본문 바로가기
CSP (Cloud Service Provider)/AWS

S3 Access log & Lambda &CloudWatch

by 알 수 없는 사용자 2023. 12. 20.

안녕하세요. 이쁜이와 멋쟁이의 "BTC_준호" 입니다.

오늘은 S3 Logging 버킷에 쌓인 AccessLog를 Lambda를 사용하여 원하는 정보만 필터링 후 CloudWatch로 옮기는작업을 소개하겠습니다.

S3 -> Lambda -> CloudWatch


Lambda 함수 생성

  1. 관련 정책 설정
  • 함수 생성 시 역할이 함께 생성되는데 그 역할에는 Lambda 실행 로그를 CloudWatch 로그그룹에 저장할 수 있는 CloudWatch 관련 정책이 함께 생성되어 있음 (고객관리형 인라인정책)

  • S3에 엑세스 하여 로그를 가져오기 위해서는 인라인 정책과 로그그룹에 대한 정책을 추가 해줘야 한다.
    (권한 추가 → 인라인 정책 생성 → JSON →코드작성→ 변경사항 저장)
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "s3:GetObject",
      "Resource": "arn:aws:s3:::[bucket name]/*"
    }
  ]
}

[예시]

  • 동일한 방식으로 람다 함수 실행으로 만들어질 로그그룹 및 로그 스트림에 대한 권한을 얻기 위해 아래의 정책 추가
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "arn:aws:logs:ap-northeast-2:[account id]:*"
      ]
    }
  ]
}

[예시]


Lambda 함수 생성

  1. Lambda 함수 생성
  • 함수 생성시에 role 이 자동으로 생성되기 때문에 해당 롤을 편집하여 권한 수정 필요
    (함수 생성 후 Deploy 필수)
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import json

# AWS 클라이언트 생성
s3_client = boto3.client('s3')
logs_client = boto3.client('logs')

# 로그 그룹 및 스트림 이름 설정
log_group_name = 'your-log-group-name'  # 변경 가능한 변수
log_stream_name = 'your-log-stream-name'  # 변경 가능한 변수

# 로그 그룹이 없다면 생성
def ensure_log_group_exists():
    try:
        logs_client.create_log_group(logGroupName=log_group_name)
        print(f'Created log group: {log_group_name}')
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
            print(f'Log group already exists: {log_group_name}')
        else:
            print(f"Error creating log group: {e}")
            raise

# 로그 스트림이 없다면 생성
def ensure_log_stream_exists():
    try:
        logs_client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
        print(f'Created log stream: {log_stream_name}')
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
            print(f'Log stream already exists: {log_stream_name}')
        else:
            print(f"Error creating log stream: {e}")
            raise

# CloudWatch에 로그 이벤트를 전송
def send_to_cloudwatch(log_data):
    ensure_log_group_exists()
    ensure_log_stream_exists()
    # 마지막 로그 이벤트의 sequenceToken 가져오기
    try:
        response = logs_client.describe_log_streams(logGroupName=log_group_name, logStreamNamePrefix=log_stream_name)
        streams = response['logStreams']
        sequence_token = None
        if streams:
            sequence_token = streams[0].get('uploadSequenceToken')
    except ClientError as e:
        print(f"Error getting log stream: {e}")
        raise

    log_events = [{
        'timestamp': int(datetime.now().timestamp() * 1000),
        'message': log_data if isinstance(log_data, str) else json.dumps(log_data)
    }]

    try:
        put_event_args = {
            'logGroupName': log_group_name,
            'logStreamName': log_stream_name,
            'logEvents': log_events
        }
        if sequence_token:
            put_event_args['sequenceToken'] = sequence_token
        response = logs_client.put_log_events(**put_event_args)
        print('Successfully put log events')
    except ClientError as e:
        print(f"Error putting log events: {e}")
        raise

# Lambda 핸들러
def lambda_handler(event, context):
    for record in event.get('Records', []):
        bucket_name = record['s3']['bucket']['name']
        object_key = record['s3']['object']['key']
        try:
            response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
            file_content = response['Body'].read().decode('utf-8').splitlines()
            for line in file_content:
                if 'GET' in line:  # 'GET' 요청이 포함된 라인만 선택
                    send_to_cloudwatch(line)
        except ClientError as e:
            print(f"Error getting object {object_key} from bucket {bucket_name}: {e}")
    return

Trigger 설정

  1. Lambda 함수에 트리거 설정 소스 S3
  • 트리거 추가시 소스를 S3로 선택 후 객체 생성 시 트리거 작동으로 람다 함수 실행
  • Test 코드 실행 오류가 나는 이유는 S3 객체 생성이 안되면 트리거 작동을 안하기 때문


S3 → CloudWatch 로 옮겨 올 때 필터링 방법

조건문을 사용하여 필터링 된 로그만 CloudWatch로 전달 가능
GET 부분을 원하는 문자열로 변경하여 필터링 가능

# Lambda 핸들러
def lambda_handler(event, context):
    for record in event.get('Records', []):
        bucket_name = record['s3']['bucket']['name']
        object_key = record['s3']['object']['key']
        try:
            response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
            file_content = response['Body'].read().decode('utf-8').splitlines()
            for line in file_content:
                if 'GET' in line:  # 'GET' 요청이 포함된 라인만 선택
                    send_to_cloudwatch(line)
        except ClientError as e:
            print(f"Error getting object {object_key} from bucket {bucket_name}: {e}")
    return

EX) GET 문자로 필터링 했을 때 예시

 

'CSP (Cloud Service Provider) > AWS' 카테고리의 다른 글

AWS Quantum Ledger Database (QLDB)  (1) 2023.12.22
[AWS] Athena로 S3 AcceseLog 분석하기  (0) 2023.12.21
[AWS] S3 Access log 활성화 방법  (0) 2023.12.20
[AWS] AWS DataSync-EFS  (0) 2023.12.20
ELB 504 에러 해결방안  (0) 2023.12.15

댓글