Blog

Testing and debugging Apache Airflow

21 Feb, 2019
Xebia Background Header Wave

Testing Airflow is hard

There’s a good reason for writing this blog post – testing Airflow code can be difficult. It often leads people to go through an entire deployment cycle to manually push the trigger button on a live system. Only after can they verify their Airflow code. This is a painfully long process and as with any other software, people would like to write, test, and debug their Airflow code locally.

Running an Airflow DAG on your local machine is often not possible due to dependencies on external systems. To start, I’d like to point out this excellent blog post by ING WBAA about testing Airflow. It covers setting up DTAP and CI/CD for Airflow. Besides this, the blog post also describes a DAG integrity test for validating if your DAG files contain valid DAG objects, which is a good starting point. Also, there’s this Meetup talk about a local Airflow testing environment with Docker Compose by my colleague Bas Beelen, which will be open sourced in the near future.

The main goal of this post is to explain how to unit test Airflow components locally without the need for a production system. In Airflow this goes for anything – hooks, utility functions, etc. The DAG itself is simply configuration to glue various operations together. This post does not cover testing complete DAGs (although you could do it with the tools shown in this blog post), but explains how to test individual operations.

All tests are written with pytest and Airflow 1.10.2. All code in this blog post is available on GitHub.

Pytest Airflow fixtures & helpers

Airflow jobs always run in the context of a DAG. The execution of a task in a DAG is controlled via a task instance, which provides the context of the current run to the task. Hence testing an cannot be decoupled from running a DAG. So in order to test operators, I use a dummy DAG to be used throughout my tests.

Pytest has the concept of fixtures; objects which can be passed to test functions as input arguments. This is why I prefer pytest over Python unittest; these fixtures allow for reusable code and less code duplication. For Airflow, I have a test_dag fixture with which I test operators which require a DAG to run.

import datetime

import pytest
from airflow import DAG

@pytest.fixture
def test_dag():
    return DAG(
        “test_dag”,
        default_args={“owner”: “airflow”, “start_date”: datetime.datetime(2018, 1, 1)},
        schedule_interval=datetime.timedelta(days=1),
    )

Define this test_dag fixture in tests/conftest.py to use it in any test.

A nice plugin for pytest is the Pytest Helpers Namespace. It allows to register any function under the pytest helpers namespace, to use anywhere in your tests. Install with pip install pytest-helpers-namespace. For testing operators, I need to run a task with a DAG, and therefore define a run_task helper function:

import pytest

pytest_plugins = ["helpers_namespace"]

@pytest.helpers.register
def run_task(task, dag):
    dag.clear()
    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
    )

Now the test_dag fixture and run_task helper function can be used to run tasks in a unit test:

import pytest
from airflow.operators.bash_operator import BashOperator

def test_dummy(test_dag, tmpdir):
    tmpfile = tmpdir.join("hello.txt")

    task = BashOperator(task_id="test", bash_command=f"echo 'hello' > {tmpfile}", dag=test_dag)
    pytest.helpers.run_task(task=task, dag=test_dag)

    assert len(tmpdir.listdir()) == 1
    assert tmpfile.read().replace("n", "") == "hello"

The test_dummy test uses two pytest fixtures: the test_dag as described above and tmpdir. Tmpdir is one of the fixtures you get for free when using pytest. It provides a temporary directory which you’d normally create with the tempfile builtin. Simply put tmpdir as a test function argument and you can use this tmpdir in your test. In the test above, I run a BashOperator which writes a file and I verify the content of the file, without having to upload a DAG to an Airflow instance and test manually.

Mocking Airflow

Sometimes you need to fake objects in your tests. For example, when you cannot access the Airflow metastore directly from your laptop and thus cannot read the connections. In these situations, you can mock these objects in your tests. For mocking I use pytest-mock which installs a mocker fixture which is a thin wrapper around the mock package. For example, this shows a test using the SimpleHttpOperator (code):

from datetime import datetime

import pytest
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
from airflow.operators.http_operator import SimpleHttpOperator

def test_simple_http_operator(test_dag, mocker):
    mocker.patch.object(
        BaseHook,
        "get_connection",
        return_value=Connection(schema="https", host="api.sunrise-sunset.org"),
    )

    def _check_light(sunset_sunrise_response):
        results = sunset_sunrise_response.json()["results"]
        sunrise = datetime.strptime(results["sunrise"][:-6], "%Y-%m-%dT%H:%M:%S")
        sunset = datetime.strptime(results["sunset"][:-6], "%Y-%m-%dT%H:%M:%S")

        if sunrise  datetime.utcnow()  sunset:
            print("It is light!")
        else:
            print("It is dark!")

        return True

    is_it_light = SimpleHttpOperator(
        task_id="is_it_light",
        http_conn_id="random_name",
        endpoint="json",
        method="GET",
        data={"lat": "52.370216", "lng": "4.895168", "formatted": "0"},
        response_check=_check_light,
        dag=test_dag,
    )

    pytest.helpers.run_task(task=is_it_light, dag=test_dag)

This (hypothetical) task fetches sunrise and sunset times from https://api.sunrise-sunset.org and the lambda function prints whether it’s currently light or dark. You might save the credentials to such an API in Airflow connections, however you cannot access the metastore from your local machine. So, we patch the BaseHook and mock the return value of get_connection to always return a Connection object with host="api.sunrise-sunset.org" for this test.

That way we can work with an Airflow Connection object and test the operator.

Mocking external systems

Mocking works for objects, but what if you want to verify the implementation of your component against a real external system. By spinning up Docker containers with the system you want to test against, you can verify the correct behaviour of your component!

There’s several ways to do this of course, for example Airflow itself starts a set of containers with Docker Compose at the start of its test suite.

Another option that I like is, once again, another pytest package called pytest_docker_tools. It provides a set of helpers for pytest to manage Docker containers. I like that it keeps the test dependencies within the test scripts and you can pass Docker containers as fixtures to your tests.

To demonstrate, I implemented a PostgresToLocalOperator:

import json

from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from psycopg2.extras import RealDictCursor

class PostgresToLocalOperator(BaseOperator):
    @apply_defaults
    def __init__(self, pg_query: str, local_path: str, postgres_conn_id: str, **kwargs):
        super().__init__(**kwargs)
        self._pg_query = pg_query
        self._local_path = local_path
        self._postgres_conn_id = postgres_conn_id

    def execute(self, context):
        postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
        conn = postgres_hook.get_conn()
        cursor = conn.cursor(cursor_factory=RealDictCursor)
        cursor.execute(self._pg_query)

        with open(self._local_path, "w") as f:
            json.dump(cursor.fetchall(), f, indent=4)

The PostgresToLocalOperator queries a Postgres database and stores the JSON-formatted result on local disk. Now I’d like to verify the correct behaviour of my PostgresToLocalOperator, however I cannot access the production Postgres database. So, let’s write a test and spin up a Postgres Docker container to query against:

from collections import namedtuple
from pytest_docker_tools import container, fetch

@pytest.fixture(scope="module")
def postgres_credentials():
    PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
    return PostgresCredentials("testuser", "testpass")

postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
    image="{postgres_image.id}",
    environment={
        "POSTGRES_USER": "{postgres_credentials.username}",
        "POSTGRES_PASSWORD": "{postgres_credentials.password}",
    },
    ports={"5432/tcp": None},
    volumes={
        path.join(path.dirname(__file__), "postgres-init.sql"): {
            "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
        }
    },
)

There are a couple of things going on here.

First, we create a namedtuple fixture holding the Postgres credentials. The way to pass variables to the pytest_docker_tools objects is via fixtures. As added bonus, the postgres_credentials fixture can now be passed as an argument to all tests.

@pytest.fixture(scope="module")
def postgres_credentials():
    PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
    return PostgresCredentials("testuser", "testpass")

Next, pytest_docker_tools requires two statements for creating a Docker container fixture. The fetch() for fetching Docker image metadata and the container() for constructing the Docker container. It follows the same argument names as Docker itself, e.g. volumes for mounting volumes. I created a postgres-init.sql script which can be placed in /docker-entrypoint-initdb.d/ in a Postgres Docker container, to be executed at boot[^1] so that the dummy Postgres DB contains dummy data.

postgres-init.sql:

# postgres-init.sql
SET search_path TO public;
CREATE TABLE dummy (
    id integer,
    name character varying(255)
);
INSERT INTO dummy (id,name) VALUES (1, 'dummy1');
INSERT INTO dummy (id,name) VALUES (2, 'dummy2');
INSERT INTO dummy (id,name) VALUES (3, 'dummy3');

With all this set up, we can now write the test to validate the
PostgresToLocalOperator reading from Postgres and writing to local
filesystem:

import json
from pathlib import Path

import pytest
from airflow.models import Connection

from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook

def test_postgres_to_local_operator(test_dag, mocker, tmpdir, postgres, postgres_credentials):
    output_path = str(tmpdir / "pg_dump")

    mocker.patch.object(
        PostgresHook,
        "get_connection",
        return_value=Connection(
            host="localhost",
            conn_type="postgres",
            login=postgres_credentials.username,
            password=postgres_credentials.password,
            port=postgres.ports["5432/tcp"][0],
        ),
    )

    task = PostgresToLocalOperator(
        task_id="test",
        postgres_conn_id="postgres",
        pg_query="SELECT * FROM dummy",
        local_path=output_path,
        dag=test_dag,
    )
    pytest.helpers.run_task(task=task, dag=test_dag)

    # Assert if output file exists
    output_file = Path(output_path)
    assert output_file.is_file()

    # Assert file contents, should be the same as in postgres-init.sql
    expected = [
        {"id": 1, "name": "dummy1"},
        {"id": 2, "name": "dummy2"},
        {"id": 3, "name": "dummy3"},
    ]
    with open(output_file, "r") as f:
        assert json.load(f) == expected

The test takes a number of arguments:

  1. test_dag – DAG fixture
  2. mocker – pytest mock fixture
  3. tmpdir – pytest tmpdir fixture
  4. postgres – dummy Postgres Docker container fixture
  5. postgres_credentials – Postgres credentials fixture

First we define an output path to write the results to:

output_path = str(tmpdir / "pg_dump") 

Next, we patch the PostgresHook to return a mocked Connection object when get_connection is called, since we don’t have access to a running Airflow instance locally.

Important! One of, if not the, most made mistake with Python mocking is to patch the incorrect location. The result is that the patch appears to have no effect. To patch the PostgresHook, you must not import from airflow.hooks.postgres_hook import PostgresHook! Instead, import the PostgresHook from the location where you actually use the PostgresHook: from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook.

from my_package.operators.postgres_to_local_operator 
import PostgresToLocalOperator, PostgresHook

mocker.patch.object(
    PostgresHook,
    "get_connection",
    return_value=Connection(
        host="localhost",
        conn_type="postgres",
        login=postgres_credentials.username,
        password=postgres_credentials.password,
        port=postgres.ports["5432/tcp"][0],
    ),
)

Next we run our operator, simply querying SELECT * FROM dummy:

task = PostgresToLocalOperator(
    task_id="test",
    postgres_conn_id="postgres",
    pg_query="SELECT * FROM dummy",
    local_path=output_path,
    dag=test_dag,
)
pytest.helpers.run_task(task=task, dag=test_dag)

The operator has now completed its execution so we expect one file in
output_path and we expect this file to contain whatever was in the
dummy table:

output_file = Path(output_path)
assert output_file.is_file()

expected = [
    {"id": 1, "name": "dummy1"},
    {"id": 2, "name": "dummy2"},
    {"id": 3, "name": "dummy3"},
]
with open(output_file, "r") as f:
    assert json.load(f) == expected

The complete code for the PostgresToLocalOperator can be found here, and the complete code for testing the operator can be found here.

Debugging Airflow

There are various ways to debug a process running in Airflow. If running locally, e.g. a unit test, you can place a breakpoint in your IDE of choice. I left remote debugging with an IDE out of scope for this blog post and I’ll explain a different method which works both locally and remote.

PDB

Python comes with a builtin debugger called pdb. You can use it by placing this snippet at the location you want to start debugging:

import pdb
pdb.set_trace()

Or if you’re on Python 3.7 (currently only supported on Airflow master) you can simply call breakpoint() somewhere in your code. There’s also ipdb with more features such as color highlighting and autocompletion, it is however not a builtin so you’ll have to install it with pip install ipdb. Once in a debug session, you can control the debugging with these shortcuts (source):

PDB cheatsheet

In case you want to place a breakpoint but don’t know where to find the code, simply open up a Python terminal:

$ python

>>> import airflow.operators.bash_operator    # Import the module
>>> airflow.operators.bash_operator.__file__  # module.__file__
'./incubator-airflow/airflow/operators/bash_operator.py'

Finally, if you want to debug a "live" Airflow job, you can manually run a task with airflow test [dag_id] [task_id] [yyyy-mm-dd]. This does not create a task instance and does not record the execution anywhere in the metastore. It is useful though for debugging. In the example below, I show how to use this approach to debug an incorrect Pendulum.format() statement:

PDB

Final words

With the examples in this post, hopefully you’ll be able to shorten your development time and verify the behaviour of your Airflow code locally. Testing operators locally using unit tests without an Airflow installation can feel like quite a breeze! If you have any questions, feel free to contact me on Airflow Slack as @BasPH.

Interested in Apache Airflow Training?

A quick heads up: we offer Apache Airflow as a public course in ourAcademy. Join us to learn everything you need to successfully work with Airflow!

[^1]: From the Postgres Docker Hub documentation: after initdb is called, any *.sql and executable *.sh file in /docker-entrypoint-initdb.d/ is run.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts