How To Write Your First Airflow Pipeline

SeattleDataGuy - Dec 5 '19 - - Dev Community

This tutorial discusses the basic steps it takes to create your first workflow using Apache Airflow.

Before getting started, you need to set up an Airflow environment to be able to follow along with each of the steps discussed in this article. If you haven't already done that, we find this article to be one of our personal favorites.


Why Airflow?

You might be asking why use Airflow anyway? Airflow helps solve a lot of issues by automating workflows and managing boring and redundant manual tasks.

By definition, Apache Airflow is a platform to programmatically author, schedule, and monitor workflows, also known as DAGs (see Github).

You can use Airflow to write ETLs, machine learning pipelines, and general job scheduling (e.g., Cron), including database backups and scheduling of code/config deployment.

We discussed some of the benefits of using Airflow in our comparison of Airflow and Luigi.


Understanding Airflow Pipelines

An Airflow pipeline is essentially a set of parameters written in Python that define an Airflow Directed Acyclic Graph (DAG) object. Various tasks within a workflow form a graph, which is Directed because the tasks are ordered. To avoid getting stuck in an infinite loop, this graph does not have any cycles, hence Acyclic.

For example, if we had three tasks named FooBar, and FooBar, it might be the case that Foo runs first and Bar and FooBar depend on Foo finishing.

This would create a basic graph like the one below. As you can see, there's a clear path. Now imagine this with tens of hundreds of tasks. Having a clear structure for how those tasks are run and what the order is is important.

With that basic explanation out of the way, let's create your first DAG.

If you followed the link above for setting up Airflow, then you should have set up a directory that points the AIRFLOW_HOME variable to a folder. By default, this should be a folder called airflow.In that folder, you will need to create a DAGs folder. You want to create your first DAG in the DAGs folder, as below.

airflow                  # airflow root directory.
├── dags                 # the dag root folder
│   ├── first_dag.py        # where you put your first task

Enter fullscreen mode Exit fullscreen mode

Set default_args

Breaking this down, we will need to set up a Python dictionary containing all the arguments applied to all the tasks in your workflow. If you take a look in the code below, there are some basic arguments, including owner (basically just the name of the DAG owner), and start_date of the task (determines the execution day of the first DAG task instant)

Airflow is built to handle both incremental and historical runs. Sometimes you just don't want to schedule the workflow and just run the task for today. You may also want to start running tasks from a specific day in the past (e.g., one day ago,) which is what is set up in the first code snippet below.


default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

Enter fullscreen mode Exit fullscreen mode

In this case, the start_date is one day ago. Your first DAG will run yesterday's data, then any day after that.

Here are some other key parameters.

  • end_date in the code will determine the last execution date. Specifying an end date limits Airflow from going beyond the date. If you don't put in this end date, then Airflow will just keep running forever.
  • depends_on_past is a Boolean value. If you set it to true, the current running test instance will rely on the previous task's status. For example, suppose you set this argument to true, in this case, a daily workflow. If yesterday's task run failed, then a two-day task will not be triggered because it depends on the status of the previous date.
  • email is just where you will receive the email notification from. You can set up your private email in the configuration file.
  • email on failure is used to define whether you want to receive the notification if a failure happens.
  • email on retry is used to define whether you want to receive an email every time a retry happens.
  • retries dictates the number of times Airflow will attempt to retry a failed task
  • retry-delay is the duration between consecutive retries.

In the example, Airflow will retry once every five minutes.

A quality workflow should be able to alert/report on failures, and this is one of the key things we aim to achieve in this step. Airflow is specially designed to simplify coding in this area. This is where emailing on failures can be helpful.


Configure DAG Schedule

This step is about instantiating a DAG by giving it a name and passing in the default argument to your DAG here: default_args=default_args.

Then set the schedule interval to specify how often DAG should be triggered and executed. In this case, it is just once per day.

Below is one way you can set up your DAG.

dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

Enter fullscreen mode Exit fullscreen mode

If you want to run your schedule daily, then use the following code parameters: schedule_interval='@daily'. Or you can use cron instead, like this: schedule_interval='0 0 * * *'.


Lay Out All the Tasks

In the example below, we have three tasks using the PythonOperatorDummyOperator, and BashOperator.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

dummy_task  = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

Enter fullscreen mode Exit fullscreen mode

These tasks are all pretty straightforward. What you will notice is that each has a different function and requires different parameters.

The DummyOperator is just a blank operator you can use to create a step that doesn't really do anything except signify the pipeline is done.

The PythonOperator allows you to call a Python function and even pass it parameters.

The BashOperator allows you to call bash commands.

Below we will just be writing the tasks. This will not operate until you add all the pieces together.

Using these basic tasks, you can now start to define dependencies, the orders in which the tasks should be executed.


Define Dependencies

There are two ways to define the dependencies among the tasks.

The first way is to use set_downstream and set_upstream. In this case, you can use set_upstream to make the python_task depend on the BASH task or do the same with the downstream version.

# This means that will depend on bashtask
# running successfully to run.
bashtask.set_upstream(python_task)
# similar to above where dummy_task will depend on bashtask
dummy_task.set_downstream(bashtask)
Enter fullscreen mode Exit fullscreen mode

Using this basic set up, if the BASH task is successful, then the Python task will run. Similarly, the dummy_task is dependent on the BASH task finishing.

The second way you can define a dependency is by using the bit shift operator. For those unfamiliar with the bit shift operator, it looks like >> or <<.

For example, if you would like to reference the Python task being dependent on the BASH task, you could write it as bashtask >> python_task.

Now what if you have a few tasks dependent on one?

Then you can put them as a list. In this case, the Python task and dummy_task both depend on the BASH task and are executed in parallel following the completion of the BASH task. You can use either the set_downstream method or the bit shift operator.

bashtask.set_downstream([python_task, dummy_task])


Your First Airflow Pipeline

Now that we have gone over each of the different pieces, we can put it all together. Below is your first basic Airflow pipeline.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

dummy_task  = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

Enter fullscreen mode Exit fullscreen mode

Adding the DAG Airflow Scheduler

Assuming you already have initialized your Airflow database, then you can use the webserver to add in your new DAG. Using the following commands, you can add in your pipeline.

airflow webserver
airflow scheduler

The end result will appear on your Airflow dashboard as below.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player