How to use PostgreSQL in Apache Airflow

Ertan Çelik
5 min readJul 17, 2023

--

In our previous article, we made an example of airflow installation and shell script working with airflow. Now let’s create a PostgreSql task.

Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges).

Now it is time to explore how one can leverage PostgreSQL with Ariflow. In our project context, we are going to schedule a dag file to create a table and insert data into it in PostgreSQL using the Postgres operator. To fulfill this task we would heavily use PostgresOperator. Therefore, we should get ourselves familiarized with it. PostgresOperator will run any query you provide to it in the PostgreSQL database.

To use PostgresOperator we should first import modules:

Importing modules

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.operators.postgres_operator import PostgresOperator

from datetime import datetime

Instantiate a DAG

Give the DAG name, configure the schedule, and set the DAG settings

with DAG('airflow_postgreql',

start_date=datetime(2023,7,17),

schedule_interval="@monthly",

catchup=False

) as dag:

We can schedule by giving preset or cron format as you see in the table.

Set the Tasks

sql_command_1 = "call public.airflow_proc(1,'example 1')"

exec_sp = PostgresOperator(

task_id = 'exec_sp',

sql = sql_command_1,

postgres_conn_id = 'postgre_dwh',

autocommit = True)
sql_command_2 = "call public.airflow_proc(2,'example 2')"

exec_sp2 = PostgresOperator(

task_id = 'exec_sp2',

sql = sql_command_2,

postgres_conn_id = 'postgre_dwh',

autocommit = True)

Creating a Postgres database table

create_table = PostgresOperator(task_id='create_table',postgres_conn_id='postgre_dwh' ,sql= """CREATE TABLE IF NOT EXISTS airflow_test (

id SERIAL PRIMARY KEY,

name VARCHAR NOT NULL,

type VARCHAR NOT NULL,

date DATE NOT NULL,

OWNER VARCHAR NOT NULL);""")

Inserting data into a Postgres database table with Postgres Stored Procedure: airflow_proc( )

And then we we can add dependency.

create_table >> exec_sp

create_table >> exec_sp2

The complete Postgres Operator DAG

Now we will start the operations related to the database.

But first we will need a few configurations.

Let’s see how we defined it over the Web UI.

We go to the relevant screen by clicking Connections in the Admin tab on the Web UI. Then we can add a new link by clicking the “+” sign.

Postgres were not available as Connection Types.

We do the following commands:

pip install apache-airflow-providers-postgres

It also came after running residences that did not have postgres as a provider.

We create and test the sample postgre_dwh connection.

Give the conn Id what you want, select Postgres for the connType, give the host as localhost, and then specify the schema name pass credentials of Postgres default port is 5432 if you have the password for Postgres pass the password as above image.

Now let’s create the table and sample procedure on the postgresql machine, let the first task create the table, and the second task trigger the stored procedure.

Verifying the tasks

In the below, as seen that we unpause the airflow_postgresql dag file.

Click on the “airflow_postgresql” name to check the dag log file and then select the graph view; as seen below, we have three tasks to create a table and insert data tasks.

Before running our dag, we select the postgresql table and make sure it is empty.

Our Postgresql stored procedure is like this:

Now let’s run our airflow_postgresql dag:

Click on the log tab to check the log file.

To check the log file how the query ran, click on the make table task in graph view, then you will get the below window.

Looks great successfully completed.
Now we check our postgresql table and we see that there are 2 records:

Here we learned how to use PostgreSQL in the airflow DAG.
In this how-to guide we explored the Apache Airflow PostgreOperator.

Let’s quickly highlight the key takeaways. It is best practice to create subdirectory called sql in your dags directory where you can store your sql files. This will make your code more elegant and more maintainable.

And finally, we looked at the different ways you can dynamically pass parameters into our PostgresOperator tasks using parameters or params attribute and how you can control the session parameters by passing options in the hook_params attribute.

I hope it was useful for you, see you in other articles.

What’s Next?

--

--

Ertan Çelik
Ertan Çelik

Written by Ertan Çelik

Lead Data and Analytics Engineer

No responses yet