👉 Lecture notes & Repositoty.
- This course focuses on 1st 2 steps of the DE life cycle — data generation and source systems and data ingestion from those source system.
- People misunderstand that 80% are for modeling and 20% are for ingestion but it’s not true, it’s the opposite!
- Lot of the time is actually spent thinking about the data → for all of AI workloads.
- In this course, we not only work with structured data, but also with text, image data and so forth. The volume of unstructured data in the world is much greater than the volume of structured data.
The plan:
- Week 1 - Common source systems
- Databases, object storage, and streaming sources
- Working with source systems on AWS
- Week 2 - Setting up ingestion from source systems
- Week 3 - DataOps undercurrent
- Automating some of your pipeline tasks
- Monitoring data quality
- Week 4 - Orchestration, monitoring, and automating data pipelines
- Setting up directed acyclic graphs using Airflow.
- Working with infrastructure as code
- 3 types of data:
- Structured Data: data organized as tables of rows and columns. Eg: excel, SQL, csv,…
- Semi-Structured data: Data that is not in tabular form but still has some structure. Eg: JSON format.
- Unstructured data: Data that does not have any predefined structure. Eg: text, video, audio, images…
- 3 types of source system to ingest (they don’t need to be corresponding 1-to-1 with 3 types of data): Databases, Files and Streaming Systems.
As a data engineer, you'll extract raw data from various sources like databases, files, and streaming systems. This data can be structured, semi-structured, or unstructured.
- The most common type of source system you’ll interact with is relational database.
- Where? → many web and mobile apps use relational database as backend. corporate systems like customer relationship, human resourec, …. Online Transaction Processing (OLTP) = execute a high volume of transactions concurrently like banking and bookings.
- Why? → if we use one big table (OBT) for everything → informations may be duplicated in multiple rows.
- Database schema
- Data normalization (developed in the 1970s)
- Minimize redundancy
- Ensure data integrity
- Today, sometimes we use One Big Table approach: use cases that need faster processing.
- To interact with relational database, we use Relational Database Management System (RDBMS): MySQL, PostgreSQL, Oracle Database, SQLServer. ← support SQL language.
- To query the data, you'll have to understand the database schema.
- I recommend only using
SELECT *
to retrieve all the data in a table, where you can filter the results with some Boolean condition.
- Retrieve data commands:
SELECT
,COUNT
,FROM
,JOIN
(INNER JOIN
,LEFT JOIN
,RIGHTJOIN
,FULL JOIN
),WHERE
,GROUP BY
,ORDER BY
(DESC
,ASC
),LIMIT
- Data Manipulation Operations:
CREATE
,INSERT INTO
,UPDATE
,DELETE
.
- This lab isn’t needed to be submitted.
- Schema
- Run SQL commands in Jupyter notebook using
%load_ext sql
thanks toipython-sql
extension
- Connect to the database
1import os
2import socket
3
4from dotenv import load_dotenv
5
6load_dotenv('./src/env', override=True)
7
8DBHOST = socket.gethostname()
9DBPORT = os.getenv('DBPORT')
10DBNAME = os.getenv('DBNAME')
11DBUSER = os.getenv('DBUSER')
12DBPASSWORD = os.getenv('DBPASSWORD')
13
14connection_url = f'mysql+pymysql://{DBUSER}:{DBPASSWORD}@{DBHOST}:{DBPORT}/{DBNAME}'
15
16%sql {connection_url}
- NoSQL databases address relational database limitations by prioritizing schema flexibility, scalability, and performance over strong consistency, joins, and fixed schemas.
- NoSQL → “not only SQL” (not “no sql at all”). It can still support SQL or SQL-like query language.
- Non-tabular structures
- NoSQL operates under the principle of eventual consistency (vs strong consistency of SQL) ← Suppose we have multiple servers (nodes) → one can be updated and others may be updated later ← suitable for speed prioritized, system availability and scalability are important
- Not all NoSQL databases guarantee ACID compliance (Atomicity - tính nguyên tử, Consistency, Isolation, Durability). Eg: MongoDB.
- Example
- Two command types of NoSQL databses — Key-Value and Document
- Key-Value is best for fast lookup: such as caching user session data. Eg: viewing different products, adding items to the shopping cart, checkout out,… can be stored in the user session id.
- Documents is a special Key-Value database that store data in JSON like documents. Each doc has an uniq key. Documents are organized in a collections (like table) and the doc is a row.
- Easy to retrieve all the information about a user (locality)
- Document stores don’t support joins.
- Flexible schema.
- Used in: content management, catelogs and sensor readings…
- Be careful: the source system owner can easily change in data and break down the pipeline
- Both SQL and NoSQL are commonly used in Online Transaction Processing (OLTP systems). ← these systesm need to store rapidly changing application states.
- ACID = Atomicity - tính nguyên tử, Consistency, Isolation, Durability
- ACID helps ensure transactions are processed reliably and accurately in an OLTP system.
- Relational databases support ACID but not all NoSQL do by default.
- Definition:
- Atomicity = ensures that transactions are atomic, treated as a single indivisible unit. Atomicity ensures that either all of the operations within a transaction are executed successfully, or none of them are.
- Eg: Money transfers between 2 accounts A → B. Either A- and B+ or A lose nothing and B gets nothing.
- Consistency = any changes to the data made within a transaction follow the set of rules or constraints defined by the database shcema. ← ensures that the db will transaction from one valid state to another.
- E.g.: The schema rule is that stock must be ≥ 0 for a transaction to be allowed. If a user orders too many items, causing the stock to become 0, the transaction will not be accepted!
- Isolation = each transaction is executed independently in sequential order.
- E.g.: Two people order products simultaneously while there's only a limited quantity available → this principle ensures order and isolation to avoid duplication
- Durability: Once a transaction is completed, its effects are permanent and will survive any subsequent system failures.
- E.g.: Even if the system crashes due to natural disasters, the state of successful transactions will still be preserved!
- ACID makes data be consistent across the entire network (when there are many servers/nodes)
- As a DE, understanding when your db needs to be ACID compliant can help you prevent disasters.
Lab 2: Interacting with Amazon DynamoDB NoSQL Database
- DynamoDB is a key-value database.
- DynamoDB is schema-less: each item can have its own distinct attributes.
- Simple primary key: partition key
- Composite Primary key: partition key & sort key (sort key must be uniq)
- Boto3: the AWS SDK for Python (Boto3) to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3). The SDK provides an object-oriented API as well as low-level access to AWS services. → Boto3 for DynamoDB
- Sample data in the lab.
- In this lab, we focus on these methods
- Create:
create_table
- Read:
scan
,get_item
,query
- Update:
put_item
,write_batch_items
,update_item
- Delete:
delete_item
- Connect
1import boto3
2client = boto3.client('dynamodb')
- DynamoDB data types descriptors:
"N"
for number,"S"
for string, and many more.
- After creating tables: Go to the AWS Console, search for DynamoDB, click on Tables on the left, and check that the tables have been created.
- Marshal JSON: similar to a regular JSON file but it also includes the types of each value.
1{
2 "<AttributeName>": {
3 "<DataType>": "<Value>"
4 },
5 "<ListAttribute>": {
6 "<DataType>": [
7 {
8 "<DataType>": "<Value1>"
9 },
10 {
11 "<DataType>": "<Value2>"
12 }]
13 }
14}
- ⭐ To fully grasp the commands and their functionalities, it's essential to examine the notebook in detail.
- We have to convert the usual JSON to Marshal JSON beforing inserting to DynamoDB table and we may need to convert from Marshal JSON to the usual to handle later in other services.
- Object Storage is no hierarchy: Even we can create and organize folders (in UI), subfolders in S3 but the actual storage mechanism is flat (all the files are stored right at the top level). ← it allows quick and straightfordward access to all objects without worrying about the overhead of a folder structure.
- Object can be any thing: csv (structured), text, json, mp4, mp4, png, bin,…
- Storing semi-structured (eg. JSON) and unstructured data (eg. images) → good to serve data for training ML models.
- Properties:
- Each object is assigned to UUID key (Universal Unique Identifier). Eg: 550e8400-e29b-41d4-a716-446655440000
- Each object has metadata (creation date, file type, owner,…)
- Objects are immutable.
- We can enable versioning → metadata version
- Why use Object Storage?
- Store files of various data formats without a specific file system structure
- Easily scale out to provide virtually limitless storage space
- Replicate data across several availability zones. Eg: S3 ~99.999999999% data durability
- Cheaper than other storage options
- Interact with S3 bucket with boto3.
- Commands
1# list all created buckets
2aws s3 ls
3
4# list all objects stored inside the bucket
5aws s3 ls $BUCKET_NAME
- Naming object key in S3
1object_key = 'csv/ratings_ml_training_dataset.csv'
Although S3 stores objects as flat, the UI displays the csv folder and the file inside.
- Query the data from csv file by using AWS Athena (check lab C1W2). More sources to read:
- It’s just a record of information about events that can serve to track the activitiy of the system.
- Monitoring or debugging a system: user activities, database updating, error tracking.
- We can apply logging systems to:
- Web server logs (user activity data) → analysis of user behavior patterns.
- Database system logs → track changes in source database.
- Security sustem logs → ML anomaly detection.
- Rich data source can support downstream use cases.
- Log levels: debug, info, warn, error, fatal
- Terminology:
- Event: something that happened in the world or a change to the state of a system. Eg: user clicking on a link, sensor measuring a teperature change.
- Message: a record of information about an event. It contains details, metadata, timestamp.
- Stream: a sequence of messages.
- If you want to handle chunks of this data all at once, like over a specific time interval, then that would be batch processing applied to a stream of messages.
- Streaming system takes actrion when each message is received.
- 3 components of a streaming system: event producer → event router / streaming broker → event consumer
- Event router / Streaming broker:
- Acts as a buffer to filter and distribute the messages
- Decouples producer from consumer
- Prevents message from being lost
- “Event” and “message” in real world can be used interchangeable.
- 2 main types of a streaming system: Message queue, event streaming platform.
- Message Queue: a queue/buffer that accumulates messages and delivers those messages to consumers asynchronously. Eg: Amazon Simple Queue Service (Amazon SQS)
- Event streaming platform: Unlike the message queue, the messages aren’t deleted from the logs and the data is persistent. Eg: kafka, Amazon Kinesis Data Streams.
- You may be in some cases that you cannot connect to a source system due tu:
- Improper iidentity and access managements (IAM) definitions.
- Broken networking configurations.
- Wrong set of access credentials.
- There are more than 1 way to connect to the database.
- You can find the information of the connection inside the AWS Console. However, this way isn’t consistent because AWS Console may change the UI in the future, let’s use programmatic way. ← use AWS CLI.
- Use SDK (boto3).
- Use API Connector (JDBC - Java Database Connectivity / ODBC - Open Database Connectivity API)
Connecting to an Amazon RDS MySQL Database
- Connect to MySQL using Python with pymsql
- 🚸 If you created an RDS database instance for practice, please remove it when you are done.
- The #1 root cause of cloud data breaches is human error: insecure storage of passwords, IAM misconfigurations.
- IAM is a framework for managing permissions. Permissions define which actions an identity
can perform on a specific set of resources.
- Other cloud providers have IAM too.
- AWS IAM gives different identities and policies to grant permissions for actions on AWS.
- Root user → no restricted.
- IAM User → has specific permission to certain resources (username, password, access key)
- IAM Group → A collection of users that inherit the same permission from the group policy.
- IAM Role → A user, application, or service that’s been granted temporary permissions.
- Example
- The way you configure your network plays a key role in ensuring that the data flows properly throughout your data pipeline.
- Region considerations:
- Legal compliance
- Latency: the closer your end users are to the region, the lower the latency.
- Availability: the more availability zones, the better you will be able to recover from a disaster.
- Cost
- Region > Zone > VPC (Virtual Private Cloud) > Subnets
- Core concepts: Amazon VPCs, subnets, gateways, route tables, network access control lists (ACLs) and security groups.
- VPC
- Scenario: A web application running on an EC2 instance that allows you to query a database running on RDS.
- There is default VPC in each region. Default VPC includes a public subnet in each availability zone in that region + an internet gateway. ← However, your default should not be to use the default VPC in real world!
- A region can have one or more VPCs in it.
- Resources in each VPC can communicate from each other.
- Communication between VPCs need to be configured.
- IPv4 CIDR (Classles Inter-Domain Routing) defines the range of private IP addresses that can be used within the VPC. Eg:
10.0.0.0/16
- Each number in the IP address is 8bit (from 0 to 255)
/16
= prefix length: How many bits used for the network part of the address. Eg.10.0.0.0/16
→ “10.0” will be the prefix → max 32 bits in the entire IP address. ← we need this when creating subnets.- Host address: any resource deployed into this network would have a private IP address.
- Subnet: smaller divisions of the private IP spoace for the VPC that you can use to group resources based on their network access and security requriements.
- Each subnet is associated to a specific AZ (Availability Zone).
- It's common to create at least one private and one public subnet per AZ that you intend to use here.
- Each subnet need to have an IP range that is a subset of the IP range of the VPC (eg.
10.0.
) → eg.10.0.1.0/24
- Upto now, the VPC we configured aren’t be able to connect to the internet even we set up EC2 inside the public subnet. ← VPC and subnets create an isolated network.
- Internet gateway likes a “door” of the house VPC.
- NAT Gateway (Network Address Translation Gateway): allows resources in a private subnet to connect to the internet or other AWS services. + prevent the internet from initiating connections with those resources.
- Allow applications in EC2 download the upgrades, patching… from the internet without exposing them.
- ALB (Application Load Balancer): distributes incomming ap traffic across multiple backend targets.
- Give users a way to submit requests to the app running on the EC2 instance.
- One VPC can only have 1 internet gateway at a time.
- Route Tables:
- Essential for directing network traffic within your VPC. Each subnet can be associated with a route table, which contains a set of rules or routes that determine where network traffic is directed
- When a VPC is created, AWS auto creates a default route table. ← resources in different subnets can communicate with each other.
- You need to configure the route internet connectivity.
- Without these routes, your subnets won't know how to direct traffic either to the Internet or within the VPC itself.
- In AWS Console → VPC → Route table → create → associate to a VPC and specific subnet.
- After creating route tables, you need to create routes.
0.0.0.0/0
→ can match any IP address (or entire internet) because zero bits are fixed.- Target of route configured for public subnet should be internet gateway.
- Target of route configured for private subnet should be NAT gateway.
- By default, no traffic is allowed to reach instances inside private subnets (eg. Amazon RDS, EC2) even the route tables in place. ← to change that: Security Groups and Network Access Control Lists (ACL)
- Security Groups: instance level virtual firewalls, controlling both inbound and outbound traffic. By default, outbound is allowed while inbound is denied. Security Groups are stateful (inbound created, outbound is created auto).
- Security Group Chaining:
- In AWS Console → VPC → Security Groups.
- Security Group Rules.
- Network Access Control Lists (ACL): Provide an additional layer of security at the subnet level. It’s stateless (you have to define both inbound and outbound rules explicitly). It’s useful for implementing security policies at the subnet level.
- Sum up:
- If you encounter connectivity issues:
- Verify that your VPC has an internet gateway properly attached
- Verify that the route tables have appropriate rules to direct traffic correctly
- Verify that the route table associations with the subnets are configured correctly
- Check security groups to make sure they have the needed rules in place
- Review network ACLs to confirm they allow the necessary traffic
- Double-check instance configurations to ensure they are associated with the correct security groups and subnets
What we’re going to do in this lab is to handle connection and permission issues.
- In case we create a new environment in Cloud9 but not set right the subnet configs. → We cannot connect to the db in rds (it freezes right after enter password)
1# Check the host name of rds
2aws rds describe-db-instances --db-instance-identifier <PostgreSQL-DB-name> --output text --query "DBInstances[].Endpoint.Address"
3
4# connect
5psql --host=<PostgreSQLEndpoint> --username=postgres --password --port=5432
🚨 It freezes!
- Not that, default port for postgre db is
5432
- Problem 1: the Cloud9 environment and RDS don’t place at the same VPS.
- Check RDS: AWS Console → RDS → Databases → DB identifier → (under Networking) VPC → take note “VPC ID”
- Check Cloud9: AWS Console → Cloud9 → Environments → <name of env> → Network Settings → Check VPC id
- Solution: delete current Cloud9 environement and create a new one (this time choose the same VPC as the one of RDS)
- Problem 2: RDS instance isn’t allowed access from instance of Cloud9.
- Solution: AWS Console → RDS → Databases → DB identifier → Connectivity & security → Click VPC security groups → Security group ID → Edit inbound rules → add the security group of the instance from EC2 (Cloud9)
- Add new rule: Custom TCP, Port range
5432
(default port of Postgre, for MySQL, default port would be3306
). ⚠️ Don’t choose0.0.0.0/0
(allow all connections from outside) → Paste the Security group id of EC2 instance. - Check the security group id of EC2 instance: AWS Console → EC2 → Instances → Instance ID → Security → Security Groups → Security Group ID → take note it.
- Problem 3: this time, we get this error
- This means 2 resources are talking to each other but the password is incorrect!
1psql: error: connection to the server at "de-c2w1a1-rds.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com" (xx.xx.xx.xx), port 5432 failed: FATAL: password authentication failed for user "postgres"
2connection to the server at "de-c2w1a1-rds.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com" (xx.xx.xx.xx), port 5432 failed: FATAL: no pg_hba.conf entry for host "xx.xx.xx.xx", user "postgres", database "postgres", no encryption
- Problem 4: After you run a python script to download the items from S3, there is an error
- Solution: AWS Console → S3 → bucket name → Permissions → Bucket Policy → Edit → Replace by below and then Save changes.
- Where to find Cloud9 instance IP Address: AWS Console → EC2 → Instances → Instance ID → Instance summary → Copy Public IPv4 address.
1Error downloading file: An error occurred (403) when calling the HeadObject operation: Forbidden
You don’t have permission to download the data!
1{
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Effect": "Allow",
6 "Principal": "*",
7 "Action": "s3:GetObject",
8 "Resource": "arn:aws:s3:::<YOUR-DATA-BUCKET>/csv/*",
9 "Condition": {
10 "IpAddress": {
11 "aws:SourceIp": "<YOUR-CLOUD9-ENV-IP-ADDRESS>"
12 }
13 }
14 }
15 ]
16}
Check more in the documentation: IAM JSON policy elements: Condition operators - AWS Identity and Access Management