본문 바로가기
Database

[Airflow] 병렬 처리 Task

by 알 수 없는 사용자 2023. 7. 17.

안녕하세요, 1-Tier 팀의 One입니다.

Airflow의 주요 기능 중 하나는 작업들을 병렬로 실행할 수 있는 기능입니다.

병렬 처리란 동시에 여러 작업을 실행하여, 작업의 처리 시간을 단축 시키는 방법을 말합니다.

 

Airflow에서는 작업을 병렬로 실행하기 위해, 병렬성(Parallelism)동시성(Concurrency) 개념을 사용합니다.

 

  • 병렬성(Parallelism)

Airflow에서 동시에 실행 가능한 작업의 최대 개수를 제어하는 설정 값

  • 동시성(Concurrency)

Airflow 스케줄러에서 제어되며, 작업을 실행할 수 있는 동시 작업수를 제어하는 설정 값

작업들 사이의 의존성 및 리소스 제약에 따라 조정이 가능 

 

위의 설정값들은 Airflow의 설정 파일 (airflow.cfg)에서 설정이 가능합니다.

그럼 같이 병렬 처리 task를 구성하러 가볼까요?

 

아래는 Airflow를 사용하여 병렬로 S3 버킷 내의 여러 폴더를 조사하는 예제입니다.

 

0. 병렬처리 설정

먼저 병렬 처리가 어떻게 설정되어있는지 먼저 알아봅시다.

위에서 말한 것 처럼 airflow 설정파일에서 확인할 수 있습니다.

vi /home/airflow/airflow.cfg

현재 동시에 실행 가능한 작업의 최대개수는 32개, 작업을 실행할 수 있는 동시 작업 수는 16개가 되겠네요 

저희는 아주 간단하게 동시에 세개의 3개만 처리할거라, 굳이 건들지 않아도 될 것 같습니다.

 

1. 모듈 Import

S3 객체를 조사하기 위해서 Python 용 aws sdk인 boto3를 사용하겠습니다.

서버에 aws cli가 설치되어 있다면, BashOperator를 사용하거나 os나 subprocess를 사용하여

aws cli로 하셔도 됩니다. 편하신 방법으로 구성해주세요

import boto3
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

 

2. S3 폴더 별 객체 조사 함수 생성

AWS 버킷 내의 특정 폴더에 있는 객체들을 조회하는 함수를 정의합니다.

boto3를 사용해서 객체를 조회하기 위해 list_objects_v2 메서드를 호출하여 사용합니다.

response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

메서드를 호출하면 AWS 서버로 부터 응답이 반환됩니다.

매개변수 bucke_name은 S3 버킷 이름, prefix는 조회 할 폴더의 경로를 나타냅니다.

 

응답 객체는 JSON 형식으로 구성되어있으며, 조회한 객체의 정보를 포함합니다.

주요한 정보는 'Content'라는 Key에 담겨있으며, 객체의 Key를 추출하기 위해 다음과 같이 구성합니다.

objects = [obj['Key'] for obj in response['Contents']]

 

그럼 s3.list_objects_v2 메서드를 이용해 버킷 내 폴더별 객체를 출력하는 함수를 생성 해 볼까요?

""" S3 Bucket Folder List """

def list_s3_objects(bucket_name, prefix):
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

    if 'Contents' in response:
        objects = [obj['Key'] for obj in response['Contents']]
        print(f"Objects in folder '{prefix}': {objects}")
        
bucket_name = 'your-bucket-name'
s3_folders = ['test1', 'test2', 'test3']

코드엔 정답이 없으니, 다양한 방식으로 구성해보면 좋을 것 같습니다.

 

3. DAG & Task 정의

DAG는 정해진 Schedule 없이 한번만 돌아가도록 정의했습니다.

만약 테스트 용도가 아닌 실제 S3를 모니터링 하는거라면 적절한 스케줄을 구성해주면 될 것 같습니다.

dag = DAG(
        's3_folder_listing',
        description = 'Parallel S3 Folder Listing',
        schedule_interval = None,
        start_date = datetime.now(),
        catchup = False
)

 

위 Python 코드에서 정의한 s3_folders의 수만큼 task를 생성해줘야 하기 때문에,

for문을 통해 task를 생성해주고, 매개변수 또한 전달해줍니다. 

start = DummyOperator(
        task_id = 'start',
        dag = dag
)

s3_folder_tasks = []
for folder in s3_folders:
    task = PythonOperator(
            task_id = f'list_s3_folder_{folder}',
            python_callable=list_s3_objects,
            op_kwargs={'bucket_name': bucket_name, 'prefix': folder},
            retries=3,
            dag = dag
        )
    s3_folder_tasks.append(task)

start >> s3_folder_tasks

 

이제 DAG를 확인 해 볼까요?

병렬처리 Task가 아주 잘 구성이 된 모습을 볼 수 있습니다. 

그럼 실제로 조사를 잘 하는지 log 또한 살펴봅시다.

아주 잘 구성이 되었네요 😆

 

이렇게 Airflow의 핵심 기능이라고 할 수 있는 병렬 처리 Task를 구성해보았습니다. 

앞서 말한 병렬성(Parallelism) 동시성(Concurrency)의 작업은 Airflow의 성능과 자원 사용에 영향을 미칩니다.

 

설정값이 낮으면 동시에 실행 가능한 작업 수가 제한되어 작업 실행 시간이 늘어날 수 있으며,

반면에 값이 높으면 시스템 자원 부하가 증가하여 성능 저하나 예기치 않은 동작이 발생할 수 있습니다.

 

시스템 자원, 작업 로드, 작업 실행 시간 등을 고려하여 최적의 값을 설정하는게 좋을 것 같습니다.

 

그럼 다음 시간에 또 돌아오겠습니다.

베바👋

 

'Database' 카테고리의 다른 글

빅쿼리 INFORMATION_SCHEMA  (0) 2023.07.21
ElasticSearch  (0) 2023.07.21
Class를 활용한 Custom Operator 생성  (0) 2023.07.13
빅쿼리 보안  (0) 2023.07.07
Airflow Web을 통한 Dag 핸들링  (0) 2023.07.05

댓글