1. python 오퍼레이터에서 Template 변수 사용
파라미터
python_callable
op_kwargs
op_args
templates_dict
templates_exts
show_return_value_in _logs
dags_python_template.py
import pendulum
import datetime
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id="dags_python_template",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023,3,10, tz="Asia/Seoul"),
catchup=False
) as dag:
def python_function1(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_t1 = PythonOperator(
task_id='python_t1',
python_callable=python_function1,
op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
)
@task(task_id='python_t2')
def python_function2(**kwargs):
print(kwargs)
print('ds:'+kwargs['ds'])
print('ts:'+str(kwargs['ts']))
print('data_interval_start:' + str(kwargs['data_interval_start']))
print('data_interval_end:' + str(kwargs['data_interval_end']))
print('task_instance:' + str(kwargs['ti']))
python_t1 >> python_function2()
결과:
python_t1
[2024-02-28, 08:35:26 UTC] {logging_mixin.py:188} INFO - 2024-02-27
[2024-02-28, 08:35:26 UTC] {logging_mixin.py:188} INFO - 2024-02-28
python_function2()
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - {'conf': <***.configuration.AirflowConfigParser object at 0x7f51bb03d610>, 'dag': <DAG: dags_python_template>, 'dag_run': <DagRun dags_python_template @ 2024-02-27 00:30:00+00:00: scheduled__2024-02-27T00:30:00+00:00, state:running, queued_at: 2024-02-28 08:40:41.821461+00:00. externally triggered: False>, 'data_interval_end': DateTime(2024, 2, 28, 0, 30, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')), 'ds': '2024-02-27', 'ds_nodash': '20240227', 'execution_date': <Proxy at 0x7f51a323e780 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'execution_date', DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 2, 27, 0, 30, 0, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/.local/lib/python3.8/site-packages/***/macros/__init__.py'>, 'next_ds': <Proxy at 0x7f51b06683c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_ds', '2024-02-28')>, 'next_ds_nodash': <Proxy at 0x7f51b0668400 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_ds_nodash', '20240228')>, 'next_execution_date': <Proxy at 0x7f51b0668440 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'next_execution_date', DateTime(2024, 2, 28, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x7f51b0668480 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_ds', '2024-02-26')>, 'prev_ds_nodash': <Proxy at 0x7f51b06684c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_ds_nodash', '20240226')>, 'prev_execution_date': <Proxy at 0x7f51b0668500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_execution_date', DateTime(2024, 2, 26, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': <Proxy at 0x7f51b0668540 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'prev_end_date_success': None, 'run_id': 'scheduled__2024-02-27T00:30:00+00:00', 'task': <Task(_PythonDecoratedOperator): python_t2>, 'task_instance': <TaskInstance: dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>, 'task_instance_key_str': 'dags_python_template__python_t2__20240227', 'test_mode': False, 'ti': <TaskInstance: dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x7f51b0668580 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'tomorrow_ds', '2024-02-28')>, 'tomorrow_ds_nodash': <Proxy at 0x7f51b06685c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'tomorrow_ds_nodash', '20240228')>, 'triggering_dataset_events': <Proxy at 0x7f5194bd9940 with factory <function _get_template_context.<locals>.get_triggering_events at 0x7f5194bd2280>>, 'ts': '2024-02-27T00:30:00+00:00', 'ts_nodash': '20240227T003000', 'ts_nodash_with_tz': '20240227T003000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x7f51b0668600 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'yesterday_ds', '2024-02-26')>, 'yesterday_ds_nodash': <Proxy at 0x7f51b0668640 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x7f5194bc4ca0>, 'yesterday_ds_nodash', '20240226')>, 'templates_dict': None}
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - ds:2024-02-27
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - ts:2024-02-27T00:30:00+00:00
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - data_interval_start:2024-02-27 00:30:00+00:00
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - data_interval_end:2024-02-28 00:30:00+00:00
[2024-02-28, 08:40:48 UTC] {logging_mixin.py:188} INFO - task_instance:<TaskInstance:dags_python_template.python_t2 scheduled__2024-02-27T00:30:00+00:00 [running]>
'Airflow' 카테고리의 다른 글
S3-ch0402. 외부 파이썬 함수 수행하기 (.env 설정) (0) | 2024.03.01 |
---|---|
S4-ch0505. Bahs 오퍼레이터 with macro (0) | 2024.02.29 |
S4-ch0503. Airflow 날짜 (0) | 2024.02.27 |
S4-502. Bash 오퍼레이터 with Template (0) | 2024.02.27 |
S4-ch501. Jinja 템플릿 (0) | 2024.02.27 |