👉 Lecture notes & Repositoty.
- Cron: a commond line utility introduced in the 1970s. Used to execute a particular command at a specified data and time.
- Syntax
- Example
- Example: scheduling data pipeline with Cron (Pure scheduling approach)
- Weakness: If one step failed → whole process failed
- But cron is still useful for simple and repetitive tasks (ex: regular data downloads) or in the prototyping phase (ex: testing aspects of your data pipeline)
- Dataswarm (Facebook, late 2000s) → oozie (2010s) → airbnb’s Airflow (2014, open sources) → Apache Airflow (2019)
- Airflow is used by a lot of teams. → should know
- Pros and Cons of Airflow
- Others
- DAG = Directed Acyclic Graph
- “directed” → data flows only in one direction
- “acyclic” → no circles or cycles
- Dependencies: the previous tasks are required to finish before the next task starts
- Basic concepts
- Orchestration in Airflow
- Writing tasks like this
- We can trigger based on time or event (eg: click)
- We can set up data quality checks like checking null values, range of values…
- We - users - only interact with DAG directory and UI
- Trigger: User initiates via UI or Scheduler automatically triggers.The Scheduler continuously monitors all DAGs in the DAG Directory (once per minute by default). When tasks are ready, it pushes them to a queue for the Workers. To manage tasks and dispatch them to Workers, the Scheduler employs an Executor.
- Status of a task: schedule → queued → running → success → failed
The Scheduler and Workers store task statuses in the Metadata Database. The Web Server then retrieves this information to display in the UI.
- In MWAA (Amazon Managed Workflows for Apache Airflow), all these components are automatically created and managed for you.
Nothing to note about.
1from airflow import DAG
2from datetime import datetime
3
4# context manager
5with DAG(
6 dag_id="my_first_dag",
7 description="ETL pipeline",
8 tags=["example"],
9 schedule="@daily",
10 # cron expression: 0 8 * * * (every day at 8:00 AM)
11 # cron preset: @daily, @hourly, @weekly, @monthly
12 # timedelta object (from datetime): timedelta(days=1)
13 start_date=datetime(2024, 12, 1),
14 catchup=False, # default to True. Defines whether the DAG will be executed for all the data intervals between the start_date and the current date.
15):
16 # define tasks
17 task_1 = PythonOperator(task_id="extract", python_callable=extract_data) # we can define the function here
18 task_2 = PythonOperator(task_id="transform", python_callable=transform_data)
19 task_3 = PythonOperator(task_id="load", python_callable=load_data)
20
21 # define task dependencies
22 task_1 >> task_2 >> task_3 # bitshift operator
- Understand sheduling DAG: Data Interval,
execution_date
, Timetables, Data-aware scheduling.
- Built-in variables like
ti
,run_id
,… incontext
- Defineing dependencies
- Check these codes.
- In the lab, we use
boto3
to interact with S3 but in real life, we don’t do that because: - Use Airflow-specific operators for storage and processing interactions (see S3 operator documentation).
- Airflow should orchestrate, not process. Delegate workloads to appropriate tools like databases or Spark clusters (see Spark operators).
- Remember that the DAG's logical date can be obtained from
context["ds"]
- In the previous lab, we use S3 as an intermediate storage to pass data from one task to another task. ← this method is used for large data.
- Small data → XCom (cross-communication). Small = metadata, dates, single value metrics, simple computations.
context
→ Airflow’s built-in variables in the currently running task.
xcom_push()
is a method associated with a task instance (context[’ti’]
), the same forxcom_pull()
1def extract_data(**context):
2 pass
3
4with DAG():
5 task_1 = PythonOperator(task_id="extract", python_callable=extract_data)
- In Airflow UI, check XCom in Admin > XComs
- We can defined user variables and use them in the code via Airflow UI (Admin > Variables) and use them in the codes
1from airflow.models import Variable
2
3Variable.get(key='number_post')
4Variable.get(key='locations', deserialize_json=True)['geo']
- Keep tasks simple and atomic: Keep your tasks simple such that each task represents one operation
- Avoid top-level code: any code that isn’t part of your DAG or operator instantiations is considered to be top-level code. This type of code will be executed at the time when the DAG is parsed by the scheduler. Any code that is part of an operator is executed when the task runs, not when the DAG is parsed.
- Use variables (user-created variables, Airflow built-in variables and macros): variables are global and should be used for overall configs. To pass data from one task to another task, use XCom!
- Use built-in variables with Jinja templating syntax (like
{{ds}}
)
1def load_to_s3(file_name):
2 #code that loads data
3 print(file_name)
4
5task_load_s3 = PythonOperator(task_id="load_to_d3",
6 python_callable=load_to_s3,
7 op_kwargs={'file_name': "data/created{{ds}}/file.csv"})
- Using Task groups
1from airflow.utils.task_group import TaskGroup
2
3with DAG(...):
4 start = DummyOperator(...)
5 with TaskGroup('task_group')as task_group:
6 task_a = PythonOperator(...)
7 task_b = PythonOperator(...)
8 task_a >> task_b
9 end = DummyOperator(...)
10 start >> task_group >> end
- Airflow is an orchestrator not an executor:
- Heavy processing should be done with other framework (eg. Spark).
- Large dataset → use intermediary data storage (eg. S3) instead of XCom.
- Keep any extra codes (not part of your DAG) in a separate file.
- Check these codes.
- Determinism means that the same input will always produce the same output.
- Idempotence means if you execute the same operation multiple times, you will obtain the same result.
- The
catchup
parameter: defines whether the DAG will be executed for all the data intervals between thestart_date
and the current date. It is recommended that you set it toFalse
to have more control over the execution of the DAG. You can also use the Backfill feature to execute the DAG for a specific date range.
- Some notable codes
1S3_URI_PATTTERN = r"^s3://[a-zA-Z0-9.\-_]+(/[a-zA-Z0-9.\-_]+)*$"
2assert re.match(S3_URI_PATTTERN, source_s3_uri)
- TaskFlow → use decorators to create the DAG and its tasks (use less names)
- DAG dependencies with TaskFlow
- XCom with TaskFlow
1from datetime import datetime
2from airflow.decorators import dag, task
3from airflow.models import Variable
4
5@dag(
6 start_date=datetime(2024, 3, 13),
7 description="First DAG",
8 tags=["data_engineering_team"],
9 schedule="@daily",
10 catchup=False
11)
12def example_xcom_taskapi():
13 # define tasks here
14 @task
15 def extract_from_api():
16 # code that connects to API
17 ratio_senior_jobs = # code
18 return ratio_senior_jobs
19
20 @task
21 def print_data(geo_ratios: dict):
22 print(geo_ratios)
23
24 # define dependencies here
25 data = extract_from_api()
26 print_data(data)
27
28 # OR
29 print_data(extract_from_api())
30
31example_xcom_taskapi()
Or
1# define tasks here
2@task
3def extract_from_api():
4 # code that connects to API
5 ratio_senior_jobs = # code
6 return ratio_senior_jobs
7
8@task
9def print_data(**context):
10 print(context['ti'].xcom_pull(task_ids='extract_from_api'))
11
12# define dependencies here
13extract_from_api() >> print_data()
- The decorator doesn’t replace all operators! ← use both paradigms.
- Example on branching
- Check these codes.
- Copy data to AWS S3 bucket and check
1aws s3 sync work_zone s3://<RAW-DATA-BUCKET>/work_zone/
2aws s3 ls s3://<RAW-DATA-BUCKET>/work_zone/ --recursive
Where
RAW-DATA-BUCKET
is checked in CloudFormation/Stacks/<numeric-one>/Output- Airflow branching → decide which task should be triggered based on some conditions.
- Dynamic DAG Generation — Airflow Documentation ← keep the DRY principle true for DAGs.
- Useful outil
protect_undefineds
1def protect_undefineds(template_str: str, config: dict):
2 """Protects undefined jinja2 expressions by treating them as raw with placeholders.
3
4 This function is useful if you want to avoid replacing a Jinja expression. For instance:
5 `{{ data_interval_end }}` is defined while running your DAG, not in the config files.
6
7 Args:
8 template_str (str): template in string format
9 config (dict): values defined to be replaced in the jinja2 expression.
10
11 Returns:
12 str: template_str with undefined parameters protected as raw text (% raw %).
13 """
14 pattern = re.compile(r"(\{\{[^\{]*\}\})")
15 for j2_expression in set(pattern.findall(template_str)):
16 try:
17 Template(j2_expression, undefined=StrictUndefined).render(config)
18 except UndefinedError:
19 template_str = template_str.replace(
20 j2_expression, f"{{% raw %}}{j2_expression}{{% endraw %}}"
21 )
22 return template_str
- Host Airflow on AWS → more control vs more convenience
- Host open-source version on EC2 ← full control configs and scaling but you need to manage all infrastructure and integration yourself.
- Using Amazon MWAA ← host Apache Airflow for you + integrate with other AWS services.
- Alternative:
- AWS Glue Workflows → create, run and monitor complex ETL workflow
- AWS Step Functions
- Which one to choose? → based on requirements + keep up to date tools.