안녕하세요. 이쁜이와 멋쟁이의 "BTC_준호" 입니다.
오늘은 S3 Logging 버킷에 쌓인 AccessLog를 Lambda를 사용하여 원하는 정보만 필터링 후 CloudWatch로 옮기는작업을 소개하겠습니다.
Lambda 함수 생성
- 관련 정책 설정
- 함수 생성 시 역할이 함께 생성되는데 그 역할에는 Lambda 실행 로그를 CloudWatch 로그그룹에 저장할 수 있는 CloudWatch 관련 정책이 함께 생성되어 있음 (고객관리형 인라인정책)
- S3에 엑세스 하여 로그를 가져오기 위해서는 인라인 정책과 로그그룹에 대한 정책을 추가 해줘야 한다.
(권한 추가 → 인라인 정책 생성 → JSON →코드작성→ 변경사항 저장)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::[bucket name]/*"
}
]
}
[예시]
- 동일한 방식으로 람다 함수 실행으로 만들어질 로그그룹 및 로그 스트림에 대한 권한을 얻기 위해 아래의 정책 추가
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
"Resource": [
"arn:aws:logs:ap-northeast-2:[account id]:*"
]
}
]
}
[예시]
Lambda 함수 생성
- Lambda 함수 생성
- 함수 생성시에 role 이 자동으로 생성되기 때문에 해당 롤을 편집하여 권한 수정 필요
(함수 생성 후 Deploy 필수)
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import json
# AWS 클라이언트 생성
s3_client = boto3.client('s3')
logs_client = boto3.client('logs')
# 로그 그룹 및 스트림 이름 설정
log_group_name = 'your-log-group-name' # 변경 가능한 변수
log_stream_name = 'your-log-stream-name' # 변경 가능한 변수
# 로그 그룹이 없다면 생성
def ensure_log_group_exists():
try:
logs_client.create_log_group(logGroupName=log_group_name)
print(f'Created log group: {log_group_name}')
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
print(f'Log group already exists: {log_group_name}')
else:
print(f"Error creating log group: {e}")
raise
# 로그 스트림이 없다면 생성
def ensure_log_stream_exists():
try:
logs_client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
print(f'Created log stream: {log_stream_name}')
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceAlreadyExistsException':
print(f'Log stream already exists: {log_stream_name}')
else:
print(f"Error creating log stream: {e}")
raise
# CloudWatch에 로그 이벤트를 전송
def send_to_cloudwatch(log_data):
ensure_log_group_exists()
ensure_log_stream_exists()
# 마지막 로그 이벤트의 sequenceToken 가져오기
try:
response = logs_client.describe_log_streams(logGroupName=log_group_name, logStreamNamePrefix=log_stream_name)
streams = response['logStreams']
sequence_token = None
if streams:
sequence_token = streams[0].get('uploadSequenceToken')
except ClientError as e:
print(f"Error getting log stream: {e}")
raise
log_events = [{
'timestamp': int(datetime.now().timestamp() * 1000),
'message': log_data if isinstance(log_data, str) else json.dumps(log_data)
}]
try:
put_event_args = {
'logGroupName': log_group_name,
'logStreamName': log_stream_name,
'logEvents': log_events
}
if sequence_token:
put_event_args['sequenceToken'] = sequence_token
response = logs_client.put_log_events(**put_event_args)
print('Successfully put log events')
except ClientError as e:
print(f"Error putting log events: {e}")
raise
# Lambda 핸들러
def lambda_handler(event, context):
for record in event.get('Records', []):
bucket_name = record['s3']['bucket']['name']
object_key = record['s3']['object']['key']
try:
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
file_content = response['Body'].read().decode('utf-8').splitlines()
for line in file_content:
if 'GET' in line: # 'GET' 요청이 포함된 라인만 선택
send_to_cloudwatch(line)
except ClientError as e:
print(f"Error getting object {object_key} from bucket {bucket_name}: {e}")
return
Trigger 설정
- Lambda 함수에 트리거 설정 소스 S3
- 트리거 추가시 소스를 S3로 선택 후 객체 생성 시 트리거 작동으로 람다 함수 실행
- Test 코드 실행 오류가 나는 이유는 S3 객체 생성이 안되면 트리거 작동을 안하기 때문
S3 → CloudWatch 로 옮겨 올 때 필터링 방법
조건문을 사용하여 필터링 된 로그만 CloudWatch로 전달 가능
GET 부분을 원하는 문자열로 변경하여 필터링 가능
# Lambda 핸들러
def lambda_handler(event, context):
for record in event.get('Records', []):
bucket_name = record['s3']['bucket']['name']
object_key = record['s3']['object']['key']
try:
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
file_content = response['Body'].read().decode('utf-8').splitlines()
for line in file_content:
if 'GET' in line: # 'GET' 요청이 포함된 라인만 선택
send_to_cloudwatch(line)
except ClientError as e:
print(f"Error getting object {object_key} from bucket {bucket_name}: {e}")
return
EX) GET 문자로 필터링 했을 때 예시
'CSP (Cloud Service Provider) > AWS' 카테고리의 다른 글
AWS Quantum Ledger Database (QLDB) (1) | 2023.12.22 |
---|---|
[AWS] Athena로 S3 AcceseLog 분석하기 (0) | 2023.12.21 |
[AWS] S3 Access log 활성화 방법 (0) | 2023.12.20 |
[AWS] AWS DataSync-EFS (0) | 2023.12.20 |
ELB 504 에러 해결방안 (0) | 2023.12.15 |
댓글