1. python 오퍼레이터에서 Template 변수 사용

파라미터

python_callable

op_kwargs

op_args

templates_dict

templates_exts

show_return_value_in _logs

 

dags_python_template.py

import pendulum
import datetime

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id="dags_python_template",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023,3,10, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    def python_function1(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)
    
    python_t1 = PythonOperator(
        task_id='python_t1',
        python_callable=python_function1,
        op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
    )

    @task(task_id='python_t2')
    def python_function2(**kwargs):
        print(kwargs)
        print('ds:'+kwargs['ds'])
        print('ts:'+str(kwargs['ts']))
        print('data_interval_start:' + str(kwargs['data_interval_start']))
        print('data_interval_end:' + str(kwargs['data_interval_end']))
        print('task_instance:' + str(kwargs['ti']))

    python_t1 >> python_function2()

결과:

python_t1

[2024-02-28, 08:35:26 UTC] {logging_mixin.py:188} INFO - 2024-02-27

[2024-02-28, 08:35:26 UTC] {logging_mixin.py:188} INFO - 2024-02-28

 

python_function2()

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7f51bb03d610>, 'dag': <DAG: dags_python_template>, 'dag_run': <DagRun dags_python_template @ 2024-02-27 00:30:00+00:00: scheduled__2024-02-27T00:30:00+00:00, state:running, queued_at: 2024-02-28 08:40:41.821461+00:00. externally triggered: False>, 'data_interval_end': DateTime(2024, 2, 28, 0, 30, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')), 'ds': '2024-02-27', 'ds_nodash': '20240227', 'execution_date': <Proxy at 0x7f51a323e780 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'execution_date', DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/.local/lib/python3.8/site-packages/***/macros/__init__.py'>, 'next_ds': <Proxy at 0x7f51b06683c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_ds', '2024-02-28')>, 'next_ds_nodash': <Proxy at 0x7f51b0668400 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_ds_nodash', '20240228')>, 'next_execution_date': <Proxy at 0x7f51b0668440 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_execution_date', DateTime(2024, 2, 28, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x7f51b0668480 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_ds', '2024-02-26')>, 'prev_ds_nodash': <Proxy at 0x7f51b06684c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_ds_nodash', '20240226')>, 'prev_execution_date': <Proxy at 0x7f51b0668500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_execution_date', DateTime(2024, 2, 26, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': <Proxy at 0x7f51b0668540 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'prev_end_date_success': None, 'run_id': 'scheduled__2024-02-27T00:30:00+00:00', 'task': <Task(_PythonDecoratedOperator): python_t2>, 'task_instance': <TaskInstance: dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>, 'task_instance_key_str': 'dags_python_template__python_t2__20240227', 'test_mode': False, 'ti': <TaskInstance: dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x7f51b0668580 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'tomorrow_ds', '2024-02-28')>, 'tomorrow_ds_nodash': <Proxy at 0x7f51b06685c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'tomorrow_ds_nodash', '20240228')>, 'triggering_dataset_events': <Proxy at 0x7f5194bd9940 with factory <function _get_template_context.<locals>.get_triggering_events at 0x7f5194bd2280>>, 'ts': '2024-02-27T00:30:00+00:00', 'ts_nodash': '20240227T003000', 'ts_nodash_with_tz': '20240227T003000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x7f51b0668600 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'yesterday_ds', '2024-02-26')>, 'yesterday_ds_nodash': <Proxy at 0x7f51b0668640 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'yesterday_ds_nodash', '20240226')>, 'templates_dict': None}

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - ds:2024-02-27

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - ts:2024-02-27T00:30:00+00:00

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - data_interval_start:2024-02-27 00:30:00+00:00

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - data_interval_end:2024-02-28 00:30:00+00:00

[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - task_instance:<TaskInstance:dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>

+ Recent posts