본문 바로가기
데이터 어쩌구/기술 써보기

C4. Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기

by annmunju 2024. 6. 2.
  • 오퍼레이터의 역할
  • 작동 방법
  • 언제 어떻게 실행되는지
  • 원격 시스템과 통신하는 방법

4.1 Airflow로 처리할 데이터 검사

  • 예시 : 위키피디아 페이지 뷰 수 수집
    • 뷰 증가 -> 긍정적 주식 전망 예측
    • 뷰 감소 -> 부정적 주식 전망 예측

4.1.1 증분 데이터를 적재하는 방법 결정하기

  • 형식 gzip (전체 크기 50mb, 압축 풀면 200~250mb)
    • 시간당 페이지 수 집계
  • 도메인 코드 살펴보기

4.2 태스크 콘텍스트와 Jinja 탬플릿 작업

  • 워크플로 첫 번째 버전

4.2.1 오퍼레이터의 인수 템플릿 작업

  • Jinja 템플릿 문자열 {{ }}
    • print("Hello {{ name }}")
  • 모든 오퍼레이터 인수가 템플릿이 될 수 있는 것은 아님. 공식 문서 참고 (PythonOperator.template_fields)

4.2.2 템플릿에 무엇이 사용 가능할까요?

  • airflow 활용하여 태스크 콘텍스트 출력하기
    import airflow.utils.dates
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    

dag=DAG(
dag_id="print_context",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)

def _print_context(**kwargs):
print(kwargs)

print_context=PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag,
)

![](https://velog.velcdn.com/images/annmunju/post/6577ebd6-3d9c-420a-8551-6a4b30dd3b07/image.png)
![](https://velog.velcdn.com/images/annmunju/post/97acdeaa-898c-4c22-9ac3-943812fcd905/image.png)

### 4.2.3 PythonOperator 템플릿
- 함수에서 키워드 인수를 받는 형태
```python
def _get_date(execution_data, **context):
    end=context["next_execution_date"]
    print(f"Start: {execution_data}, end: {end}")

4.2.4 PythonOperator에 변수 제공

  • 콜러블 커스텀 변수 제공
    get_date=PythonOperator(
      task_id="get_date",
      python_callable=_get_date,
      op_args={output_path:"/tmp/wikipageviews.gz"},
      dag=dag,
    )

4.2.5 템플릿의 인수 검사하기

4.3 다른 시스템과 연결하기

  1. gzip 열기 코드 작성

    extract_gz = BashOperator(
     task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
    )
  2. 페이지 뷰 읽기 코드 작성

    def _fetch_pageviews(pagenames):
     result = dict.fromkeys(pagenames, 0)
     with open("/tmp/wikipageviews", "r") as f:
         for line in f:
             domain_code, page_title, view_counts, _ = line.split(" ")
             if domain_code == "en" and page_title in pagenames:
                 result[page_title] = view_counts
    
     print(result)
     # Prints e.g. "{'Facebook': '778', 'Apple': '20', 'Google': '451', 'Amazon': '9', 'Microsoft': '119'}"
    
    

fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)

3. 해당 결과 데이터베이스에 쓰기
  - 데이터베이스 테이블 생성
  ```SQL
  CREATE TABLE pageview_counts ( 
    pagename VARCHAR(50) NOT NULL, 
    pageviewcount INT NOT NULL, 
    datetime TIMESTAMP NOT NULL
  );
  • 데이터베이스 테이블 결과 저장

    INSERT INTO pageview_counts VALUES ('Google', 333, '2019-07-17T00:00:00');
    • Airflow 태스크간 메모리 데이터 공유 불가 -> XCom 방식을 이용하면 개체 저장하고 읽을 수 있음. (혹은 Pickle)
    • 큰 데이터의 경우 디스크, DB에 영구적으로 저장하여 테스크간 데이터 공유 및 Airflow 외부에 데이터 유지
  • PostgresOperator를 사용하기 위한 라이브러리 설치

    pip install apache-airflow-providers-postgres
  • PostgresOperator를 사용하여 데이터베이스에 작성

    def _fetch_pageviews(pagenames, execution_date, **_):
      result = dict.fromkeys(pagenames, 0)
      with open("/tmp/wikipageviews", "r") as f:
          for line in f:
              domain_code, page_title, view_counts, _ = line.split(" ")
              if domain_code == "en" and page_title in pagenames:
                  result[page_title] = view_counts
    
      with open("/tmp/postgres_query.sql", "w") as f:
          for pagename, pageviewcount in result.items():
              f.write(
                  "INSERT INTO pageview_counts VALUES ("
                  f"'{pagename}', {pageviewcount}, '{execution_date}'"
                  ");\n"
              )
    
    fetch_pageviews = PythonOperator(
      task_id="fetch_pageviews",
      python_callable=_fetch_pageviews,
      op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
      dag=dag,
    )
    write_to_postgres = PostgresOperator(
      task_id="write_to_postgres",
      postgres_conn_id="my_postgres",
      sql="postgres_query.sql", # 쿼리문 자체를 넣어도 됨.
      dag=dag,
    )

  • DB 연결 세부

    • Postgres 연결을 위한 자격 증명 식별자 postgres_conn_id
    • CLI 이용하여 자격증명 저장
      airflow connections add \
        --conn-type postgres \
        --conn-host localhost \
        --conn-login postgres \
        --conn-password mysecretpassword \
        my_postgres # 연결 식별자
  • 파일 내용 템플릿화

    • 오퍼레이터에게 파일 경로를 제공해 템플릿화 가능
    • 예. PostgresOperator -> SQL 쿼리를 포함하는 파일 경로를 보낼 수 있음.
  • 실행 원리

    • PostgresOperator가 Hook으로 전달 -> Postgres 통신을 위한 Hook 인스턴스화 -> 연결 생성 / 쿼리 전송 / 연결 종료 처리

  1. 쌓인 데이터를 바탕으로 가장 인기 있는 시간대 확인 (with SQL)
    SELECT x.pagename, x.hr AS "hour", x.average AS "average pageviews" FROM (
    SELECT
    pagename,
    date_part('hour', datetime) AS hr,
    AVG(pageviewcount) AS average,
    ROW_NUMBER() OVER (PARTITION BY pagename ORDER BY AVG(pageviewcount) DESC)
     FROM pageview_counts
     GROUP BY pagename, hr
    ) AS x
    WHERE row_number=1;
728x90

'데이터 어쩌구 > 기술 써보기' 카테고리의 다른 글

C5. 테스크 간 의존성 정의하기  (0) 2024.06.02
C3. Airflow의 스케줄링  (0) 2024.06.02
C2. Airflow DAG의 구조  (0) 2024.06.02
C1. Apach Airflow 살펴보기  (0) 2024.06.02
[4주차] ReAct Agent 만들기  (1) 2024.04.02