본문 바로가기
CSP (Cloud Service Provider)/AWS

[AWS] 실시간 데이터 파이프라인 구축(4)

by BTC_티모 2023. 11. 1.

탑신병자 듀오 팀 티모입니다.

AWS 실시간 데이터 파이프라인을 구축한 후, 대시보드를 통해 실시간 모니터링하는 아키텍처를 구현해보겠습니다.


실습 과정입니다.

1 - Kinesis Data Streams :  IoT 로그들을 Kinesis Data Streams의 버퍼 스토리지에 안전하게 수집합니다. 수집된 데이터들은 Kinesis 내 샤드에 저장되게 되고, 로그를 소비할 사용자들은 샤드에 저장된 데이터들을 가져갑니다.

2 - Kinesis Data Firehose : Kinesis Data Streams에 저장된 실시간 로그들을 설정한 버퍼 사이즈 또는 시간 주기에 따라 데이터들을 수집합니다. 수집한 데이터들은 Lambda를 통해 데이터를 추가 및 정제합니다. 정제한 데이터들은 Amazon OpenSearch에 Index를 기준으로 저장합니다.

3 - Lambda : Kinesis Data Firehose를 통해 설정한 버퍼 사이즈 또는 시간 주기에 따라 데이터를 추가 및 정제합니다. 실습에서는 생성된 IoT 센서 데이터에 섭씨 온도 레코드를 활용하여 화씨 온도 레코드를 추가합니다. 또한 Amazon OpenSearch에 적재하기 위해 데이터 형식을 지정합니다.

4 - Amazon OpenSearch : Kinesis Data Firehose를 통해 생성된 IoT 데이터들이 Index를 기준으로 저장됩니다. Amazon OpenSearch 내에서 저장된 데이터들을 Filter, Sum, Average등 다양한 기준으로 데이터를 그룹화 및 연산이 가능합니다.

5 - OpenSearch Dashboard : OpenSearch의 대시보드 기능을 활용해 실시간 데이터를 모니터링하는 대시보드를 구현합니다. 대시보드의 시간 주기를 설정하여 각 시간에 따라 대시보드가 업데이트 되고 IoT 센서 데이터의 실시간 현황을 확인 할 수 있습니다.


1~3 실습과 마찬가지로 CloudFormation 스택을 이용합니다.

템플릿 소스는 S3 URL로 지정, AWS workshop에서 제공된 URL를 이용합니다.

스택 이름 생성 후 모두 기본 옵션으로 두고 검토 단계에서 IAM 리소스 생성 확인 버튼을 체크합니다.

AWS가 실습용으로 제공하는 Module1, Module2 Cloudformation가 생성됩니다.

이번에는 Module2를 이용해 실습을 진행합니다.


우선, 실시간 데이터를 Amazon Opensearch에 저장하기 전에 데이터 추가 및 정제를 위한 Lambda를 구현합니다.

 

Lambda를 생성한 뒤 구성을 변경합니다.

  • 함수 이름 : iot-enrichment
  • 런타임 : Python 3.8

Lambda 일반 구성 설정 화면으로 이동하여 제한 시간을 1분으로 변경하고,
기존 역할에서 "IAM 콘솔에서 Lambda IAM 역할을 확인합니다"를 클릭한 뒤 AmazonKinesisFullAccess 권한을 추가하여 추후 Lambda가 Kinesis Data Firehose에 접근하여 데이터를 가져올 수 있도록 합니다.

Lambda의 코드 탭으로 이동하여 아래의 코드를 붙여넣은 후 Deploy 합니다.
코드는 Amazon Opensearch로 데이터를 저장하기 위해 형식(Timestamp)을 지정하고, 섭씨 온도를 기준으로 화씨 온도를 추가하는 레코드입니다.

import base64
import json
import datetime

print('Loading function')

def lambda_handler(event, context):
    
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')
        
        source_dict = json.loads(payload)
        device_time_raw = source_dict["device_ts"]
        device_time_element = datetime.datetime.strptime(device_time_raw, "%Y-%m-%d %H:%M:%S.%f")
        
        destination_dic = {
            "uuid" : source_dict["uuid"],
            "timestamp" : device_time_element.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3], # Set Timestamp format
            "device_id" : int(source_dict["device_id"]),
            "temp_celsius" : int(source_dict["device_temp"]),
            "temp_fahrenheit" : float(int(source_dict["device_temp"])*1.8 + 32), # Add fahrenheit temperature
            "track_id" : int(int(source_dict["track_id"])),
            "activity_type" : source_dict["activity_type"]
        }
        print(destination_dic['timestamp'])
        print(type(destination_dic['timestamp']))
        
        payload = json.dumps(destination_dic) +"\n"
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('ascii')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

이제 Kinesis Data Firehose를 생성합니다.

  • 소스 : Amazon Kinesis Data Streams
  • 대상 : Amazon OpenSearch Service
  • 소스 설정 : iot-data-stream-2
  • 전송 스트림 이름 : kinesis-Opensearch

 

레코드 변형 메뉴에서 'AWS Lambda를 사용하여 소스 레코드 변형'에 데이터 변환 활성화를 클릭합니다.

Lambda 검색 버튼을 클릭하여 이전 세부 모듈에서 생성한 iot-enrichment를 클릭합니다.

  • AWS Lambda : iot-enrichment
  • 버퍼 크기 : 0.2MB
  • 버퍼 간격 : 60초

데이터가 저장 될 대상 설정에서 Cloudformation으로 생성한 opensearch 도메인을 클릭합니다.

  • OpenSearch Service 도메인 : builders
  • 인덱스 : iot-stream

 

Opensearch에 데이터를 저장하기 위한 버퍼를 설정하기 위해 버퍼 힌트를 클릭합니다.

버퍼의 크기는 1MiB, 버퍼 간격은 60초로 변경합니다.
해당 버퍼는 Kinesis Data Firehose에서 제공하는 가장 작은/빠른 버퍼 입니다.

  • 버퍼 크기 : 1 MiB
  • 버퍼 간격 : 60초

레코드 저장이 실패한 데이터들을 백업하기 위해서 백업 설정을 지정합니다.
S3 백업 버킷으로는 Cloudformation으로 생성한 버킷을 선택하고, 버킷 접두사로 opensearch_backup/을 입력합니다.

  • S3 백업 버킷 : {Account ID}-builders-analytics
  • S3 백업 버킷 접두사 : opensearch_backup/

이외 다른 설정은 기본으로 두고 전송 스트림을 생성합니다.


다음은 IoT 데이터를 Amazon Opensearch에 저장하기 위해 Opensearch에 kinesis-datafirehose 역할을 생성하고,
해당 역할에 Kinesis Data Firehose를 매핑시켜주는 작업을 진행합니다.

이를 통해 IoT 데이터들을 Lambda로 전처리 후 Opensearch에 저장 할 수 있습니다.

 

Opensearch에 Kinesis Data Firehose가 접근하도록 설정하려면 Opensearch에 Firehose의 IAM ARN을 등록해야 합니다. IAM ARN을 확인하기위해 Kinesis Data Firehose로 이동합니다.
앞서 생성한 kinesis-opensearch의 구성 탭 하단의 서비스 액세스 박스에서 IAM 역할을 클릭,

IAM 메뉴에서 Kinesis Firehose에 적용된 IAM의 ARN 정보를 복사해둡니다.

이후 Opensearch로 이동, 'builders' 도메인을 클릭합니다.

Opensearch 대시보드 URL을 클릭하여 Opensearch 대시보드로 이동합니다.

Cloudformation > Module2 > 출력에 나와있는 Opensearch ID/Password를 이용 로그인합니다.

Opensearch에 로그인 후 왼쪽 메뉴 >  Security > Role 선택 후 생성

  • Name : kinesis-firehose
  • cluster permission : cluster_monitor, cluster_composite_ops
  • index : *
  • index permission : crud, create_index, manage

Role을 생성한 후 해당 Role에 Kinesis Data Firehose를 매핑시켜주기위해 Mapped users를 클릭합니다.

Map users >  Backend roles에 Kinesis Data Firehose의 IAM ARN을 입력 후 Map 버튼을 클릭합니다.

 


다음 시간에는 Kinesis Data Generator를 통해 Data를 발생 시킨 후,
Kinesis Data Firehose가 Amazon Opensearch에 데이터를 저장하는 것으로 이어서 진행하겠습니다.

감사합니다.

댓글