Airflow Dags
Airflow Contents
What is DAG
Directed Acyclic Graph (DAG)
- Directed - If multiple tasks exist, each must have at least one defined upstream (previous) or downstream (subsequent) tasks, although they could easily have both.
- Acyclic - No task can create data that goes on to reference itself. This could cause an infinite loop that would be, um, it’d be bad. Don’t do that.
- Graph - All tasks are laid out in a clear structure with discrete processes occurring at set points and clear relationships made to other tasks.
Learning from an exmaple
Step 1. Import required library
from airflow import DAG
Step 2. Define our default arguments with dicttionary (optional)
default_args = {
'owner': 'arthur',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'bash_queue',
'pool': 'backfill',
'priority_weight': 10,
'end_date': datetime(2016, 1, 1),
'wait_for_downstream': False,
'sla': timedelta(hours=2),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': some_function,
'on_success_callback': some_other_function,
'on_retry_callback': another_function,
'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'
}
Keys explanation:
owner
(String): who owns this DAG, the name will appear in the list of dags web page
depends_on_past
(Boolean): It is for to check whether to run a task or not depending of its previous DAG run(last run).
email_on_failure
(Boolean): If failed any task send a notification by email.
email_on_retry
(Boolean): If occurs retrying any task, send a notification by email.
retries
(Int): The number of retry if any task in DAG is failed
retry_delay
(timedelta Obj.): The interval between retries
queue
: Scheduler sent task to executor to run on the queue.
pool
: it can be used tolimit the execution parallelism
on arbitrary sets of tasks
priority_weight
(Int): The priority of the editing DAG
end_date
(datetime Obj.): The deadline of the task
wait_for_downstream
(Boolean): if you setTrue
The task instances directly upstream from the task need to be in asucess
state.
sla
(timedelta Obj): Servuce Level Agreements(SLA), or time by which a task or DAG should have succeeded, can be set at a task level as atimedelta
If one or many instances have not succeeded by that time, an alert email is sent detailing the list of tasks that missed their SLA.
execution_timeout
(timedelta Obj.): Max time allowed for the execution of this task instance, if it goes beyond it will raise and fail.
on_failure_callback
(callable): A function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
on_success_callback
(callable): much like theon_failure_callback
except that it is executed when success occur.
on_retry_callback
(callable): much like theon_failure_callback
except that it is executed when retries occur.
sla_miss_callback
(callable)
trigger_rule
(String): All operators have atrigger_rule
argument which defines the rule by which the generated task get triggered. The default value fortrigger_rule
isall_successand
can be defined as “trigger this task when all directly upstream tasks have succeeded”. All other rules described here are based on direct parent tasks and are values that can be passed to any operator while creating tasks:
Step 3. Create a DAG Object
dag = DAG(
dag_id='tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
)
Keys explanation:
dag_id (String): set up your dag id which will appear in web page of DAGs.
default_args (Dictionary): The dictionary we just define in previous step.
description (String): description of this DAG
schedule_interval (timedelta or String): the scheduler interval here, there are two main options to set up. I will highly recommand to use the following option here
start_date (datetime obj.): Set up the initial/starting date.
tags (Array): List of tags which will also show in web page of DAGs.
comments powered by Disqus