The purpose of this project is to build an ETL pipeline that will be able to provide information to data analysts, immigration and climate researchers e.tc with temperature, population and immigration statistics for different cities. It does this by first extracting temperature, airport, immigration and demographic data from various datasets, perform some transformation on it and convert the data into json files using Apache Spark that can be then uploaded to a Redshift database. Using Apache Airflow, the json files get migrated to s3, then the data gets uploaded to Redshift, undergoes further transformation and gets loaded to normalized fact and dimension tables using a series of reusable tasks that allow for easy backfills. Finally, data checks are run against the data in the fact and dimension tables so as to catch any discrepancies that might be found in the data.
Below are some of the use cases of the data collected:
- The data can be used to identify the impact that changes in temperature have on city population, going even further to categorize the impact by gender.
- Immigration researchers can use the data to identify what some of the most common ports of entry are, type of visas that are commonly used, use the arrival and departure times to identify periods of high traffic at the ports of entry, the impact of temperature on the rate on immigration by city.
The kind of queries that can be run against this data include:
- Find the average immigration traffic at a given city at a given port of entry by month.
- Find the average age of immigrants at a given port of entry by month and by airline/ or by gender.
- retrieve the average population and average temperature of a given city.
The project comprises of a redshift postgres database in the cluster with staging tables that contain all the data retrieved from the s3 bucket and copied over to the tables. It also contains a fact table fact_city_data_table
and three dimensional tables namely dim_airport_table
, dim_demographic_table
and dim_visitor_table
. The data model representation for the fact and dimension tables is as below:
Table Column | Data Type | Description |
---|---|---|
city_id (PRIMARY_KEY) | varchar(32) | auto generated primary key |
city_name | varchar(32) | name of city |
country | varchar(32) | name of country |
latitude | varchar(10) | latitude value |
longitude | varchar(10) | longitude value |
average_temperature | numeric | average temperature of the city |
date | date | date of temperature recording |
Table Column | Data Type | Description |
---|---|---|
airport_id (PRIMARY_KEY) | varchar(50) | auto generated primary key |
airport_code | varchar(50) | airport short code |
name | varchar(500) | name of airport |
continent | varchar(50) | continent code |
country_code | varchar(32) | country code |
state | varchar(32) | state code |
Table Column | Data Type | Description |
---|---|---|
visitor_id (PRIMARY_KEY) | varchar(32) | auto generated primary key |
year | int4 | year of visit |
month | int4 | month of visit |
city | varchar(32) | city of visit |
gender | varchar(32) | gender of visitor |
arrival_date | date | arrival date of the visitor |
departure_date | date | departure time of the visitor |
airline | varchar(32) | airline code |
Table Column | Data Type | Description |
---|---|---|
demographic_id (PRIMARY_KEY) | varchar(100) | auto generated primary key |
city | varchar(50) | city name |
state | varchar(50) | state code |
male_population | int4 | male population numbers by city |
female_population | int4 | female population numbers by city |
total_population | int4 | total population numbers by city |
I settled on the above model since I found that the common data field from all the various datasets is city and with that I could be able to extrapolate the other data fields that I required for the data pipeline. With the fact and dimension tables, I utilized the star schema which is more suitable and optimized for OLAP (Online Analytical Processing) operations.
The tools used in this project include:
- Apache Spark - This was needed to process data from the big data SAS and csv files to dataframes and convert them to the more readable json data files. In the process, it maps the columns from the datasets to the relevant columns required for the staging tables and also maps some of the values such as the city codes from the provided
label descriptions
data dictionary to city names in the data generated by spark. To view how this is done, check out this python helper file and the data dictionary for city codes. - Apache Airflow - This was required to automate workflows, namely uploading processed json files from the local filesystem to s3, creating the staging, fact and dimension tables, copying the s3 files to the redshift staging table then performing ETL to load the final data to the fact and dimension tables. This is all done by pre-configured dags, the file upload dag and the capstone dag that both perform a series of tasks.
- Amazon Redshift - The database is located in a redshift cluster that store the data from s3 and the eventual data that gets added to the fact and dimension tables.
- Amazon S3 - Stores the json files generated by spark that are uploaded from the local filesystem.
The datasets used and sources include:
- I94 Immigration Data: This data is retrieved from the US National Tourism and Trade Office and the source can be found here.
- World Temperature Data: This dataset came from Kaggle and you can find out about it here.
- U.S. City Demographic Data: This data comes from OpenSoft here.
- Airport Code Table: This is a simple table of airport codes and corresponding cities that is retrieved from here
Clone the repo from github by running:
$ git clone [email protected]:jonathankamau/udend-capstone-project.git
Once cloned, create a virtualenv on your local machine that will be your local development environment:
$ virtualenv sparkify-project
$ source sparkify-project/bin/activate
If running on your local machine, ensure that you have the following main requirements installed in the virtualenv:
- pyspark
- apache-airflow
- psycopg2
Alternatively you can install them from the provided requirements.txt
file by running the following in your virtual environment.
$ pip install -r requirements.txt
- A sample size of the json data files that were processed by spark are already part of this project's files. You can find them in
data/processed-data
. If you would wish to generate them from scratch, you can follow the same step by step process I went through here - On the terminal, ensure your virtualenv is set then add the AIRFLOW_HOME environment variable.
$ export AIRFLOW_HOME=~/path/to/project
- Run
airflow initdb
to setup the airflow db locally the run both theairflow scheduler
andairflow webserver
commands on seperate terminal windows. If you get a psycopg2 related error, you might need to check if the postgresql service is running on your computer. - Once you run the above commands successfully you can open the airflow UI on localhost using port 8080 http://0.0.0.0:8080
- Navigate to your AWS s3 console and create a bucket named
udend-data
. If you wish to provide another name, ensure you set it in thecapstone_dag
andfile_upload_dag
operator configs for s3_bucket. - Create a Redshift cluster with a redshift database. Once it's finished creating, take note of the endpoint and database credentials.
- Add your AWS and Redshift credentials in the airflow UI. You can accomplish this in the following steps:
- Click on the Admin tab and select Connections.
- Under Connections, select Create.
- In the Create tab enter the following creds:
- Conn Id:
aws_credentials
- Conn Type:
Amazon Web Services
- Login: Your
<AWS Access Key ID>
- Password:
<Your AWS Secret Access Key>
- Conn Id:
- Once done, click on Save and Add Another
- On the new create page add the following:
- Conn Id:
redshift
- Conn Type:
Postgres
- Host:
<Endpointof your redshift cluster>
- Schema:
<Redshift database name>
- Login:
<Database username>
- Password:
<Database password>
- Port:
<Database port which is usually 5439>
- Conn Id:
- Click save
- Trigger the
file_upload_dag
first. This will upload the files to your s3 bucket. You can view the status of the dag tasks in theGraph View
of the dag. - Once the files are uploaded, you can trigger the
capstone_dag
that will create the necessary tables on redshift and load data to them, as well as perform data quality checks.
- Pull the immigration files in SAS format from the udacity workspace or from the source then in the
data
directory, create a new directory namedimmigration-data
where you will store them. - Retrieve the
GlobalLandTemperaturesByCity.csv
file from the udacity workspace and create atemperature-data
subdirectory within thedata
directory and save the file there. The other data files, namelyus-cities-demographics.csv
andairport-codes.csv
are already provided in this project in thedata
folder. - Navigate to the
additional_helpers
directory and run the following command:$ python data_to_json.py
This will create a new set of json data files in data/processed-data
. NOTE This may take a while due to the nature of the size of some of the datasets.
The data should be updated daily if possible, so that the star schema tables are always updated with the most recent data for a more accurate analysis.
- If the data gets increased by 100x:
- The increase of reads and writes to the database can be handled by increasing the number of compute nodes being used in the redshift cluster using elastic resize that can handle for more storage.
- Use of distkeys in case of a need to join the tables.
- Compress the s3 data.
- If the pipelines were needed to be run on a daily basis by 7am:
- dags can be scheduled to run daily by setting the start_date config as a datetime value containing both the date and time when it should start running, then setting schedule_interval to @daily which will ensure the dag runs everyday at the time provided in start_date.
- If the database needed to be accessed by 100+ people:
- Utilizing elastic resize for better performance.
- Utilizing Concurrency Scaling on Redshift by setting it to auto and allocating it's usage to specific user groups and workloads. This will boost query processing for an increasing amount of users.
- Python 2.7, Airflow and pySpark
- Jonathan Kamau - Github Profile