Skip to content

Commit 90ab014

Browse files
committed
Test Composer DAG samples to verify they contain a valid DAG
Inspiration from Circle 1 of https://medium.com/wbaa/datas-inferno-7-circles-of-data-testing-hell-with-airflow-cef4adff58d8 Beyond just asserting there are no syntax errors, we can verify that the DAG files actually contain a DAG and that Airflow detects no cycles in that DAG.
1 parent cd8a168 commit 90ab014

13 files changed

+193
-14
lines changed

composer/workflows/bq_copy_across_locations.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@
5151
# Set default arguments
5252
# --------------------------------------------------------------------------------
5353

54+
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
55+
5456
default_args = {
5557
'owner': 'airflow',
56-
'start_date': datetime.datetime.today(),
58+
'start_date': yesterday,
5759
'depends_on_past': False,
5860
'email': [''],
5961
'email_on_failure': False,
@@ -122,17 +124,17 @@ def read_table_list(table_list_file):
122124
# Define a DAG (directed acyclic graph) of tasks.
123125
# Any task you create within the context manager is automatically added to the
124126
# DAG object.
125-
with models.DAG('bq_copy_us_to_eu_01',
126-
default_args=default_args,
127-
schedule_interval=None) as dag:
127+
with models.DAG(
128+
'comoser_sample_bq_copy_across_locations',
129+
default_args=default_args,
130+
schedule_interval=None) as dag:
128131
start = dummy_operator.DummyOperator(
129132
task_id='start',
130133
trigger_rule='all_success'
131134
)
132135

133136
end = dummy_operator.DummyOperator(
134137
task_id='end',
135-
136138
trigger_rule='all_success'
137139
)
138140

composer/workflows/bq_copy_across_locations_test.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
from airflow import models
2020
import pytest
2121

22+
from . import unit_testing
23+
2224

2325
@pytest.fixture(scope='module', autouse=True)
2426
def gcs_plugin():
@@ -35,7 +37,7 @@ def gcs_plugin():
3537
sys.path.remove(plugins_dir)
3638

3739

38-
def test_dag_import():
40+
def test_dag():
3941
"""Test that the DAG file can be successfully imported.
4042
4143
This tests that the DAG can be parsed, but does not run it in an Airflow
@@ -48,4 +50,5 @@ def test_dag_import():
4850
models.Variable.set('table_list_file_path', example_file_path)
4951
models.Variable.set('gcs_source_bucket', 'example-project')
5052
models.Variable.set('gcs_dest_bucket', 'us-central1-f')
51-
from . import bq_copy_across_locations # noqa
53+
from . import bq_copy_across_locations as module
54+
unit_testing.assert_has_valid_dag(module)

composer/workflows/bq_notify_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
from airflow import models
1616

17+
from . import unit_testing
18+
1719

1820
def test_dag_import():
1921
"""Test that the DAG file can be successfully imported.
@@ -26,4 +28,5 @@ def test_dag_import():
2628
models.Variable.set('gcp_project', 'example-project')
2729
models.Variable.set('gce_zone', 'us-central1-f')
2830
models.Variable.set('email', '[email protected]')
29-
from . import bq_notify # noqa
31+
from . import bq_notify as module
32+
unit_testing.assert_has_valid_dag(module)

composer/workflows/connections_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from . import unit_testing
16+
1517

1618
def test_dag_import():
1719
"""Test that the DAG file can be successfully imported.
@@ -20,4 +22,5 @@ def test_dag_import():
2022
environment. This is a recommended sanity check by the official Airflow
2123
docs: https://airflow.incubator.apache.org/tutorial.html#testing
2224
"""
23-
from . import connections # noqa
25+
from . import connections as module
26+
unit_testing.assert_has_valid_dag(module)

composer/workflows/kubernetes_pod_operator_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from . import unit_testing
16+
1517

1618
def test_dag_import():
1719
"""Test that the DAG file can be successfully imported.
@@ -20,4 +22,5 @@ def test_dag_import():
2022
environment. This is a recommended sanity check by the official Airflow
2123
docs: https://airflow.incubator.apache.org/tutorial.html#testing
2224
"""
23-
from . import kubernetes_pod_operator # noqa
25+
from . import kubernetes_pod_operator as module
26+
unit_testing.assert_has_valid_dag(module)

composer/workflows/quickstart_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
from airflow import models
1616

17+
from . import unit_testing
18+
1719

1820
def test_dag_import():
1921
"""Test that the DAG file can be successfully imported.
@@ -25,4 +27,5 @@ def test_dag_import():
2527
models.Variable.set('gcs_bucket', 'example_bucket')
2628
models.Variable.set('gcp_project', 'example-project')
2729
models.Variable.set('gce_zone', 'us-central1-f')
28-
from . import quickstart # noqa
30+
from . import quickstart as module
31+
unit_testing.assert_has_valid_dag(module)

composer/workflows/simple_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from . import unit_testing
16+
1517

1618
def test_dag_import():
1719
"""Test that the DAG file can be successfully imported.
@@ -20,4 +22,5 @@ def test_dag_import():
2022
environment. This is a recommended sanity check by the official Airflow
2123
docs: https://airflow.incubator.apache.org/tutorial.html#testing
2224
"""
23-
from . import simple # noqa
25+
from . import simple as module
26+
unit_testing.assert_has_valid_dag(module)

composer/workflows/trigger_response_dag_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from . import unit_testing
16+
1517

1618
def test_dag_import():
1719
"""Test that the DAG file can be successfully imported.
@@ -20,4 +22,5 @@ def test_dag_import():
2022
environment. This is a recommended sanity check by the official Airflow
2123
docs: https://airflow.incubator.apache.org/tutorial.html#testing
2224
"""
23-
from . import trigger_response_dag # noqa
25+
from . import trigger_response_dag as module
26+
unit_testing.assert_has_valid_dag(module)

composer/workflows/unit_testing.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Utilities for unit testing DAGs."""
16+
17+
# [START composer_dag_unit_testing]
18+
from airflow import models
19+
20+
21+
def assert_has_valid_dag(module):
22+
"""Assert that a module contains a valid DAG."""
23+
24+
no_dag_found = True
25+
26+
for dag in vars(module).values():
27+
if isinstance(dag, models.DAG):
28+
no_dag_found = False
29+
dag.test_cycle() # Throws if a task cycle is found.
30+
31+
if no_dag_found:
32+
raise AssertionError('module does not contain a valid DAG')
33+
# [END composer_dag_unit_testing]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""An example DAG demonstrating a cyle in the task IDs."""
16+
17+
import datetime
18+
19+
from airflow import models
20+
from airflow.operators import dummy_operator
21+
22+
23+
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
24+
25+
default_dag_args = {
26+
'start_date': yesterday,
27+
}
28+
29+
with models.DAG(
30+
'composer_sample_cycle',
31+
schedule_interval=datetime.timedelta(days=1),
32+
default_args=default_dag_args) as dag:
33+
start = dummy_operator.DummyOperator(task_id='oops_a_cycle')
34+
end = dummy_operator.DummyOperator(task_id='oops_a_cycle')
35+
start >> end

0 commit comments

Comments
 (0)