from typing import List, Mapping
import boto3
__all__ = ['AwsLogUtil', 'LogEvent']
[docs]class LogEvent(object):
"""A AWS Cloudwatch log event.
Args:
record: A dictionary of log metadata.
"""
def __init__(self, record: Mapping) -> None:
self.timestamp = record.get('timestamp')
self.message = record.get('message')
self.ingestion_time = record.get('ingestionTime')
def __repr__(self) -> str:
return (
f'{self.__class__.__qualname__}('
f'timestamp={repr(self.timestamp)}, '
f'message={repr(self.message)}, '
f'ingestion_time={repr(self.ingestion_time)})'
)
[docs]class AwsLogUtil(object):
"""AWS Cloudwatch cloud utility functions."""
def __init__(self) -> None:
self.client = boto3.client('logs')
[docs] def get_log_events(self, group_name: str, stream_name: str) -> List[LogEvent]:
"""Get all log events from a stream within a group."""
response = self.client.get_log_events(logGroupName=group_name, logStreamName=stream_name)
events = [LogEvent(record) for record in response['events']]
return events