1. 어떤 파라미터가 Template 변수를 지원할까?

파라미터

python_callable(Callable | None)

op_kwargs

op_args

templates_dict

templates_exts

show_return_value_in_logs

참조자료 : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html

 

그러나,  python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까?

날짜 연산을 DAG안에서 직접 할 수 있다면?

아래 @task(task_id='task_using_macros) 구문

 

실습코드

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(
        task_id='task_using_macros',
        templates_dict={
              'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul")+macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds}}',
              'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1)+macros.dateutil.relativedelta.relativedelta(days=-1)) | ds}}'
        }
    )
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict :
            start_date = templates_dict.get('start_date') or 'start_date없음'
            end_date = templates_dict.get('end_date') or 'end_date없음'
            print(start_date)
            print(end_date)


    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta # 스케줄러 부하 경감을 이유로 여기다가 작성함.
        data_interval_end = kwargs['data_interval_end']

        prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul')+relativedelta(months=-1, day=1)
        prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1)+relativedelta(days=-1)
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))

    get_datetime_macro() >> get_datetime_calc()

 

+ Recent posts