# How to Write a Python ETL Pipeline

August 2021 · 9 minute read

Building ETL pipelines has become a crucial skill to master for data engineers, software developers, and data scientists alike.

In this tutorial, we cover the principles of building efficient and maintainable ETL data pipelines.

### What is an ETL pipeline?

ETL stands for Extract-Transform-Load. Essentially, it is a 3-part batch process for migrating data from one place to another. 1. Extract. In this first part, we define our data sources, and potentially determine whether any filtering needs to be done. This is especially important when dealing with large amounts of data. Any data quality checks on the data source may also be included here. 2. Transform. Once the source data has been extracted, a series of transformations can be applied. 3. Load. In the final part, the transformed data is usually merged into the target, using an upsert (update + insert).

### Batch vs. Streaming

There are 2 distinct data pipeline processing approaches, namely batch processing, and streaming. Whilst the focus of this article is on the batch processing, knowing the differences between the two helps understand what the best scenarios for ETL pipelines are.

In the batch processing approach, often synonymous with ETL, the data is processed by extracting large portions of the source data (or even the entire data set), transforming it, then fully loading it into the target destination.

Stream-based processing on the other hand aims to bring in the data as close to real-time as possible, at the expense of complexity.

Feature Batch Streaming
Easy implementation ✔️
Real-time ✔️
Easy to debug ✔️
Easy to monitor

### ETL Principles

• Separation of concerns. Each step in the pipeline should be concerned with one thing only. This helps keep the code well-structured and maintainable.
• Idempotency. Job runs should have the same effect no matter how many times they are executed. If a job is run 10 times on the same partition of the data, each data point should still appear only once in the target. Often times, this is easier said than done, but it’s important to strive for idempotent operations.
• Incremental load. The tables should timestamped, such that subsequent runs of an ETL jobs may only process the rows from the last processing time. This ensures runs of the ETL job are fast. Of course, the initial load of the data will be slower.
• Checkpointing. Intermediate steps in the ETL process must optionally be saved down, so they can be inspected in case bugs appear.

### When to create ETL pipelines?

Any piece of code that is not interactive and needs to be scheduled can be designed as an ETL job.

By interactive, we mean something where a user works with a service (e.g. via a UI, or via a command line) and expects almost immediate responses.

### ETL in Practice

Now, let’s get our hands dirty with an example ETL pipeline written in Python and pandas. Why Python and pandas? Because Python has become the lingua franca for data engineering, and pandas is its most common library for data wrangling. The aim of this is to have the entire process in memory and on disk, without having to set up any database. Plain CSVs and in-memory processing should illustrate the principles just as well as processing data within a database, which is often the natural system within which ETL jobs run.

📝 One great framework for ETL processing and scheduling is Airflow. We do not explore it here because it would distract us away from the core principles of ETL.

#### Atlanta crime example

The example I have chosen to illustrate an ETL pipeline is based on a dataset containing information about crimes in Atlanta. It is a smallish dataset of 25Mb so it should be pretty fast run.

Let’s imagine we are in charge of optimising the dispatching of police officers from our police stations and that the way we are going to do that in the first phase is by looking at individual datapoints and computing the distance between the places where the crimes happen and the police stations. We can then use this information to create outposts closer to the crime hotspots (we won’t do this in the current article, we’ll focus on the ETL pipeline).

Let’s start by importing the required packages. We use pandas and numpy for data wrangling, and pandera for schema checks.

import pandas as pd
import numpy as np
import pandera as pa
from pandera import Column, DataFrameSchema, Check, Index
from typing import Callable, Union, Optional


Using pandera, it’s easy to define schemas for your dataframes, with checks for column types and nullability, among others.

CRIME_TYPES = ['LARCENY-NON VEHICLE', 'AUTO THEFT', 'LARCENY-FROM VEHICLE',
'BURGLARY-RESIDENCE', 'ROBBERY-PEDESTRIAN', 'AGG ASSAULT', 'RAPE',
'BURGLARY-NONRES', 'HOMICIDE', 'ROBBERY-RESIDENCE',
'ROBBERY-COMMERCIAL']
atlanta_crime_schema = DataFrameSchema(
{
"crime_id": Column(int),
"crime": Column(str, Check(lambda s: all(crime in CRIME_TYPES for crime in s))),
"number": Column(int),
"date": Column(np.datetime64),
"location": Column(str, nullable=True),
"beat": Column(int, nullable=True),
"neighborhood": Column(str, nullable=True),
"npu": Column(str, nullable=True),
"lat": Column(float),
"long": Column(float),
},
index=Index(int),
strict=True,
)


The extract step is defined below and involves a series of transformations to the raw columns. Note how everything is immutable - we take in a dataframe and each transformation returns a new one, such that the previous dataframe is not modified. This way we ensure we do not accidentally modify state. Just think about all those times you lost track of the changes done to your dataframes in a jupyter notebook - immutability avoids that!

@pa.check_output(atlanta_crime_schema)
def preprocess_atlanta_crime(df):
return (
df.
assign(date=lambda x: pd.to_datetime(x.date)).
assign(beat=lambda x: pd.to_numeric(x.beat, errors='coerce', downcast='integer')).
rename(columns={'Unnamed: 0': 'crime_id'}).
astype({'beat': 'Int64'})
)


Here we define the three police stations. Nothing fancy, just some hard-coded random values that I eye-balled by looking at sample latitude and longitude data.

police_stations_df = pd.DataFrame({
'lat': [33.768, 33.752, 33.711],
'long': [-84.505, -84.360, -84.444],
'name': ['Atlanta PD Zone 1', 'Atlanta PD Zone 2', 'Atlanta PD Zone 3']
})


Now let’s define the functions for the more complex transformations. In this case, we define a helper function for the distance between the location of a crime and a police station. We define a helper function that takes in a dataframe with the crimes and a dataframe with the stations and computes the distances between all of them, in essence performing a Cartesian product. And finally, the apply_closest_police_station function, which gets then computes the distances and selects the closest station based on the minimum distance.

def distance_from_station(row):
return np.sqrt((row.lat - row.lat_station)**2 + (row.long - row.long_station)**2)

pa.check_input(atlanta_crime_schema, 'crime_df')
pa.check_output(atlanta_crime_schema)
def apply_dist_to_police_station(crime_df: pd.DataFrame, stations_df):
crime_copy_df = crime_df.copy(deep=True)
crime_and_stations_df = crime_copy_df.merge(police_stations_df, how='cross', suffixes=('', '_station'))
crime_and_stations_df['dist_to_station'] = crime_and_stations_df.apply(distance_from_station, axis='columns')
return crime_and_stations_df

pa.check_input(atlanta_crime_schema, 'crime_df')
def apply_closest_police_station(crime_df: pd.DataFrame, stations_df):
crime_copy_df = crime_df.copy(deep=True)
crime_with_dist_df = apply_dist_to_police_station(crime_copy_df, stations_df)
min_dist_to_station = crime_with_dist_df.groupby('crime_id')['dist_to_station'].min()
return crime_with_dist_df.merge(
min_dist_to_station,
how='inner',
on=['crime_id', 'dist_to_station'],
)


Finally, the end-to-end pipeline is defined simply by chaining together all the building blocks we wrote above, using the pipe method. This method is incredibly useful to designing pipelines in pandas, as it allows you to define your workflow in a declarative manner. The pipeline below reads from CSV, then preprocesses the data, applies some more complex business logic transformation, and finally loads the data back into a CSV.

(
pipe(preprocess_atlanta_crime).
pipe(apply_closest_police_station, stations_df=police_stations_df).
pipe(pd.DataFrame.to_csv, path_or_buf='crime_output.csv')
)


It is very similar to how piping works in bash or in powershell. You just need to pass in your functions, and it can even accept additional parameters to those functions. In our case, we assign the stations_df and the path_or_buf arguments. The method then returns a new dataframe (if you wrote your code in the functional paradigm 😉) that can then be piped into another transformation.

What is nice about the way we have defined our pipeline is that we can very easily swap in-and-out different steps, as well as add or remove steps. This level of abstraction is flexible and powerful.

For example, instead of loading the transformed data back into a CSV, we can save it to a database. Only the last line of the ETL pipeline needs to change. If we need to change our business logic to compute the distance using actual street-level distances instead of straight lines, we only need to modify the business logic transformation functions. Or even better, we can parameterise that function to accept a strategy function that determines how the distance is computed. Additional transformations are also easy to include - just pipe another method.

We follow all four basic principles. We have separation of concerns because each step concentrates solely on extracting, transforming, or loading the data, without interfering with the other steps. We have idempotency thanks to immutability and lack of side-effects. We have the possibility of adding incremental load by using timestamps and by adding date range parameters to the extract step. We also have the ability to add checkpointing by saving intermediary steps to local disk, or S3, without interferring with other steps because each method is well abstracted.

### What next?

We have this beautiful pipeline, but how do we schedule it to run regularly? Our pipeline does not have any logic related to the time of execution, and that’s by design. It helps keep responsibilities separated. The pipeline is only tasked with executing the code, not to schedule it. For scheduling, I would recommend Airflow, or if you do not have the time to learn it and deploy it, plain old crontabs with a wrapper around the pipeline.

One feature that should be added to facilitate incremental loads is adding a date range parameter to the extract method, such that newly added crime datapoints can be picked up without having to load the entire dataset.

📝 Note that, in the example with the Atlanta crime, we assume crime datapoints arrive in chronological order - a new datapoint cannot have a date previous to the greatest date for an already existing datapoint. If the datapoints had both the date and the timestamp when the datapoint was registered, we would be able to do the incremental load based on the timestamp instead, which is the proper way to do it.

### Tools

Other tools that may be useful are: - Apache Airflow for orchestation. - dbt for transformations. - Kubernetes for infrastructure. - great expectations for data quality. - Apache Beam for combining batch and streaming processing. The Python SDK is rather immature at the moment, so I would wait until more features are added before jumping in.

### Conclusion

In this article, we briefly went over the theory and principles of well-designed ETL pipelines, and then we explored a realistic example of how to build one such pipeline in Python. Hopefully this article taught you something new and hopefully it convinced you why functional pipelines are useful. These principles and tips should be applicable to both data engineers and data scientists.