Install from PyPi:
pip install pylint-airflow
And apply with:
pylint --load-plugins=pylint_airflow yourscript.py
************* Module airflow.contrib.example_dags.example_emr_job_flow_automatic_steps airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:1:0: C0111: Missing module docstring (missing-docstring) airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:1:0: C8306: For consistency match the DAG filename with the dag_id (match-dagid-filename) airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:63:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid) airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py:71:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)
Pylint-Airflow is developed for linting (static code analysis) Airflow DAG scripts. It runs on top of your default Pylint settings and will notify you about potential conventions, refactorings, warnings and errors. The plugin runs with Python 3.6 and higher. You can integrate the plugin in your CI pipeline in order to check for potential issues, and it is configurable both via .pylintrc and inline just like other Pylint messages.
The plugin is the result of one of our GDD Friday’s (plus some extra time), a 4-weekly Friday on which we have a full day for experimentation.
Based on experience at projects and open source contributions, I devised a few conventions I figured would be useful to apply to anybody’s DAG scripts. For example:
Mixed dependency directions
This message is given when the bitshift operator for setting task dependencies is applied in opposing directions on a single line:
t1 = DummyOperator(task_id="task1") t2 = DummyOperator(task_id="task2") t3 = DummyOperator(task_id="task3") t1 >> t3 t2
$ pylint --load-plugins=pylint_airflow example.py ************* Module example example.py:5:9: C8302: Avoid mixing task dependency directions (mixed-dependency-directions)
Mixing dependency directions is generally considered confusing, a better alternative would be to switch the direction or split the statement over multiple lines:
[t1, t2] >> t3 # Or: t1 >> t3 t2 >> t3
Match the task_id with the variable name
For readability I like to match an operator’s
task_id with the name of the variable it is assigned to:
# Correct do_something = DummyOperator(task_id="do_something") # Incorrect do_something_else = DummyOperator(task_id="random_name")
The second assignment is valid Python code but the plugin will return a convention message:
$ pylint --load-plugins=pylint_airflow example.py ************* Module example example.py:5:0: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)
Warn for unused XComs
Perhaps a magic "feature" of Airflow; a return value from the PythonOperator’s callable is automatically stored as an XCom in the Airflow metastore. If the return statement is unintentional and the XCom is never used (i.e. pulled), the metastore will fill up with obsolete data which is never accessed. Besides the redundant values, this could become an issue in case the returned data is very large.
To avoid this situation, the plugin checks for
task_ids=[task_id of pushtask]in the script, so that it knows if the XCom is read and therefore intended or not. However, if no reference to the XCom key can be found, a message will be returned to notify of the potentially unused XCom (it can obviously not know if it is used in other scripts).
from airflow.operators.python_operator import PythonOperator def _pushtask(): print("do stuff") return "foobar" # value is returned here but not used elsewhere pushtask = PythonOperator(task_id="pushtask", python_callable=_pushtask) def _pulltask(): print("do something") pulltask = PythonOperator(task_id="pulltask", python_callable=_pulltask) # A valid pulltask would be: # def _pulltask(task_instance, **_): # print(task_instance.xcom_pull(task_ids="pushtask")) # pulltask = PythonOperator(task_id="pulltask", python_callable=_pulltask, provide_context=True)
"foobar" is returned by
_pushtask, however the XCom key it is stored in, is not pulled anywhere else in the script. Therefore a refactoring message is shown:
$ pylint --load-plugins=pylint_airflow example.py ************* Module example example.py:7:11: R8300: Return value from _pushtask is stored as XCom but not used anywhere (unused-xcom)
Pylint-Airflow is currently in alpha and requires more future work. In the current state, I tried to focus mostly on the stability so that it wouldn’t crash anywhere. However, there are still many things to complete:
- Make it more configurable.
- Adding more messages. Some messages in the readme are currently not implemented.
- Better message texts, e.g. different-operator-varname-taskid could also return the actual variable name and task_id.
- Write documentation. Current documenation is built automatically and hosted on Read the Docs but empty.
Tests and contributions are very welcome. If you encounter any issues or would like to contribute to the project, please create an issue or pull request on GitHub: https://github.com/basph/pylint-airflow.
Feel free to contact me about anything Airflow on the Airflow Slack @BasPH.
By the way, we have a great Apache Airflow course to teach you the internals, terminology, and best practices of working with Airflow, with
hands-on experience in writing an maintaining data pipelines.
Subscribe to our newsletter
Stay up to date on the latest insights and best-practices by registering for the GoDataDriven newsletter.