베하 ~!
안녕하세요 문땅훈과 루피입니다! 😆
저번 시간에는 중요한 부분만 골라서 Airflow 구성요소와 동작방식에 대해 설명해드렸는데요~
이제 본격적으로 Airflow에 대해 알아보도록 합시다 !!
이번에는 DAG의 개념과 동시에 DAG를 생성하기 위해 알아야 할 태스크(Task)와 오퍼레이터(Operator)에 대해 다뤄보겠습니다.
참고로, 지난 시간에 소개해드렸던 내용 링크 남겨놓을테니 구성요소 보고 오시면 더 이해하기 쉬우실거에요 ~!
먼저 Airflow에서 제일 중요한 개념인 DAG에 대해서 알아볼까요 ~?
DAG란?
- 순환하지 않고 시작에서 끝으로 진행되는 워크플로우 구조
- Directed Acyclic Graph의 약자로 방향이 있는 순환되지 않는 그래프 구조
- Dag는 Task로 구성되어 있음
실행할 작업들의 순서를 구성한 워크플로우는 airflow에서 DAG라는 형태로 사용됩니다. 위에서 말했듯이 DAG는 방향이 있는 순환되지 않는 그래프 구조인데요.
즉, 순차적으로 작업이 진행되며 무한 순환되지 않는다는 뜻입니다.
그리고 DAG는 Operator 집합에 대한 실행을 조정, 조율하는 역할을 합니다.
- Operator의 시작과 정지
- Operator가 완료되면 연속된 다음 Task의 시작
- Operator간의 의존성 보장 등
여기서 Task, Operator가 뭔지 궁금하시다구요 ?!
Airflow 개념을 알아가시면서 처음으로 헷갈려하시는데 Task와 Operator의 차이인데요 ㅎㅎ!
개념부터 정확하게 차근차근 알아갑시다 !!
Task
- Dag를 구성하는 수행할 작업 단위
- Operator가 인스턴스화 된 것
- 하나 또는 여러 개의 Task를 연결해서 Dag를 생성하며 Task에는 Operator, Sensor, Hook 등이 있음
쉽게 말해보자면 Airflow에서 Task는 작업의 올바른 실행을 보장하기 위한 즉, Operator의 상태를 관리하는 매니저 역할을 하신다고 생각하면 됩니다!
Operator
- Task를 어떻게 실행시킬지 실제 작업 실행을 담당
- 작업을 수행할 때 사용하는 기계로 Task를 만들고 이러한 Task들을 순서대로 배치해주면 하나의 DAG 완성
- 즉, Operator에 특정 인풋과 조건을 넣어주면 특정 Task(작업)가 됨
쉽게 말해 작업을 수행할 때 사용하는 기계라고 생각하시고 이러한 기계를 Task가 관리해주고, Task들을 순서대로 배치하여 하나의 DAG를 완성한다고 생각하시면 됩니다 ~!!
Operator를 통해서 운영자(airflow 사용자)가 python 함수, bash 명령, SQL 쿼리, API 트리거, e-mail 보내기, 조건부 작업 수행과 같은 다양한 작업을 실행할 수 있도록 해줍니다.
그럼 Airflow에서 어떤 Operator를 제공해주고 있는지 알아봅시다 !
대표적인 Operator
- BashOperator : Bash 명령을 수행하는 Operator
- 리눅스 명령어 실행도 가능하며 프로그램 실행도 가능
- ActionOperator : 실제 연산을 수행
- Transfer Operator : 데이터 옮기는 역할 수행
- PythonOperator : Python 함수(.py)를 실행을 담당하는 Operator
- EmailOperator : 수신자에게 이메일을 보내는 데 사용하는 Operator
- MySQL Operator : MySQL 데이터베이스에 대한 SQL 조회를 실행하는데 사용하는 Operator
종류가 정말 다양하죠 ~?
아래의 링크로도 확인할 수 있듯이 Airflow에서 저희가 다양한 작업을 할 수 있도록 많은 종류가 있습니다.
하기에는 대표적으로 쓰이는 Operator 예제 코드들입니다.
Operator 예제 코드
1. Bash Operator
- import로 bashoprator를 불러와줍니다.
- 사용할 bash command를 bash_command에 지정해주고, 환경 변수를 만들어 주어 실행한 예시입니다.
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo \\"here is the message: '$message'\\"",
env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
)
2. Python Operator
- PythonOperator를 사용하기 위해서 airflow에서 제공해주는 operator를 불러옵니다.
- 실행할 파이썬 코드를 python_callable에서 불러오면 됩니다.
- DAG코드 외부 함수나 클래스를 사용하는 경우에는 DAG 코드 상단에 import 되어 있어야 합니다
from airflow.operators.python_operator import PythonOperator
external_classic = ExternalPythonOperator(
task_id="external_python_classic",
python=PATH_TO_PYTHON_BINARY,
python_callable=x,
)
3. Email Operator
- airflow에서 제공하는 EmailOperator 를 불러와줍니다.
- DAG를 만들어 준 후, Operator를 사용해서 메일을 보낼 주소, 내용 등을 작성합니다.
from datetime import datetime
from airflow import DAG
from airflow.operators.email import EmailOperator
with DAG(
'send_to_gmail',
start_date = datetime(2022, 1, 1),
schedule_interval = None
) as dag:
email = EmailOperator(
task_id = 'send_to_gmail',
to = '보낼 메일주소',
subject = 'Airflow Test 메일입니다',
html_content = """
이 메일은 테스트 메일입니다.<br/><br/>
{{ ds }}<br/>
"""
)
출처 : https://velog.io/@jskim/Airflow-Pipeline-만들기-Email-보내기
4. Mysql Operator
- MysqlOperator를 불러와줍니다.
- 실행할 sql 파일을 sql= 옵션에 경로를 지정해줍니다.
from airflow.providers.mysql.operators.mysql import MySqlOperator
mysql_task = MySqlOperator(
task_id="drop_table_mysql_external_file",
sql="/scripts/drop_table.sql",
dag=dag,
- 위 예시 말고도 많이 사용하는 Operator로는 DummyOperator도 있습니다.
오늘 설명한 개념을 토대로 Airflow의 장점을 말해보자면 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG로 형성할 수 있다는 점입니다.
즉, 전체 파이프라인을 재실행할 필요가 없는 효율적인 파이프라인 구축이 가능합니다!
그리고 이제 개념을 알았다면 ! Airflow의 작업 순서도 이해하실 수 있습니다.
[작업 순서]
1. DAG 객체 생성
2. Operator를 사용하여 Task 작성
3. 작성한 Task를 순차적으로 연결하여 하나의 DAG를 구성
어때요 ?ㅎㅎ
이렇게 보니까 정말 반복작업에는 Airflow가 정말 편리하겠죠 ?!
개념으로만 설명드렸는데 더 이해하기 쉬울려면 실제 Airflow 웹 서버에서 실행해보면서 이해하는데 더 좋겠죠 ~?
그런 의미로 다음 장에서는 이제 Airflow를 간단하게 실습해보겠습니다 ~!!
그럼 다음 시간에 만나요 !
베빠 ~ 😉
'Database' 카테고리의 다른 글
MySQL CRUD 권한 설정 (0) | 2023.05.23 |
---|---|
Airflow Executor (0) | 2023.05.23 |
[SQL] 쿼리 사용시 주의사항 (0) | 2023.05.19 |
Apache Superset & 설치 (0) | 2023.05.15 |
Airflow 란? (0) | 2023.05.15 |
댓글