DE by DL.AI - C2: Source Systems, Data Ingestion, and Pipelines (W2 - Data Ingestion)

Anh-Thi Dinh

Data Ingestion Overview

Overview

  • Recall: As a DE, you get raw data somewhere → turn it into something useful → make it available for downstream use cases.
    • Data injection → “get raw data somewhere”
  • Recall all the labs we’ve done so far:
  • Plan for this week:
    • This week takes a closer look at batch and streaming ingestions.
    • Talk to Data Analyst to identify requirements for batch ingestion from a REST API.
    • Talk to Software Engineers to investigate requirements for streaming ingestion: the data payload and event rates + how to configure the streaming pipeline.

Data injection on a continuum

  • Data you’re working with is unbounded (continous stream of events) - the stream doesn’t have particular beginning and ending.
    • If we ingest events individually, one at a time → streaming injection
    • If we impose some boundaries and inject all data within these boundaries → batch injection
  • Different ways to impose the boundaries:
    • Size-based batch ingestion: size-threshold batch ingestion: 10Gb each chunk, total number of records: 1K events each chunk.
    • Time-base batch injestion: Weekly / Daily chunk, every hour,…
    • The more we increase the frequency of the injection → streaming injection.
      It depends on use case and the source system to decide which one to use.
  • Ways to ingest data from databases:
    • Connectors (JDBC/ODBC API) ← Lab 1 of this week.
      • How much can you ingest in one go? How frequently can you call the API? → no fix measure → reading API doc + communicate with data owners + writing custom API connection code.
      • 👍 Recommend: For API data ingestion, use existing solutions when possible. Reserve custom connections for when no other options exist.
    • Ingestion Tool (AWS Glue ETL)
  • Ways to injest data from files: Use secure file transfer like SFTP, SCP.
  • Ways to injest data from streaming systems: choose batch or streaming or setup message queue. ← Lab 2 of this week.

Batch and Streaming tools

  • Batch ingestion tools:
    • AWS Glue ETL: Serverless service for data ingestion and transformation from various AWS sources. Uses Apache Spark for distributed ETL jobs. Enables code-based solutions for data processing. See AWS Glue Components & AWS Glue ETL guidance.
    • Amazon EMR (Big Data platform): Managed cluster platform for big data frameworks like Hadoop and Spark. Useful for ingesting and transforming petabyte-scale data from databases to AWS data stores. Offers serverless and provisioned modes.
    • Glue vs EMR: see again this note.
    • AWS DMS (Database Migration Service): For data ingestion without transformations. Syncs data between databases or to data stores like S3. Supports database engine migrations. Available in serverless or provisioned modes.
    • AWS Snow family: For migrating large on-premise datasets (100+ TB) to the cloud. Offers physical transfer appliances like Snowball and Snowcone, avoiding slow and costly internet transfers.
    • AWS Transfer Family: Enables secure file transfers to/from S3 using SFTP, FTPS, and FTP protocols.
    • Other non-AWS: Airbyte, Matillion, Fivetran
  • Key considerations for Batch vs Streaming Injection:
    • Use cases: Evaluate benefits of real-time data vs. batch processing. Consider stakeholder needs for ML and reporting.
    • Latency: Determine if millisecond-level ingestion is necessary or if micro-batch approach suffices.
    • Cost: Assess complexities and expenses of streaming vs. batch ingestion, including maintenance and team capabilities.
    • Existing systems: Check compatibility with source and destination systems. Consider impact on production instances and suitability for data types (e.g., IoT sensors).
    • Reliability: Ensure streaming pipeline stability, redundancy, and high availability of compute resources compared to batch services.

Batch Ingestion

Conversation with a Makerting Analyst

  • Lean what actions your stakeholders plan to take with the data:
    • DA: We would like to look into what kind of music people are listening to across the various regions where we sell our products and then comparing those with product sales.
    • DE (repeat again + more clearly): you would like to pull in public data from some external sources to get information on the music people are listening to
  • A follow-up conversation may be required:
    • DE: wait for me to look closer on the Spotify API.
  • Key point: to identify the key requirements for the system you’ll need to build.
    • The key things you'll learn here are that you're going to need to ingest data from a third party API. → batch ingestion (no need for real time)

ETL vs ELT

  • ETL (Extract - Transform - Load)
    • When resources in the warehouse is limited (back to the past) → we have to think of a way to transform data to store in an efficient manner.
    • It ensures data quality but take times to transform and may be re-do the transformation processing if needed.
  • ELT (Extract - Load - Transform):
    • Becomes popular nowaday because of lower cost of infrastructure.
    • Store enormous amounts of data for relatively cheap.
    • Perform data transformation directly in the data warehouse.
    • Advantages:
      • It’s faster to implement.
      • It makes data available more quickly to end users.
      • Transformations can still be done efficiently. You can decide later to adopt different transformations.
    • Downsides: you may lost the “T” and only extract and load any data → Data Swamp problem (data has become unorganized, unmanageable and useless)
→ Both ETL & ELT are reasonable batch processing approaches.

REST API

  • REST = Representational state transfer
  • APIs solved inefficient data and service exchange between teams at Amazon and elsewhere. They established stable interfaces for sharing functionality, regardless of internal systems. This approach formed the basis for Amazon Web Services and influenced global data sharing practices.
  • API features: metadata, documentation, authentication, error handling.
  • Most common type is REST API. ← use HTTP as the basis for communication.

Lab 1: Batch processing to get data from an API

In this lab, we practice extracting data from the Spotify API.

Streaming Ingestion

Conversation with a Software Engineer

  • Learn more about the existing data system.
  • Learn about the specific data payload and message rate of the existing data system. → 1K events per second, message size: few hundred bytes, retention period 1 day.

Streaming ingestion details

  • Recall: 2 main modalities of streaming injecttion: Message Queue & Event Streaming Platform.
  • We’re going Amazon Kinesis Data Streams and Apache Kafka. ← we can use them interchangeable.
  • Kafka:
    • It's a job of the producer to decide on which partition to send each message.
      Partitions are like lanes on the highway. More lanes allow more cars to pass through, and so each partition handles a subset of messages as they are added to the topic.
    • Kafka cluster retains message
    • ❤️ Gentle introduction into Kafka: Gently down the stream

Kinesis Data Streams Details

  • Kinesis Data Stream has the same concepts as in Kafka.
Compare to the capture in Kafka.
  • To scale your stream for more data ingestion, add more shards. Determine the necessary shard count based on expected write and read operation sizes and rates in your pipeline.
    • Write operation (from the side of Event producer)
      • up to 1K records per second per shard.
      • max total write rate: 1MB/s
    • Read Operation (from the size of Event Consumer)
      • up to 5 read operations per second per shard
      • max total read rate: 2Mb/s
  • When you know the traffic of your app → use provisioned mode. ← more controls on cost, manually add shards
  • When don’t know the number of read and wirte → use on-demand mode ← auto scale, pay what you use, more convenient
  • Data sent from producer → Data record which contains partition key (eg. customerID), sequence number and Binary Large Object (BLOB)
    • Shared Fan-Out: when consumers share a shard’s read capacity (2MB/s for all consumers) ← there may be issuses for some use cases.
    • Enhanced Fan-Out : each consumer can read at full capacity of the shard (2Mb/s for each)
  • We can use AWS Lambda, Amazon Managed Service for Apache Flink, AWS Glue to process data stored in Kinesis data streams. OR you can write your custom consumers using Amazon Kinesis Client Library (KCL)
  • We can setup so that output of one stream becomes input of another.
  • Consumer can send data to other Amazon services like Amazon Data Firehose to store data in S3.
  • Kinesis data stream allows multiple applications to work with the same stream at the same time.

Change Data Capture (CDC)

  • When exrtact and load data from a database, you may need to update the data after some time. There are 2 ways: full snapshot or incrementally update only the updated rows (based on something like last_updated column.
    • Full load → consistent but heavy.
    • Incremental load → CDC
  • Use cases for CDC:
    • Sync data between database systems, on-premise and cloud,…
    • CDC helps you capture all historical changes.
    • CDC enables microservices to track any change.
  • 2 approaches to CDC: push (update target, triggered from source) and pull (update current target, pull from source).
  • CDC Implementation Patterns:
    • Batch-oriented or query-based CDC (pull-based): This method queries the database periodically to identify changes using a timestamp column like updated_at, which helps find modified rows. It adds computational overhead as it requires scanning the database.
    • Continuous or log-based CDC (pull-based): This approach monitors the database log to capture changes in real-time without adding computational load or needing extra columns, often integrating with streaming platforms like Apache Kafka.
    • Trigger-based CDC (push-based method): This method uses triggers configured in the database to detect changes automatically, reducing CDC’s workload but potentially affecting write performance if overused.

General Considerations for choosing ingestion tools

  • Consider the characteristics of the data (payload) when choosing an ingestion tool.
  • Data Characteristics:
    • Data type & structure: Structured, unstructured, or semi-structured data types influence tool choice and necessary transformations.
    • Data volume:
      • For batch ingestion, consider historical data size and potential need for chunking based on bandwidth.
      • For streaming ingestion, ensure the tool supports the maximum expected message size.
      • Account for future data growth to configure tools and anticipate costs.
    • Latency requirements: Determine if stakeholders need batch or near real-time processing and choose tools accordingly.
    • Data quality: Assess if source data is complete and accurate, and if post-processing is needed. Use tools that handle quality checks if required.
    • Schema changes: Anticipate frequent schema changes and select tools that detect changes automatically. Communication with stakeholders is crucial.
  • Reliability and Durability:
    • Reliability ensures the ingestion system performs as expected.
    • Durability prevents data loss or corruption.
    • Evaluate trade-offs between redundancy costs and the consequences of data loss.
  • For detailed insights, refer to chapter 7 of Fundamentals of Data Engineering.

Lab 2 (or named as Lab 1 Streaming): Streaming ingestion

This lab contains 2 parts:
Part 1: Manually create and consume streams using Kinesis. We'll use two files: producer_from_cli.py to generate streams and consumer_from_cli.py to consume them.
Before running these scripts, we need to create a data stream in Kinesis. Go to the Kinesis console on AWS and select "Create data stream." This stream must be running for our scripts to work properly.
Part 2: Act as a consumer only. We'll receive a given stream, transform the data, and create two new streams based on the user's country (USA or international). Finally, we'll save the results to S3. All these operations are handled in the src/etl/consumer.py file.