Build a pipeline with the Airflow image


I'm going to use Airflow in docker. In the airflow website, there is a guide to setup airflow project. But I will change some details a bit

First, they have default docker-compose to use. You can download them by

mdkir docker-airflow
cd docker-airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'

Then create 4 folders

mkdir -p ./dags ./logs ./plugins ./data

In the docker-compose.yaml file, uncomment the 48th line that says build: . then go to the 60th line and add more a volume mapping like this - ./data:/home/airflow/data

Then create a new Dockerfile; this is for adding more 3rd party libraries in case you want to try more than I'm going to give.

touch Dockerfile requirements.txt
FROM apache/airflow:2.2.4
USER airflow
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --user -r /tmp/requirements.txt

In requirements.txt file, add 3rd party libraries there as many as you want.

Then start wrting a pipeline for Airflow, create a mypipeline.py in the dags folder. The python file name can be whatever you want.

touch dags/mypipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

from datetime import date, timedelta

dag = DAG(
  'mycustompipeline',
  catchup=False,
  description='Example for data pipelining',
  schedule_interval='*/1 * * * *',
  default_args={
    'owner': 'Nuttawut',
    'start_date': days_ago(1),
    'depends_on_past': False,
    'email': ['nuttawut@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
  })

def print_message():
  print('Hello Task 1!')

def write_to_file():
  with open('/home/airflow/data/logtime.txt', 'a') as f:
    f.write(date.today().strftime('%B %d, %Y\n'))

t1 = PythonOperator(
  task_id='print_message',
  python_callable=print_message,
  dag=dag
)

t2 = PythonOperator(
  task_id='write_file',
  python_callable=write_to_file,
  dag=dag
)

t3 = BashOperator(
  task_id='print_file',
  bash_command='cat /home/airflow/data/logtime.txt',
  dag=dag
)

[t1, t2] >> t3

Try to interpret my code, I created a pipeline with 3 tasks that will be executed every 1 minute. Before start this pipeline, create .env file by

echo -e "AIRFLOW_UID=$(id -u)" > .env

Now you can initialize the airflow by typing this

docker compose up airflow-init

After it has been initialized, you can now start the pipeline with

docker compose up

Wait for a minute, then you can visit the airflow webserver in localhost:8080. Login with user and password, both are airflow.

In the DAGs section, find a pipeline named mycustompipeline then click to inspect the pipeline. After that, trigger the pipeline by clicking at the play button at the top right. The pipeline will start running, now you can check the data folder, there will be a new file created. That file's content will be updated every minute. You can try to change this behavior by editting the dags/mypipeline.py file

To stop the airflow application, use ctrl+C and then remove all volume cache with docker-compose down --volumes --rmi all.

sources