DE by DL.AI - C2 (W4 - Orchestration, Monitoring, and Automating your data pipelines)

Anh-Thi Dinh

Orchestration

Before Orchestration

  • Cron: a commond line utility introduced in the 1970s. Used to execute a particular command at a specified data and time.
  • Syntax
    • Example
  • Example: scheduling data pipeline with Cron (Pure scheduling approach)
  • Weakness: If one step failed → whole process failed
  • But cron is still useful for simple and repetitive tasks (ex: regular data downloads) or in the prototyping phase (ex: testing aspects of your data pipeline)

Evolution of Orchestration Tools

  • Dataswarm (Facebook, late 2000s) → oozie (2010s) → airbnb’s Airflow (2014, open sources) → Apache Airflow (2019)
  • Airflow is used by a lot of teams. → should know
  • Pros and Cons of Airflow
  • Others

Orchestration Basics

  • DAG = Directed Acyclic Graph
  • “directed” → data flows only in one direction
  • “acyclic” → no circles or cycles
  • Dependencies: the previous tasks are required to finish before the next task starts
  • Basic concepts
  • Orchestration in Airflow
    • Writing tasks like this
    • We can trigger based on time or event (eg: click)
    • We can set up data quality checks like checking null values, range of values…

Airflow

Airflow - Core components

  • We - users - only interact with DAG directory and UI
    • DAG Didirectory is connected to Web Server → any DAG changes, we can see in UI
  • Trigger: User initiates via UI or Scheduler automatically triggers.The Scheduler continuously monitors all DAGs in the DAG Directory (once per minute by default). When tasks are ready, it pushes them to a queue for the Workers. To manage tasks and dispatch them to Workers, the Scheduler employs an Executor.
  • Status of a task: schedulequeuedrunningsuccessfailed
    • The Scheduler and Workers store task statuses in the Metadata Database. The Web Server then retrieves this information to display in the UI.
  • In MWAA (Amazon Managed Workflows for Apache Airflow), all these components are automatically created and managed for you.

Airflow — UI

Nothing to note about.

Airflow — Creating a DAG

1from airflow import DAG
2from datetime import datetime
3
4# context manager
5with DAG(
6    dag_id="my_first_dag",
7    description="ETL pipeline",
8    tags=["example"],
9    schedule="@daily",
10    # cron expression: 0 8 * * * (every day at 8:00 AM)
11    # cron preset: @daily, @hourly, @weekly, @monthly
12    # timedelta object (from datetime): timedelta(days=1)
13    start_date=datetime(2024, 12, 1),
14    catchup=False,  # default to True. Defines whether the DAG will be executed for all the data intervals between the start_date and the current date. 
15):
16    # define tasks
17    task_1 = PythonOperator(task_id="extract", python_callable=extract_data)  # we can define the function here
18    task_2 = PythonOperator(task_id="transform", python_callable=transform_data)
19    task_3 = PythonOperator(task_id="load", python_callable=load_data)
20
21    # define task dependencies
22    task_1 >> task_2 >> task_3 # bitshift operator
  • Defineing dependencies

Lab 1 — Airflow 101: build your first data pipeline

  • In the lab, we use boto3 to interact with S3 but in real life, we don’t do that because:
      1. Use Airflow-specific operators for storage and processing interactions (see S3 operator documentation).
      1. Airflow should orchestrate, not process. Delegate workloads to appropriate tools like databases or Spark clusters (see Spark operators).
  • Remember that the DAG's logical date can be obtained from context["ds"]

Airflow — XCom and Variables

  • In the previous lab, we use S3 as an intermediate storage to pass data from one task to another task. ← this method is used for large data.
  • Small dataXCom (cross-communication). Small = metadata, dates, single value metrics, simple computations.
  • context → Airflow’s built-in variables in the currently running task.
  • xcom_push() is a method associated with a task instance (context[’ti’]), the same for xcom_pull()
    • 1def extract_data(**context):
      2	pass
      3	
      4with DAG():
      5	task_1 = PythonOperator(task_id="extract", python_callable=extract_data)
  • In Airflow UI, check XCom in Admin > XComs
  • We can defined user variables and use them in the code via Airflow UI (Admin > Variables) and use them in the codes
    • 1from airflow.models import Variable
      2
      3Variable.get(key='number_post')
      4Variable.get(key='locations', deserialize_json=True)['geo']

Best practices for writing Airflow DAGs

  • Keep tasks simple and atomic: Keep your tasks simple such that each task represents one operation
  • Avoid top-level code: any code that isn’t part of your DAG or operator instantiations is considered to be top-level code. This type of code will be executed at the time when the DAG is parsed by the scheduler. Any code that is part of an operator is executed when the task runs, not when the DAG is parsed.
  • Use variables (user-created variables, Airflow built-in variables and macros): variables are global and should be used for overall configs. To pass data from one task to another task, use XCom!
    • Use built-in variables with Jinja templating syntax (like {{ds}})
      • 1def load_to_s3(file_name):    
        2    #code that loads data    
        3    print(file_name)
        4
        5task_load_s3 = PythonOperator(task_id="load_to_d3",
        6         python_callable=load_to_s3,
        7         op_kwargs={'file_name': "data/created{{ds}}/file.csv"})
  • Using Task groups
    • 1from airflow.utils.task_group import TaskGroup  
      2
      3with DAG(...):
      4    start = DummyOperator(...)
      5    with TaskGroup('task_group')as task_group:
      6       task_a = PythonOperator(...)
      7       task_b = PythonOperator(...)
      8       task_a >> task_b
      9    end = DummyOperator(...)      
      10    start >> task_group >> end
  • Airflow is an orchestrator not an executor:
    • Heavy processing should be done with other framework (eg. Spark).
    • Large dataset → use intermediary data storage (eg. S3) instead of XCom.
    • Keep any extra codes (not part of your DAG) in a separate file.

Lab 2 — Airflow 101: Best practices

  • Determinism means that the same input will always produce the same output. 
  • Idempotence means if you execute the same operation multiple times, you will obtain the same result.
  • The catchup parameter: defines whether the DAG will be executed for all the data intervals between the start_date and the current date. It is recommended that you set it to False to have more control over the execution of the DAG. You can also use the Backfill feature to execute the DAG for a specific date range.
  • Some notable codes
    • 1S3_URI_PATTTERN = r"^s3://[a-zA-Z0-9.\-_]+(/[a-zA-Z0-9.\-_]+)*$"
      2assert re.match(S3_URI_PATTTERN, source_s3_uri)

Airflow — TaskFlow API

  • TaskFlow → use decorators to create the DAG and its tasks (use less names)
  • DAG dependencies with TaskFlow
  • XCom with TaskFlow
    • 1from datetime import datetime
      2from airflow.decorators import dag, task
      3from airflow.models import Variable
      4
      5@dag(
      6    start_date=datetime(2024, 3, 13),
      7    description="First DAG",
      8    tags=["data_engineering_team"],
      9    schedule="@daily",
      10    catchup=False
      11)
      12def example_xcom_taskapi():
      13    # define tasks here
      14    @task
      15    def extract_from_api():
      16        # code that connects to API
      17        ratio_senior_jobs = # code
      18        return ratio_senior_jobs
      19
      20    @task
      21    def print_data(geo_ratios: dict):
      22        print(geo_ratios)
      23
      24    # define dependencies here
      25    data = extract_from_api()
      26    print_data(data)
      27    
      28    # OR
      29    print_data(extract_from_api())
      30
      31example_xcom_taskapi()
      Or
      1# define tasks here
      2@task
      3def extract_from_api():
      4    # code that connects to API
      5    ratio_senior_jobs = # code
      6    return ratio_senior_jobs
      7
      8@task
      9def print_data(**context):
      10    print(context['ti'].xcom_pull(task_ids='extract_from_api'))
      11
      12# define dependencies here
      13extract_from_api() >> print_data()
  • The decorator doesn’t replace all operators! ← use both paradigms.
  • Example on branching

Lab 3 — Building an advanced data pipeline with Data Quality Checks

  • Copy data to AWS S3 bucket and check
    • 1aws s3 sync work_zone s3://<RAW-DATA-BUCKET>/work_zone/
      2aws s3 ls s3://<RAW-DATA-BUCKET>/work_zone/ --recursive
      Where RAW-DATA-BUCKET is checked in CloudFormation/Stacks/<numeric-one>/Output
  • Airflow branching → decide which task should be triggered based on some conditions.
  • Useful outil protect_undefineds
    • 1def protect_undefineds(template_str: str, config: dict):
      2    """Protects undefined jinja2 expressions by treating them as raw with placeholders.
      3
      4    This function is useful if you want to avoid replacing a Jinja expression. For instance:
      5    `{{ data_interval_end }}` is defined while running your DAG, not in the config files.
      6
      7    Args:
      8        template_str (str): template in string format
      9        config (dict): values defined to be replaced in the jinja2 expression.
      10
      11    Returns:
      12        str: template_str with undefined parameters protected as raw text (% raw %).
      13    """
      14    pattern = re.compile(r"(\{\{[^\{]*\}\})")
      15    for j2_expression in set(pattern.findall(template_str)):
      16        try:
      17            Template(j2_expression, undefined=StrictUndefined).render(config)
      18        except UndefinedError:
      19            template_str = template_str.replace(
      20                j2_expression, f"{{% raw %}}{j2_expression}{{% endraw %}}"
      21            )
      22    return template_str

Orchestration on AWS

  • Host Airflow on AWS → more control vs more convenience
    • Host open-source version on EC2 ← full control configs and scaling but you need to manage all infrastructure and integration yourself.
    • Using Amazon MWAA ← host Apache Airflow for you + integrate with other AWS services.
  • Alternative:
    • AWS Glue Workflows → create, run and monitor complex ETL workflow
    • AWS Step Functions
  • Which one to choose? → based on requirements + keep up to date tools.