Is the COVID death rate related to nutrition? A data engineering setup

Rubén Chuliá Mena
27 min readJan 3, 2023

--

In this article, we are going explore a simple data analysis case (the case is quite complex, but we are not going to dive deep into the analysis) as an excuse to use three essential data engineering tools: Terraform, AWS Redshift, and Airflow.

This project will allow you to understand the role of these tools, how to make them work together, and the importance of data modelling. You can use this project as a starting point for more complex use cases.

In the data engineering field, it is important to start with the end in mind, so you can create the data model and design your pipeline accordingly. Of course, the first version is never the final version, but things get a lot easier when your first attempt is accurate.

In our case, the final question we are trying to answer is:

Are COVID health effects related to nutrition?

To answer that question, the data we are going to load and explore is related to health factors in the USA. We will use data from three different sources so we can face common problems when merging different data sources. These datasets are related to health in the United States of America.

With them, we are going to set up a data warehouse in AWS Redshift that is ready to be analyzed so one can discover relationships between the COVID impact in each USA county and different social and health variables such as, for example, how many COVID vaccines the population received, or what percentage of the population has low income or low access to food stores.

The creation of tables and loading of data is orchestrated in Airflow. This tool is already mature and used in many companies to manage their data pipelines, making it a good choice. The fact that has a big community of users makes finding the solution to issues very easy.

The creation of the AWS infrastructure has been done using an Infrastructure-as-Code approach with the software Terraform. AWS is one of the three top companies that offer cloud services related to data engineering, and one of their services, AWS Redshift, is a very potent database system nicely designed for data warehousing activities, making it ideal for this project.

Concerning the choice of deploying the infrastructure with an Infrastructure-as-Code approach, it makes the development easier. Having to perform the same steps in the web user interface over and over is tedious and prone to errors. Having it coded allows for replicability, version control, and the ability to share the design easily. AWS has its language for deploying infrastructure, called Cloudformation.

However, I chose Terraform because it is a platform-agnostic tool used by many users all over the world. I thought that if I am going to learn to use Infrastructure-as-Code, it would be more useful to learn a tool that allows me to work in other clouds too (e.g., GCP or Microsoft Azure).

The demonstration of how this data can be queried and analyzed has been done in a Jupyter Notebook. I chose this because it is the standard tool for data analysis. It is easy to set up, and good for exploring data sets, prototyping solutions, and explaining lines of thought. So, as the purpose of this project is not to come up with a production-ready system, it is a good choice.

An important note is that this is NOT a tutorial about Terraform, AWS, or Airflow. This article is just for inspiring experienced users with an example of how to use these tools together. You are not going to learn to use these tools from reading this article. There are plenty of tutorials(official and non-official) on these topics. Please work on them, and understand them :)

All the code used in this article is put together in this GitHub repository. In the article, I will explain fragments of the code, but for having a working version it is better that you refer to the repository.

Datasets

I have used three different datasets:

  • Healthy food choices and availability in the US.
  • COVID cases registry.
  • COVID vaccination statistics.

All of them provide the data in CSV format, which is very convenient since AWS can work easily with CSV files.

Healthy food availability

Data about food choices is here:

Food Environment Atlas

COVID Cases

This is a dataset about COVID cases for each county and date:

US counties COVID-19 dataset

COVID Vaccination

This dataset is about vaccination

COVID-19 Vaccinations in the United States,County | Data | Centers for Disease Control and Prevention

Another interesting source that could also be used for future extension

This dataset is quite promising:

USA COVID-19 Vaccinations

However, don’t get caught up with the specific datasets. You can use the ones I propose if you want to replicate your results, but you can also choose others. This is just for explanation purposes.

Data Model

Now we are going to define a data model. In the one I propose, there are four tables with different data about counties and states

  • CountyHealthData
  • StateHealthData
  • CovidCases
  • CovidVaccination

and two tables that just contain basic information (name and population) about each county and each state.

The key variable that relates all tables is the FIPS (each region in the United States is identified by a numeric code called FIPS, Federal Information Processing Standards).

This is the final data model we want to have in the final data warehouse. It will be very convenient for being analyzed.

ETL Overview

ETL stands for Extract, Transform, Load. It is a process in data warehousing in which data is extracted from various sources, transformed into a format suitable for analysis and reporting, and then loaded into a data warehouse or other data repository.

The extract phase involves extracting data from various sources, such as databases, flat files, or APIs. The transformation phase involves cleaning and organizing the data, as well as performing any necessary calculations or conversions. This may include tasks such as filtering out unwanted data, aggregating data from multiple sources and applying business rules. The load phase involves loading the transformed data into the target data warehouse or repository.

ETL is a common way to move and integrate data from disparate sources, and this is exactly what we need to do.

Extract

In the extract phase, we will just get the data out of the CSV files and load it into staging tables. In this phase, there will be a first step in which we will load the files into AWS S3 so they are easily accessible in the AWS ecosystem. In a second step, we will load the raw data as it is into staging tables in the AWS data warehousing service called Redshift.

Wait a moment. What is a staging table? A staging table is a temporary table used to hold data that is being prepared for insertion into a target table. Staging tables are often used in ETL processes to store data that has been extracted from various sources, such as databases, flat files, or APIs before it is transformed and loaded into the target table. In our case, these various sources are just CSV files.

Staging tables can be useful for several reasons:

  1. They allow data to be cleaned, transformed, and validated before it is loaded into the target table, which helps ensure the integrity and quality of the data.
  2. They can be used to stage data from multiple sources, which can then be merged and loaded into the target table in a single batch.
  3. They provide a temporary storage location for data that is being transformed or loaded, which can be useful if the transformation process takes a long time or if the load process needs to be retried.
  4. They can be used to create a history of changes to the data, as the data in the staging table can be compared to the data in the target table to track changes over time.

Overall, staging tables are a useful tool for managing and preparing data as it is moved from various sources into a target data repository.

For simplicity’s sake, we will only transform data in one of the datasets. The other two are going to be left as they are and loaded directly into the final analysis tables that will be analyzed.

These two steps can be summarized in this diagram:

Transform

In the transformation step, we will perform three main tasks.

Task 1: pivot tables

To explain why this step is necessary, let’s take a look at the CSV file called StateAndCountyData.csv:

FIPS,State,County,Variable_Code,Value
1001,AL,Autauga,LACCESS_POP10,18428.43969
1001,AL,Autauga,LACCESS_POP15,17496.69304
1001,AL,Autauga,PCH_LACCESS_POP_10_15,-5.056025704
1001,AL,Autauga,PCT_LACCESS_POP10,33.7696573
1001,AL,Autauga,PCT_LACCESS_POP15,32.06225475

As we can see, the fourth column is called Variable_Code. In the transform phase, we are going to change the format from long to wide, i.e. we will perform an unpivot operation to get all data for one given county into the same row, instead of being distributed in multiple rows as in this case:

+======+=======+=========+===============+===============+=======================+=====+
| FIPS | State | County | LACCESS_POP10 | LACCESS_POP15 | PCH_LACCESS_POP_10_15 | ... |
+======+=======+=========+===============+===============+=======================+=====+
| 1001 | AL | Autauga | 18428.43969 | 17496.69304 | -5.056025704 | ... |
+------+-------+---------+---------------+---------------+-----------------------+-----+
| 1002 | ... | ... | ... | ... | ... | ... |
+------+-------+---------+---------------+---------------+-----------------------+-----+

The same pivot operation will be applied to the state data, stored in the file SupplementalDataState.csv.

Task 2: divide state data into nutrition table and state population table

In our final data model, the general information about a state (like the population or the name) and the information about nutrition are divided into two different tables.

However, in the data sources, all that information comes in just one file, SupplementalDataState.csv.

The second task of the transformation phase will be precisely dividing this information into two tables.

Task 3: get county FIPS for the county general data table

In our data model, the general data (population and full name) of each county is in its table, called County. In that table, the only reference to the state to which a county belongs is the state FIPS code. The problem is that the population data is in a CSV file called SupplementalDataCounty.csv in which we don’t have available the state FIPS, only the name. Hence, we need to combine that data with the information in the State table.

Load

The Load phase consists in just loading the transformed data into the target tables in the data warehouse.

Although conceptually transforming and loading are different phases, we can perform both with the same SQL statement in practice.

The final diagram of the ETL process is this:

ETL diagram

Infrastructure creation

The infrastructure we need will be located in two places: our local machine and the AWS cloud. In our local machine, we will have an Airflow instance (more on these below) that will orchestrate all the processes. The load of the processing will be performed by AWS. In particular, we will need an S3 bucket and a Redshift cluster.

For simplicity purposes, the Airflow instance will be run inside a Docker container, whereas the AWS infrastructure will be created using an Infrastructure-as-code approach using Terraform (more on this below).

Create AWS infrastructure with Terraform

Terraform is an open-source infrastructure-as-code software tool created by HashiCorp. It allows users to define and provision infrastructure resources in a safe, predictable, and reusable manner.

With Terraform, you can write configuration files that describe the infrastructure resources you need, and then use Terraform to create and manage those resources. This makes it easy to automate the process of setting up and maintaining infrastructure and enables you to use infrastructure as code to manage and version your resources.

Terraform supports a wide range of resource types, including low-level components such as compute instances and storage, as well as higher-level resources such as DNS records, container clusters or databases. It can be used to manage infrastructure resources on a variety of cloud providers, such as AWS, Azure, and Google Cloud, as well as on-premises environments. We will deal with AWS exclusively.

Terraform is a powerful tool for building, changing, and versioning infrastructure safely and efficiently. It is widely used in the DevOps community to automate infrastructure provisioning and management tasks.

The resources we are going to need to create are mainly two: an AWS S3 bucket for uploading our CSV files, and an AWS Redshift instance for our data warehouse.

The detailed process of how we can create these resources with Terraform is out of the scope of this article. There are thousands of tutorials in articles over the Internet and in the official documentation of these tools. If you have never used Terraform or AWS before, let me warn you that making it work is going to be a frustrating process because there are a lot of little details one must take into account. This is not easy, but it is worth the effort.

However, I am going to provide you with the code I used, so you can use it too. If you are an experienced user, you might make it work on the first attempt. If you are not, use it as a reference but beware that you will need to understand how all this works and follow some tutorials on your own until you get it.

The code is this:

variable "aws_region" {
type = string
default = "eu-west-3"
}

variable "aws_profile" {
type = string
default = "my_aws_profile"
}

variable "database_name" {
type = string
default = "health_data"
}

variable "login" {
type = string
default = "dwhusername"
}

variable "password" {
type = string
default = "SomePassw0rd"
}

variable "dwh_iam_role_name" {
type = string
default = "dwhRole"
}

variable "s3_bucket" {
type = string
default = "covid-csv-files-bucket-name"
}


provider "aws" {
region = var.aws_region
profile = var.aws_profile
}

module "storage" {
source = "git::https://github.com/rubchume/TerraformAWSexamples//aws_redshift_example?ref=de3f1b6c4a6c0f70941e872f6df44ceaeaaba184"

aws_region = var.aws_region
aws_profile = var.aws_profile

dwh_iam_role_name = var.dwh_iam_role_name

vpc_cidr = "10.0.0.0/16"
redshift_subnet_cidr_1 = "10.0.1.0/24"
redshift_subnet_cidr_2 = "10.0.2.0/24"

subnet_availability_zone = "${var.aws_region}a"

rs_cluster_identifier = "dwh-cluster"
rs_database_name = var.database_name
rs_master_username = var.login
rs_master_pass = var.password
rs_nodetype = "dc2.large"
rs_cluster_type = "single-node"
rs_cluster_number_of_nodes = 1
}


resource "local_file" "output_variables" {
filename = "${path.module}/redshift.env"
content = <<EOF
redshift_host=${module.storage.redshift_cluster_dns_name}
redshift_port=${module.storage.redshift_cluster_port}
EOF
}


resource "aws_s3_bucket" "s3" {
bucket = var.s3_bucket

tags = {
Name = "COVID CSV files bucket"
Environment = "Dev"
}
}


output "redshift_host" {
value = module.storage.redshift_cluster_dns_name
}

output "redshift_port" {
value = module.storage.redshift_cluster_port
}

Pay attention to the fact that I am using a public module from a GitHub repository that I created. You can review the code. You MUST review it.

AWS is not free. AWS Redshift has a cost that depends on the time and the type of instance. I practised with this infrastructure lots of times (deploying it for working with it between one to two hours a day, and then destroying it every day when I don’t have it anymore) and the cost has never been higher than 30$ a month. However, this can change. If you accidentally leave the AWS Redshift cluster active because you forget to destroy it, you can get an unpleasant surprise at the end of the month. So, ALWAYS review the Terraform code you are going to use.

Once the file is created, name it something like main.tf. You can deploy it by executing the next shell script:

source .env

terraform init
terraform get -update
terraform apply -lock=false \
-var="database_name=$database_name" \
-var="login=$login" \
-var="password=$password" \
-var="aws_region=$aws_region" \
-var="aws_profile=$aws_profile" \
-var="s3_bucket=$s3_bucket" \
-var="dwh_iam_role_name=$redshiftIAMRole" \
-auto-approve

where .env is a file containing your custom variable values:

database_name=my_database_name
login=whatever
...

You can also use a (generally) more convenient way, which is placing the custom variables in a file called terraform.tfvars and executing:

terraform apply -var-file="terraform.tfvars"

I used the first way because it facilitated the integration with Airflow when I was practising with it (check out the full Git repository for more details). However, you can use the solution you find more convenient. The important thing is that you have the S3 bucket and Redshift cluster created.

If you are new to this, you won’t have understood anything probably. Don’t be afraid. Just take your time to research the topic on your own and follow some tutorials.

Infrastructure creation with Terraform

Run Airflow instance

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It is designed to simplify the management of complex workflows and data pipelines by providing a simple, flexible, and powerful platform for the definition, execution, and monitoring of workflows.

Airflow is often used in data engineering to build, schedule, and monitor pipelines that move and transform data from one location to another. It can also be used to automate tasks such as data ingestion, data transformation, data loading, and data quality checks.

Airflow is written in Python and uses a Pythonic syntax for defining and scheduling tasks. It also provides a web interface for visualizing and managing the workflows, as well as a command-line interface for interacting with the platform.

Airflow has a modular architecture and a large ecosystem of plugins that extend its functionality. It can be integrated with other tools and platforms, such as databases, messaging systems, and cloud services (including AWS), to create complex data pipelines and workflows.

The simplest way I have found for using Airflow is, not installing it directly on my laptop, but just downloading the official docker-compose.yml file and just execute it with the Docker Compose tool (Docker is also out of the scope of this article; look for more information if needed). The file can be found here. Once downloaded, place it in a folder called airflow and execute it by running:

docker-compose -f airflow/docker-compose.yml up --detach

This will start multiple containers that are all part of the Airflow ecosystem. Access the GUI in http://localhost:8080/home and enter username and password airflow (as they appear in docker-compose.yaml).

When the containers are up and running, three folders are created in the same folder where the docker-compose.yaml is located:

  • dags
  • logs
  • plugins

These folders are of volumes defined in the docker-compose.yml file. You can add other volumes, like a configuration folder for importing connections and other configurations:

volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./configuration:/opt/airflow/configuration

I recommend modifying the docker-compose.yml file to assign a predefined and easily identifiable name for the airflow scheduler container. We will use it later for easily configuring Airflow:

airflow-scheduler:
container_name: airflow_scheduler

For Airflow to run any tasks, they must be organized in DAGs. A DAG (Directed Acyclic Graph) is a collection of all the tasks you want to run. It defines an execution plan for your workflows, allowing you to specify which tasks should run and in what order they should be executed. Tasks within a DAG can be organized and arranged in any way, as long as they form a directed acyclic graph, meaning that they do not form any loops and there is a clear start and end point. You can also set up dependencies between tasks so that certain tasks only run if others have been completed successfully. This allows you to build complex and scalable data pipelines that can handle large amounts of data and run reliably.

A DAG is represented as a Python script, in which you define a set of tasks and specify the dependencies between them with pythonic syntax. Tasks can be any type of action, such as running a Python script, executing a SQL query, or calling an API. Your DAGs defined in Python should be loaded in the dags folder. Later we will see what those files look like.

From the Python code, we can use parameters that can be set by the user from the user interface or programmatically. These parameters can be mainly of two types: variables and connections. Variables are just what they sound like, variables. You assign an arbitrary value (a string, a number, etc.) to a variable name. You can set them with the next command:

docker exec airflow_scheduler airflow variables set variable_name "variable_value"

The first part of the command (docker exec airflow_scheduler) just tells Docker that we are going to execute a command inside the container called airflow_scheduler, which is the name we purposefully chose for the scheduler container.

The second part (airflow variables set variable_name "variable_value") is the Airflow CLI command used to set variables.

Connections are kind of collections of variables with a certain structure designed to store information about how to connect to external systems, such as databases or cloud storage services (e.g. AWS Redshift). They typically include credentials and other details required to establish a connection, such as hostnames, port numbers, and database names.

They can be set with the airflow connections add command providing the values in JSON format:

docker exec airflow_scheduler airflow connections add 'redshift_default' \
--conn-json '{
"conn_type": "redshift",
"login": "'$login'",
"password": "'$password'",
"host": "'$REDSHIFT_HOST'",
"port": '$REDSHIFT_PORT',
"schema": "'$database_name'",
"extra": {
"region": "'$aws_region'"
}
}'

You might notice we are using some Bash variables ($login, $password, etc.). These must be the same ones we used for creating the AWS infrastructure with Terraform. That is why I found it more convenient to use a single .env file that contains all those variables.

There are two variables, REDSHIFT_HOST and REDSHIFT_PORT, that are not in the .env file. This is because we don’t know the Redshift cluster host and port until it is started, i.e. we cannot choose them. However, once Terraform starts it, it can provide us with those values. That is why I included those variables as outputs in the Terraform file. To query them from bash, we just do it like this:

REDSHIFT_HOST=$(terraform output --raw redshift_host)
REDSHIFT_PORT=$(terraform output --raw redshift_port)

The complete script that runs the Airflow instance with the necessary variables and connections is scripts/airflow_up.sh in the GitHub repository. Review that file and see if you understand why every line is there.

The general schema of what we did looks like this:

ETL tasks

Upload CSV files to an S3 bucket

The first step is to upload the CSV files with the data we are going to analyze to the created S3 bucket. This step is of course essential, but I conceive it as being separated from the rest of the tasks. Moreover, during development is much more convenient to upload them once and then test the rest of the tasks many times until they are completely debugged. Uploading the files again every time I change something small in my code is very inefficient. This is why I place it in a separate DAG. Of course, there is a lot of subjectivity in this approach. In the end, this is a demonstration article, i.e. you can arrange your tasks however you find better. So, how do we do this?

Remember I told you that Airflow is wonderfully integrated with cloud platforms through plugins? One type of plugin that can be added to Airflow is a provider. Airflow providers are extensions that allow Airflow to integrate with other tools and platforms, such as AWS.

The AWS provider has a very convenient Airflow operator called LocalFilesystemToS3Operator:

upload_vaccinations_csv = LocalFilesystemToS3Operator(
task_id="upload_vaccinations_csv",
filename="./data/COVID-19_Vaccinations_in_the_United_States_County.csv",
dest_key=Variable.get("covid_vaccinations_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

This operator is pretty self-explanatory. Notice that we use the syntax Variable.get("variable_name") to get the value of variables.

The first DAG uses this operator 5 times, to upload the 5 CSV files. Simple and effective:

from datetime import datetime

from airflow.decorators import dag, task
from airflow.decorators.task_group import task_group
from airflow.models.variable import Variable
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator


@dag(
schedule_interval="@once",
catchup=False,
start_date=datetime.now()
)
def upload_csv_files_to_s3():
upload_vaccinations_csv = LocalFilesystemToS3Operator(
task_id="upload_vaccinations_csv",
filename="./data/COVID-19_Vaccinations_in_the_United_States_County.csv",
dest_key=Variable.get("covid_vaccinations_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

upload_covid_cases_csv = LocalFilesystemToS3Operator(
task_id="upload_covid_cases_csv",
filename="./data/CovidCases.csv",
dest_key=Variable.get("covid_cases_data_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

@task_group(group_id="upload_nutrition_csvs")
def upload_nutrition_csvs_group():
upload_state_and_county_csv = LocalFilesystemToS3Operator(
task_id="upload_state_and_county_csv",
filename="./data/StateAndCountyData.csv",
dest_key=Variable.get("county_state_data_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

upload_supplemental_data_county_csv = LocalFilesystemToS3Operator(
task_id="upload_supplemental_data_county_csv",
filename="./data/SupplementalDataCounty.csv",
dest_key=Variable.get("county_supplemental_data_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

upload_supplemental_data_state_csv = LocalFilesystemToS3Operator(
task_id="upload_supplemental_data_state_csv",
filename="./data/SupplementalDataState.csv",
dest_key=Variable.get("state_supplemental_data_csv_name"),
dest_bucket=Variable.get("s3_bucket"),
aws_conn_id="aws_s3_connection",
replace=False,
)

upload_nutrition_csvs_group()


upload_csvs_dag = upload_csv_files_to_s3()

Notice how we use the @dag decorator to create the dag, and how we configure it. Airflow documentation is vast and daunting, but have patience and explore it as you encounter new needs.

Once the DAG file is located in the dags folder, you can visit http://localhost:8080/home and execute it from the UI.

After executing this in Airflow, we will have finished uploading the files to S3:

Extract: Load file from S3 to Redshift

Now that we have the files inside the AWS ecosystem, we can load the data easily to Redshift. The first step is of course to create the staging tables (or final tables if no data processing is required). This can be achieved with a simple CREATE TABLE SQL statement. For example, for the county nutrition data:

CREATE TABLE IF NOT EXISTS CountyStateHealthDataStaging (
FIPS varchar(100) not null,
State varchar(300) not null,
County varchar(300),
Variable_Code varchar(300),
Value real
);

To trigger the execution of this statement in Redshift from Airflow, we can use the RedshiftSQLOperator. Just save the previous statement in a file called, for example, create_county_state_data_table.sql, and place it in the dags/sql directory. Then in the DAG file, you can use it like:

create_county_state_data_table = RedshiftSQLOperator(
task_id="create_county_state_data_table",
sql="sql/create_county_state_data_table.sql",
)

Once the table is created, we can populate it with data. AWS Redshift works with a very enriched version of SQL. One of the capabilities is that with the COPY command one can read data in CSV format and load it directly into a table. The next example is for uploading the data for the CountyStateHealthDataStaging table:

COPY CountyStateHealthDataStaging
FROM 's3://covid-csv-files-bucket-name/StateAndCountyData.csv'
IAM_ROLE 'arn:aws:iam::1234:role/redshiftrole'
IGNOREHEADER 1
FORMAT AS CSV

Here 1234 must be substituted by your AWS account Id and redsfhiftrole by the role associated with the Redshift cluster. For each file you want to load, you must use the right path to the file in the bucket.

However, as the bucket name, the file name, the account id and the Redshift role might change in different deploys, it is better to use a more parametrizable solution. Airflow can help us achieve that. First, we are going to save the file using a special templating syntax that Airflow provides:

COPY CountyStateHealthDataStaging
FROM 's3://{{params.s3_bucket}}/{{params.county_state_data_csv_name}}'
IAM_ROLE '{{params.iam_role}}'
IGNOREHEADER 1
FORMAT AS CSV

Now, we will use RedshiftSQLOperator again, and we will use the params argument to provide values for the parameters:

load_covid_cases = RedshiftSQLOperator(
task_id="load_covid_cases_data",
sql="sql/load_covid_cases_data.sql",
params=dict(
s3_bucket=Variable.get("s3_bucket"),
covid_cases_data_csv_name=Variable.get("covid_cases_data_csv_name"),
iam_role=Variable.get("iam_role_arn")
)
)

Once we load the 5 files, we will have 3 staging tables and 2 final tables (ready to be queried and analysed) populated:

Transform

Pivot tables

To pivot tables we are going to use the same Airflow syntax as before, as well as the RedshiftSQLOperator. The big change is going to be made in the SQL statement.

CREATE TABLE CountyHealthData AS
SELECT * FROM (SELECT lpad(FIPS, 5, '0') AS FIPS, Variable_Code, Value FROM CountyStateHealthDataStaging)
PIVOT (
AVG(Value) FOR Variable_Code IN (
{{ ti.xcom_pull(key="return_value") }}
)
);

What the hell is all of this? Fair question. We just took a big step towards complexity.

Multiple small details must be explained to understand what this does. The first line is easy to explain. We are just creating a new table called CountyHealthData. We will populate that table with the results of the query that is performed from the second line onwards.

This query, as all queries, starts with the SELECT keyword. We are selecting everything from a subquery. In that subquery, the FROM clause indicates that our data comes from the CountyStateHealthDataStaging table, which we just created in the extract phase.

From that table, we are going to get three columns: FIPS, Variable_Code and Value. The FIPS is the unique code that identifies each county. So, each row will associate a county with a variable and a value for that variable. Not so difficult so far. Now the first strange element comes in.

Do you remember that our CSV files come from 3 different sources? In those sources, the FIPS code has different formats. In some of them, the 123 value would be written as “123” and in others, it would be written as “00123”. To unify the format, we use the lpad function to indicate that we want to pad the FIPS with 0s to the left. That way we ensure we get the same format as in the other sources.

Then the main element of this query comes in: the PIVOT clause. Pivoting a table means rotating the data in the table so that a given column becomes the columns’ values. For example, if you have a table like this

+--------+---------+-------+
| Region | Product | Sales |
+--------+---------+-------+
| North | A | $100 |
| North | B | $200 |
| South | A | $300 |
| South | B | $400 |
| East | A | $500 |
| East | B | $600 |
+--------+---------+-------+

You can pivot the “Product” column and get something like this

+-------+------------+------------+
| | Product A | Product B |
+-------+------------+------------+
| North | $100 | $200 |
| South | $300 | $400 |
| East | $500 | $600 |
+-------+------------+------------+

You can find the details about the PIVOT clause here. The previous PIVOT statement is saying that the column we are going to pivot is Variable_Code. For each variable code, if it finds more than one row with the same FIPS (the same county), then it will take the average value. Finally, it is going to create one new column for each variable code present in {{ ti.xcom_pull(key="return_value") }}. What is that? You will be asking.

If you are familiar with Airflow you will know this is using the XCOM interface for passing data from one task to the next. The task that precedes the pivoting task finds all the values for the variable codes in the CountyStateHealthDataStaging table.

In SQL it can be queried with just one line

SELECT DISTINCT Variable_Code FROM CountyStateHealthDataStaging

But for doing this with Airflow and passing the data in the right format, you will need to write quite a few lines. Here are the main parts of the code that performed this task (see the full code in the repository):

...

@task(task_id="get_variable_codes")
def get_county_state_variable_codes(**context):
hook = PostgresHook(postgres_conn_id="postgres_default")
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT DISTINCT Variable_Code FROM CountyStateHealthDataStaging")
sources = cursor.fetchall()
variable_codes = [
f"'{source[0]}'"
for source in sources
]
return ", ".join(variable_codes)

get_county_state_variable_codes_task = get_county_state_variable_codes()

pivot_county_state_data_table = RedshiftSQLOperator(
task_id="pivot_county_state_data_table",
sql="sql/pivot_county_state_data_table.sql",
)

...

get_county_state_variable_codes_task >> pivot_county_state_data_table

Divide state data into nutrition table and state population table

This task is pretty simple. There is the table about state nutrition data, SupplementalDataStateStagingPivoted, and we are going to take some columns to create a new table that will only contain non-health related data (state name, fips, population), and will use the rest of the columns to create another table with the health-related data.

CREATE TABLE StateHealthData AS SELECT * FROM SupplementalDataStageStagingPivoted;

ALTER TABLE StateHealthData DROP COLUMN State;
CREATE TABLE State AS (
SELECT DISTINCT State_FIPS AS FIPS, State, State_Population_2018 AS Population FROM SupplementalDataStageStagingPivoted
);

Notice that there are multiple SQL statements here. This is a problem because the RedshiftSQLOperator can only handle one statement at a time. To execute multiple statements I created a wrapper class that handles this:

from pathlib import Path

from airflow.models.baseoperator import BaseOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
import sqlparse


class RedshiftSQLOperatorMultipleStatements(BaseOperator):
def __init__(self, sql_file: str, **kwargs) -> None:
super().__init__(**kwargs)
self.sql_file = sql_file
def execute(self, context):
sql_statements = self.get_sql_statements(self.sql_file)
for sql_statement in sql_statements:
redshift_sql_operator = RedshiftSQLOperator(
task_id=self.task_id,
sql=sql_statement
)
redshift_sql_operator.execute(dict())
@staticmethod
def get_sql_statements(sql_file):
with Path(sql_file).open("r") as file:
sql_string = file.read()
return sqlparse.split(sql_string)

You can use it exactly as the original operator:

divide_into_health_data_and_state_data_tables = RedshiftSQLOperatorMultipleStatements(
sql_file="dags/sql/divide_into_health_data_and_state_data_tables.sql",
task_id="divide_into_health_data_and_state_data_tables",
)

Get county FIPS for the county general data table

As explained before, we need to combine the county data with the information in the recently created State table. Combining in SQL language means joining.

CREATE TABLE County AS (
SELECT DISTINCT lpad(c.FIPS, 5, '0') AS FIPS, s.FIPS AS StateFIPS, c.County, c.Value AS Population
FROM SupplementalDataCountyStaging AS c
INNER JOIN State AS s ON TRIM(' ' FROM c.State) = s.State
WHERE Variable_Code = 'Population_Estimate_2018'
)

There is not much to explain here. Just an INNER JOIN of two tables. Pay attention to the convenient functions like lpad and trim, which allow us to work with files in different formats by generating a unique format.

After these transformations, we already completed all the ETL steps:

Quality Checks

At the end of the pipeline, there is a last task that checks that the data has been loaded correctly. This step is always going to depend deeply on the nature of the data, but some general checks can be helpful in a great variety of projects.

For example, checking that the number of rows is what it should be is an easy way of checking the correctness of the pipeline. In my case, because the FIPS formats were different in different files and I used INNER JOIN operations, sometimes I ended with the double of rows that there should be since each FIP code was duplicated in two different formats. So with a quality check, I made sure that there were not more rows than there should be.

For performing the quality check you can use simple operators with simple SQL statements, or you can make it more sophisticated. In my case, because I was trying many different checks, I created a class for handling generic cases:

from dataclasses import dataclass
import logging
from typing import Any, List, Callable

from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

task_logger = logging.getLogger('airflow.task')


@dataclass
class DataQualityCheck:
sql_statement: str
expected_result: Any
match_function: Callable = None


class DataQualityOperator(BaseOperator):
ui_color = '#89DA59'

def __init__(
self,
checks: List[DataQualityCheck],
postgres_conn_id="postgres_default",
*args,
**kwargs
):
super(DataQualityOperator, self).__init__(*args, **kwargs)
hook = PostgresHook(postgres_conn_id=postgres_conn_id)
conn = hook.get_conn()
self.cursor = conn.cursor()
self.checks = checks

def execute(self, context):
for check in self.checks:
self.execute_check(check)
task_logger.info("Check passed")

def execute_check(self, check: DataQualityCheck):
self.cursor.execute(check.sql_statement)
response = self._get_cursor_response()
match_function = check.match_function or self._equality_match_function
if not match_function(response[0], check.expected_result):
raise ValueError(
f"Data quality check failed. Response: {response[0]}. Expected result: {check.expected_result}")

@staticmethod
def _equality_match_function(response, expected):
return response == expected

def _get_cursor_response(self):
if self._is_response_empty():
return

return [
row[0] if len(row) == 1
else row
for row in self.cursor
]

def _is_response_empty(self):
return self.cursor.description is None
run_data_quality_checks = DataQualityOperator(
checks=[
DataQualityCheck(
sql_statement="""
SELECT COUNT(*)
FROM countyhealthdata AS chd
JOIN county AS c ON chd.fips = c.fips
JOIN covidcases AS cc ON cc.fips = chd.fips
JOIN covidvaccination AS cv ON cv.fips = chd.fips AND cv.date = cc.date
WHERE cc.date = '2020-12-31';
""",
expected_result=3142,
match_function=lambda response, expected: response <= expected
),
DataQualityCheck(
sql_statement="""
WITH distinct_fips AS (
SELECT COUNT(DISTINCT chd.fips) AS numrows
FROM countyhealthdata AS chd
JOIN county AS c ON chd.fips = c.fips
JOIN covidcases AS cc ON cc.fips = chd.fips
JOIN covidvaccination AS cv ON cv.fips = chd.fips AND cv.date = cc.date
WHERE cc.date = '2020-12-31'
)

SELECT COUNT(*) - (SELECT numrows FROM distinct_fips) AS numrows
FROM countyhealthdata AS chd
JOIN county AS c ON chd.fips = c.fips
JOIN covidcases AS cc ON cc.fips = chd.fips
JOIN covidvaccination AS cv ON cv.fips = chd.fips AND cv.date = cc.date
WHERE cc.date = '2020-12-31';
""",
expected_result=0,
),
],
task_id='run_data_quality_checks'
)

Complete Airflow DAG

The Airflow DAG can be visualised from Airflow UI:

Airflow DAG

Retrieving and Analyzing the Data

Finally, we got our data in AWS Redshift with the data model we decided on at the beginning. This data is ready to be analyzed. If the sources were updated, we can just repeat the process by triggering the pipeline from Airflow. So, the main goal of this article has already been fulfilled. Now in this last section, I am going to show a little of the potential of having data unified in an optimal format and available in a cloud service such as Redshift.

There is a huge amount of ways this data can be analysed. Once it is nicely stored in the data warehouse, it is easy to retrieve it. Just with the purpose of satisfying curiosity, let’s explore this data a little bit, just to show how the data warehouse can be queried and used.

To interact with AWS Redshift easily, we have available two very useful Python packages: redshift-connector and psycopg2. The first one is the official package by AWS. The second one is the most popular PostgreSQL database adapter.

Although I found redshift-connector to be extremely convenient, from another project I already had developed a class that used psycopg2. I will use that class in order not to reinvent the wheel. It is stored in notebooks/postgresql_connector.py. All the code I use for the analysis is present in a Jupyter notebook called notebooks/FullExploration.ipynb.

Geographic visualization

We can retrieve the data from any column in the countyhealthdata table and represent it in a map. This can work as a visual way of finding correlations between variables. For example, if we want to explore the relationship between the death ratio by COVID by the end of 2020 and the percentage of the population with low income and low access to a grocery store, we can retrieve the data with the next query:

SELECT cc.fips, deaths / chd."2010_census_population" AS death_ratio, chd.pct_laccess_lowi15
FROM covidcases AS cc
JOIN countyhealthdata AS chd ON cc.fips = chd.fips
WHERE date = '2020-12-31'

Then we can find correlations or, for example, plot a map (for the full code check notebooks/FullExploration.ipynb):

Death Ratio
Low income and low access to grocery stores

In both images, we see that the death ratio and the percentage of the population with low income and low access to grocery stores are higher in counties in the centre of the US, although there is not a clear relationship.

Decision tree regression

Another way of exploring the data (in a very simplistic way, this is just to exemplify how the data warehouse can be used) is to perform a decision tree regression and see what variables are the ones that most help predict the county death ratio.

For doing that, we are going to get all the available variables about nutrition health, as well as the amount of population vaccinated by the end of 2020. Then we will try to find a relationship with the COVID death rate at the end of 2020 with this model.

The SQL query we must perform is this:

SELECT chd.*, cv.administered_dose1_pop_pct, cc.deaths / c.population AS death_ratio 
FROM countyhealthdata AS chd
JOIN county AS c ON chd.fips = c.fips
JOIN covidcases AS cc ON cc.fips = chd.fips
JOIN covidvaccination AS cv ON cv.fips = chd.fips AND cv.date = cc.date
WHERE cc.date = '2020-12-31';

The code fragment (from notebooks/FullExploration.ipynb) is:

from sklearn.tree import DecisionTreeRegressor

decision_tree_regression_data = redshift_connector.execute_sql("""
SELECT chd.*, cv.administered_dose1_pop_pct, cc.deaths / c.population AS death_ratio
FROM countyhealthdata AS chd
JOIN county AS c ON chd.fips = c.fips
JOIN covidcases AS cc ON cc.fips = chd.fips
JOIN covidvaccination AS cv ON cv.fips = chd.fips AND cv.date = cc.date
WHERE cc.date = '2020-12-31';
""")
decision_tree_regression_data_clean = decision_tree_regression_data.set_index("fips").fillna(0)
X = decision_tree_regression_data_clean.drop("death_ratio", axis="columns")
y = decision_tree_regression_data_clean["death_ratio"]
regressor = DecisionTreeRegressor(random_state = 0, max_depth=4)

regressor.fit(X, y)
print(f"R-squared coefficient: {regressor.score(X, y)}")
print(f"Five most relevant features: {list(pd.Series(regressor.feature_importances_, index=X.columns).sort_values(ascending=False).index[:5])}")

It turns out that with a tree of depth 4 (4 levels deep), the explained variance (R-squared) is only 0.3.

The 5 most relevant features for predicting COVID death rates are:

  • Farms with direct sales (%)
  • School Breakfast Program participants (change % children), 2012-17*
  • WIC women participants (change % women), 2014-16
  • SNAP online application, 2009*
  • Child poverty rate, 2015

I hope you got some inspiration from this project and learned something you can apply to your future projects ;)

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Rubén Chuliá Mena
Rubén Chuliá Mena

Written by Rubén Chuliá Mena

Artificial Intelligence Engineer with experience in ML for bank and insurance sector. I strive with passion to make a meaningful impact in the industry.

No responses yet

Write a response