DAG1:
import airflow.utils.dates
import airflow.utils.helpers
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.models.variable import Variable
from airflow.utils.state import State
from datetime import datetime, timedelta
import dateutil.parser
dag = DAG(
dag_id="dag1",
default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="@once",
catchup=False
)
def trigger_dag_with_context(context, dag_run_obj):
dag_run_obj.payload = {'job': context['params']['job']}
return dag_run_obj
l1 = []
l2 = []
i = 0
for job in ["job1", "job2"]:
trigger = TriggerDagRunOperator(
task_id= job + "_dag",
trigger_dag_id="dag2",
python_callable=trigger_dag_with_context,
params={"job":job},
dag=dag,
provide_context=True,
execution_date= '{{execution_date.replace(second=0, microsecond=0) + macros.timedelta(minutes='+ str(i) +')}}'
)
external_task = ExternalTaskSensor(
task_id= job + '_sensor',
external_dag_id='dag2',
external_task_id = None,
dag=dag,
allowed_states=['success'],
execution_delta=timedelta(minutes=-i),
poke_interval=1
)
i = i + 1
l1.append(trigger)
l2.append(external_task)
zip_ops = list(zip(l1, l2))
flat_ops = [op for tuple in zip_ops for op in tuple]
airflow.utils.helpers.chain(*flat_ops)
DAG2
import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from helper import *
with DAG(
dag_id="dag2",
default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"},
schedule_interval=None,
catchup=False
) as dag:
def run_this_func(**kwargs):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param context: The execution context
:type context: dict
"""
print(kwargs)
print(kwargs["dag_run"].conf)
run_this = PythonOperator(
task_id="run_this",
python_callable=run_this_func,
provide_context=True,
dag=dag
)
bash_this = BashOperator(
task_id="bash_this",
bash_command='echo "Bash Done"'
)
run_this >> bash_this
Comments
Post a Comment