ETL Pipelines
all about ETL pipelines scheduling
Data Pipeline Design Patterns
all about design patterns and implementation
Pipeline Terms
Idempotent
- One can run a data pipeline multiple times with the same input, and the output will not change
Backfilling
- add/modifying data to already existing dataset. Eg, apply change in logic to processed dataset, adding a new column from source to processed dataset. Most orchestration tools support backfilling.
Source Replayability
- ability of source to provide historical data. This is a requirement for backfilling. You can make non-replayable source replayable by dumping the incoming data into raw/loading zone at a certain frequency, say, daily. Then data is replayable to daily degree and not hourly.
- Replayable sources: Event stream, web server logs, a dump of database WAL (write ahead log) showing all the create/update/delete statements (CDC - Change Data Capture), etc
- Nonreplayable sources: Application tables that are constantly modified, APIs, etc., that only provide the current state of the data.
Source Ordering
- Is the data coming from source in order? Eg, event of checkout happens after click, log-out happens after log-in.
Sink Overwritability
- ability to modify based on unique key or run ID. Overwritability is crucial to prevent duplicates or partial records when a pipeline fails.
- Kafka queue is non overwritable, table entries without key.
Multi-hop pipeline
- After each transformation the data state is saved, this lets easy debug and lets start the pipeline from failed state, and hence only failed transformations are reprocessed.
Extraction Patterns
Time Ranged
Extract the data from source based on time frame. Eg, pull all data from yesterday.
Full Snapshot
- Entire data is fetched.
- Add run_id to build historic snapshot using which you can go back in time.
- Replace full data if history is not required.
Lookback
- This pull is used to build aggregated metric. Eg, active users in a month. This is used when source data is constantly updated and end user only care about current state. Usually required for dashboards needs.
Streaming
- Each record flows through pipeline.
Fail Recovery Patterns
Idempotent
Ability of a code to produce same result despite being executed multiple times. Eg, if ETL job is run multiple times, it should not load duplicate data. Eg, Pull daily data and dump into a dated folder. If you re-run this pipe, the folder content will get overwritten and hence the output remains same.
Re-run should not produce duplicates or incorrect values.
Eg, you need to re-run the ETL as you have to apply new transformation to filter out the records with, say, customers not having DOB. Now you will rerun the pipeline (also called Backfill) and this will re populate the table by overwriting the customer having dbo. However, you may have still have stale records having no dob from previous run. In this case you should do Delete-Write and can add delete all data from loading area before reloading after new transformation. Most libraries and frameworks offer an overwrite option (e.g. Spark overwrite , Snowflake overwrite ) which is safer than deleting and writing.
Link: SDE - How to make data pipelines idempotent
Self Healing
The pipeline will re-catchup the data on failure. Eg, if the data is inserted based on ID difference between source and sink. Then the re-run will fetch the IDs that were failed to load in previous run.
Coding Data Pipeline Patters
how to code in different patters
Coding in pattern can make pipeline robust, easy to test and maintain. Different patterns are required to cater different use cases and needs.
Functional Design
This is simple, where you define three functions extract()
, transform()
and load()
and then define a run()
function which calls all three.
This is good for simple use cases, however, at least try that they follow following standards:
- Atomicity - one function does only one work. eg,
load()
should not createdb_conn
- Idempotency - rerun should not produce duplicates or incorrect data. Eg,
load()
shoulddelete
thenwrite
. - No Side Effects - one function should not affect another function. eg,
load()
should not closedb_conn
as it is passed to it and is created in another function.
Factory Pattern
If you have similar ETL jobs, eg pulling data from Twitter/Reddit/Quora using APIs then you can code using factory pattern. If you have non-similar ETL jobs then this may not be good implementation.
In factory pattern:
- You define abstract class - this is structure with no logic
- You define concrete classes - these use abstract class and implement them with logic and ensure that the pattern is same as defined in abstract class
- You define the factory - which is nothing but a function, that takes in a param which tells which concrete class to return.
Eg:
import os
from abc import ABC, abstractmethod # python module to define abstract interfaces
# Abstract class with abstract methods
class SocialETL(ABC):
@abstractmethod
def extract(self, id, num_records, client):
pass
@abstractmethod
def transform(self, social_data):
pass
@abstractmethod
def load(self, social_data, db_conn):
pass
@abstractmethod
def run(self, db_conn, client, id, num_records):
pass
# Concrete implementation of the abstract Class
class RedditETL(SocialETL):
def extract(self, id, num_records, client):
# code to extract reddit data
def transform(self, social_data):
# code to transform reddit data
def load(self, social_data, db_conn):
# code to load reddit data into the final table
def run(self, db_conn, client, id, num_records):
# code to run extract, transform and load
# Concrete implementation of the abstract Class
class TwitterETL(SocialETL):
def extract(self, id, num_records, client):
# code to extract reddit data
def transform(self, social_data):
# code to transform reddit data
def load(self, social_data, db_conn):
# code to load reddit data into the final table
def run(self, db_conn, client, id, num_records):
# code to run extract, transform and load
# This "factory" will acccept an input and give you the appropriate object that you can use to perform ETL
def etl_factory(source):
factory = {
'Reddit': (
praw.Reddit(
client_id=os.environ['REDDIT_CLIENT_ID'],
client_secret=os.environ['REDDIT_CLIENT_SECRET'],
user_agent=os.environ['REDDIT_USER_AGENT'],
),
RedditETL(),
),
'Twitter': (
tweepy.Client(bearer_token=os.environ['BEARER_TOKEN']),
TwitterETL(),
),
}
if source in factory:
return factory[source]
else:
raise ValueError(
f"source {source} is not supported. Please pass a valid source."
)
# calling code
client, social_etl = etl_factory(source)
social_etl.run(db_conn, client, ...)
# code credit goes to link below
Now in the main()
you can call etl_factory()
.
Note:
- Please use factory pattern only when you data pipelines have similar structure.
- Ensure proper
logging
to know which function is executed.
Link: SDE - Factory Patter - Coding Pattern
Strategy Pattern
In simple terms, it takes out the actual transformation logic into separate function instead of being in the transformation() function itself.
This ensure separation of concerns (definition, creation & use) and hence enables easier code maintenance and testability.
Eg:
clean_name()
,format_date()
,calc_std_dev()
can be some of the transformations, now these are separate functions.- There can be a factory to return these functions as and when required. Factory is nothing but a function having a dictionary which has key as "some name to function" and value as the function itself. It return the value of dict based on key passed.
- Depending on what you want to do in transformation, you can pass pass that transformation string to factory and it will return that transformation function.
Note: The signature of transformation functions (input and output) is same, hence it is possible to make them switchable using factory pattern.
Singleton, & Object pool patterns
Singleton - Only one object can be created and used. Eg, db_conn
object. There can be only one database connection object and can be used across app.
Object Pool - There is a pool of object from which an object can be taken to do the work and can then returned back to pool once done. Object should be returned in its original form. Eg, A pool of database connection from which objects can be taken when required, enabling parallel processing.
Python Helpers
Python Type Validation
- It helps define param/return type so that run time issues are avoided.
typing
package has classes that can be used to define complex types.- Eg usage:
from typing import Callable, List
def transformation_factory(value: str) -> Callable[[List[SocialMediaData]], List[SocialMediaData]]:
pass
Here, Callable
defines that return type of function is a callable function. First param to Callable is input to callable function, Second param to Callable is return type of callable function.
MyPy
It is a python lib that helps validate python type checks. It is additional code check to coding standard like flake8
is for formatting.
DataClass
It helps store data as object of Dataclass. Instead of dictionary you can use class-object notation for handling data.
This will make this usable in class-object notation.
Using Data Class with JSON, like working with an API
Suppose you have JSON data coming from API and you want to load it into a class.
from dataclasses import dataclass
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class Person:
is_active: int = 0 # data type
title_code: str = 'Mr' # default value
first_name: str = ''
middle_name: str = ''
# define functions like this
def get_name(self):
name = self.first_name + ' ' + self.last_name
return name
To build the objects
# build obj from json
person_items = None
if res_json is not None:
person_items = []
for item in res_json['items']:
person = Person.from_dict(item)
person_items.append(person)
Here, you can also use Person.from_json(item)
if item
is JSON String.
Link https://realpython.com/python-data-classes/#default-values
Context Managers
When there is a network call, like reading file or database connection, we should close the connection once done to avoid memory leaks and free up resources.
Context manager makes a function context managed. A context managed functions can be called using with
keyword. Eg, with open(file):
.
Upon call, a context managed function will provide an object, which is provided by using yield
.
Upon end of with block, the execution is returned back to the context managed function which can handle closing of connection in finally
block.
from contextlib import contextmanager
class DatabaseConnection:
@contextmanager
def managed_cursor(self):
_conn = sqlite3.connect(self._db_file)
cur = _conn.cursor()
try:
yield cur # is provided to `with` block
finally:
_conn.commit() # executed after `with` block
cur.close()
_conn.close()
db = DatabaseConnection()
with db.managed_cursor() as cur: # cursor and connection are open
cur.execute("YOUR SQL QUERY")
# cursor and connection are close
Testing with pytest
Ensure code is correct and lets modify code with confidence.
- Fixtures - lets provide mock API data
- Schema Setup - Let define db-schema for tests, basically
setup
andteardown
at different level. - Mocking functions - lets run a function to be tested by modifying its behaviour.
Decorators
Decorators add functionality to other functions.
Python Best Practice
Makefile
You do make aliases, like one work for long commands. This can be for command to run pytest, lint-test, type-test, formatting-test, run ETL, docker up or docker down, etc.
Githooks
You do run pytest, lint-test, type-test, formatting-test etc while approving a PR, but can make this mandatory and make it run before commit by adding a pre-commit hook. this will run all these checks automatically to force them before commit.
ETL Pipeline
- Simple - Using Pandas to read data, transform it and load is a pipeline. Doing this in distributed environment is big data pipeline.
- Distributed - Using PySpark read data from DB is extraction. Do transformation like groupBy or mean, join.
- Code organization - say in
etl_somejob.py
- define extract functions. Eg:
def extract_table1_to_df():
that returns df. - define transform functions. Eg:
def transform_avg_score(df1,df2):
returns df. - define load function. Eg:
def load_df_to_db(df):
-
finally to execute them
- define extract functions. Eg:
- Now you can use this file to run or schedule.