본문 바로가기
Database

[Airflow] Celery Executer

by BTC_One 2023. 9. 4.

안녕하세요 BTC 1-tier 팀의 One입니다!

점점 선선해지는게 가을이 오고있다는 것이 느껴지네요ㅎㅎ

오늘은 Airflow의 Celery Executer에 대해서 알아봅시다.

 

CeleryExecutor?

Celery Worker의 통신 매커니즘

자 한대의 Master서버가 있고 여러개의 DAG가 있다고 생각해봅시다. 

데이터 처리량이 많은 여러개의 DAG를 한대의 서버에서 돌리면 어떻게 될까요?

 

Master서버는 자신의 일을 제대로 하지 못하겠죠 ....😥

그렇기 위해서 우리는 여러개의 Worker로 Task를 분산하여 동작을 시켜야합니다.

 

그 역할을 해주는 Executor가 바로 Celery Executor입니다.

Celery Executor는 Task 메시지를 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행합니다.

당연히 Worker의 수는 확장이 가능합니다!

 

그럼 Celery Executor를 함께 구성하러 가볼까요?

 

멀티 클러스터 환경 구성

Celery Executor를 구성하는 방법에는 여러가지가 있는데요,

저는 Airflow를 따로 Docker 컨테이너에 구성하지않고 서버에 직접 설치를 했기때문에 

Celery Executor또한 서버 2대로 구성하는 방법을 소개하겠습니다.

 

만약 Docker 컨테이너에 구성을 했다면 Docker-Compose로 구성하는 방법이 공식 문서에 있기 때문에

그걸 참고하시면 될 것 같습니다.

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

 

⚠ 주의사항

1. Celery Executor를 구성하기 위해선 Airflow MetaDB의 엔진이 MySQL이나 PostgreSQL여야 합니다!

2. 위의 통신 매커니즘 사진에서 Queue Broker가 보이는데 그 역할을 하기 위한 RedisRabbit MQ가 필요합니다.

저는 Redis를 사용했습니다.

 Master node

1. MySQL & Redis 패키지 설치

sudo apt-get install python3-dev gcc libmysqlclient-dev -y
pip3 install mysql-connector-python
pip3 install mysqlclient
pip3 install redis

2. Celery, Flower 설치

pip3 install celery
pip3 install flower

3. config 파일 수정

vi airflow/airflow.cfg
# full import path to the class when using a custom executor.
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
sql_alchemy_conn = mysql+mysqldb://<user>:<passwd>@<host>:3306/<db>

# The Celery result_backend. When a job finishes, it needs to update the
result_backend = db+mysql+mysqlconnector://<user>:<passwd>@<host>:3306/<db>

# a sqlalchemy database. Refer to the Celery documentation for more information.
broker_url = redis://<host>:6379/0

4. Airflow DB 초기화 & Celery/Flower 실행

airflow db init
airflow shceduler -D
airflow webserver -D
airflow celery -D
airflow celery flower -D

ip:5555로 접속하여 Flower web확인 🔅

 

Worker Node

✔ 당연하지만 Airflow가 설치되어있어야 합니다.

 

1. MySQL & Redis 패키지 설치

sudo apt-get install python3-dev gcc libmysqlclient-dev -y
pip3 install mysql-connector-python
pip3 install mysqlclient
pip3 install redis

2. Celery 설치

pip3 install celery

3. config 파일 수정

vi airflow/airflow.cfg
# full import path to the class when using a custom executor.
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
sql_alchemy_conn = mysql+mysqldb://<user>:<passwd>@<host>:3306/<db>

# The Celery result_backend. When a job finishes, it needs to update the
result_backend = db+mysql+mysqlconnector://<user>:<passwd>@<host>:3306/<db>

# a sqlalchemy database. Refer to the Celery documentation for more information.
broker_url = redis://<host>:6379/0

5. Worker 실행

airflow celery worker -q queue_1 -D

아래처럼 뜬다면 성공입니다 ㅎㅎ 

flower를 재시작하면 웹에서도 확인이 가능하다는 점! 

 

생각보다 간단하죠?

이제 웹서버에서 DAG를 동작시키면 Worker가 Task를 가져가서 실행하게 됩니다

 

설치가 간단하니 한번 시도해보는 것도 좋은 방법 일 것 같습니다ㅎㅎ

그럼 다음 포스팅에서 만나요

 

베바👋

'Database' 카테고리의 다른 글

Cloud Composer의 내부DB 접근  (0) 2023.09.14
문서 검색 챗봇 만들기  (0) 2023.09.13
LangChain이란?  (0) 2023.09.04
SQL과 NoSQL 데이터베이스  (0) 2023.08.18
Airflow : API를 활용한 Image 다운로드  (0) 2023.08.17

댓글