Apache Airflow® is a platform created by the community to programmatically author, schedule, and monitor workflows.

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. You declare a DAG in a Python file in the $AIRFLOW_HOME/dags folder of your Airflow instance.

This page shows you how to use a Python connector in a DAG to integrate Apache Airflow with a Timescale Cloud service.

Before integrating:

This example DAG uses the company table you create in Create regular PostgreSQL tables for relational data

To install the Python libraries required to connect to Timescale Cloud:

  1. Enable PostgreSQL connections between Airflow and Timescale Cloud

    pip install psycopg2-binary
  2. Enable PostgreSQL connection types in the Airflow UI

    pip install apache-airflow-providers-postgres

In your Airflow instance, securely connect to your Timescale Cloud service:

  1. Run Airflow

    On your development machine, run the following command:

    airflow standalone

    The username and password for Airflow UI are displayed in the standalone | Login with username line in the output.

  2. Add a connection from Airflow to your Timescale Cloud service

    1. In your browser, navigate to localhost:8080, then select Admin > Connections.
    2. Click + (Add a new record), then use your connection info to fill in the form. The Connection Type is Postgres.

To exchange data between Airflow and your Timescale Cloud service:

  1. Create and execute a DAG

    To insert data in your Timescale Cloud service from Airflow:

    1. In $AIRFLOW_HOME/dags/timescale_dag.py, add the following code:

      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      def insert_data_to_timescale():
      hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
      conn = hook.get_conn()
      cursor = conn.cursor()
      """
      This could be any query. This example inserts data into the table
      you create in:
      https://docs.timescale.com/getting-started/latest/tables-hypertables/#create-regular-postgresql-tables-for-relational-data
      """
      cursor.execute("INSERT INTO company (symbol, name) VALUES (%s, %s)",
      ('new_company_symbol', 'New Company Name'))
      conn.commit()
      cursor.close()
      conn.close()
      default_args = {
      'owner': 'airflow',
      'start_date': datetime(2023, 1, 1),
      'retries': 1,
      }
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      insert_task = PythonOperator(
      task_id='insert_data',
      python_callable=insert_data_to_timescale,
      dag=dag,
      )

      This DAG uses the company table created in Create regular PostgreSQL tables for relational data.

    2. In your browser, refresh the Airflow UI.

    3. In Search DAGS, type timescale_dag and press ENTER.

    4. Press the play icon and trigger the DAG:

      daily eth volume of assets

  2. Verify that the data appears in Timescale Cloud

    1. In Timescale Console, navigate to your service and click SQL editor.

    2. Run a query to view your data. For example: SELECT symbol, name FROM company;.

      You see the new rows inserted in the table.

You have successfully integrated Apache Airflow with Timescale Cloud and created a data pipeline.

Keywords

Found an issue on this page?Report an issue or Edit this page in GitHub.