2.1 다양한 소스에서 데이터 수집
2.1.1 데이터 탐색
- 배경 : 로켓 뉴스 수집 프로그램 작성
- 예정된 로켓 발사 데이터, 이미지 API
2.2 첫 번째 Airflow DAG 작성
- Airflow에 매핑된 모형
- 태스크를 나누는 기준은 때에 따라 다름.
2.2.1 태스크와 오퍼레이터 차이점
- 오퍼레이터는 단일 작업을 수행할 수 있는 기능 제공.
- 태스크는 오퍼레이터의 래퍼/매니저 역할을 함.
- 둘은 유사한 개념으로 혼용해서 사용
2.2.2 임의 파이썬 코드 실행
- 라이브러리
import json import pathlib
import airflow
import airflow.utils
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
2. DAG 인스턴스 생성
```python
dag = DAG(
dag_id="listing_2_02",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)
- 배시 커맨드 실행을 위한 객체 인스턴스 생성
- curl 요청으로 다운로드
download_launches = BashOperator( task_id="download_launches", bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501 dag=dag, )
- 파이썬 커맨드 실행을 위한 객체 인스턴스 생성
이미지 다운로드
def _get_pictures(): pathlib.Path("tmp/images").mkdir(parents=True, exist_ok=True) with open("tmp/launches.json") as f: launches = json.load(f) image_urls = [launch["image"] for launch in launches['results']] for image_url in image_urls: try: response = requests.get(image_url) image_fname = image_url.split("/")[-1] target_file = f"/tmp/images/{image_fname}" with open(target_file, "wb") as f: f.write(response.content) print(f"Downloaded {image_url} to {target_file}") except requests_exceptions.MissingSchema: print(f"{image_url} appears to be an invalid URL.") except requests_exceptions.ConnectionError: print(f"Could not connect to {image_url}.")
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
5. 배시 커맨드 실행을 위한 객체 인스턴스 생성
- 알람 문구 노출
```python
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images | wc -l) images."',
dag=dag
)
2.3 Airflow에서 DAG 실행하기
2.3.1 파이썬 환경에서 Airflow 실행
pip install apach-airflow
2.3.2 도커 컨테이너에서 Airflow 실행
이 방식 대신 docker-compose 사용해서 설치하였습니다.
- workspace 생성
mkdir -p airflow/{dags,logs,plugins}
- 설정 파일 생성
echo -e "AIRFLOW_UID=$(id -u)" > .env
AIRFLOW_UID=501
3. Airflow 설치
```bash
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'
docker compose up airflow-init
docker-compose up
- 웹 UI 로그인
- docker-compose로 설치했을 때 초기 세팅은 id "airflow", pw "airflow"
- http://localhost:8080/
- DAG 실행
2.4 스케줄 간격으로 실행하기
dag=DAG(
dag_id="download_rocket_launches",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily"
)
2.5 실패한 태스크에 대한 처리
- 로그 확인하여 문제 원인 파악
- 실패한 지점부터 다시 실행 (실패 태스크를 클릭한 후 Clear 버튼 누르기)
728x90
'데이터 어쩌구 > 기술 써보기' 카테고리의 다른 글
C4. Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 (0) | 2024.06.02 |
---|---|
C3. Airflow의 스케줄링 (0) | 2024.06.02 |
C1. Apach Airflow 살펴보기 (0) | 2024.06.02 |
[4주차] ReAct Agent 만들기 (1) | 2024.04.02 |
이미지 생성 모델 (2024. 02) (0) | 2024.02.27 |