Automate Your Data Workflows: Why Pressing Download Button Isn’t Always Enough!

Dhanvina N - Aug 25 - - Dev Community

Ever found yourself downloading datasets from Kaggle or other online sources, only to get bogged down by repetitive tasks like data cleaning and splitting? Imagine if you could automate these processes, making data management as breezy as a click of a button! That’s where Apache Airflow comes into play. Let’s dive into how you can set up an automated pipeline for handling massive datasets, complete with a NAS (Network-Attached Storage) for seamless data management. 🚀

Why Automate?

Before we dive into the nitty-gritty, let’s explore why automating data workflows can save you time and sanity:

Reduce Repetition: Automate repetitive tasks to focus on more exciting aspects of your project.
Increase Efficiency: Quickly handle updates or new data without manual intervention.
Ensure Consistency: Maintain consistent data processing standards every time.

Step-by-Step Guide to Your Data Pipeline

Let’s walk through setting up a data pipeline using Apache Airflow, focusing on automating dataset downloads, data cleaning, and splitting—all while leveraging your NAS for storage.

File structure

/your_project/
│
├── dags/
│   └── kaggle_data_pipeline.py      # Airflow DAG script for automation
│
├── scripts/
│   ├── cleaning_script.py           # Data cleaning script
│   └── split_script.py              # Data splitting script
│
├── data/
│   ├── raw/                        # Raw dataset files
│   ├── processed/                 # Cleaned and split dataset files
│   └── external/                  # External files or archives
│
├── airflow_config/
│   └── airflow.cfg                 # Airflow configuration file (if customized)
│
├── Dockerfile                       # Optional: Dockerfile for containerizing
├── docker-compose.yml               # Optional: Docker Compose configuration
├── requirements.txt                # Python dependencies for your project
└── README.md                       # Project documentation

Enter fullscreen mode Exit fullscreen mode

1. Set Up Apache Airflow
First things first, let’s get Airflow up and running.

Install Apache Airflow:

# Create and activate a virtual environment
python3 -m venv airflow_env
source airflow_env/bin/activate

# Install Airflow
pip install apache-airflow
Enter fullscreen mode Exit fullscreen mode

Initialize the Airflow Database:

airflow db init
Enter fullscreen mode Exit fullscreen mode

Create an Admin User:

airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com
Enter fullscreen mode Exit fullscreen mode

Start Airflow:

airflow webserver --port 8080
airflow scheduler
Enter fullscreen mode Exit fullscreen mode

Access Airflow UI: Go to http://localhost:8080 in your web browser.

2. Connect Your NAS
Mount NAS Storage: Ensure your NAS is mounted on your system. For instance:

sudo mount -t nfs <NAS_IP>:/path/to/nas /mnt/nas
Enter fullscreen mode Exit fullscreen mode

3. Create Your Data Pipeline DAG
Create a Python file (e.g., kaggle_data_pipeline.py) in the ~/airflow/dags directory with the following code:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
import subprocess

# Default arguments
default_args = {
    'owner': 'your_name',
    'depends_on_past': False,
    'start_date': datetime(2024, 8, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'kaggle_data_pipeline',
    default_args=default_args,
    description='Automated Pipeline for Kaggle Datasets',
    schedule_interval=timedelta(days=1),
)

# Define Python functions for each task
def download_data(**kwargs):
    # Replace with your Kaggle dataset URL and credentials
    subprocess.run(["kaggle", "datasets", "download", "-d", "<DATASET_ID>", "-p", "/mnt/nas/data"])

def extract_data(**kwargs):
    # Extract data if it's in a compressed format
    subprocess.run(["unzip", "/mnt/nas/data/dataset.zip", "-d", "/mnt/nas/data"])

def clean_data(**kwargs):
    # Example cleaning script call
    subprocess.run(["python", "/path/to/cleaning_script.py", "--input", "/mnt/nas/data"])

def split_data(**kwargs):
    # Example splitting script call
    subprocess.run(["python", "/path/to/split_script.py", "--input", "/mnt/nas/data"])

# Define tasks
download_task = PythonOperator(
    task_id='download_data',
    python_callable=download_data,
    dag=dag,
)

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

clean_task = PythonOperator(
    task_id='clean_data',
    python_callable=clean_data,
    dag=dag,
)

split_task = PythonOperator(
    task_id='split_data',
    python_callable=split_data,
    dag=dag,
)

# Set task dependencies
download_task >> extract_task >> clean_task >> split_task
Enter fullscreen mode Exit fullscreen mode

Create Data Processing Scripts
scripts/cleaning_script.py

import argparse
import os

def clean_data(input_path):
    # Implement your data cleaning logic here
    print(f"Cleaning data in {input_path}...")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True, help="Path to the data directory")
    args = parser.parse_args()

    clean_data(args.input)
Enter fullscreen mode Exit fullscreen mode

scripts/split_script.py

import argparse
import os

def split_data(input_path):
    # Implement your data splitting logic here
    print(f"Splitting data in {input_path}...")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True, help="Path to the data directory")
    args = parser.parse_args()

    split_data(args.input)
Enter fullscreen mode Exit fullscreen mode

Dockerize Your Setup

FROM apache/airflow:2.5.1

USER root

# Install any additional packages
RUN pip install kaggle

# Copy DAGs and scripts
COPY dags/ /opt/airflow/dags/
COPY scripts/ /opt/airflow/scripts/

USER airflow
Enter fullscreen mode Exit fullscreen mode

docker-compose.yml

version: '3'
services:
  airflow-webserver:
    image: apache/airflow:2.5.1
    ports:
      - "8080:8080"
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_DATABASE_URI=sqlite:///airflow.db
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
    command: webserver

  airflow-scheduler:
    image: apache/airflow:2.5.1
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_DATABASE_URI=sqlite:///airflow.db
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
    volumes:
      - ./dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
    command: scheduler
Enter fullscreen mode Exit fullscreen mode

Run Your Pipeline
Start Airflow Services:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Monitor Pipeline:

Access the Airflow UI at http://localhost:8080 to trigger and monitor the pipeline
Enter fullscreen mode Exit fullscreen mode

GitHub Actions Setup
GitHub Actions allows you to automate workflows directly within your GitHub repository. Here’s how you can set it up to run your Dockerized pipeline:

Create GitHub Actions Workflow
Create a .github/workflows Directory:

mkdir -p .github/workflows
Enter fullscreen mode Exit fullscreen mode

Create a Workflow File:

.github/workflows/ci-cd.yml

name: CI/CD Pipeline

on:
  push:
    branches:
      - main
  pull_request:
    branches:
      - main

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v2

      - name: Build and push Docker image
        uses: docker/build-push-action@v4
        with:
          context: .
          push: true
          tags: your_dockerhub_username/your_image_name:latest

      - name: Run Docker container
        run: |
          docker run -d --name airflow_container -p 8080:8080 your_dockerhub_username/your_image_name:latest
Enter fullscreen mode Exit fullscreen mode

4. What’s Happening Here?

  • download_data: Automatically downloads the dataset from Kaggle to your NAS.
  • extract_data: Unzips the dataset if needed.
  • clean_data: Cleans the data using your custom script.
  • split_data: Splits the data into training, validation, and testing sets.

5. Run and Monitor Your Pipeline
Access the Airflow UI to manually trigger the DAG or monitor its execution.
Check Logs for detailed information on each task.

6. Optimize and Scale
As your dataset grows or your needs change:

  • Adjust Task Parallelism: Configure Airflow to handle multiple tasks concurrently.
  • Enhance Data Cleaning: Update your cleaning and splitting scripts as needed.
  • Add More Tasks: Integrate additional data processing steps into your pipeline.

Conclusion

Automating your data workflows with Apache Airflow can transform how you manage and process datasets. From downloading and cleaning to splitting and scaling, Airflow’s orchestration capabilities streamline your data pipeline, allowing you to focus on what really matters—analyzing and deriving insights from your data.

So, set up your pipeline today, kick back, and let Airflow do the heavy lifting!

.
Terabox Video Player