1. Xcom이란?

Xcom(Cross Communication)

Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술

ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우

 

주로 작은 규몬의 데이터 공유를 위해 사용

(Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)

1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 사용)

 

2. Python 오퍼레이터에서 Xcom사용하기

크게 두 가지 방법으로 Xcom 사용 가능

1) **kwargs에 존재하는 ti(task_instance) 객체 활용

template변수에서 task_instance라는 객체를 얻을 수 있으며

task_instance객체가 가진 xcom_push메서드를 활용 할 수 있음.

 

2) 파이썬 함수의  return값 활용 (1안) / (2안)

 

요약

Xcom push 방법

1. ti.xcom_push 명시적 사용

2. 함수 return

 

Xcom pull 방법

ti.xcom_pull 명시적 사용

return 값을 input으로 사용

 

소스코드

dags_python_with_xcom_eg1.py

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)

    xcom_push1() >> xcom_push2() >> xcom_pull()

 

dags_python_with_xcom_eg2.py

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'
    
    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 멘서드로 직접 찾은 리턴 값: '+ value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값: '+status)

    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()

    #1.xcom_push_result > xcom_pull_2 > xcom_pull_1 순서로 실행됨.

+ Recent posts