본문 바로가기
Database

Airflow : API를 활용한 Image 다운로드

by BTC_문땅훈 2023. 8. 17.

 

베하 ~!

 

문땅훈과 루피입니다.

 

 

오늘은

'Launch 라이브러리에서 로켓 이미지 가져오기' 예제 실습과 동시에

API를 활용하여 이미지를 다운로드 하는 실습도 간단하게 진행해보겠습니다.

 

*Launch 라이브러리에서 로켓 이미지 가져오기' 실습은 Airflow 참고서적에서 참고하여 진행했습니다.*

 

 


Launch 라이브러리에서 로켓 이미지 가져오기

Launch library 2

  • 로켓 발사, 우주 관련 Event 데이터들을 오픈 API를 통해 제공

⇒ 가져올 데이터 : 예정된 10개의 로켓 발사에 대한 데이터와 로켓 이미지에 대한 URL을 가져오는 Launcher 데이터

 

 

 DAG 구조 및 설명

밑에서 간략하게 정리한것과 같이 API로부터 Image URL을 추출한 다음 해당 Image를 다운로드하여 성공하면 입력해준 이메일로 성공했다는 메세지를 보내도록 합니다.

(1) download_launches : API로부터 JSON 데이터 다운로드 및 JSON 데이터에서 Image URL 추출

(2) get_pictures : Image URL에 접속 및 Image 내려받기

(3) notify : 모든 Image를 download 받으면 E-mail로 성공 메세지 보내기

 

 

로켓 발사 데이터 다운로드 및 처리를 위한 DAG

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

import pathlib # 경로를 문자열이 아닌 객체로 처리하도록 해줌
import json
import requests
import requests.exceptions as requests_exceptions

ROCKET_LAUNCHES_URL = "https://ll.thespacedevs.com/2.0.0/launch/upcoming/"
JSON_PATH = '/tmp/launches.json'
TARGET_DIR = '/tmp/images'

def _get_pictures():
    # Path() : Path 객체 생성
    # mkdir() - exist_ok=True : 폴더가 없을 경우 자동으로 생성 
    pathlib.Path(TARGET_DIR).mkdir(parents=True, exist_ok=True)

    # launches.json 파일에 있는 모든 그림 파일 download
    with open(JSON_PATH) as f:
        try:
            launches = json.load(f)
            image_urls = [launch['image'] for launch in launches['results']]

            for i, image_url in enumerate(image_urls):
                try:
                    response = requests.get(image_url)
                    image_filename = image_url.split('/')[-1]
                    target_file = f'{TARGET_DIR}/{image_filename}'

                    with open(target_file, 'wb') as f:
                        f.write(response.content)
                    
                    print(f'Downloaded {image_url} to {target_file}')
                except requests_exceptions.MissingSchema: 
                    print(f'{image_url} appears to be an invalid URL.')
                except requests_exceptions.ConnectionError:
                    print(f'Could not connect to {image_url}.')
        except KeyError as e:
            with open(JSON_PATH) as f:
                print(json.load(f)) # ex : {'detail': 'Request was throttled. Expected available in 766 seconds.'}
            raise


with DAG(
    'download_rocket_launches',
    start_date = airflow.utils.dates.days_ago(14),
    schedule_interval = None
) as dag:
    
    download_launches = BashOperator(
        task_id = 'download_launches',
        bash_command = f'curl -o {JSON_PATH} -L {ROCKET_LAUNCHES_URL}'
    )

    get_pictures = PythonOperator(
        task_id = 'get_pictures',
        python_callable = _get_pictures
    )

    notify = EmailOperator(
        task_id = 'send_email',
        to = '이메일 주소 기입',
        subject = 'Rocket Launches Data Ingestion Completed.',
        html_content = """
            <h2>Rocket Launches Data Ingestion Completed.</h2>
            <br/>
            Date : {{ ds }}
        """
    )

    download_launches >> get_pictures >> notify

 

이미지 들어온거 확인

 

 

 

 

 

 

추가로 다른 API를 사용하여 이미지를 다운로드 받는 것도 진행해봤습니다~

저는 Unsplash에서 가져왔는데, 원하시는 API를 정하셔서 진행해보는 것도 좋을거 같습니다.

 

 

 

 

 

 

✅ Unsplash API 사용해서 랜덤이미지 가져오기

Unsplash Dag Code

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

import pathlib
import requests
import requests.exceptions as requests_exceptions

UNSPLASH_API_URL = "<https://api.unsplash.com/photos/random>"
UNSPLASH_ACCESS_KEY = "3TptV0Yc2Gt5kHwtKvFpQfQfURrPetejTaVSJ-6Lphk"
JSON_PATH = '/tmp/images.json'
TARGET_DIR = '/tmp/images'

def _get_images():
    pathlib.Path(TARGET_DIR).mkdir(parents=True, exist_ok=True)

    headers = {
        "Authorization": f"Client-ID {UNSPLASH_ACCESS_KEY}"
    }

    # Unsplash API 호출하여 이미지 다운로드 URL 가져오기
    image_urls = []
    for _ in range(10):
        response = requests.get(UNSPLASH_API_URL, headers=headers)
        data = response.json()
        image_urls.append(data["urls"]["regular"])

    # 이미지 다운로드
    for i, image_url in enumerate(image_urls):
        try:
            response = requests.get(image_url)
            image_filename = f"image_{i}.jpg"
            target_file = f"{TARGET_DIR}/{image_filename}"

            with open(target_file, 'wb') as f:
                f.write(response.content)

            print(f"Downloaded {image_url} to {target_file}")
        except requests_exceptions.MissingSchema:
            print(f"{image_url} appears to be an invalid URL.")
        except requests_exceptions.ConnectionError:
            print(f"Could not connect to {image_url}.")

with DAG(
    'download_images',
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None
) as dag:

    download_data = BashOperator(
        task_id='download_data',
        bash_command=f'curl -o {JSON_PATH} -L {UNSPLASH_API_URL}'
    )

    get_images = PythonOperator(
        task_id='get_images',
        python_callable=_get_images
    )

    notify = EmailOperator(
        task_id='send_email',
        to='silverbin0523@gmail.com',
        subject='Image Download Completed.',
        html_content="""
                   <h2>Image Download Completed.</h2>
            <br/>
            Date: {{ ds }}
        """
    )

    download_data >> get_images >> notify

 

 

이미지 들어온거 확인

 

 

 

 

 

그럼 다음 시간에 봐요 ~

베빠 ~ 😀

 

 

[참고 링크]

https://velog.io/@jskim/Airflow-Pipeline-%EB%A7%8C%EB%93%A4%EA%B8%B0-API%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-Image-%EB%8B%A4%EC%9A%B4%EB%A1%9C%EB%93%9C

'Database' 카테고리의 다른 글

LangChain이란?  (0) 2023.09.04
SQL과 NoSQL 데이터베이스  (0) 2023.08.18
Airflow를 통한 Dataproc 연동  (0) 2023.08.16
[Airflow] Decorator  (0) 2023.08.07
[Airflow] Airflow CLI 명령어  (0) 2023.07.24

댓글