본문 바로가기
강의록~스터디

C3. Airflow의 스케줄링

by annmunju 2024. 6. 2.

3.1 예시: 사용자 이벤트 처리하기

  • 사용자 이벤트를 가져오고 통계 및 계산하기 + 스케줄링으로 매일 특정 기간의 변화를 비교하기

3.2 정기적으로 실행하기

3.2.1 스케줄 간격 정의하기

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval="@daily", # 매일
    start_date=datetime(2024, 4, 1), # 4월 1일 이후 자정부터 시작 (4월 2일 00시)
    end_date=datetime(2024, 5, 1),
)

3.2.2 Cron 기반의 스케줄 간격 설정하기

  • Cron 사용 예시
    • 0 * * * * = 매시간 정시 실행
    • 0 0 * * * = 매일 자정 실행
    • 0 0 * * 0 = 매주 일요일 자정에 실행
    • 0 0 1 * * = 매월 1일 자정에 실행
    • 45 23 * * SAT = 매주 토요일 23시 45분
    • 0 0 * * MON,WED,FRI = 매주 월, 화, 금요일 자정에 실행
    • 0 0 * * MON-FRI = 매주 월요일부터 금요일 자정에 실행
    • 0 0,12 * * * = 매일 자정 오후 12시에 실행
  • 자주 사용되는 스케줄 간격에 대한 Airflow 프리셋
    • @once 1회 실행
    • @hourly 매시간 변경 시 1회 실행
    • @daily 매일 자정에 1회 실행
    • @weekly 매주 일요일 자정에 1회 실행
    • @monthly 매월 1일 자정에 1회 실행
    • @yearly 매년 1월 1일 자정에 1회 실행

3.2.3 빈도 기반의 스케줄 간격 설정하기

dag = DAG(
    dag_id="02_daily_schedule",
    schedule_interval=timedelta(days=3),
    start_date=datetime(2019, 1, 1),
    end_date=datetime(2019, 1, 5),
)

3.3 데이터 증분 처리하기

  • 과거 특정 시점 이벤트를 가져오는 방법

3.3.1 이벤트 데이터 증분 가져오기

  • 스케줄 간격에 해당하는 일자의 이벤트만 로드하고 새로운 이벤트만 통계를 계산
    fetch_events=BashOperator(
      task_id="fetch_events",
      bash_command=(
          "mkdir -p /data && "
          "curl -o /data/events.json "
          "http://localhost:5000/events?"
          "start_date=2019-01-01&"
          "end_date=2019-01-02"
      ),
      dag=dag,
    )

3.3.2 실행 날짜를 사용하여 동적 시간 참조하기

  • 주어진 작업이 실행되는 특정 시간 간격을 정의할 수 있는 매개변수 execution_date
  • 종료 시간 next_execution_date
  • 과거 스케줄 간격의 시작 정의 previous_excution_date
  • 축약 표기법 사용
    • ds : YYYYMM-DD (excution_date)
    • next_ds : (next_excutioin_date)

3.3.3 데이터 파티셔닝

  • events.json 파일에 이벤트를 추가하여 하나의 json 파일에 모든 데이터를 작성하도록 함.
  • 혹은 테스크의 출력을 실행 날짜 이름이 적힌 파일에 기록해 데이터 세트를 일일 배치로 나눔. -> 파티셔닝

3.4 Airflow의 실행 날짜 이해

3.4.1 고정된 스케줄 간격으로 태스크 실행

  • 간격 기반 스케줄링 (간격 시작 - 다음 간격 시작 사이에 실행)
  • 시점 기반 스케줄링 (해당 시점에 실행)
  • DAG 실행으로만 정의되기 때문에 UI로 수동 실행 어려움

3.5 과거 데이터 간격을 메꾸기 위해 백필 사용하기

3.5.1 과거 시점의 작업 실행하기

  • DAG의 catchup 매개변수로 제어
  • 백필 사용시 원천 데이터가 이전 데이터를 제공해야 데이터 로드가 가능.
  • 코드 변경시에도 데이터 재처리에 활용

3.6 태스크 디자인을 위한 모범 사례

3.6.1 원자성

  • 나눌 수 없고 돌이킬 수 없는 일련의 데이터베이스와 같은 작업.

3.6.2 멱등성

  • 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과에 효력이 없어야 함.
728x90