Thi Notes
AboutNotesBlogTopicsToolsReading
About|Sketches |Cooking |Cafe icon Support Thi

DE by DL.AI - C2 W4 - Orchestration, Monitoring, and Automating your data pipelines

DE by DL.AI - C2 W4 - Orchestration, Monitoring, and Automating your data pipelines

Anh-Thi Dinh
DeepLearning.AI
mooc-de
Data Science
Data Engineering
MOOC
AWS
API & Services
☝
List of notes for this specialization + Lecture notes & Repository & Quizzes + Home page on Coursera. Read this note alongside the lecture notes—some points aren't mentioned here as they're already covered in the lecture notes.

Orchestration

Before Orchestration

  • Cron: a commond line utility introduced in the 1970s. Used to execute a particular command at a specified data and time.
  • Syntax
    • Example
  • Example: scheduling data pipeline with Cron (Pure scheduling approach)
  • Weakness: If one step failed → whole process failed
  • But cron is still useful for simple and repetitive tasks (ex: regular data downloads) or in the prototyping phase (ex: testing aspects of your data pipeline)

Evolution of Orchestration Tools

  • Dataswarm (Facebook, late 2000s) → oozie (2010s) → airbnb’s Airflow (2014, open sources) → Apache Airflow (2019)
  • Airflow is used by a lot of teams. → should know
  • Pros and Cons of Airflow
  • Others

Orchestration Basics

  • DAG = Directed Acyclic Graph
  • “directed” → data flows only in one direction
  • “acyclic” → no circles or cycles
  • Dependencies: the previous tasks are required to finish before the next task starts
  • Basic concepts
  • Orchestration in Airflow
    • Writing tasks like this
    • We can trigger based on time or event (eg: click)
    • We can set up data quality checks like checking null values, range of values…

Airflow

Airflow - Core components

  • We - users - only interact with DAG directory and UI
    • DAG Didirectory is connected to Web Server → any DAG changes, we can see in UI
  • Trigger: User initiates via UI or Scheduler automatically triggers.The Scheduler continuously monitors all DAGs in the DAG Directory (once per minute by default). When tasks are ready, it pushes them to a queue for the Workers. To manage tasks and dispatch them to Workers, the Scheduler employs an Executor.
  • Status of a task: scheduled → queued → running → success → failed
    • The Scheduler and Workers store task statuses in the Metadata Database. The Web Server then retrieves this information to display in the UI.
  • In MWAA (Amazon Managed Workflows for Apache Airflow), all these components are automatically created and managed for you.

Airflow — UI

Nothing to note about.

Airflow — Creating a DAG

  • Core Concepts — Airflow Documentation
  • Operators — Airflow Documentation
  • Bitshift Composition and Relationships
  • Cron & Time Intervals — Airflow Documentation
  • Parameters of DAG
  • Understand sheduling DAG: Data Interval, execution_date, Timetables, Data-aware scheduling.
  • An introduction to the Airflow UI | Astronomer Documentation
  • Built-in variables like ti, run_id,… in context
  • Defineing dependencies

Lab 1 — Airflow 101: build your first data pipeline

  • Check these codes.
  • In the lab, we use boto3 to interact with S3 but in real life, we don’t do that because:
      1. Use Airflow-specific operators for storage and processing interactions (see S3 operator documentation).
      1. Airflow should orchestrate, not process. Delegate workloads to appropriate tools like databases or Spark clusters (see Spark operators).
  • Remember that the DAG's logical date can be obtained from context["ds"]

Airflow — XCom and Variables

  • In the previous lab, we use S3 as an intermediate storage to pass data from one task to another task. ← this method is used for large data.
  • Small data → XCom (cross-communication). Small = metadata, dates, single value metrics, simple computations.
  • context → Airflow’s built-in variables in the currently running task.
  • xcom_push() is a method associated with a task instance (context[’ti’]), the same for xcom_pull()
  • In Airflow UI, check XCom in Admin > XComs
  • We can defined user variables and use them in the code via Airflow UI (Admin > Variables) and use them in the codes

Best practices for writing Airflow DAGs

  • Keep tasks simple and atomic: Keep your tasks simple such that each task represents one operation
  • Avoid top-level code: any code that isn’t part of your DAG or operator instantiations is considered to be top-level code. This type of code will be executed at the time when the DAG is parsed by the scheduler. Any code that is part of an operator is executed when the task runs, not when the DAG is parsed.
  • Use variables (user-created variables, Airflow built-in variables and macros): variables are global and should be used for overall configs. To pass data from one task to another task, use XCom!
    • Use built-in variables with Jinja templating syntax (like {{ds}})
  • Using Task groups
  • Airflow is an orchestrator not an executor:
    • Heavy processing should be done with other framework (eg. Spark).
    • Large dataset → use intermediary data storage (eg. S3) instead of XCom.
    • Keep any extra codes (not part of your DAG) in a separate file.
  • Best Practices — Airflow Documentation
  • DAG writing best practices in Apache Airflow | Astronomer Documentation
  • Functional Data Engineering — a modern paradigm for batch data processing | by Maxime Beauchemin | Medium

Lab 2 — Airflow 101: Best practices

  • Check these codes.
  • Determinism means that the same input will always produce the same output. 
  • Idempotence means if you execute the same operation multiple times, you will obtain the same result.
  • The catchup parameter: defines whether the DAG will be executed for all the data intervals between the start_date and the current date. It is recommended that you set it to False to have more control over the execution of the DAG. You can also use the Backfill feature to execute the DAG for a specific date range.
  • SQL to Amazon S3 — apache-airflow-providers-amazon Documentation
  • Connections & Hooks — Airflow Documentation
  • Some notable codes

Airflow — TaskFlow API

  • TaskFlow — Airflow Documentation
  • Tutorial on the TaskFlow API — Airflow Documentation
  • TaskFlow → use decorators to create the DAG and its tasks (use less names)
  • DAG dependencies with TaskFlow
  • XCom with TaskFlow
    • Or
  • The decorator doesn’t replace all operators! ← use both paradigms.
  • Example on branching

Lab 3 — Building an advanced data pipeline with Data Quality Checks

  • Check these codes.
  • Copy data to AWS S3 bucket and check
  • Airflow branching → decide which task should be triggered based on some conditions.
  • Dynamic DAG Generation — Airflow Documentation ← keep the DRY principle true for DAGs.
  • Useful outil protect_undefineds

Orchestration on AWS

  • Host Airflow on AWS → more control vs more convenience
    • Host open-source version on EC2 ← full control configs and scaling but you need to manage all infrastructure and integration yourself.
    • Using Amazon MWAA ← host Apache Airflow for you + integrate with other AWS services.
  • Alternative:
    • AWS Glue Workflows → create, run and monitor complex ETL workflow
    • AWS Step Functions
  • Which one to choose? → based on requirements + keep up to date tools.
👉
DE by DL.AI - C3 W1 - Storage Ingredients & Storage Systems
 
In this post
◆Orchestration○Before Orchestration○Evolution of Orchestration Tools○Orchestration Basics◆Airflow○Airflow - Core components○Airflow — UI○Airflow — Creating a DAG○Lab 1 — Airflow 101: build your first data pipeline○Airflow — XCom and Variables○Best practices for writing Airflow DAGs○Lab 2 — Airflow 101: Best practices○Airflow — TaskFlow API○Lab 3 — Building an advanced data pipeline with Data Quality Checks○Orchestration on AWS
1from airflow import DAG
2from datetime import datetime
3
4# context manager
5with DAG(
6    dag_id="my_first_dag",
7    description="ETL pipeline",
8    tags=["example"],
9    schedule="@daily",
10    # cron expression: 0 8 * * * (every day at 8:00 AM)
11    # cron preset: @daily, @hourly, @weekly, @monthly
12    # timedelta object (from datetime): timedelta(days=1)
13    start_date=datetime(2024, 12, 1),
14    catchup=False,  # default to True. Defines whether the DAG will be executed for all the data intervals between the start_date and the current date. 
15):
16    # define tasks
17    task_1 = PythonOperator(task_id="extract", python_callable=extract_data)  # we can define the function here
18    task_2 = PythonOperator(task_id="transform", python_callable=transform_data)
19    task_3 = PythonOperator(task_id="load", python_callable=load_data)
20
21    # define task dependencies
22    task_1 >> task_2 >> task_3 # bitshift operator
1def extract_data(**context):
2	pass
3	
4with DAG():
5	task_1 = PythonOperator(task_id="extract", python_callable=extract_data)
1from airflow.models import Variable
2
3Variable.get(key='number_post')
4Variable.get(key='locations', deserialize_json=True)['geo']
1def load_to_s3(file_name):    
2    #code that loads data    
3    print(file_name)
4
5task_load_s3 = PythonOperator(task_id="load_to_d3",
6         python_callable=load_to_s3,
7         op_kwargs={'file_name': "data/created{{ds}}/file.csv"})
1from airflow.utils.task_group import TaskGroup  
2
3with DAG(...):
4    start = DummyOperator(...)
5    with TaskGroup('task_group')as task_group:
6       task_a = PythonOperator(...)
7       task_b = PythonOperator(...)
8       task_a >> task_b
9    end = DummyOperator(...)      
10    start >> task_group >> end
1S3_URI_PATTTERN = r"^s3://[a-zA-Z0-9.\-_]+(/[a-zA-Z0-9.\-_]+)*$"
2assert re.match(S3_URI_PATTTERN, source_s3_uri)
1from datetime import datetime
2from airflow.decorators import dag, task
3from airflow.models import Variable
4
5@dag(
6    start_date=datetime(2024, 3, 13),
7    description="First DAG",
8    tags=["data_engineering_team"],
9    schedule="@daily",
10    catchup=False
11)
12def example_xcom_taskapi():
13    # define tasks here
14    @task
15    def extract_from_api():
16        # code that connects to API
17        ratio_senior_jobs = # code
18        return ratio_senior_jobs
19
20    @task
21    def print_data(geo_ratios: dict):
22        print(geo_ratios)
23
24    # define dependencies here
25    data = extract_from_api()
26    print_data(data)
27    
28    # OR
29    print_data(extract_from_api())
30
31example_xcom_taskapi()
1# define tasks here
2@task
3def extract_from_api():
4    # code that connects to API
5    ratio_senior_jobs = # code
6    return ratio_senior_jobs
7
8@task
9def print_data(**context):
10    print(context['ti'].xcom_pull(task_ids='extract_from_api'))
11
12# define dependencies here
13extract_from_api() >> print_data()
1aws s3 sync work_zone s3://<RAW-DATA-BUCKET>/work_zone/
2aws s3 ls s3://<RAW-DATA-BUCKET>/work_zone/ --recursive
Where RAW-DATA-BUCKET is checked in CloudFormation/Stacks/<numeric-one>/Output
1def protect_undefineds(template_str: str, config: dict):
2    """Protects undefined jinja2 expressions by treating them as raw with placeholders.
3
4    This function is useful if you want to avoid replacing a Jinja expression. For instance:
5    `{{ data_interval_end }}` is defined while running your DAG, not in the config files.
6
7    Args:
8        template_str (str): template in string format
9        config (dict): values defined to be replaced in the jinja2 expression.
10
11    Returns:
12        str: template_str with undefined parameters protected as raw text (% raw %).
13    """
14    pattern = re.compile(r"(\{\{[^\{]*\}\})")
15    for j2_expression in set(pattern.findall(template_str)):
16        try:
17            Template(j2_expression, undefined=StrictUndefined).render(config)
18        except UndefinedError:
19            template_str = template_str.replace(
20                j2_expression, f"{{% raw %}}{j2_expression}{{% endraw %}}"
21            )
22    return template_str