본문 바로가기
데이터 어쩌구/기술 써보기

C5. 테스크 간 의존성 정의하기

by annmunju 2024. 6. 2.

5.1 기본 의존성 유형

5.1.1 선형 의존성 유형

  • 명시적으로 테스크 의존성을 지정
    • 의존성 충족된 뒤 다음 태스크 실행
    • 다운스트림 태스크로 전달

5.1.2 팬인/팬아웃 의존성

  • 두가지 소스에서 데이터가 정제되어 결합해야하는 테스크에서
    • DummyOperator 사용
      from airflow.operators.dummy import DummyOperator
      

start = DummyOperator(task_id='start')
start >> [fetch_weather, fetch_sales]
[clean_sales, clean_weather] >> join_datasets
join_datasets >> train_model >> deploy_model

![](https://velog.velcdn.com/images/annmunju/post/3b4e1b3f-c4f0-4a16-96d2-406eb49e1270/image.png)

## 5.2 브랜치하기

### 5.2.1 태스크 내에서 브랜치하기
- 시스템에서 케이스별 서로 다른 태스크 세트를 실행할 때
  - if - else로 분기? -> 더 좋은 방법 있다

### 5.2.2 DAG 내부에서 브랜치하기
- DAG 내에서 둘 중 하나만 실행하도록 하기
  - 트리거 규칙에 의해 태스트 실행 시기 제어 (trigger_rule)
```python
with DAG(
    dag_id="03_branch_dag",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")
    ...
    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    # Using the wrong trigger rule ("all_success") results in tasks being skipped downstream.
    # join_datasets = DummyOperator(task_id="join_datasets")

    # 실패 없이 실행되면 실행하라는 트리거 추가됨
    join_datasets = DummyOperator(task_id="join_datasets", trigger_rule="none_failed") 
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [pick_erp_system, fetch_weather]
    pick_erp_system >> [fetch_sales_old, fetch_sales_new]
    fetch_sales_old >> clean_sales_old
    fetch_sales_new >> clean_sales_new
    fetch_weather >> clean_weather
    [clean_sales_old, clean_sales_new, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

5.3 조건부 태스크

  • 조건에 따라 특정 태스크 건너뛰기

5.3.1 태스크 내에서 조건

  • if else 분기? -> 내부에서 제어되어 파악 어려움

5.3.2 조건부 태스크 만들기

  • 최근 실행이 아니라면 다운 스트림을 건너뛰도록 함수 작성, AirflowSkipException 함수 실행
    from airflow.exceptions import AirflowSkipException
    

def _latest_only(**context):
now = pendulum.now("UTC")
left_window = context["dag"].following_schedule(context["execution_date"])
right_window = context["dag"].following_schedule(left_window)

if not left_window < now <= right_window:
    raise AirflowSkipException()

### 5.3.3 내장 오퍼레이터 사용하기
- 가장 최근에 실행한 DAG만 실행하도록 구현
```python
from airflow.operators.latest_only import LatestOnlyOperator
    ...
    latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
    ...
    join_datasets >> train_model >> deploy_model
    latest_only >> deploy_model

5.4 트리거 규칙에 대한 추가 정보

5.4.1 트리거 규칙이란?

  • 태스크 의존성 기능과 같이 실행 준비가 되었는지 여부를 결정하기 위한 필수 조건
  • 시작할 때 태스크가 필요없는 start 실행하는 경우도 있음
    • 의존성이 없는 시작 태스크

5.4.2 실패의 영향

  • 의존성 태스크들이 건너뛰거나 실패하는 경우 전파됨. 업스트림 실행 못하면 다운스트림 실행 안됨

5.4.3 기타 트리거 규칙

  • 5.2절에서 실패해도 다운스트림 태스크 실행하도록 브랜치 결합하는 예를 봤음. 건너뛰는 경우
  • none_failed 트리거 규칙은 모든 업스트림 태스크에 실패 없이 완료되었는지 여부만 확인. 두 브랜치 결합하기 적합
  • 다른 여러 기능도 있음
    )

5.5 태스크 간 데이터 공유

5.5.1 XCom 사용해 데이터 공유하기

  • 인메모리에서 변수 공유하는 방식
  • 예시. 태스크 사이에 model_id 공유하기
    def _train_model(**context):
      model_id = str(uuid.uuid4())
      context["task_instance"].xcom_push(key="model_id", value=model_id)
    

...

deploy_model = PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model,
    templates_dict={
        "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
    },
)

### 5.5.2 XCom 사용 시 고려사항
- 묵시적 의존성 필요 (명시적으로 드러나지 않으므로 올바르게 실행됨을 주의)
- 오퍼레이터의 원자성 무너질 수 있음 
  - API 접근 토큰 만료 문제 발생시 재실행 어려움 등
- XCom이 저장하는 모든 값은 직렬화를 지원해야함.
  - 다중 멀티 프로세스, 람다 등은 저장 불가.
  - 저장 크기 제한될 수 있음

### 5.5.3 커스텀 XCom 백엔드 사용하기
- BaseXCom 클래스 상속받아 직렬화 -> 역직렬화 두가지 메소드 구현해서 직접 만들어볼 수 있음.

## 5.6 Taskflow API로 파이썬 태스크 연결하기

### 5.6.1 파이썬 태스크 단순화하기
- 데코레이터 기반으로 정의하는 단순화
```python
import uuid

import airflow

from airflow import DAG
from airflow.decorators import task # 데코레이터


with DAG(
    dag_id="12_taskflow",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:

    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")

    model_id = train_model() # @task로 인수로 전달받도록 함
    deploy_model(model_id)
  • 두 태스크가 마치 하나의 태스크처럼 작성되었으나 DAG에서는 분리되어 작성됨. (의존성 있는 태스크)

5.6.2 Taskflow API를 사용하지 않는 경우

  • 파이썬 태스크로 제한됨. 다른 태스크는 일반 API(XCom)로 의존성을 정의해야 함.
728x90