본문 바로가기
Database

[Airflow] Airflow Xcom과 Trigger

by BTC_루피 2023. 5. 26.

베하 ~!

문땅훈과 루피입니다!

 

 

오늘은 Airflow Xcom과 Trigger에 대해서 배워보도록 하겠습니다😊

 


Xcom

 

💡 Xcom이란? : cross communication의 약자로 airflow task간 데이터를 주고 받을 때 사용합니다.

 

  • Xcom은 Task Instance간 데이터를 공유하지 않기 때문에 Xcom을 사용해서 데이터를 주고 받아야 합니다.
  • Xcom은 DAG Run 내에서만 존재하고 다른 DAG run에서는 공유하지 않습니다.
  • PythonOperator를 사용하면 return 값은 자동적으로 Xcom 변수로 등록됩니다.

 

Xcom 사용법

  • 먼저 사용법을 배우기 전에 ! Xcom을 사용하기 위해서는 DAGrun에 대해서 알아야 합니다!

DAGRuns

  • Task 인스턴스들을 DAG에 정의된 특정 execution_date에 실행하는 DAG 인스턴스 입니다.
  • 쉽게 말해, DAG의 실행 이력을 의미합니다.
  • 각 DAG는 Schedule이 있거나 없을 수 있으며, 각 schedule은 DAG Runs가 어떻게 생성되는지 정보를 제공합니다.
  • DAG Runs 은 각 상태를 가지며 해당 상태를 기반으로 어떤 task를 실행할지 알 수 있습니다.
    • DAG Runs의 상태
      • Queued : task가 Executor에 할당되었으며 실행되기를 기다리고 있는 상태
      • Success : task 성공
      • Running : task 실행 중
      • Failed : task를 실행하는 동안 오류가 발생하여 실패
    • Task의 상태
      • none → scheduled → queued → running → success 순으로 흘러갑니다.
    • DAG Runs의 확인법

Airflow web에서 DAGrun의 상태 확인이 가능합니다.

  • 또한, DAG Run 탭에서 상태를 확인할 수 있습니다.

 


DAGRuns 대충 감이 오시나요!?

이제 Xcom 사용법에 대해서 알아보겠습니다.😊😊

 

Xcom 사용법

  • PythonOperator return 값을 이용한 Xcom 사용
  • push-pull을 이용한 Xcom 사용
  • Jinja template를 이용해서 Xcom 구성

 

1. PythonOperator return 값 사용

def return_xcom():
    return "xcom!"
    
return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)
  • PythonOperator에서 return하면 Airflow Xcom에 자동으로 push 되며 return하는 함수를 만들어 하나의 task로 실행 시킵니다.
  • return값이 바로 Xcom으로 들어가게 되며, Xcom 사용이 가능합니다.

 

2. push-pull을 이용한 Xcom 사용

def xcom_push_test(**context):
    xcom_value = "xcom_push_value"
    context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)

    return "xcom_return_value"

def xcom_pull_test(**context):
    xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
    xcom_push_value = context['test'].xcom_pull(key='xcom_push_value')
    xcom_push_return_value = context['test'].xcom_pull(task_ids='xcom_push_task')

    print("xcom_return : {}".format(xcom_return))
    print("xcom_push_value : {}".format(xcom_push_value))
    print("xcom_push_return_value : {}".format(xcom_push_return_value))
    
xcom_push_task = PythonOperator(
    task_id = 'xcom_push_task_test',
    python_callable = xcom_push_test,
    dag = dag
)

xcom_pull_task = PythonOperator(
    task_id = 'xcom_pull_task_test',
    python_callable = xcom_pull_test,
    dag = dag
)
  • 실제로 객체를 생성해서 push하고 pull로 당겨와서 확인하는 경우입니다.
  • context[’task_instance’]를 이용하여 Xcom에 push, pull하여 데이터를 주고 받는 것이 가능합니다.
  • test=task_instance로 간단하게 축약하여 사용할 수 있습니다.
  • return으로 Xcom을 사용하는 경우 xcom_pull(task_id)를 사용해 데이터를 전달받고 push하는 경우 key-value 형식에 따라 데이터를 주고 받을 수 있습니다.

 

3. Jinja Template를 이용한 Xcom

def pull_value_from_jinja(**context):
    res = context['task_instance'].xcom_pull(key='jinja_test')
    print(res)

jinja_template = BashOperator(
    task_id='jinja_template',
    bash_command='echo "{{task_instance.xcom_push(key="jinja_test", value="airflow")}}"'
)

pulled_value_jinja = PythonOperator(
    task_id='pulled_value',
    python_callable=pull_value_from_jinja
)
  • Jinja Template를 사용해서 Xcom을 구성할 수 있습니다.

 

이처럼 Xcom을 3가지 방법으로 사용할 수 있습니다. 😊

 

 

다음은 Trigger에 대해서 알아보겠습니다!

Trigger

  • 그리고 DAG를 실행하는 방법에 대해 알아봅시다 !

저희가 주로 하는 방법은 DAG의 ON_OFF 스위치를 ON(파란색) 상태로 만들어서 DAG를 실행할 수 있습니다. 이러한 방식을 “스케줄링”이라고 하는데 이렇게 실행된 DAG 실행이력은 굵은 테두리로 표시가 됩니다.

 

또 다른 방법으로 “즉시” DAG Run을 만들어 실행하는 방법이 있습니다.

지금 알려드릴 내용이 바로  Trigger(트리거) 라고 합니다 !

Airflow는 2가지 방법으로 Trigger 방법을 제공합니다.


  • 수동으로 트리거 : Airflow UI에서 또는 gcloud에서 Airflow CLI 명령어를 실행합니다.
  • 일정에 따라 트리거 : DAG를 만들 때 일정을 지정합니다. Airflow는 지정된 예약 매개변수를 기반으로 DAG를 자동으로 트리거합니다.

이 두 가지 방법에 대해 같이 알아볼까요 ?!

 

1. Airflow UI

참조 : https://heumsi.github.io/apache-airflow-tutorials-for-beginner/getting-started/04-running-a-dag/#dag-trigger-하기

위의 그림에서 볼 수 있듯이 우측 상단에 Trigger DAG 버튼이 있습니다.

이 버튼을 클릭하면 새로운 DAG Run이 생기고 실행됩니다.

 

이렇게 실행된 세번째 DAG Run(DAG 실행이력)은 스케줄링으로 실행된 DAG Run과 다르게 동그라미와 네모 주위에 굵은 테두리가 없는걸로 구분이 가능합니다.

 

 

2. CLI

 

다음으로는 CLI환경에서 트리거 명령을 실행하여 실행 시점을 지정하는 방법입니다.

 

만약 DAG가 OFF 상태로 되어있다면 트리거하기 위해 ON으로 변경을 해줍니다.

(1) DAG - ON상태 변경 명령어 (unpause)

# ON
airflow dags unpause <DAG_ID>

# OFF
airflow dags pause <DAG_ID>

 

그 다음, ON 상태가 되었으면 DAG를 Trigger 해줍니다.

(2) DAG trigger 명령어

airflow dags trigger [-h] [-c CONF] [-e EXEC_DATE] [--no-replace-microseconds]
                     [-o table, json, yaml, plain] [-r RUN_ID] [-S SUBDIR]
                     [-v]
                     dag_id

-e : DAG 실행 날짜

[참고] https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#triggerer

 


 

Airflow의 Xcom과 Trigger 기능에 대해서 이해가 되셨을까요!?

😊😊

 

다음주는 Airflow 환경을 구성하고 DAG를 생성하는 방법을 배워보겠습니다!

 

그럼 베빠!

 

 

 


[참고 링크]

DAG 실행하기 | Apache Airflow Tutorials for Beginner

[Airflow] Trigger

[Airflow] 자주 쓰는 CLI

DAG 트리거  |  Cloud Composer  |  Google Cloud

[Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기

Airflow Xcom 사용하기

댓글