5.1 기본 의존성 유형
5.1.1 선형 의존성 유형
- 명시적으로 테스크 의존성을 지정
- 의존성 충족된 뒤 다음 태스크 실행
- 다운스트림 태스크로 전달
5.1.2 팬인/팬아웃 의존성
- 두가지 소스에서 데이터가 정제되어 결합해야하는 테스크에서
- DummyOperator 사용
from airflow.operators.dummy import DummyOperator
- DummyOperator 사용
start = DummyOperator(task_id='start')
start >> [fetch_weather, fetch_sales]
[clean_sales, clean_weather] >> join_datasets
join_datasets >> train_model >> deploy_model
![](https://velog.velcdn.com/images/annmunju/post/3b4e1b3f-c4f0-4a16-96d2-406eb49e1270/image.png)
## 5.2 브랜치하기
### 5.2.1 태스크 내에서 브랜치하기
- 시스템에서 케이스별 서로 다른 태스크 세트를 실행할 때
- if - else로 분기? -> 더 좋은 방법 있다
### 5.2.2 DAG 내부에서 브랜치하기
- DAG 내에서 둘 중 하나만 실행하도록 하기
- 트리거 규칙에 의해 태스트 실행 시기 제어 (trigger_rule)
```python
with DAG(
dag_id="03_branch_dag",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
) as dag:
start = DummyOperator(task_id="start")
...
fetch_weather = DummyOperator(task_id="fetch_weather")
clean_weather = DummyOperator(task_id="clean_weather")
# Using the wrong trigger rule ("all_success") results in tasks being skipped downstream.
# join_datasets = DummyOperator(task_id="join_datasets")
# 실패 없이 실행되면 실행하라는 트리거 추가됨
join_datasets = DummyOperator(task_id="join_datasets", trigger_rule="none_failed")
train_model = DummyOperator(task_id="train_model")
deploy_model = DummyOperator(task_id="deploy_model")
start >> [pick_erp_system, fetch_weather]
pick_erp_system >> [fetch_sales_old, fetch_sales_new]
fetch_sales_old >> clean_sales_old
fetch_sales_new >> clean_sales_new
fetch_weather >> clean_weather
[clean_sales_old, clean_sales_new, clean_weather] >> join_datasets
join_datasets >> train_model >> deploy_model
5.3 조건부 태스크
- 조건에 따라 특정 태스크 건너뛰기
5.3.1 태스크 내에서 조건
- if else 분기? -> 내부에서 제어되어 파악 어려움
5.3.2 조건부 태스크 만들기
- 최근 실행이 아니라면 다운 스트림을 건너뛰도록 함수 작성, AirflowSkipException 함수 실행
from airflow.exceptions import AirflowSkipException
def _latest_only(**context):
now = pendulum.now("UTC")
left_window = context["dag"].following_schedule(context["execution_date"])
right_window = context["dag"].following_schedule(left_window)
if not left_window < now <= right_window:
raise AirflowSkipException()
### 5.3.3 내장 오퍼레이터 사용하기
- 가장 최근에 실행한 DAG만 실행하도록 구현
```python
from airflow.operators.latest_only import LatestOnlyOperator
...
latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
...
join_datasets >> train_model >> deploy_model
latest_only >> deploy_model
5.4 트리거 규칙에 대한 추가 정보
5.4.1 트리거 규칙이란?
- 태스크 의존성 기능과 같이 실행 준비가 되었는지 여부를 결정하기 위한 필수 조건
- 시작할 때 태스크가 필요없는 start 실행하는 경우도 있음
- 의존성이 없는 시작 태스크
5.4.2 실패의 영향
- 의존성 태스크들이 건너뛰거나 실패하는 경우 전파됨. 업스트림 실행 못하면 다운스트림 실행 안됨
5.4.3 기타 트리거 규칙
- 5.2절에서 실패해도 다운스트림 태스크 실행하도록 브랜치 결합하는 예를 봤음. 건너뛰는 경우
- none_failed 트리거 규칙은 모든 업스트림 태스크에 실패 없이 완료되었는지 여부만 확인. 두 브랜치 결합하기 적합
- 다른 여러 기능도 있음
)
5.5 태스크 간 데이터 공유
5.5.1 XCom 사용해 데이터 공유하기
- 인메모리에서 변수 공유하는 방식
- 예시. 태스크 사이에 model_id 공유하기
def _train_model(**context): model_id = str(uuid.uuid4()) context["task_instance"].xcom_push(key="model_id", value=model_id)
...
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
},
)
### 5.5.2 XCom 사용 시 고려사항
- 묵시적 의존성 필요 (명시적으로 드러나지 않으므로 올바르게 실행됨을 주의)
- 오퍼레이터의 원자성 무너질 수 있음
- API 접근 토큰 만료 문제 발생시 재실행 어려움 등
- XCom이 저장하는 모든 값은 직렬화를 지원해야함.
- 다중 멀티 프로세스, 람다 등은 저장 불가.
- 저장 크기 제한될 수 있음
### 5.5.3 커스텀 XCom 백엔드 사용하기
- BaseXCom 클래스 상속받아 직렬화 -> 역직렬화 두가지 메소드 구현해서 직접 만들어볼 수 있음.
## 5.6 Taskflow API로 파이썬 태스크 연결하기
### 5.6.1 파이썬 태스크 단순화하기
- 데코레이터 기반으로 정의하는 단순화
```python
import uuid
import airflow
from airflow import DAG
from airflow.decorators import task # 데코레이터
with DAG(
dag_id="12_taskflow",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
) as dag:
@task
def train_model():
model_id = str(uuid.uuid4())
return model_id
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id}")
model_id = train_model() # @task로 인수로 전달받도록 함
deploy_model(model_id)
- 두 태스크가 마치 하나의 태스크처럼 작성되었으나 DAG에서는 분리되어 작성됨. (의존성 있는 태스크)
5.6.2 Taskflow API를 사용하지 않는 경우
- 파이썬 태스크로 제한됨. 다른 태스크는 일반 API(XCom)로 의존성을 정의해야 함.
728x90
'데이터 어쩌구 > 기술 써보기' 카테고리의 다른 글
C4. Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 (0) | 2024.06.02 |
---|---|
C3. Airflow의 스케줄링 (0) | 2024.06.02 |
C2. Airflow DAG의 구조 (0) | 2024.06.02 |
C1. Apach Airflow 살펴보기 (0) | 2024.06.02 |
[4주차] ReAct Agent 만들기 (1) | 2024.04.02 |