ref. https://www.inflearn.com/course/lecture?courseSlug=airflow-%EB%A7%88%EC%8A%A4%ED%84%B0-%ED%81%B4%EB%9E%98%EC%8A%A4&unitId=173873

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.models import Variable

with DAG(
    dag_id="dags_bash_with_variable",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 2, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    
    var_value = Variable.get("sample_key")
    bash_var_1 = BashOperator(
        task_id = "bash_var_1",
        bash_command = f"echo variable:{var_value}"
    )

    bash_var_2 = BashOperator(
        task_id = "bash_var_2",
        bash_command= "echo variable:{{var.value.sample_key}}"
    )

 

전역변수를 Admin에서 설정을 할 수 있다.

 

전역변수 사용하기는 2가지 방법이 있는데

2안을 권고한다. 이유는 1안은 스케줄러가 많아지면 부하가 발생 할 수 있다.

from airflow import DAG
import pendulum
from airflow.decorators import task

from airflow.operators.email import EmailOperator

with DAG(
    dag_id = "dags_python_email_xcom",
    schedule = "0 8 1 * *",
    start_date = pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
    catchup = False
) as dag:

    @task(task_id = 'something_task')
    def some_logic(**kwargs):
        from random import choice
        return choice(['성공', '실패'])
    
    send_email = EmailOperator(
        task_id = 'send_email',
        to = 'email_id@gmail.com',       
        subject = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
        html_content = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
            {{ ti.xcom_pull(task_ids="something_task") }} 했습니다. <br>'
    )

    some_logic() >> send_email

 

만약, 이메일 발송 에러가 발생한다면,

airflow 의 docker-compose.yaml 파일을 체크해봐야 함.

 

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

--------------------------------------------------------------------------------------

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_USER: 'email-id@gmail.com'
AIRFLOW__SMTP__SMTP_PASSWORD: '계정에서 발급한 비밀번호'
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: ' email-id@gmail.com'

--------------------------------------------------------------------------------------

문제1유형
#
기초 : 데이터마님 (https://www.datamanim.com)
데이터 분석 시험
▶1.빅데이터 분석기사 실기(PYTHON) ▶1.2 데이터 전처리 100문제
#
연습 : 1.6 기출문제 및 모의고사 기출변형(2~6) 1유형만 풀면서 이해하기.
#마무리 : 인프런강의 섹션12~섹션16 기출문제 1유형만 실전처럼 혼자 풀고. 해설 듣고 이해하기. <중요>

문제2유형
#
연습 : 작업시나리오 외우고, 회귀/분류모델을 쓸지만 구분할 줄 알면 됨.
#
마무리 : 인프런강의 섹션12~섹션16 기출문제 2유형만 실전처럼 혼자 풀고. 해설 듣고 이해하기. <중요>

문제3유형
#
기초 : 섹션 7, 8, 9 학습
#
연습 :
1.6 기출문제 및 모의고사 기출변형(2~6) 1유형만 풀면서 이해하기.
#
마무리 : 섹션16 기출 3유형 머리 속에 집어넣기


문제2유형 작업시나리오
*
빅분기 실기 유형2 코드 작성 순서 (유튜브채널: 코딩 대한민국)
제목: 시험의 당락을 좌우하는 유형 2번 답코드가 저절로 암기 되는 영상
https://youtu.be/fQf_oOkV_SY?si=v7I6LYFqfnUtEIOo

import pandas as pd

train = pd.read_csv('https://raw.githubusercontent.com/Datamanim/datarepo/main/audit/x_train.csv')
y_train = pd.read_csv('https://raw.githubusercontent.com/Datamanim/datarepo/main/audit/y_train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/Datamanim/datarepo/main/audit/x_test.csv')
y_test = pd.read_csv('https://raw.githubusercontent.com/Datamanim/datarepo/main/audit/y_test.csv')

# print(train.isnull().sum())
# print(test.isnull().sum())
# print(y_train.isnull().sum())
# print(y_test.isnull().sum())
# print(train.info())
# print(test.info())
# print(y_train.info())
# print(train.isnull().sum())
# print(test.isnull().sum())

# 널처리를 drop 하는데,
# train 과 y_train 행 수가 틀릴 수 있으니
# 우선 train과 y_train을 열로 합친 후 결측치(null)을 제거
train = pd.concat([train,y_train['Risk']], axis=1)
test = pd.concat([test,y_test['Risk']], axis=1)
train = train.dropna()
test = test.dropna()
test_id = test['ID'].copy()

# 결측치제거한 train에서 y_train을 분리.
y_train = train[['ID','Risk']].copy()
y_test = test[['ID','Risk']].copy()

# y_train 컬럼을 train 데이터셋에서 삭제
train = train.drop(['Risk'], axis=1)
test = test.drop(['Risk'], axis=1)

# 스케일링을 위한 컬럼구분
ocols = train.select_dtypes(include='object').columns.tolist()
ncols = train.select_dtypes(exclude='object').columns.tolist()

# print(train.describe(include='object'))
# print("="*100)
# print(test.describe(include='object'))
# print(allt.info())

LOCATION_ID
===============
트레인
===============
count 619
unique 42
top 8
freq 68
===============
테스트
===============
count 155
unique 34
top 8
freq 17

# train과 test의 object에서 unique 값이 서로 다르므로, train과 test값을 합침.
allt = pd.concat([train,test],axis=0)
oallt = allt[ocols].copy()
oallt = pd.get_dummies(oallt)

# 원핫인코딩 값을 다시 train 과 test로 나눔
otrain = oallt.iloc[:train.shape[0],:]
otest = oallt.iloc[train.shape[0]:,:]
# print(type(otrain))
ntrain = train[ncols].copy()
ntest = test[ncols].copy()

from sklearn.preprocessing import MinMaxScaler
mm = MinMaxScaler()
ntrain[ncols] = mm.fit_transform(ntrain[ncols])
ntest[ncols] = mm.transform(ntest[ncols])

# 스케일링이 모두 마친 데이터셋을 다시 하나로 합침.

train = pd.concat([otrain,ntrain],axis=1)
test = pd.concat([otest,ntest],axis=1)
# print(y_test['Risk'].value_counts())

# 분류예측
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier()
model.fit(train, y_train['Risk'])
pred = model.predict(test)
# print(pred)

ans = pd.DataFrame({
"ID": test_id,
"pred": pred
})

print(ans)

3유형 검정 및 분석 키워드 정리
단일표본 t 검정

from scipy import stats

stats.ttest_1samp ( 집단, 비교값, alternative )

stats.wilcoxon( 집단 - 비교값, alternative )

대응표본(쌍체) paired t 검정

stats.ttest_rel( , , alternative )

stats.wilcoxon( diff, alternative )

독립표본 t 검정

stats.ttest_ind( 집단1, 집단2, equal_var = True, alternative ) # 등분산( levene ) 검정, False

정규성X

stats.ranksums( 집단1, 집단2, alternative ) # 윌콕슨 순위합 검정

stats.mannwhitneyu( 집단1, 집단2, alternative ) # 만휘트니유 검정


분산분석 ANOVA : 3개이상 집단의 평균이 궁금?

일원 분산분석

stats.f_oneway( 집단1, 집단2, 집단3 )

stats.kruskal( 집단1, 집단2, 집단3 )

이원 분산분석

df_melt = df.melt()

from statsmodels.formula import api

model = api.ols( '종속 ~ C(독립1) + C(독립2) + C(독립1):C(독립2)', data=df_melt).fit()

# C로 감싼건 범주형데이터라서.

model = api.ols( '종속 ~ C(독립1) * C(독립2)', data=df ).fit()

from statsmodels.stats import anova

anova.anova_lm( model )

사후검증까진.... 안함.


카이제곱

적합도 검정 : 빈도수로 값을 변경 해야함.

from scipy import stats

stats.chisquare( 관찰값, 기대값, alternative )

독립성 검정 : pd.crosstab ( index=컬럼, columns=컬럼, values=컬럼, aggfunc=sum )

from scipy import stats

stats.chi2_contingency( 크로스탭 )


상관분석

from scipy import stats

stats.pearsonr( x, y )


 

다중회귀 분석

from statsmodels.formula import api

model = api.ols( '종속 ~ 독립1 + 독립2', data = df ).fit()

model.summary()

예측, 신뢰

newdata = pd.DataFrame({ '컬럼' : [예측값] })

pred = model.get_prediction( newdata )

pred.summary_frame( alpha=0.05 ) # 95% 신뢰구간

# 신뢰구간 : mean_ci_lower ~ mean_ci_upper

# 예측구간 : obs_ci_lower ~ obs_ci_upper


로지스틱 회귀

from statsmodels.formula import api

model = api.logit( '종속 ~ C(독립1) + 독립2 + 독립3', data=df ).fit()

model.summary()

오즈비

import numpy as np

np.exp( model.params['독립변수'] )

미국 보조금 : 지원금과 대출이 혼합된 형태의 보조금 2022년 8월 발효된 반도체 지원법(CHIPS and Science Act)

인텔 총 76조 챈들러 캠퍼스에 공장 2곳 추가 건설할 계획 발표 애리조나를 미국 내 반도체 생산기지로 계획함.

* 반도체 생산 보조금 52조 (390억 달러)

* 연구.개발 지원금 14조 6천 (110억 달러)

 

TSMC 2020년 애리조나 피닉스에 생산시설 건설 발표.

 

바이든 행정부는 지난달 미 반도체 기업 글로벌 파운드리스의

뉴욕주 말타 설비투자와 버몬트주 벌링턴 생산시설 확장을 위해 약 2조 (15억 달러)의 보조금을 지급하는 계획 발표

 

 

1. Bash 오퍼레이터에서 Xcom 사용하기

Bash오퍼레이터는 env, bash_command파라미터에서 Template 이용하여 push/pull

- 마지막 출력문은 자동으로 return_value에 저장됨.

- task_ids만 지정하면 key값은 return_value를 의미한다.

 

dags_bash_with_xcom.py

import datetime
import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 2, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    
    bash_push=BashOperator(
        task_id='bash_push',
        bash_command="echo START && "
                     "echo XCOM_PUSHED "
                     "{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                     "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE' : "{{ ti.xcom_pull(key='bash_pushed') }}",
             'RETURN_VALUE' : "{{ ti.xcom_pull(task_ids='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETRUN_VALUE ",
        do_xcom_push=False
    )

    bash_push >> bash_pull

1. Xcom이란?

Xcom(Cross Communication)

Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술

ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우

 

주로 작은 규몬의 데이터 공유를 위해 사용

(Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)

1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 사용)

 

2. Python 오퍼레이터에서 Xcom사용하기

크게 두 가지 방법으로 Xcom 사용 가능

1) **kwargs에 존재하는 ti(task_instance) 객체 활용

template변수에서 task_instance라는 객체를 얻을 수 있으며

task_instance객체가 가진 xcom_push메서드를 활용 할 수 있음.

 

2) 파이썬 함수의  return값 활용 (1안) / (2안)

 

요약

Xcom push 방법

1. ti.xcom_push 명시적 사용

2. 함수 return

 

Xcom pull 방법

ti.xcom_pull 명시적 사용

return 값을 input으로 사용

 

소스코드

dags_python_with_xcom_eg1.py

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)

    xcom_push1() >> xcom_push2() >> xcom_pull()

 

dags_python_with_xcom_eg2.py

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'
    
    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 멘서드로 직접 찾은 리턴 값: '+ value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값: '+status)

    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()

    #1.xcom_push_result > xcom_pull_2 > xcom_pull_1 순서로 실행됨.

1. 어떤 파라미터가 Template 변수를 지원할까?

파라미터

python_callable(Callable | None)

op_kwargs

op_args

templates_dict

templates_exts

show_return_value_in_logs

참조자료 : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html

 

그러나,  python 오퍼레이터에서 굳이 macro를 사용할 필요가 있을까?

날짜 연산을 DAG안에서 직접 할 수 있다면?

아래 @task(task_id='task_using_macros) 구문

 

실습코드

import pendulum

from airflow.models.dag import DAG
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(
        task_id='task_using_macros',
        templates_dict={
              'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul")+macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds}}',
              'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1)+macros.dateutil.relativedelta.relativedelta(days=-1)) | ds}}'
        }
    )
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict :
            start_date = templates_dict.get('start_date') or 'start_date없음'
            end_date = templates_dict.get('end_date') or 'end_date없음'
            print(start_date)
            print(end_date)


    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta # 스케줄러 부하 경감을 이유로 여기다가 작성함.
        data_interval_end = kwargs['data_interval_end']

        prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul')+relativedelta(months=-1, day=1)
        prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1)+relativedelta(days=-1)
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))

    get_datetime_macro() >> get_datetime_calc()

 

.env 파일 설정

WORKSPACE_FOLDER=C:\vscode_workspace\airflow
PYTHONPATH=${WORKSPACE_FOLDER}/plugins

 

1. VScode 설정(왼쪽하단 톱니바퀴)

2. 설정메뉴에서 Settings 선택

3. Settings화면에서 User메뉴의 Workbench > Appearance 선택

4. Color Customizations 에서 'Edit in settings.json 선택

5. "tab.activeBackground": "#129c3c" 구문 추가 후 ctrl+S 저장.

6. 현재 탭 배경색 변경을 확인 한다.

 

1. Macro 변수의 이해

Macro변수의 필요성

DAG스케줄은 매일 말일에 도는 스케줄인데 BETWEEN값을 전월 마지막일부터 어제 날짜까지 주고 싶은데

어떻게 하지?

예를 들어 배치일이 1월 31일이면 12월 31일부터 1월 30일까지

배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정되었으면 좋겠어.등 요건사항 처리.

 

2. 파이썬 datetime+dateutil 라이브러리 이해

3. Bash 오퍼레이터에서 Macro변수 활용하기

dags_bash_with_macro_eg1.py

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import pendulum

with DAG(
    dag_id="dags_bash_with_macro_eg1",
    schedule="10 0 L * *", # 매월 말일에 실행 되는 DAG
    start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
    catchup=False    
) as dag:
    
    # Start_date : 전일 말일, END_DATE: 1일 전
    bash_task_1 = BashOperator(
        task_id='bash_task_1',
        env={'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds}}',
             'END_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE : $END_DATE"'
    )

 

dags_bash_with_macro_eg2.py

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import pendulum

with DAG(
    dag_id="dags_bash_with_macro_eg2",
    schedule="10 0 * * 6#2", # 매월 2째주 토요일 
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False    
) as dag:
    
     # Start_date : 2주전 월요일, END_DATE: 2주전 토요일
    bash_task_2 = BashOperator(
        task_id='bash_task_2',
        env={'START_DATE':  '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
             'END_DATE':    '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE : $END_DATE"'
    )

 

+ Recent posts