Using Apache Airflow with SingleStoreDB

Akmal Chaudhri - Oct 16 '22 - - Dev Community

Abstract

Apache Airflow can be used to manage workflows for data engineering pipelines. It can easily be used with SingleStoreDB and, in this article, we'll walk through the installation process and the creation of a simple workflow that uses several tables in a SingleStoreDB database.

The code files used in this article are available on GitHub.

Introduction

SingleStoreDB is MySQL wire-compatible, and we'll adapt a simple Apache Airflow MySQL example and show it working with SingleStoreDB.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Airflow Demo Group as our Workspace Group Name and airflow-demo as our Workspace Name. We'll make a note of our password and host name.

Create the database and tables

Our example is derived from a simple example described in the article Scheduling a SQL script, using Apache Airflow, with an example.

Using the SQL Editor in SingleStoreDB, we'll create a database and several tables:



CREATE DATABASE IF NOT EXISTS airflow_demo;

USE airflow_demo;

DROP TABLE IF EXISTS event;
CREATE TABLE IF NOT EXISTS event (
    event_id INT,
    spend_amt FLOAT,
    user_id INT,
    date DATE
);

DROP TABLE IF EXISTS event_stats;
CREATE TABLE IF NOT EXISTS event_stats (
    date DATE,
    user_id INT,
    total_spend_amt FLOAT,
    PRIMARY KEY (date, user_id)
);


Enter fullscreen mode Exit fullscreen mode

We'll populate the event table, as follows:



INSERT INTO event VALUES
(1,  34.36, 2,   CURRENT_DATE()),
(2,  94.92, 2,   CURRENT_DATE()),
(3,  70.76, 9,   CURRENT_DATE()),
(4,  34.26, 7,   CURRENT_DATE()),
(5,  58.36, 1,   CURRENT_DATE()),
(6,  39.64, 2,   CURRENT_DATE()),
(7,  64.83, 10,  CURRENT_DATE()),
(8,  39.33, 1,   CURRENT_DATE()),
(9,  100,   -99, CURRENT_DATE()),
(9,  69.06, 10,  ADDDATE(CURRENT_DATE(), 1)),
(10, 63.79, 3,   ADDDATE(CURRENT_DATE(), 1)),
(11, 40.87, 3,   ADDDATE(CURRENT_DATE(), 1)),
(12, 32.76, 10,  ADDDATE(CURRENT_DATE(), 1)),
(13, 11.84, 3,   ADDDATE(CURRENT_DATE(), 1)),
(14, 88.07, 2,   ADDDATE(CURRENT_DATE(), 1)),
(15, 100,   -99, ADDDATE(CURRENT_DATE(), 1));


Enter fullscreen mode Exit fullscreen mode

Example output:



+----------+-----------+---------+------------+
| event_id | spend_amt | user_id | date       |
+----------+-----------+---------+------------+
|        1 |     34.36 |       2 | 2022-10-15 |
|        2 |     94.92 |       2 | 2022-10-15 |
|        3 |     70.76 |       9 | 2022-10-15 |
|        4 |     34.26 |       7 | 2022-10-15 |
|        5 |     58.36 |       1 | 2022-10-15 |
|        6 |     39.64 |       2 | 2022-10-15 |
|        7 |     64.83 |      10 | 2022-10-15 |
|        8 |     39.33 |       1 | 2022-10-15 |
|        9 |       100 |     -99 | 2022-10-15 |
|        9 |     69.06 |      10 | 2022-10-16 |
|       10 |     63.79 |       3 | 2022-10-16 |
|       11 |     40.87 |       3 | 2022-10-16 |
|       12 |     32.76 |      10 | 2022-10-16 |
|       13 |     11.84 |       3 | 2022-10-16 |
|       14 |     88.07 |       2 | 2022-10-16 |
|       15 |       100 |     -99 | 2022-10-16 |
+----------+-----------+---------+------------+


Enter fullscreen mode Exit fullscreen mode

Install Apache Airflow

We'll use a Virtual Machine running Ubuntu 22.04.2 as our test environment. An alternative would be to use venv.

First, we'll upgrade pip:



pip install --upgrade pip


Enter fullscreen mode Exit fullscreen mode

Since we are using MySQL tools, we'll need to install the following:



sudo apt install pkg-config

sudo apt install libmysqlclient-dev


Enter fullscreen mode Exit fullscreen mode

and



pip install apache-airflow-providers-mysql


Enter fullscreen mode Exit fullscreen mode

Next, we'll slightly modify the installation instructions, as follows:



export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.10.2

PYTHON_VERSION="$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"


Enter fullscreen mode Exit fullscreen mode

We'll also need:



pip install apache-airflow-providers-common-sql>=1.17.0


Enter fullscreen mode Exit fullscreen mode

Now, we'll run the following:



airflow standalone


Enter fullscreen mode Exit fullscreen mode

The last set of messages displayed by the above command should be similar to the following:



standalone | 
standalone | Airflow is ready
standalone | Login with username: admin  password: kRgbcnSCXEQ8SYQ6
standalone | Airflow Standalone is for development purposes only. Do not use this in production!
standalone |


Enter fullscreen mode Exit fullscreen mode

Make a note of the password in your environment.

Launching a web browser and entering http://localhost:8080 will show a login screen similar to Figure 1.

Figure 1. Sign In screen.

Figure 1. Sign In screen.

We'll use the credentials that we received earlier. After logging in, we should see a screen similar to Figure 2.

Figure 2. DAGs.

Figure 2. DAGs.

Create SingleStoreDB connection

In Apache Airflow, selecting Admin > Connections, we'll see a large list of connections. If we scroll down, we'll find mysql_default. Using the pencil icon, we'll edit the connection. Here is what we need to enter:

  • Connection Id: mysql_default
  • Connection Type: MySQL
  • Host: <host>
  • Schema: airflow_demo
  • Login: admin
  • Password: <password>
  • Port: 3306

We'll replace the <host> and <password> with the values from our SingleStoreDB Cloud account.

We'll then use the Save button to save the connection.

Create DAG and SQL files

Let's create a new Python file called airflow_demo_dag.py with the following code, derived from the article mentioned earlier:



from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import date

default_arg = {
        'owner' : 'airflow',
        'start_date' : str(date.today())
}

dag = DAG(
        'simple-s2-dag',
        default_args = default_arg,
        schedule_interval = '0 0 * * *'
)

s2_task = MySqlOperator(
        task_id = 's2_task',
        mysql_conn_id = 'mysql_default',
        autocommit = True,
        sql = 'airflow_demo.sql',
        params = {'test_user_id': -99},
        dag = dag
)

s2_task


Enter fullscreen mode Exit fullscreen mode

and a SQL file called airflow_demo.sql with the following code:



USE airflow_demo;

DROP TABLE IF EXISTS event_stats_staging;

CREATE TABLE event_stats_staging AS
    SELECT date, user_id, SUM(spend_amt) AS total_spend_amt
    FROM event
    WHERE date = '{{ ds }}' AND user_id <> {{ params.test_user_id }}
    GROUP BY date, user_id;

INSERT INTO event_stats (date, user_id, total_spend_amt)
    SELECT date, user_id, total_spend_amt
    FROM event_stats_staging
    ON DUPLICATE KEY UPDATE total_spend_amt = VALUES(total_spend_amt);

DROP TABLE IF EXISTS event_stats_staging;


Enter fullscreen mode Exit fullscreen mode

The DAG file is scheduled to run at midnight. The SQL code performs an aggregation of the data in the event table and stores the result in the event_stats table. For example, the first run of the code would produce the following:



+------------+---------+-----------------+
| date       | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 |       1 |           97.69 |
| 2022-10-15 |       2 |          168.92 |
| 2022-10-15 |       7 |           34.26 |
| 2022-10-15 |       9 |           70.76 |
| 2022-10-15 |      10 |           64.83 |
+------------+---------+-----------------+


Enter fullscreen mode Exit fullscreen mode

The second run of the code would produce the following:



+------------+---------+-----------------+
| date       | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 |       1 |           97.69 |
| 2022-10-15 |       2 |          168.92 |
| 2022-10-15 |       7 |           34.26 |
| 2022-10-15 |       9 |           70.76 |
| 2022-10-15 |      10 |           64.83 |
| 2022-10-16 |       2 |           88.07 |
| 2022-10-16 |       3 |           116.5 |
| 2022-10-16 |      10 |          101.82 |
+------------+---------+-----------------+


Enter fullscreen mode Exit fullscreen mode

We'll save our two files in the directory ~/airflow/dags. We may need to create the directory if it does not exist.

Run the code

In Apache Airflow, we should see a new DAG called simple-s2-dag in the list of DAGS. If it is not visible, it may be necessary to rerun the command:



airflow standalone


Enter fullscreen mode Exit fullscreen mode

We can toggle the button to the left of the name to enable the DAG. On the extreme right-hand side, we can see an arrow button (▶) ︎that will allow us to run the DAG immediately. Doing so should be successful. Clicking on the DAG name will provide additional details, as shown in Figure 3.

Figure 3. simple-s2-dag.

Figure 3. simple-s2-dag.

From SingleStoreDB, we can check the event_stats table:



SELECT * FROM event_stats ORDER BY user_id;


Enter fullscreen mode Exit fullscreen mode

The result should be similar to the following:



+------------+---------+-----------------+
| date | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 | 1 | 97.69 |
| 2022-10-15 | 2 | 168.92 |
| 2022-10-15 | 7 | 34.26 |
| 2022-10-15 | 9 | 70.76 |
| 2022-10-15 | 10 | 64.83 |
+------------+---------+-----------------+
5 rows in set (0.02 sec)

Enter fullscreen mode Exit fullscreen mode




Summary

In this article, we have used a straightforward example to demonstrate Apache Airflow with SingleStoreDB. Since SingleStoreDB is MySQL wire-compatible, many existing tools and techniques can be used out of the box. This makes it easy for MySQL and MariaDB developers to transition to SingleStoreDB.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player