DE by DL.AI - Course 2: Source Systems, Data Ingestion, and Pipelines (W1)

Anh-Thi Dinh

Plan for Course 2

  • This course focuses on 1st 2 steps of the DE life cycle — data generation and source systems and data ingestion from those source system.
    • DE life cycle.
  • People misunderstand that 80% are for modeling and 20% are for ingestion but it’s not true, it’s the opposite!
  • Lot of the time is actually spent thinking about the data → for all of AI workloads.
  • In this course, we not only work with structured data, but also with text, image data and so forth. The volume of unstructured data in the world is much greater than the volume of structured data.
The plan:
  • Week 1 - Common source systems
    • Databases, object storage, and streaming sources
    • Working with source systems on AWS
  • Week 2 - Setting up ingestion from source systems
  • Week 3 - DataOps undercurrent
    • Automating some of your pipeline tasks
    • Monitoring data quality
  • Week 4 - Orchestration, monitoring, and automating data pipelines
    • Setting up directed acyclic graphs using Airflow.
    • Working with infrastructure as code

Intro to Source Systems

Different types of source systems

  • 3 types of data:
    • Structured Data: data organized as tables of rows and columns. Eg: excel, SQL, csv,…
    • Semi-Structured data: Data that is not in tabular form but still has some structure. Eg: JSON format.
    • Unstructured data: Data that does not have any predefined structure. Eg: text, video, audio, images…
  • 3 types of source system to ingest (they don’t need to be corresponding 1-to-1 with 3 types of data): Databases, Files and Streaming Systems.
    • In this figure: CRUD, DBMS, relational databases, non-relational (NoSQL) databases.
      any types of files. They’re one of the most common source systems you’ll work with as a DE.
       
As a data engineer, you'll extract raw data from various sources like databases, files, and streaming systems. This data can be structured, semi-structured, or unstructured.

Relational Databases

  • The most common type of source system you’ll interact with is relational database.
  • Where? → many web and mobile apps use relational database as backend. corporate systems like customer relationship, human resourec, …. Online Transaction Processing (OLTP) = execute a high volume of transactions concurrently like banking and bookings.
  • Why? → if we use one big table (OBT) for everything → informations may be duplicated in multiple rows.
  • Database schema
    • Primary key vs Foreign key.
  • Data normalization (developed in the 1970s)
    • Minimize redundancy
    • Ensure data integrity
  • Today, sometimes we use One Big Table approach: use cases that need faster processing.

SQL Queries

  • To query the data, you'll have to understand the database schema.
    • Database for a fictitious DVD rental company called Rentio
  • I recommend only using SELECT * to retrieve all the data in a table, where you can filter the results with some Boolean condition.
  • Retrieve data commands: SELECT, COUNT, FROM, JOIN (INNER JOIN, LEFT JOIN, RIGHTJOIN, FULL JOIN), WHERE, GROUP BY, ORDER BY (DESC, ASC), LIMIT
    • An example.
  • Data Manipulation Operations: CREATE, INSERT INTO, UPDATE, DELETE.

Lab 1: Interacting with a Relational Database using SQL

  • This lab isn’t needed to be submitted.
  • Schema
  • Run SQL commands in Jupyter notebook using %load_ext sql thanks to ipython-sql extension
  • Connect to the database
    • 1import os 
      2import socket
      3
      4from dotenv import load_dotenv
      5
      6load_dotenv('./src/env', override=True)
      7
      8DBHOST = socket.gethostname()
      9DBPORT = os.getenv('DBPORT')
      10DBNAME = os.getenv('DBNAME')
      11DBUSER = os.getenv('DBUSER')
      12DBPASSWORD = os.getenv('DBPASSWORD')
      13
      14connection_url = f'mysql+pymysql://{DBUSER}:{DBPASSWORD}@{DBHOST}:{DBPORT}/{DBNAME}'
      15
      16%sql {connection_url}

NoSQL Databases

  • NoSQL databases address relational database limitations by prioritizing schema flexibility, scalability, and performance over strong consistency, joins, and fixed schemas.
  • NoSQL → “not only SQL” (not “no sql at all”). It can still support SQL or SQL-like query language.
  • Non-tabular structures
  • NoSQL operates under the principle of eventual consistency (vs strong consistency of SQL) ← Suppose we have multiple servers (nodes) → one can be updated and others may be updated later ← suitable for speed prioritized, system availability and scalability are important
    • “Not updated” → given enough time → “updated”
  • Not all NoSQL databases guarantee ACID compliance (Atomicity - tính nguyên tử, Consistency, Isolation, Durability). Eg: MongoDB.
  • Example
  • Two command types of NoSQL databses — Key-Value and Document
    • Key-Value is best for fast lookup: such as caching user session data. Eg: viewing different products, adding items to the shopping cart, checkout out,… can be stored in the user session id.
    • Documents is a special Key-Value database that store data in JSON like documents. Each doc has an uniq key. Documents are organized in a collections (like table) and the doc is a row.
      • Easy to retrieve all the information about a user (locality)
      • Document stores don’t support joins.
      • Flexible schema.
      • Used in: content management, catelogs and sensor readings…
      • Be careful: the source system owner can easily change in data and break down the pipeline

ACID Compliance

  • Both SQL and NoSQL are commonly used in Online Transaction Processing (OLTP systems). ← these systesm need to store rapidly changing application states.
  • ACID = Atomicity - tính nguyên tử, Consistency, Isolation, Durability
  • ACID helps ensure transactions are processed reliably and accurately in an OLTP system.
  • Relational databases support ACID but not all NoSQL do by default.
  • Definition:
    • Atomicity = ensures that transactions are atomic, treated as a single indivisible unit. Atomicity ensures that either all of the operations within a transaction are executed successfully, or none of them are.
      • Eg: Money transfers between 2 accounts A → B. Either A- and B+ or A lose nothing and B gets nothing.
    • Consistency = any changes to the data made within a transaction follow the set of rules or constraints defined by the database shcema. ← ensures that the db will transaction from one valid state to another.
      • E.g.: The schema rule is that stock must be ≥ 0 for a transaction to be allowed. If a user orders too many items, causing the stock to become 0, the transaction will not be accepted!
    • Isolation = each transaction is executed independently in sequential order.
      • E.g.: Two people order products simultaneously while there's only a limited quantity available → this principle ensures order and isolation to avoid duplication
    • Durability: Once a transaction is completed, its effects are permanent and will survive any subsequent system failures.
      • E.g.: Even if the system crashes due to natural disasters, the state of successful transactions will still be preserved!
  • ACID makes data be consistent across the entire network (when there are many servers/nodes)
  • As a DE, understanding when your db needs to be ACID compliant can help you prevent disasters.

Lab 2: Interacting with Amazon DynamoDB NoSQL Database

  • DynamoDB is a key-value database.
  • DynamoDB is schema-less: each item can have its own distinct attributes.
  • Simple primary key: partition key
  • Composite Primary key: partition key & sort key (sort key must be uniq)
  • Boto3: the AWS SDK for Python (Boto3) to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3). The SDK provides an object-oriented API as well as low-level access to AWS services. → Boto3 for DynamoDB
  • In this lab, we focus on these methods
    • Create: create_table
    • Read: scan, get_item, query
    • Update: put_item, write_batch_items, update_item
    • Delete: delete_item
  • Connect
    • 1import boto3
      2client = boto3.client('dynamodb')
  • After creating tables: Go to the AWS Console, search for DynamoDB, click on Tables on the left, and check that the tables have been created.
  • Marshal JSON: similar to a regular JSON file but it also includes the types of each value.
    • 1{
      2    "<AttributeName>": {
      3        "<DataType>": "<Value>"
      4    },
      5    "<ListAttribute>": {
      6        "<DataType>": [
      7            {
      8                "<DataType>": "<Value1>"
      9            },
      10            {
      11                "<DataType>": "<Value2>"
      12            }]
      13    }    
      14}
  • ⭐ To fully grasp the commands and their functionalities, it's essential to examine the notebook in detail.
  • We have to convert the usual JSON to Marshal JSON beforing inserting to DynamoDB table and we may need to convert from Marshal JSON to the usual to handle later in other services.

Object Storage

  • Object Storage is no hierarchy: Even we can create and organize folders (in UI), subfolders in S3 but the actual storage mechanism is flat (all the files are stored right at the top level). ← it allows quick and straightfordward access to all objects without worrying about the overhead of a folder structure.
  • Object can be any thing: csv (structured), text, json, mp4, mp4, png, bin,…
  • Storing semi-structured (eg. JSON) and unstructured data (eg. images) → good to serve data for training ML models.
  • Properties:
    • Each object is assigned to UUID key (Universal Unique Identifier). Eg: 550e8400-e29b-41d4-a716-446655440000
    • Each object has metadata (creation date, file type, owner,…)
    • Objects are immutable.
    • We can enable versioning → metadata version
  • Why use Object Storage?
    • Store files of various data formats without a specific file system structure
    • Easily scale out to provide virtually limitless storage space
    • Replicate data across several availability zones. Eg: S3 ~99.999999999% data durability
    • Cheaper than other storage options

Lab 3: Interacting with Amazon S3 Object Storage

  • Interact with S3 bucket with boto3.
  • Commands
    • 1# list all created buckets
      2aws s3 ls
      3
      4# list all objects stored inside the bucket
      5aws s3 ls $BUCKET_NAME
  • Naming object key in S3
    • 1object_key = 'csv/ratings_ml_training_dataset.csv'
      Although S3 stores objects as flat, the UI displays the csv folder and the file inside.

Logs

  • It’s just a record of information about events that can serve to track the activitiy of the system.
  • Monitoring or debugging a system: user activities, database updating, error tracking.
  • We can apply logging systems to:
    • Web server logs (user activity data) → analysis of user behavior patterns.
    • Database system logs → track changes in source database.
    • Security sustem logs → ML anomaly detection.
  • Rich data source can support downstream use cases.
  • Log levels: debug, info, warn, error, fatal

Streaming Systems

  • Terminology:
    • Event: something that happened in the world or a change to the state of a system. Eg: user clicking on a link, sensor measuring a teperature change.
    • Message: a record of information about an event. It contains details, metadata, timestamp.
    • Stream: a sequence of messages.
  • If you want to handle chunks of this data all at once, like over a specific time interval, then that would be batch processing applied to a stream of messages.
  • Streaming system takes actrion when each message is received.
  • 3 components of a streaming system: event producer → event router / streaming broker → event consumer
  • Event router / Streaming broker:
    • Acts as a buffer to filter and distribute the messages
    • Decouples producer from consumer
    • Prevents message from being lost
  • “Event” and “message” in real world can be used interchangeable.
  • 2 main types of a streaming system: Message queue, event streaming platform.
    • Message Queue: a queue/buffer that accumulates messages and delivers those messages to consumers asynchronously. Eg: Amazon Simple Queue Service (Amazon SQS)
    • Event streaming platform: Unlike the message queue, the messages aren’t deleted from the logs and the data is persistent. Eg: kafka, Amazon Kinesis Data Streams.

Connecting to Source Systems

Connecting to Source Systems

  • You may be in some cases that you cannot connect to a source system due tu:
    • Improper iidentity and access managements (IAM) definitions.
    • Broken networking configurations.
    • Wrong set of access credentials.
  • There are more than 1 way to connect to the database.
  • You can find the information of the connection inside the AWS Console. However, this way isn’t consistent because AWS Console may change the UI in the future, let’s use programmatic way. ← use AWS CLI.
  • Use API Connector (JDBC - Java Database Connectivity / ODBC - Open Database Connectivity API)

Connecting to an Amazon RDS MySQL Database

  • Connect to MySQL using Python with pymsql
  • 🚸 If you created an RDS database instance for practice, please remove it when you are done.

Basics of IAM and Permissions

  • The #1 root cause of cloud data breaches is human error: insecure storage of passwords, IAM misconfigurations.
  • IAM is a framework for managing permissions. Permissions define which actions an identity
    can perform on a specific set of resources.
  • Other cloud providers have IAM too.
  • AWS IAM gives different identities and policies to grant permissions for actions on AWS.
    • Root user → no restricted.
    • IAM User → has specific permission to certain resources (username, password, access key)
    • IAM Group → A collection of users that inherit the same permission from the group policy.
    • IAM Role → A user, application, or service that’s been granted temporary permissions.
  • Example

Basics of Networking in the Cloud

  • The way you configure your network plays a key role in ensuring that the data flows properly throughout your data pipeline.
  • Region considerations:
    • Legal compliance
    • Latency: the closer your end users are to the region, the lower the latency.
    • Availability: the more availability zones, the better you will be able to recover from a disaster.
    • Cost
  • Region > Zone > VPC (Virtual Private Cloud) > Subnets
    •  

AWS Netwroking: VPCs & Subnets

  • Core concepts: Amazon VPCs, subnets, gateways, route tables, network access control lists (ACLs) and security groups.
  • VPC
  • Scenario: A web application running on an EC2 instance that allows you to query a database running on RDS.
  • There is default VPC in each region. Default VPC includes a public subnet in each availability zone in that region + an internet gateway. ← However, your default should not be to use the default VPC in real world!
  • A region can have one or more VPCs in it.
    • Resources in each VPC can communicate from each other.
    • Communication between VPCs need to be configured.
  • IPv4 CIDR (Classles Inter-Domain Routing) defines the range of private IP addresses that can be used within the VPC. Eg: 10.0.0.0/16
    • Each number in the IP address is 8bit (from 0 to 255)
    • /16 = prefix length: How many bits used for the network part of the address. Eg. 10.0.0.0/16 → “10.0” will be the prefix → max 32 bits in the entire IP address. ← we need this when creating subnets.
    • Host address: any resource deployed into this network would have a private IP address.
  • Subnet: smaller divisions of the private IP spoace for the VPC that you can use to group resources based on their network access and security requriements.
    • Each subnet is associated to a specific AZ (Availability Zone).
    • It's common to create at least one private and one public subnet per AZ that you intend to use here.
    • Each subnet need to have an IP range that is a subset of the IP range of the VPC (eg. 10.0.) → eg. 10.0.1.0/24

AWS Networking: Internet Gateway & NAT Gateway

  • Upto now, the VPC we configured aren’t be able to connect to the internet even we set up EC2 inside the public subnet. ← VPC and subnets create an isolated network.
  • NAT Gateway (Network Address Translation Gateway): allows resources in a private subnet to connect to the internet or other AWS services. + prevent the internet from initiating connections with those resources.
    • Allow applications in EC2 download the upgrades, patching… from the internet without exposing them.
  • ALB (Application Load Balancer): distributes incomming ap traffic across multiple backend targets.
    • Give users a way to submit requests to the app running on the EC2 instance.
  • One VPC can only have 1 internet gateway at a time.

AWS Networking: Route Gateway

  • Route Tables:
    • Essential for directing network traffic within your VPC. Each subnet can be associated with a route table, which contains a set of rules or routes that determine where network traffic is directed
    • When a VPC is created, AWS auto creates a default route table. ← resources in different subnets can communicate with each other.
  • You need to configure the route internet connectivity.
  • Without these routes, your subnets won't know how to direct traffic either to the Internet or within the VPC itself.
  • In AWS Console → VPC → Route table → create → associate to a VPC and specific subnet.
  • After creating route tables, you need to create routes.
    • 0.0.0.0/0 → can match any IP address (or entire internet) because zero bits are fixed.
    • Target of route configured for public subnet should be internet gateway.
    • Target of route configured for private subnet should be NAT gateway.

AWS Networking: Network ACLs & Security Groups

  • By default, no traffic is allowed to reach instances inside private subnets (eg. Amazon RDS, EC2) even the route tables in place. ← to change that: Security Groups and Network Access Control Lists (ACL)
    • Teal rectangles → ACL, Orange rectangles → Security groups.
  • Security Groups: instance level virtual firewalls, controlling both inbound and outbound traffic. By default, outbound is allowed while inbound is denied. Security Groups are stateful (inbound created, outbound is created auto).
  • Network Access Control Lists (ACL): Provide an additional layer of security at the subnet level. It’s stateless (you have to define both inbound and outbound rules explicitly). It’s useful for implementing security policies at the subnet level.
  • Sum up:
  • If you encounter connectivity issues:
      1. Verify that your VPC has an internet gateway properly attached
      1. Verify that the route tables have appropriate rules to direct traffic correctly
      1. Verify that the route table associations with the subnets are configured correctly
      1. Check security groups to make sure they have the needed rules in place
      1. Review network ACLs to confirm they allow the necessary traffic
      1. Double-check instance configurations to ensure they are associated with the correct security groups and subnets

Lab 4: Troubleshooting Database Connectivity on AWS

What we’re going to do in this lab is to handle connection and permission issues.
  • In case we create a new environment in Cloud9 but not set right the subnet configs. → We cannot connect to the db in rds (it freezes right after enter password)
    • 1# Check the host name of rds
      2aws rds describe-db-instances --db-instance-identifier <PostgreSQL-DB-name> --output text --query "DBInstances[].Endpoint.Address"
      3
      4# connect
      5psql --host=<PostgreSQLEndpoint> --username=postgres --password --port=5432
      🚨 It freezes!
  • Not that, default port for postgre db is 5432
  • Problem 1: the Cloud9 environment and RDS don’t place at the same VPS.
    • Check RDS: AWS Console → RDS → Databases → DB identifier → (under Networking) VPC → take note “VPC ID”
    • Check Cloud9: AWS Console → Cloud9 → Environments → <name of env> → Network Settings → Check VPC id
    • Solution: delete current Cloud9 environement and create a new one (this time choose the same VPC as the one of RDS)
  • Problem 2: RDS instance isn’t allowed access from instance of Cloud9.
    • Solution: AWS Console → RDS → Databases → DB identifier → Connectivity & security → Click VPC security groups → Security group ID → Edit inbound rules → add the security group of the instance from EC2 (Cloud9)
      • Add new rule: Custom TCP, Port range 5432 (default port of Postgre, for MySQL, default port would be 3306). ⚠️ Don’t choose 0.0.0.0/0 (allow all connections from outside) → Paste the Security group id of EC2 instance.
      • Check the security group id of EC2 instance: AWS Console → EC2 → Instances → Instance ID → Security → Security Groups → Security Group ID → take note it.
  • Problem 3: this time, we get this error
    • 1psql: error: connection to the server at "de-c2w1a1-rds.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com" (xx.xx.xx.xx), port 5432 failed: FATAL:  password authentication failed for user "postgres"
      2connection to the server at "de-c2w1a1-rds.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com" (xx.xx.xx.xx), port 5432 failed: FATAL:  no pg_hba.conf entry for host "xx.xx.xx.xx", user "postgres", database "postgres", no encryption
    • This means 2 resources are talking to each other but the password is incorrect!
  • Problem 4: After you run a python script to download the items from S3, there is an error
    • 1Error downloading file: An error occurred (403) when calling the HeadObject operation: Forbidden
      You don’t have permission to download the data!
    • Solution: AWS Console → S3 → bucket name → Permissions → Bucket Policy → Edit → Replace by below and then Save changes.
      • 1{
        2    "Version": "2012-10-17",
        3    "Statement": [
        4        {
        5            "Effect": "Allow",
        6            "Principal": "*",
        7            "Action": "s3:GetObject",
        8            "Resource": "arn:aws:s3:::<YOUR-DATA-BUCKET>/csv/*",
        9            "Condition": {
        10                "IpAddress": {
        11                    "aws:SourceIp": "<YOUR-CLOUD9-ENV-IP-ADDRESS>" 
        12                }
        13            }
        14        }
        15    ]
        16}
      • Where to find Cloud9 instance IP Address: AWS Console → EC2 → Instances → Instance ID → Instance summary → Copy Public IPv4 address.