본문 바로가기
Database

[Airflow] Decorator

by BTC_One 2023. 8. 7.

안녕하세요 BTC 1-tier팀의 One입니다! 

무더위 다들 잘 보내고 계신가요??

 

오늘은 Airflow의 Decorator 모듈에 대해서 알아보겠습니다. 

 

Decorator

파이썬을 사용하시는 분들이라면 Decorator가 익숙하실텐데요, 모르시는 분들을 위해서 간략하게 설명을 드리자면

Decorator는 파이썬에서 함수나 클래스의 기능을 확장하거나 수정할 수 있게 해주는 기능입니다. 

 

그렇다면 데코레이터를 사용하는 이유는 무엇일까요?

 

1. 코드의 재사용성모듈성을 높임

2. 코드의 가독성을 높일 수 있는 강력한 도구 중 하나

3. 코드의 중복을 줄이고 유지 보수성을 높일 수 있음 

 

위처럼 데코레이터를 사용하면 기존 코드를 건드리지 않고 함수나 클래스의 동작을 수정할 수 있습니다.

말만 들어서는 이해가 잘 안될 수도 있는데요, 간단한 예제와 함께 살펴봅시다.

 

데코레이터의 사용방법은 간단합니다. 

데코레이터가 필요한 함수 위에 @ + (데코레이터로 사용 할)함수명으로 선언을 해줍니다.

def my_decorator(func):
    def wrapper():
        print("Something is happening before the function is called.")
        func()
        print("Something is happening after the function is called.")
    return wrapper

@my_decorator
def say_hello():
    print("Hello!")

say_hello()

코드를 천천히 살펴 봅시다.

 

'my_decorator' 함수는 func인자로 전달 된 함수를 래핑하고, 래핑된 함수를 반환하는 함수 입니다.

'wrapper' 함수는 래핑된 함수를 실행하기 전과 후에 추가적인 동작을 수행하는 함수입니다.

'@my_decorator'는 'say_hello' 함수에 'my_decorator' 함수를 데코레이터로 사용한다는 의미겠죠?

 

즉, 'say_hello' 함수는 'my_decorator' 함수에 의해 래핑된 함수인 'wrapper' 함수를 호출합니다.

 

say_hello를 호출했을 때의 결과값은 다음과 같습니다.

Something is happening before the function is called.
Hello!
Something is happening after the function is called.

이러한 데코레이터 기능을 에어플로우에서도 사용이 가능합니다.

 

airflow.decorators

airflow.decorators라는 Airflow에서 제공하는 데코레이터 모듈을 통해서 사용이 가능합니다.

제공하는 모듈에 뭐가 있는지 하나씩 살펴보도록 합시다. 

 

@dag

DAG를 정의할 때 사용되는 데코레이터

DAG 객체를 생성하고, DAG 속성을 설정하는 데 사용됩니다.

 

@task

Task를 정의할 때 사용되는 데코레이터

Python 함수를 task로 등록하고, task의 실행을 제어하는데 사용됩니다.

 

@task_decorator

커스텀 task 데코레이터를 정의할 때 사용되는 데코레이터

이를 사용하여 task에 사용자 지정 로직이나 기능을 추가할 수 있습니다.

 

@task_group

task_group을 정의할 때 사용되는 데코레이터

여러 task를 그룹으로 묶어서 실행 순서나 의존성을 관리할 수 있습니다.

 

@task_trigger_dag

다른 DAG를 트리거할 때 사용되는 데코레이터

해당 task의 실행 후, 지정된 dag를 트리거 할 수 있습니다. 

 

그럼 기존 DAG 파일을 작성하는 기존 방식과, 데코레이터를 사용한 방식을 비교해 볼까요?

 

Example

먼저 기존 방식을 살펴봅시다.

세 개의 task는 PythonOperator를 통해 생성이됩니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

def print_message(message):
    print(message)

default_args = {
    'start_date': days_ago(1),
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval='@once',
    catchup=False
)

task1 = PythonOperator(
    task_id='task1',
    python_callable=print_message,
    op_args=['Task 1'],
    dag=dag
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=print_message,
    op_args=['Task 2'],
    dag=dag
)

task3 = PythonOperator(
    task_id='task3',
    python_callable=print_message,
    op_args=['Task 3'],
    dag=dag
)

task1 >> task2 >> task3

task 3개를 반복하여 정의하니 코드가 매우 길어집니다.

그럼 데코레이터를 사용하는 코드와 비교해볼까요?

from datetime import datetime
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
    'start_date': days_ago(1),
}

@dag(default_args=default_args, schedule_interval='@once', catchup=False)
def decorator_test_dag():
    
    @task
    def print_message(message):
        print(message)
    
    task1 = print_message('Task 1')
    task2 = print_message('Task 2')
    task3 = print_message('Task 3')

    task1 >> task2 >> task3

dag = decorator_test_dag()

@dag 데코레이터를 사용하여 DAG 객체를 생성하고

@task 데코레이터를 사용하여 세 개의 테스트를 정의합니다.

 

어떤가요? 데코레이터를 사용하면 기존 방식보다 코드가 훨씬 간결해진 모습을 확인할 수 있습니다.  

데코레이터 모듈을 사용하여  Airflow DAG를 정의하면 간결하고 가독성이 높은 코드를 짤 수 있습니다.

 

여러분도 데코레이터를 활용해보면 좋겠죠?! 

다음 시간에 또 만나요

베바👋

'Database' 카테고리의 다른 글

Airflow : API를 활용한 Image 다운로드  (0) 2023.08.17
Airflow를 통한 Dataproc 연동  (0) 2023.08.16
[Airflow] Airflow CLI 명령어  (0) 2023.07.24
빅쿼리 INFORMATION_SCHEMA  (0) 2023.07.21
ElasticSearch  (0) 2023.07.21

댓글