End-to-end Implementation of an ETL Pipeline from an API with dbt & BigQuery
Building a Weather Data ETL Pipeline with OpenWeatherMap API and dbt
In this article, I’ll walk through the process of creating an ETL (Extract, Transform, Load) pipeline that collects weather data from the OpenWeatherMap API, processes it, and creates analytical views using dbt (data build tool). This pipeline will provide valuable insights into weather patterns across major cities.
This article covers creating two separate but interconnected projects:
1. Python Project:
This is the data collection and processing service. It’s responsible for:
— Fetching data from the OpenWeatherMap API
— Transforming the data
— Storing the data in cloud storage (Google Cloud Storage)
— Running on a schedule (hourly) on a GCP compute service
2. dbt Project:
This is the data modeling and analysis project. It’s responsible for:
— Sourcing the data that the Python project stored
— Creating a staging layer
— Building analytical models
— Implementing tests and documentation
— Running on BigQuery
Part 1: Python-based Data Collection Service
First, we’ll create a Python service that fetches and processes weather data. This service will run on Google Cloud Platform (GCP) and perform the following tasks:
- Fetch current weather data for major cities from OpenWeatherMap API
- Transform the JSON response into a structured format (CSV)
- Export the data to cloud storage
- Run hourly on a GCP compute service
- Accept and validate configuration parameters
Get the OpenWeatherMap API Key
Navigate to the OpenWeatherMap API portal and sign up to be able to generate an API key.
This API key should be saved in a .env
file at the root of the Python project.
Get Google Storage Account Credentials
To get the credentials for storing data in Google Cloud Storage after logging into your Google account, follow these steps:
- Go to the Google Cloud Console (console.cloud.google.com).
- Create a new project or select an existing one.
- Navigate to “IAM & Admin” > “Service Accounts” in the console menu.
- Click “Create Service Account” and follow the prompts to create a new service account.
5. Once created, click on the service account and go to the “Keys” tab.
6. Click “Add Key” > “Create new key” and select JSON as the key type.
Once you click ‘create’ the JSON keys will be downloaded locally. These credentials should also be added to root of the project and you can call it gcp_credential.json. Make sure to add this file to the .gitignore since it stores sensitive data.
Create a Cloud Storage bucket
- Create a new project or select an existing one in the Google Cloud Console.
- Enable the Google Cloud Storage API for your project. Navigate to “APIs & Services” and enable the “Google Cloud Storage” API if it’s not already enabled.
3. Create a Cloud Storage bucket:
- Go to the Cloud Storage section in the console
- Click “Create bucket”
- Choose a unique name for your bucket
- Select the appropriate region and storage class
- Configure access control settings
4. Set appropriate permissions for the service account on the bucket you created:
- Go to the Cloud Storage Buckets page
- Click on your bucket name
- Select the “Permissions” tab
- Click “Grant Access”
- Enter the service account email in the “New principals” field
- Select the appropriate role (e.g., Storage Object Viewer or Storage Object Admin)
Implementing the Python Package to fetch weather data
The details of implementation can be found under this link in Github.
The following is the project structure:
weather_data_etl_pipeline_with_dbt/
│
├── config/
│ └── config.json
│
├── src/
│ ├── __init__.py
│ ├── data_collection.py
│ ├── data_transformation.py
│ ├── utils.py
│ └── weather_etl.py
│
├── tests/
│ └── test_weather_etl.py
│
├── .github/
│ └── workflows/
│ └── ci-cd.yml
│
├── .gitignore
├── Dockerfile
├── README.md
├── requirements.txt
└── gcp_credentials.json
This is a high-level overview of the ETL scripts:
import requests
import json
import logging
from google.cloud import storage
def fetch_weather_data():
# Fetch data from OpenWeatherMap API
pass
def transform_data(data):
# Transform JSON response
pass
def export_to_storage(data):
# Export data to Google Cloud Storage
pass
def etl():
try:
data = self.fetch_weather_data()
transformed_data = self.transform_data(data)
self.export_to_storage(transformed_data)
except Exception as e:
logging.error(f"Error in data collection: {str(e)}")
# Main execution
if __name__ == "__main__":
config = {
"cities": ["New York", "London", "Tokyo"],
"ignore_columns": ["visibility", "wind_deg"]
}
etl()
After running this ETL script, the data should be stored in the Google bucket storage with the file name containing the time stamp. As shown in the screenshot below:
To ensure code quality and reliability:
- Implement proper error handling and logging
- Create unit tests to cover the code which was done using Pytest.
Deployment
The next step is to deploy this package to
- Use a private package registry for deployment
- Set up CI/CD to ensure only successfully tested code is uploaded
To implement the ETL Python package with the given requirements, we will use the following approach:
1. Use Artifact Registry as the private package registry:
- Create a Python repository in Artifact Registry.
- Configure the project to use the private registry for package management.
- Install the Google Cloud CLI using this link.
- Make sure you’re logged in with the gcloud CLI:
gcloud auth login
- Install the following libraries:
pip install build # for building our package
pip install twine # for uploading the package to GCP Artifact Registry
- Configure authentication for Artifact Registry in the development environment.
- Enable the necessary roles for your service account, including “Artifact Registry Writer” and “Artifact Registry Reader” by navigating to the IAM and choosing the service account email as the principal.
- Build the Python package
python -m build
- Create a
.pypirc
file in your home directory. Make sure to edit the variables repository, project_id, and location.
echo "[distutils]
index-servers =
${REPOSITORY}
[${REPOSITORY}]
repository: https://${LOCATION}-python.pkg.dev/${PROJECT_ID}/${REPOSITORY}/" > ~/.pypirc
- Set the GOOGLE_APPLICATION_CREDENTIALS env vars to point to the downloaded JSON keys.
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
- Upload the Python package to the Artifact registry
python -m twine upload -r weather-data-etl dist/* --verbose
You should have the Package uploaded after running this instruction.
2. Use Cloud Run for running the ETL job hourly:
- Deploy your ETL package as a containerized application on Cloud Run.
The process typically involves:
a. Creating a Dockerfile that installs your package from the Artifact Registry
FROM python:3.12.2-slim
WORKDIR /app
# Copy the service account key into the container
COPY gcp_credentials.json /app/gcp_credentials.json
# Set the GOOGLE_APPLICATION_CREDENTIALS environment variable
#ARG GOOGLE_APPLICATION_CREDENTIALS
#ENV GOOGLE_APPLICATION_CREDENTIALS=/app/gcp_credentials.json
# Install system dependencies
RUN apt-get update && apt-get install -y curl apt-transport-https ca-certificates
# Install Google Cloud SDK
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH $PATH:/root/google-cloud-sdk/bin
RUN pip install google-cloud-storage
# Authenticate gcloud
RUN gcloud auth activate-service-account --key-file=/app/gcp_credentials.json
# Configure docker to use gcloud
RUN gcloud auth configure-docker
# Install pip packages with authentication support
RUN pip install --upgrade pip && \
pip install --no-cache-dir keyrings.google-artifactregistry-auth && \
pip install --no-cache-dir weather-data-etl==0.1.0 --extra-index-url https://europe-west10-python.pkg.dev/dd-testtask-rf-016d/weather-data-etl/simple/
# Copy the source code
COPY src/ /app/src/
COPY config/config.json /app/config/config.json
COPY requirements.txt .
RUN pip install -r requirements.txt
CMD ["python", "/app/src/weather_etl.py"]
b. Building a Docker image from that Dockerfile
docker build -t weather-data-etl-docker-image .
- Run the docker image locally to check if everything is working
docker run -p 8080:8080 --env-file .env -e PORT=8080 weather-data-etl-docker-image
- Tag the new image for the Artifact Registry:
docker tag weather-data-etl:0.1.0 europe-west10-docker.pkg.dev/dd-testtask-rf-016d/weather-data-etl-docker-image/weather-data-etl:0.1.0
- Authenticate Docker to GCP
gcloud auth configure-docker europe-west10-docker.pkg.dev
- Push the updated image to the Artifact Registry:
docker push europe-west10-docker.pkg.dev/dd-testtask-rf-016d/weather-data-etl-docker-image/weather-data-etl:0.1.0
When everything works successfully, you should be able to see the docker image pushed to the Docker Repository created in the Artifact Registry with the name ‘weather-data-etl-docker-image’
c. Deploying the Docker image to Cloud Run
- Enable the Cloud Run API in your Google Cloud account
- Create a Cloud Run service in Google Cloud before deploying the application.
- Select the container image that we just pushed
Fill out the form as follows:
- The deployment process involves containerizing the application and deploying it to Cloud Run, which makes your service accessible via a URL.
gcloud run deploy weather-data-etl --image=europe-west10-docker.pkg.dev/dd-testtask-rf-016d/weather-data-etl-docker-image/weather-data-etl:0.1.0 --region=europe-west10 --allow-unauthenticated
- Set up a Cloud Scheduler to trigger the Cloud Run service every hour.
3. Implement CI/CD with GitHub Actions for testing and deployment:
- Set up a GitHub Actions workflow to run tests on every push to your repository.
- If tests pass, build and upload the package to the Artifact Registry.
Part 2: dbt Project for Data Analysis
Next, we’ll create a dbt project to source the weather data, create a staging layer, and build an analytical layer. This project will use BigQuery as the data warehouse.
Create a service account on GCP
- Create a new service account specifically for dbt.
- Assign only the necessary roles, such as ‘BigQuery Data Editor’ and ‘BigQuery Job User’.
- Generate a new JSON key file for this service account. This keyfile.json will be downloaded to your device and later used to authenticate the dbt service to BigQuery.
- Enable the BigQuery API by navigating to ‘services & Apis’
Project initialization
- First, install the following packages
python -m pip install dbt-core
python -m pip install dbt-bigquery
- Initialize the dbt project, by running its command:
dbt init weather_analysis
Make sure to respond to the form and provide the database to be used e.g. BigQuery and also the GCP project ID.
Configure BigQuery connection in ~/.dbt/profiles.yml
:
weather_analysis:
outputs:
dev:
dataset: weather
job_execution_timeout_seconds: 300
job_retries: 1
keyfile: path/to/keyfile.json
location: EU
method: service-account
priority: interactive
project: <GCP-Project-ID>
threads: 1
type: bigquery
target: dev
Create a dataset in BigQuery
Create an External Table that references the data in the cloud storage bucket which are read from the OpenWeather API.
Then after running this query, you should have the following table of weather_measurment and its inferred schema.
Note that creating an external table in BigQuery does not load the data into BigQuery storage. Instead, it creates a table definition that points to the data in Cloud Storage, allowing to query the data directly from its source.
When you run the dbt script for the staging table, it will query the external table, which in turn reads the data from the CSV files in Cloud Storage.
Project Structure
weather_analysis/
├── models/
│ ├── staging/
│ │ └── stg_weather_data.sql
│ └── analytics/
│ ├── daily_temperature_averages.sql
│ ├── temperature_variations.sql
│ └── common_weather_conditions.sql
├── macros/
│ └── temp_conversion.sql
├── tests/
│ └── assert_temperature_ranges.sql
├── dbt_project.yml
└── sources.yml
Staging Layer
In stg_weather_data.sql
, we'll perform basic cleansing:
{{ config(materialized='table') }}
SELECT
city,
DATE(timestamp) AS date,
CAST(temperature AS FLOAT64) AS temperature_celsius,
humidity,
pressure,
wind_speed
FROM {{ source('weather_data', 'weather_measurements') }}
WHERE temperature IS NOT NULL
Analytical Layer
For the analytical layer, we’ll create three models:
- Weather Analysis(
weather_analysis.sql
):
{{ config(materialized='table') }}
WITH daily_temps AS (
SELECT
city,
date,
AVG(temperature_celsius) AS avg_temp_celsius,
{{ celsius_to_fahrenheit('AVG(temperature_celsius)') }} AS avg_temp_fahrenheit,
MIN(temperature_celsius) AS min_temp,
MAX(temperature_celsius) AS max_temp
FROM {{ ref('stg_weather') }}
GROUP BY city, date
)
SELECT
dt.city,
dt.date,
dt.avg_temp_celsius,
dt.avg_temp_fahrenheit,
dt.max_temp - dt.min_temp AS temp_variation,
FROM daily_temps dt
Macros
We’ll create a macro for temperature conversion in macros/temp_conversion.sql
:
{% macro celsius_to_fahrenheit(column_name) %}
({{ column_name }} * 9/5) + 32
{% endmacro %}
Documentation
Generate documentation for your models using dbt’s built-in documentation features. Add descriptions to your models, columns, and sources in the YAML files.
version: 2
models:
- name: weather_analysis
description: "This model provides daily weather analysis for various cities"
columns:
- name: city
description: "The name of the city"
tests:
- not_null
- unique
- name: date
description: "The date of the weather measurement"
tests:
- not_null
- name: avg_temp_celsius
description: "Average daily temperature in Celsius"
- name: avg_temp_fahrenheit
description: "Average daily temperature in Fahrenheit"
- name: temp_variation
description: "Temperature variation (max - min) for the day"
tests:
- dbt_utils.accepted_range:
column_name: avg_temp_celsius
min_value: -50
max_value: 50
- dbt_utils.accepted_range:
column_name: avg_temp_fahrenheit
min_value: -58
max_value: 122
To generate documentation, run dbt docs generate
and dbt docs serve
to view it locally.
- Run the project
dbt run
Then if everything runs successfully, navigate to BigQuery and you should find the tables created in it.
This way you have learned how to implement a dbt project that follows best practices includes proper documentation, and utilizes BigQuery-specific features like table partitioning.
Thank you for reading!
Rihab