from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.email import EmailOperator
with DAG(
dag_id = "dags_python_email_xcom",
schedule = "0 8 1 * *",
start_date = pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
catchup = False
) as dag:
@task(task_id = 'something_task')
def some_logic(**kwargs):
from random import choice
return choice(['성공', '실패'])
send_email = EmailOperator(
task_id = 'send_email',
to = 'email_id@gmail.com',
subject = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
{{ ti.xcom_pull(task_ids="something_task") }} 했습니다. <br>'
)
some_logic() >> send_email
만약, 이메일 발송 에러가 발생한다면,
airflow 의 docker-compose.yaml 파일을 체크해봐야 함.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
--------------------------------------------------------------------------------------
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_USER: 'email-id@gmail.com'
AIRFLOW__SMTP__SMTP_PASSWORD: '계정에서 발급한 비밀번호'
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: ' email-id@gmail.com'
--------------------------------------------------------------------------------------
'Airflow' 카테고리의 다른 글
전역 공유변수 Variable (0) | 2024.04.27 |
---|---|
S5-ch0601. python 오퍼레이터 with Xcom (0) | 2024.03.04 |
S4-ch0506. Python 오퍼레이터 with macro (0) | 2024.03.04 |
S3-ch0402. 외부 파이썬 함수 수행하기 (.env 설정) (0) | 2024.03.01 |
S4-ch0505. Bahs 오퍼레이터 with macro (0) | 2024.02.29 |