1. Xcom이란?
Xcom(Cross Communication)
Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
주로 작은 규몬의 데이터 공유를 위해 사용
(Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)
1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 사용)
2. Python 오퍼레이터에서 Xcom사용하기
크게 두 가지 방법으로 Xcom 사용 가능
1) **kwargs에 존재하는 ti(task_instance) 객체 활용
template변수에서 task_instance라는 객체를 얻을 수 있으며
task_instance객체가 가진 xcom_push메서드를 활용 할 수 있음.
2) 파이썬 함수의 return값 활용 (1안) / (2안)
요약
Xcom push 방법
1. ti.xcom_push 명시적 사용
2. 함수 return
Xcom pull 방법
ti.xcom_pull 명시적 사용
return 값을 input으로 사용
소스코드
dags_python_with_xcom_eg1.py
import pendulum
from airflow.models.dag import DAG
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1,2,3,4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
dags_python_with_xcom_eg2.py
import pendulum
from airflow.models.dag import DAG
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('xcom_pull 멘서드로 직접 찾은 리턴 값: '+ value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값: '+status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()
#1.xcom_push_result > xcom_pull_2 > xcom_pull_1 순서로 실행됨.
'Airflow' 카테고리의 다른 글
전역 공유변수 Variable (0) | 2024.04.27 |
---|---|
Python 과 email operator xcom (0) | 2024.04.27 |
S4-ch0506. Python 오퍼레이터 with macro (0) | 2024.03.04 |
S3-ch0402. 외부 파이썬 함수 수행하기 (.env 설정) (0) | 2024.03.01 |
S4-ch0505. Bahs 오퍼레이터 with macro (0) | 2024.02.29 |