I'm going to use Airflow in docker. In the airflow website, there is a guide to setup airflow project. But I will change some details a bit
First, they have default docker-compose to use. You can download them by
mdkir docker-airflow
cd docker-airflow
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
Then create 4 folders
mkdir -p ./dags ./logs ./plugins ./data
In the docker-compose.yaml
file, uncomment the 48th line that says build: .
then go to the 60th line
and add more a volume mapping like this - ./data:/home/airflow/data
Then create a new Dockerfile
; this is for adding more 3rd party libraries in case you want to try more than I'm going to give.
touch Dockerfile requirements.txt
FROM apache/airflow:2.2.4
USER airflow
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --user -r /tmp/requirements.txt
In requirements.txt
file, add 3rd party libraries there as many as you want.
Then start wrting a pipeline for Airflow, create a mypipeline.py
in the dags
folder.
The python file name can be whatever you want.
touch dags/mypipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import date, timedelta
dag = DAG(
'mycustompipeline',
catchup=False,
description='Example for data pipelining',
schedule_interval='*/1 * * * *',
default_args={
'owner': 'Nuttawut',
'start_date': days_ago(1),
'depends_on_past': False,
'email': ['nuttawut@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
})
def print_message():
print('Hello Task 1!')
def write_to_file():
with open('/home/airflow/data/logtime.txt', 'a') as f:
f.write(date.today().strftime('%B %d, %Y\n'))
t1 = PythonOperator(
task_id='print_message',
python_callable=print_message,
dag=dag
)
t2 = PythonOperator(
task_id='write_file',
python_callable=write_to_file,
dag=dag
)
t3 = BashOperator(
task_id='print_file',
bash_command='cat /home/airflow/data/logtime.txt',
dag=dag
)
[t1, t2] >> t3
Try to interpret my code,
I created a pipeline with 3 tasks that will be executed every 1 minute.
Before start this pipeline, create .env
file by
echo -e "AIRFLOW_UID=$(id -u)" > .env
Now you can initialize the airflow by typing this
docker compose up airflow-init
After it has been initialized, you can now start the pipeline with
docker compose up
Wait for a minute, then you can visit the airflow webserver in localhost:8080
.
Login with user and password, both are airflow
.
In the DAGs section, find a pipeline named mycustompipeline
then click to inspect the pipeline.
After that, trigger the pipeline by clicking at the play button at the top right.
The pipeline will start running, now you can check the data
folder, there will be a new file created.
That file's content will be updated every minute. You can try to change this behavior by editting the dags/mypipeline.py
file
To stop the airflow application, use ctrl+C
and then remove all volume cache with docker-compose down --volumes --rmi all
.
sources