Blog

Introducing Pylint-Airflow

12 May, 2019
Xebia Background Header Wave

I’m happy to introduce the release of Pylint-Airflow, a Pylint plugin for Apache Airflow.

Install from PyPi:

pip install pylint-airflow

And apply with:

pylint --load-plugins=pylint_airflow yourscript.py

Example output:

************* Module airflow.contrib.example_dags.example_emr_job_flow_automatic_steps
airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:1:0: C0111: Missing module docstring (missing-docstring)
airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:1:0: C8306: For consistency match the DAG filename with the dag_id (match-dagid-filename)
airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:63:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)
airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:71:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)

Pylint-Airflow is developed for linting (static code analysis) Airflow DAG scripts. It runs on top of your default Pylint settings and will notify you about potential conventions, refactorings, warnings and errors. The plugin runs with Python 3.6 and higher. You can integrate the plugin in your CI pipeline in order to check for potential issues, and it is configurable both via .pylintrc and inline just like other Pylint messages.

Alpha release! Be aware this is an alpha release and might contain flaws.

The plugin is the result of one of our GDD Friday’s (plus some extra time), a 4-weekly Friday on which we have a full day for experimentation.

Examples

Based on experience at projects and open source contributions, I devised a few conventions I figured would be useful to apply to anybody’s DAG scripts. For example:

Mixed dependency directions

This message is given when the bitshift operator for setting task dependencies is applied in opposing directions on a single line:

t1 = DummyOperator(task_id="task1")
t2 = DummyOperator(task_id="task2")
t3 = DummyOperator(task_id="task3")

t1 >> t3  t2
$ pylint --load-plugins=pylint_airflow example.py
************* Module example
example.py:5:9: C8302: Avoid mixing task dependency directions (mixed-dependency-directions)

Mixing dependency directions is generally considered confusing, a better alternative would be to switch the direction or split the statement over multiple lines:

[t1, t2] >> t3

# Or:
t1 >> t3
t2 >> t3

Match the task_id with the variable name

For readability I like to match an operator’s task_id with the name of the variable it is assigned to:

# Correct
do_something = DummyOperator(task_id="do_something")

# Incorrect
do_something_else = DummyOperator(task_id="random_name")

The second assignment is valid Python code but the plugin will return a convention message:

$ pylint --load-plugins=pylint_airflow example.py
************* Module example
example.py:5:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)

Warn for unused XComs

Perhaps a magic "feature" of Airflow; a return value from the PythonOperator’s callable is automatically stored as an XCom in the Airflow metastore. If the return statement is unintentional and the XCom is never used (i.e. pulled), the metastore will fill up with obsolete data which is never accessed. Besides the redundant values, this could become an issue in case the returned data is very large.

To avoid this situation, the plugin checks for xcom_pull() with task_ids=[task_id of pushtask]in the script, so that it knows if the XCom is read and therefore intended or not. However, if no reference to the XCom key can be found, a message will be returned to notify of the potentially unused XCom (it can obviously not know if it is used in other scripts).

from airflow.operators.python_operator import PythonOperator

def _pushtask():
    print("do stuff")
    return "foobar"  # value is returned here but not used elsewhere

pushtask = PythonOperator(task_id="pushtask", python_callable=_pushtask)

def _pulltask():
    print("do something")

pulltask = PythonOperator(task_id="pulltask", python_callable=_pulltask)

# A valid pulltask would be:

# def _pulltask(task_instance, **_):
#     print(task_instance.xcom_pull(task_ids="pushtask"))

# pulltask = PythonOperator(task_id="pulltask", python_callable=_pulltask, provide_context=True)

The value "foobar" is returned by _pushtask, however the XCom key it is stored in, is not pulled anywhere else in the script. Therefore a refactoring message is shown:

$ pylint --load-plugins=pylint_airflow example.py
************* Module example
example.py:7:11: R8300: Return value from _pushtask is stored as XCom but not used anywhere (unused-xcom)

The future

Pylint-Airflow is currently in alpha and requires more future work. In the current state, I tried to focus mostly on the stability so that it wouldn’t crash anywhere. However, there are still many things to complete:

  • Make it more configurable.
  • Adding more messages. Some messages in the readme are currently not implemented.
  • Better message texts, e.g. different-operator-varname-taskid could also return the actual variable name and task_id.
  • Write documentation. Current documenation is built automatically and hosted on Read the Docs but empty.

Tests and contributions are very welcome. If you encounter any issues or would like to contribute to the project, please create an issue or pull request on GitHub: https://github.com/basph/pylint-airflow.

Feel free to contact me about anything Airflow on the Airflow Slack @BasPH.

By the way, we have a great Apache Airflow course to teach you the internals, terminology, and best practices of working with Airflow, with
hands-on experience in writing an maintaining data pipelines.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts