1. 어떤 파라미터가 Template 변수를 지원할까?
파라미터
python_callable(Callable | None)
op_kwargs
op_args
templates_dict
templates_exts
show_return_value_in_logs
참조자료 : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
그러나, python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까?
날짜 연산을 DAG안에서 직접 할 수 있다면?
아래 @task(task_id='task_using_macros) 구문
실습코드
import pendulum
from airflow.models.dag import DAG
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(
task_id='task_using_macros',
templates_dict={
'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul")+macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds}}',
'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1)+macros.dateutil.relativedelta.relativedelta(days=-1)) | ds}}'
}
)
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict :
start_date = templates_dict.get('start_date') or 'start_date없음'
end_date = templates_dict.get('end_date') or 'end_date없음'
print(start_date)
print(end_date)
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta # 스케줄러 부하 경감을 이유로 여기다가 작성함.
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul')+relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1)+relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
'Airflow' 카테고리의 다른 글
Python 과 email operator xcom (0) | 2024.04.27 |
---|---|
S5-ch0601. python 오퍼레이터 with Xcom (0) | 2024.03.04 |
S3-ch0402. 외부 파이썬 함수 수행하기 (.env 설정) (0) | 2024.03.01 |
S4-ch0505. Bahs 오퍼레이터 with macro (0) | 2024.02.29 |
S4-0504. Python 오퍼레이터 with Template (0) | 2024.02.28 |