본문 바로가기
CSP (Cloud Service Provider)/AWS

[AWS] 실시간 데이터 파이프라인 구축(2)

by BTC_티모 2023. 10. 6.

탑신병자 듀오 팀 티모입니다.

오늘은 AWS 기반 실시간 데이터 파이프라인을 이어서 구축해보겠습니다.


실습 과정입니다.

  1. Kinesis Data Streams : 지속적으로 생산되는 실시간 IoT 로그들을 Kinesis Data Streams 내 Buffer Storage에 수집합니다. 수집된 데이터들은 Shard에 저장되며 Consumer들은 Shard에 저장된 데이터들을 가져가 사용합니다.
  2. Glue Streaming : ETL 서비스인 AWS Glue을 통해 Kinesis Data Streams에 쌓이는 실시간 데이터를 처리합니다.
    처리가 완료된 데이터들은 Amazon S3에 저장합니다.
  3. Glue Data Catalog : 실시간 유입되는 데이터들을 처리하기 위해 데이터의 스키마 형식을 Glue Data Catalog에 저장합니다. Glue Streaming(2)에서는 사전에 정의된 스키마를 기준으로 데이터를 처리합니다.
  4. Reference Data(S3) : Glue Streaming(2)에서 실시간으로 유입되는 데이터들을 Join하기 위한 Reference 데이터를 S3에 저장합니다
  5. Glue Crawler / Glue Data Catalog : 2~4 과정을 거쳐 최종적으로 S3에 저장된 데이터들을 스캔하여 데이터 카탈로그를 생성, 하나 이상의 테이블을 생성합니다. 여기서 정의된 데이터 카탈로그를 기준으로 Athena를 통해 데이터를 분석합니다.
  6. Amazon Athena :  Glue Data Catalog를 참조하여 S3의 데이터에 대해 SQL 기반의 분석을 진행합니다.
    이를 통해 ETL이 완료된 데이터를 간편하게 데이터를 분석 할 수 있습니다.
  7. Amazon Quicksight : Athena(6)를 통해 분석한 결과들을 다양한 대시보드 기능을 통해 시각화하여 사용자가 원하는 방식으로 데이터 분석 결과를 보여줍니다.

데이터 처리를 위한 사전 작업을 진행했고, streaming job을 생성하는 것으로 이어서 진행합니다. (2. Glue Streaming)

Glue >  ETL jobs 선택 후 Visual with blank canvas로 생성합니다.

Glue Job을 시각화로 생성하는 메뉴로 이동했다면, 우선 kinesis부터 설정합니다.

  • Amazon Kinesis Source : Data Catalog table
  • Database : iotstream
  • Table : iot-stream-raw
  • Detect schema inactive(체크 해제)

Kinesis로 수집한 데이터들을 Reference 데이터와 조인하기 위해 스키마 설정을 변경해줍니다.

add nodes(좌측 위 +)를 눌러 Transforms > Change Schema을 선택합니다.

  • Node Parent : Amazon Kinesis
  • Change Schema : track_id(Int -> String)

다음은 Kinesis로부터 수집한 데이터들을 Join하기 위한 Reference Data를 추가합니다.

sources > AWS Glue Data Catalog를 선택합니다.

  • Database : iotstream
  • Table : reference

지금까지 생성한 테이블을 Join합니다.
Transforms > Join을 선택합니다.

  • Node Parent : AWS Glue Data Catalog, Change Schema
  • Join Condition : track_id = track_id

Join까지 완료한 테이블을 추가로 Change Schema 작업을 진행하여 특정 레코드 값을 제거합니다.

Transforms > Change Schema을 선택합니다.

  • Change Schema : .track_id Drop(체크)

마지막으로 지금까지 작업한 데이터들을 S3에 저장하기 위한 과정을 진행합니다.
기존의 JSON 타입의 데이터들을 빠른 데이터 분석을 위한 Parquet 형식으로 변환 및 압축하는 작업도 함께 진행합니다.
Targets > Amazon S3를 선택합니다.

  • Node Parent : Change Schema(.track_id Drop한 JOIN이 완료된 테이블)
  • Format : Parquet
  • Compression Type : Snappy
  • S3 Target Location : {AWS Account}-builders-analytics/transformed/

Job details로 이동하여 Job Name과 IAM 그리고 Worker Node의 수를 지정한 뒤 save합니다.

  • Job Name : iotStreaming
  • IAM : Glueadmin
  • Worker Type : G 0.25X
  • Requested number of workers = 2

ETL jobs로 돌아가 확인합니다.

 

이제 Streaming Job을 실행해봅니다.

먼저 Kinesis Data Generator로 이동하여 실시간 데이터를 Kinesis로 전송합니다.

전송 대상은 iot-data-stream-1입니다. 데이터 전송 후 iotStreaming을 실행합니다.

Job run monitroing 메뉴를 통해서 Glue Streaming Job의 상태를 확인 할 수 있습니다.

Amazon S3에서 ETL이 완료된 데이터들이 정상적으로 저장된 것을 확인할 수 있습니다.

경로는 builders-analytics/transformed/ingest_(year)/ingest_(month)/ingest_(day)/ingest_(hour)/ 입니다.

여기까지가 Glue Streaming Job을 생성하고 실행하는 과정입니다.

실습이 잘 진행된 것을 확인한 뒤에는 job을 중지하고 데이터 전송 역시 중지합니다.

 


다음은 위 실습으로 ETL 작업이 완료된 데이터를 대상으로 SQL 기반의 데이터 분석을 해보겠습니다.

Glue Crawler를 이용 S3에 저장된 데이터를 스캔하고 데이터 스키마를 정의합니다.

Crawler를 생성합니다.

  • Crawler Name : Transformed
  • Data Source : S3
  • S3 path : {AWS Account ID}-builders-analytics/transformed/
  • IAM Role : glueadmin
  • Target database : iotstream

Crwaler 실행 후 Data Catalog -> Database -> Tables로 이동, transformed의 Table data를 클릭합니다.

Athena로 이동한 뒤, 쿼리문을 수정해 실행해봅니다.

SELECT * FROM "AwsDataCatalog"."iotstream"."transformed" limit 10;

쿼리 결과에서 track_name과 artist_name이 추가 된 것을 확인할 수 있습니다.

Athena를 통하여 데이터 분석을 실행할 수 있습니다.
새로운 쿼리를 실행하기 위해 우측의 + 버튼을 클릭하여 새로운 쿼리 화면을 생성합니다.

온도가 40도 이상인 디바이스들의 Activity_type과 수를 계산하고 이를 Table1 이라는 테이블에 데이터를 적재합니다.
아래 쿼리 실행 시 external_location의 {AWS Account ID}개인 계정 ID를 넣어 수정합니다.

CREATE TABLE "iotstream"."tab1"
WITH(
    external_location = 's3://{AWS Account ID}-builders-analytics-athena/tab1',
    format = 'Parquet'
)
AS SELECT "activity_type", count("device_id") as Number_of_Record
FROM "AwsDataCatalog"."iotstream"."transformed" 
where "iotstream"."transformed"."device_temp" >= 40 
GROUP BY "activity_type"
ORDER BY "number_of_record" desc;

위 쿼리를 실행 했을때 tab1 이라는 테이블이 생성 된 것을 확인할 수 있으며,
tab1을 클릭 후 테이블 미리보기를 클릭하여 데이터를 조회 할 수 있습니다.


다음 시간에는 Amazon Quicksight를 통해 시각화하는 과정을 이어서 진행하겠습니다.

감사합니다.

'CSP (Cloud Service Provider) > AWS' 카테고리의 다른 글

Amazon VPC Lattice  (1) 2023.10.11
[AWS] Trusted Advisor  (0) 2023.10.10
[AWS] AWS Amplify  (0) 2023.10.05
[AWS] 스토리지 관련 다양한 서비스  (1) 2023.10.04
[AWS] CloudShell  (0) 2023.10.02

댓글