til: A simple ETL task in Airflow using PostgresHook
This week at work I had the need to build a small ETL (Export, Transform, Load) process to move some data from PostgreSQL database A (a primary relational database used by our application to serve customer traffic) to PostgreSQL database B (a back-of-house instance used to perform metering and other usage analytics).
We already use Apache Airflow to orchestrate the metering tasks, data sync and Stripe API interactions, so building this process in Airflow was my first choice.
Airflow has native support for managing connections to third party data sources and APIs called “Hooks” that individual tasks can consume. This solution uses the PostgresHook
API within Airflow to extract and subsequently load a CSV from one database to another.
### within the context of a DAG definition
# with DAG(
# ...
# ) as dag:
# the DAG task definition
def copy_data_from_a_to_b(*args, **kwargs):
import tempfile
from airflow.hooks.postgres_hook import PostgresHook
# fetch the airflow-managed PostgreSQL connections for A/B
database_a_hook = PostgresHook(postgres_conn_id="database_a")
database_b_hook = PostgresHook(postgres_conn_id="database_b")
# create a tempfile for the exported data
table_tmpfile = tempfile.NamedTemporaryFile()
# export from A
database_a_hook.copy_expert(
"COPY (SELECT table.foo, table.bar FROM TABLE) TO STDOUT CSV HEADER",
filename=table_tmpfile.name
)
# load into B
database_b_hook.copy_expert(
"COPY table(foo, bar) FROM STDIN WITH CSV HEADER",
filename=table_tmpfile.name
)
# the DAG operator
copy_data_from_a_to_b_task = PythonOperator(
task_id="copy_data_from_a_to_b",
python_callable=copy_data_from_a_to_b,
op_args=[],
)
NB: this method relies on the ability to store the entire table as a tempfile on disk before loading it into the second database. If your table is large your task might run out of disk space and fail to complete.