본문 바로가기
데이터 어쩌구/기술 써보기

C2. Airflow DAG의 구조

by annmunju 2024. 6. 2.

2.1 다양한 소스에서 데이터 수집

2.1.1 데이터 탐색

  • 배경 : 로켓 뉴스 수집 프로그램 작성
    • 예정된 로켓 발사 데이터, 이미지 API

2.2 첫 번째 Airflow DAG 작성

  • Airflow에 매핑된 모형
    • 태스크를 나누는 기준은 때에 따라 다름.

2.2.1 태스크와 오퍼레이터 차이점

  • 오퍼레이터는 단일 작업을 수행할 수 있는 기능 제공.
  • 태스크는 오퍼레이터의 래퍼/매니저 역할을 함.
  • 둘은 유사한 개념으로 혼용해서 사용

2.2.2 임의 파이썬 코드 실행

  1. 라이브러리
    import json
    import pathlib
    

import airflow
import airflow.utils
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


2. DAG 인스턴스 생성
```python
dag = DAG(
    dag_id="listing_2_02",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)
  1. 배시 커맨드 실행을 위한 객체 인스턴스 생성
  • curl 요청으로 다운로드
    download_launches = BashOperator(
      task_id="download_launches",
      bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
      dag=dag,
    )
  1. 파이썬 커맨드 실행을 위한 객체 인스턴스 생성
  • 이미지 다운로드

    def _get_pictures():
      pathlib.Path("tmp/images").mkdir(parents=True, exist_ok=True)
    
      with open("tmp/launches.json") as f:
          launches = json.load(f)
          image_urls = [launch["image"] for launch in launches['results']]
          for image_url in image_urls:
              try:
                  response = requests.get(image_url)
                  image_fname = image_url.split("/")[-1]
                  target_file = f"/tmp/images/{image_fname}"
                  with open(target_file, "wb") as f:
                      f.write(response.content)
                  print(f"Downloaded {image_url} to {target_file}")
              except requests_exceptions.MissingSchema:
                  print(f"{image_url} appears to be an invalid URL.")
              except requests_exceptions.ConnectionError:
                  print(f"Could not connect to {image_url}.")
    

get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)


5. 배시 커맨드 실행을 위한 객체 인스턴스 생성
- 알람 문구 노출
```python
notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images | wc -l) images."',
    dag=dag
)

2.3 Airflow에서 DAG 실행하기

2.3.1 파이썬 환경에서 Airflow 실행

  • pip install apach-airflow

2.3.2 도커 컨테이너에서 Airflow 실행

이 방식 대신 docker-compose 사용해서 설치하였습니다.

  1. workspace 생성
    mkdir -p airflow/{dags,logs,plugins}
  2. 설정 파일 생성
    echo -e "AIRFLOW_UID=$(id -u)" > .env
    

AIRFLOW_UID=501

3. Airflow 설치
```bash
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'
docker compose up airflow-init
docker-compose up
  1. 웹 UI 로그인
  1. DAG 실행

2.4 스케줄 간격으로 실행하기

dag=DAG(
    dag_id="download_rocket_launches",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily"
)

2.5 실패한 태스크에 대한 처리

  • 로그 확인하여 문제 원인 파악
  • 실패한 지점부터 다시 실행 (실패 태스크를 클릭한 후 Clear 버튼 누르기)
728x90