from airflow import DAG
import pendulum
from airflow.decorators import task

from airflow.operators.email import EmailOperator

with DAG(
    dag_id = "dags_python_email_xcom",
    schedule = "0 8 1 * *",
    start_date = pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
    catchup = False
) as dag:

    @task(task_id = 'something_task')
    def some_logic(**kwargs):
        from random import choice
        return choice(['성공', '실패'])
    
    send_email = EmailOperator(
        task_id = 'send_email',
        to = 'email_id@gmail.com',       
        subject = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
        html_content = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
            {{ ti.xcom_pull(task_ids="something_task") }} 했습니다. <br>'
    )

    some_logic() >> send_email

 

만약, 이메일 발송 에러가 발생한다면,

airflow 의 docker-compose.yaml 파일을 체크해봐야 함.

 

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

--------------------------------------------------------------------------------------

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_USER: 'email-id@gmail.com'
AIRFLOW__SMTP__SMTP_PASSWORD: '계정에서 발급한 비밀번호'
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: ' email-id@gmail.com'

--------------------------------------------------------------------------------------

+ Recent posts