SQL Joins

Image
Joins are used to combine rows from two or more tables based on related columns between them. Joins allow you to retrieve data from multiple tables simultaneously, enabling you to create complex queries that fetch data from different sources. There are different types of joins in SQL, including: INNER JOIN Returns only the rows that have matching values in both tables based on the specified join condition. It discards non-matching rows from both tables. Example:           create table t1(x int); insert into t1 values(1); insert into t1 values(1); insert into t1 values(0); create table t2(y int); insert into t2 values(0); insert into t2 values(1); insert into t2 values(1);           select * from t1 inner join t2 on t1.x = t2.y Output: 2. LEFT JOIN (or) LEFT OUTER JOIN Returns all the rows from the left (or first) table and the matching rows from the rig...

Airflow dynamic dag creation and chain sequentially



DAG1:

import airflow.utils.dates
import airflow.utils.helpers
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator

from airflow.operators.sensors import ExternalTaskSensor

from airflow.models.variable import Variable
from airflow.utils.state import State

from datetime import datetime, timedelta
import dateutil.parser

dag = DAG(
    dag_id="dag1",
    default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
    schedule_interval="@once", 
    catchup=False
)

def trigger_dag_with_context(context, dag_run_obj):   
    dag_run_obj.payload = {'job': context['params']['job']}
    return dag_run_obj

l1 = []
l2 = []
i = 0
for job in ["job1", "job2"]: 
    trigger = TriggerDagRunOperator(
        task_id= job + "_dag",
        trigger_dag_id="dag2",
        python_callable=trigger_dag_with_context,
        params={"job":job},     
        dag=dag,
        provide_context=True,
        execution_date= '{{execution_date.replace(second=0, microsecond=0) +  macros.timedelta(minutes='+ str(i) +')}}'
    )

    external_task = ExternalTaskSensor(
        task_id= job + '_sensor',
        external_dag_id='dag2',     
        external_task_id = None,
        dag=dag,     
        allowed_states=['success'],       
        execution_delta=timedelta(minutes=-i),     
        poke_interval=1           
    )

    i = i + 1
    l1.append(trigger)
    l2.append(external_task)

zip_ops = list(zip(l1, l2))
flat_ops = [op for tuple in zip_ops for op in tuple]

airflow.utils.helpers.chain(*flat_ops)




DAG2

import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

from helper import *

with DAG(
    dag_id="dag2",
    default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"},
    schedule_interval=None, 
    catchup=False
    ) as dag:

    def run_this_func(**kwargs):
        """
        Print the payload "message" passed to the DagRun conf attribute.
        :param context: The execution context
        :type context: dict
        """ 
        print(kwargs) 
        print(kwargs["dag_run"].conf)
     
     
    run_this = PythonOperator(
        task_id="run_this",
        python_callable=run_this_func,
        provide_context=True,
        dag=dag
    )

    bash_this = BashOperator(
        task_id="bash_this",
        bash_command='echo "Bash Done"'
    )


    run_this >> bash_this

Comments

Popular posts from this blog

Machine Learning Foundations - Deep Learning Summary - Quiz

Machine Learning Foundation - Deep Learning - Programming Assignment

SQL Access WebApi with Basic Authentication