- 오퍼레이터의 역할
- 작동 방법
- 언제 어떻게 실행되는지
- 원격 시스템과 통신하는 방법
4.1 Airflow로 처리할 데이터 검사
- 예시 : 위키피디아 페이지 뷰 수 수집
- 뷰 증가 -> 긍정적 주식 전망 예측
- 뷰 감소 -> 부정적 주식 전망 예측
4.1.1 증분 데이터를 적재하는 방법 결정하기
- 형식 gzip (전체 크기 50mb, 압축 풀면 200~250mb)
- 시간당 페이지 수 집계
- 도메인 코드 살펴보기
4.2 태스크 콘텍스트와 Jinja 탬플릿 작업
- 워크플로 첫 번째 버전
4.2.1 오퍼레이터의 인수 템플릿 작업
- Jinja 템플릿 문자열 {{ }}
print("Hello {{ name }}")
- 모든 오퍼레이터 인수가 템플릿이 될 수 있는 것은 아님. 공식 문서 참고 (PythonOperator.template_fields)
4.2.2 템플릿에 무엇이 사용 가능할까요?
- airflow 활용하여 태스크 콘텍스트 출력하기
import airflow.utils.dates from airflow import DAG from airflow.operators.python import PythonOperator
dag=DAG(
dag_id="print_context",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**kwargs):
print(kwargs)
print_context=PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag,
)
![](https://velog.velcdn.com/images/annmunju/post/6577ebd6-3d9c-420a-8551-6a4b30dd3b07/image.png)
![](https://velog.velcdn.com/images/annmunju/post/97acdeaa-898c-4c22-9ac3-943812fcd905/image.png)
### 4.2.3 PythonOperator 템플릿
- 함수에서 키워드 인수를 받는 형태
```python
def _get_date(execution_data, **context):
end=context["next_execution_date"]
print(f"Start: {execution_data}, end: {end}")
4.2.4 PythonOperator에 변수 제공
- 콜러블 커스텀 변수 제공
get_date=PythonOperator( task_id="get_date", python_callable=_get_date, op_args={output_path:"/tmp/wikipageviews.gz"}, dag=dag, )
4.2.5 템플릿의 인수 검사하기
4.3 다른 시스템과 연결하기
gzip 열기 코드 작성
extract_gz = BashOperator( task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag )
페이지 뷰 읽기 코드 작성
def _fetch_pageviews(pagenames): result = dict.fromkeys(pagenames, 0) with open("/tmp/wikipageviews", "r") as f: for line in f: domain_code, page_title, view_counts, _ = line.split(" ") if domain_code == "en" and page_title in pagenames: result[page_title] = view_counts print(result) # Prints e.g. "{'Facebook': '778', 'Apple': '20', 'Google': '451', 'Amazon': '9', 'Microsoft': '119'}"
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
3. 해당 결과 데이터베이스에 쓰기
- 데이터베이스 테이블 생성
```SQL
CREATE TABLE pageview_counts (
pagename VARCHAR(50) NOT NULL,
pageviewcount INT NOT NULL,
datetime TIMESTAMP NOT NULL
);
데이터베이스 테이블 결과 저장
INSERT INTO pageview_counts VALUES ('Google', 333, '2019-07-17T00:00:00');
- Airflow 태스크간 메모리 데이터 공유 불가 -> XCom 방식을 이용하면 개체 저장하고 읽을 수 있음. (혹은 Pickle)
- 큰 데이터의 경우 디스크, DB에 영구적으로 저장하여 테스크간 데이터 공유 및 Airflow 외부에 데이터 유지
PostgresOperator를 사용하기 위한 라이브러리 설치
pip install apache-airflow-providers-postgres
PostgresOperator를 사용하여 데이터베이스에 작성
def _fetch_pageviews(pagenames, execution_date, **_): result = dict.fromkeys(pagenames, 0) with open("/tmp/wikipageviews", "r") as f: for line in f: domain_code, page_title, view_counts, _ = line.split(" ") if domain_code == "en" and page_title in pagenames: result[page_title] = view_counts with open("/tmp/postgres_query.sql", "w") as f: for pagename, pageviewcount in result.items(): f.write( "INSERT INTO pageview_counts VALUES (" f"'{pagename}', {pageviewcount}, '{execution_date}'" ");\n" ) fetch_pageviews = PythonOperator( task_id="fetch_pageviews", python_callable=_fetch_pageviews, op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}}, dag=dag, ) write_to_postgres = PostgresOperator( task_id="write_to_postgres", postgres_conn_id="my_postgres", sql="postgres_query.sql", # 쿼리문 자체를 넣어도 됨. dag=dag, )
DB 연결 세부
- Postgres 연결을 위한 자격 증명 식별자
postgres_conn_id
- CLI 이용하여 자격증명 저장
airflow connections add \ --conn-type postgres \ --conn-host localhost \ --conn-login postgres \ --conn-password mysecretpassword \ my_postgres # 연결 식별자
- Postgres 연결을 위한 자격 증명 식별자
파일 내용 템플릿화
- 오퍼레이터에게 파일 경로를 제공해 템플릿화 가능
- 예. PostgresOperator -> SQL 쿼리를 포함하는 파일 경로를 보낼 수 있음.
실행 원리
- PostgresOperator가 Hook으로 전달 -> Postgres 통신을 위한 Hook 인스턴스화 -> 연결 생성 / 쿼리 전송 / 연결 종료 처리
- 쌓인 데이터를 바탕으로 가장 인기 있는 시간대 확인 (with SQL)
SELECT x.pagename, x.hr AS "hour", x.average AS "average pageviews" FROM ( SELECT pagename, date_part('hour', datetime) AS hr, AVG(pageviewcount) AS average, ROW_NUMBER() OVER (PARTITION BY pagename ORDER BY AVG(pageviewcount) DESC) FROM pageview_counts GROUP BY pagename, hr ) AS x WHERE row_number=1;
728x90
'데이터 어쩌구 > 기술 써보기' 카테고리의 다른 글
C5. 테스크 간 의존성 정의하기 (0) | 2024.06.02 |
---|---|
C3. Airflow의 스케줄링 (0) | 2024.06.02 |
C2. Airflow DAG의 구조 (0) | 2024.06.02 |
C1. Apach Airflow 살펴보기 (0) | 2024.06.02 |
[4주차] ReAct Agent 만들기 (1) | 2024.04.02 |