返回列表
如何利用组织级别的cloudtrail监控指定EC2机型飞书告警
发布时间:2026-03-28 02:04
方案概述
本方案基于 AWS Organizations 组织级 CloudTrail,实现对组织内所有成员账号特定 EC2 实例类型(如 GPU 实例 g5 系列)启动行为的实时监控与飞书告警通知。方案采用 Serverless 架构,无需管理服务器,成本极低,适用于需要对高成本实例启动进行管控的多账号组织环境。
架构组件
组件 | 服务 | 作用 |
| 日志采集 | AWS CloudTrail(组织级) | 集中采集组织内所有账号的 API调用事件 |
| 日志存储 | Amazon S3 | 存储 CloudTrail 日志文件(.json.gz) |
| 事件通知 | Amazon SNS | CloudTrail 写入新日志文件时发送通知 |
| 消息缓冲 | Amazon SQS | 缓冲 SNS 通知消息 |
| 事件处理 | AWS Lambda | 解析日志、 匹配事件、发送告警 |
| 事件去重 | Amazon DynamoDB | 存储已处理的事件 ID,防止重复告警 |
| 告警通知 | 飞书 Webhook | 发送卡片消息到飞书群 |
架构图

架构描述:
1. 在Org master账号中开启组织级别的cloudtrail,
2. 开启组织级别的cloutrail之后, 日志每5-15分钟会收集到S3中
3. SNS+SQS发送通知消息和避免Lambda并发过高
4. Lambda 通过 SQS Event Source Mapping 自动拉取消息,发送Post https请求
日志采集阶段
组织级 CloudTrail 在 Management Account 中开启, 自动采集组织内所有成员账号的管理事件(Management Events) 。CloudTrail 将事件按账号、 区域、 日期分目录存储到指定的 S3 Bucket中, 日志文件格式为 gzip 压缩的 JSON 文件。
事件通知阶段
CloudTrail 原生支持配置 SNS 通知,每次向 S3 写入新的日志文件时, 自动向指定的 SNS Topic 发送通知。通知内容仅包含 S3 Bucket 名称和文件路径( Key),不包含事件内容本身。
消息缓冲阶段
SNS 通知投递到 SQS 标准队列进行缓冲。SQS 的作用:
. 削峰填谷: 当短时间内产生大量日志文件时,SQS 缓冲消息,避免 Lambda 并发过高
事件处理阶段
Lambda 通过 SQS Event Source Mapping 自动拉取消息,处理流程:
1. 解析 SQS 消息体,提取 S3 文件路径
2. 从 S3 下载 .json.gz 日志文件
3. 解压并解析 JSON ,遍历所有 CloudTrail 事件记录
4. 筛选 event Name 为 RunInstances 的事件
5. 检查 instanceType 是否匹配监控列表(如 g5. 系列)
6. 通过 DynamoDB 条件写入进行去重,防止同—事件重复告警
7. 对新事件调用飞书 Webhook 发送卡片消息
去重机制
CloudTrail 可能在不同日志文件中重复记录同—事件,SQS 标准队列也可能重复投递消息。方案使用DynamoDB 基于 CloudTrail 事件唯—标识 eventID 进行去重:
. 使用 ConditionExpression='attribute_not_exists(eventId)' 条件写入 ,保证原子性 . 写入成功表示新事件,执行告警
. 写入失败(条件不满足)表示重复事件,跳过
. TTL 设置为 24 小时,过期记录自动清理,表不会无限增长
告警通知
通过飞书 Webhook 发送交互式卡片消息,包含以下信息: . 事件类型、执行状态
. 账号 ID 、 区域
. 操作人( IAM User/Role/Root) . 来源 IP 、操作时间
. 实例类型、AMI 、实例 ID 、私有 IP . 完整请求参数
操作步骤:
1.创建SNS消息的通道

2.开启组织级别的cloudtail,开启这个功能的话,会将当前组织下所有的link Account下的cloudtrail 日志收集到org manager 账号当中

这里需要新创建—个MSK和配置刚刚创建好的SNS主题,配置完成之后直接创建即可3.配置SQS

创建完成之后需要订阅SNS

4.配置DynamoDB
通过cli来创建表
1 aws dynamodb create-table \
2 --table-name cloudtrail-alert-dedup \
3 --attribute-definitions AttributeName=eventId,AttributeType=S \
4 --key-schema AttributeName=eventId,KeyType=HASH \
5 --billing-mode PAY_PER_REQUEST \
6 --region us-east-1
开启生命周期
1 aws dynamodb update-time-to-live \
2 --table-name cloudtrail-alert-dedup \
3 --time-to-live-specification Enabled=true,AttributeName=ttl \
4 --region us-east-1
5.创建和配置Lambda

Lambda代码
1 import json
2 import gzip
3 import os
4 import time
5 import boto3
6 from urllib.parse import unquote_plus
7 from urllib import request
8
9 s3 = boto3.client('s3')
10 dynamodb = boto3.resource('dynamodb')
11 dedup_table = dynamodb.Table(os.environ['DEDUP_TABLE_NAME'])
12 FEISHU_WEBHOOK = os.environ['FEISHU_WEBHOOK_URL']
13
14 TARGET_EVENTS = {
15 'RunInstances',
16 }
17
18 EVENT_LABELS = {
19 'RunInstances': 'EC2 Create Instance',
20 }
21
22 # Instance families to alert on - add new prefixes here
23 ALERT_INSTANCE_PREFIXES = [
24 't3.',#测试用
25 'g5.',
26 ]
27
28
29 def lambda_handler(event, context):
30 failures = []
31 for record in event['Records']:
32 try:
33 process_record(record)
34 except Exception as e:
35 print("Failed: " + str(record.get('messageId')) + ", error: " + str(e))
36 failures.append({"itemIdentifier": record['messageId']})
37 return {"batchItemFailures": failures}
38
39
40 def is_duplicate(event_id):
41 try:
42 dedup_table.put_item(
43 Item={
44 'eventId': event_id,
45 'ttl': int(time.time()) + 86400
46 },
47 ConditionExpression='attribute_not_exists(eventId)'
48 )
49 return False
50 except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
51 return True
52
53
54 def should_alert(e):
55 event_name = e.get('eventName', '')
56 if event_name == 'RunInstances':
57
'') instance_type = (e.get('requestParameters') or {}).get('instanceType',
58 for prefix in ALERT_INSTANCE_PREFIXES:
59 if instance_type.startswith(prefix):
60 return True
61 return False
62
63
64 def process_record(record):
65 sns_msg = json.loads(record['body'])
66 s3_info = json.loads(sns_msg['Message'])
67
68 bucket = s3_info['s3Bucket']
69 for key in s3_info.get('s3ObjectKey', []):
70 if not key.endswith('.json.gz'):
71 continue
72 obj = s3.get_object(Bucket=bucket, Key=unquote_plus(key))
73 with gzip.open(obj['Body'], 'rt') as f:
74 log = json.loads(f.read())
75 for e in log.get('Records', []):
76 if not should_alert(e):
77 continue
78 event_id = e.get('eventID', '')
79 if event_id and not is_duplicate(event_id):
80 send_alert(e)
81 elif not event_id:
82 send_alert(e)
83
84
85 def send_alert(e):
86 event_name = e.get('eventName', '')
87 account_id = e.get('recipientAccountId', 'Unknown')
88 region = e.get('awsRegion', 'Unknown')
89 event_time = e.get('eventTime', 'Unknown')
90 source_ip = e.get('sourceIPAddress', 'Unknown')
91 error_code = e.get('errorCode', '')
92 params = e.get('requestParameters', {}) or {}
93 response = e.get('responseElements', {}) or {}
94
95 user_identity = e.get('userIdentity', {})
96 user_type = user_identity.get('type', '')
97 if user_type == 'Root':
98 operator = "Root (" + user_identity.get('accountId', '') + ")"
99 else:
100 operator = user_identity.get('arn', 'Unknown')
101
102 purchase_type = EVENT_LABELS.get(event_name, event_name)
103 status = "Failed" if error_code else "Success"
104
105 details = []
106 details.append("Instance Type: " + str(params.get('instanceType', 'N/A')))
107 items = params.get('instancesSet', {}).get('items', [])
108 if items:
109 details.append("AMI: " + str(items[0].get('imageId', 'N/A')))
110 min_c = str(items[0].get('minCount', 'N/A'))
111 max_c = str(items[0].get('maxCount', 'N/A'))
112 details.append("Count: " + min_c + " - " + max_c)
113 resp_items = response.get('instancesSet', {}).get('items', [])
114 for ri in resp_items:
115 details.append("Instance ID: " + str(ri.get('instanceId', 'N/A')))
116 details.append("Private IP: " + str(ri.get('privateIpAddress', 'N/A')))
117
118 detail_text = "\n".join(details) if details else "N/A"
119 params_text = json.dumps(params, indent=2, ensure_ascii=False, default=str)
120 params_text = params_text[:2000]
121 params_block = "、、、\n" + params_text + "\n、、、"
122
123 card = {
124 "msg_type": "interactive",
125 "card": {
126 "header": {
127 "title": {
128 "tag": "plain_text",
129 "content": "AWS Alert: " + purchase_type
130 },
131 "template": "red" if error_code else "orange"
132 },
133 "elements": [
134 {
135 "tag": "div",
136 "fields": [
137 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Event**\n" + purchase_type}},
138 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Status**\n" + status}},
139 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Account**\n" + account_id}},
140 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Region**\n" + region}},
141 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Operator**\n" + operator}},
142 {"is_short": True, "text": {"tag": "lark_md",
"content": "**Source IP**\n" + source_ip}},
143 ]
144 },
145 {"tag": "div", "text": {"tag": "lark_md", "content":
"**Time**\n" + event_time}},
146 {"tag": "div", "text": {"tag": "lark_md", "content":
"**Details**\n" + detail_text}},
147 {"tag": "div", "text": {"tag": "lark_md", "content":
"**Parameters**\n" + params_block}},
148 ]
149 }
150 }
151
152 data = json.dumps(card).encode('utf-8')
153 req = request.Request(FEISHU_WEBHOOK, data=data, headers={'Content-Type': 'application/json'})
154 request.urlopen(req)
155
常规配置:


Lambdaj角色换权限需要额外添加以下权限
内联策略:
1 {
2 "Version": "2012-10-17",
3 "Statement": [
4 {
5 "Sid": "Statement1", 6 "Effect": "Allow",
7 "Action": [
8 "dynamodb:PutItem"
9 ],
10 "Resource": [
11 "*"
12 ]
13 }
14 ]
15 }
托管策略:
AWSLambdaSQSQueueExecutionRole
Lambda环境变量:

DEDUP_TABLE_NAME:填写dynamodb的表名
FEISHU_WEBHOOK_URL:填写飞书WebHook的url
添加触发器

Lambda测试json:
1 {
2 "Records": [
3 {
4 "messageId": "test-001",
5 "body": "{\"Message\": \"{\\\"s3Bucket\\\": \\\"aws-cloudtrail-logs-xxxxx- 9f3ffaba\\\", \\\"s3ObjectKey\\\": [\\\"AWSLogs/o-8jppajkuxv/{账号
ID}/CloudTrail/us-east-1/2026/03/24/376129859421_CloudTrail_us-east-
1_20260324T0720Z_vyrL7oNCnrplt30N.json.gz\\\"]}\"}"
6 } 7 ]
8 }
验证结果:

成本估算
以 100 个成员账号的组织为例:
组件 | 月用量估算 | 月成本 |
| CloudTrail(组织级) | 已有,不额外计费 | $0( 已有) |
S3 存储 | 已有,不额外计费 | $0( 已有) |
| SNS 通知 | ~数千条/月 | ~$0 |
| SQS 消息 | ~数千条/月 | $0(免费额度内) |
| Lambda 调用 | ~数千次/月 ,128MB | $0(免费额度内) |
| DynamoDB | 按需模式,极少量读写 | ~$0 |
| 数据传输 | 同区域 AWS 服务间 | $0 |
| 合计 | ~$0 |
方案优势
. 全组织覆盖:基于组织级 CloudTrail , 自动覆盖所有现有和新加入的成员账号,无需逐账号配置
. Serverless 架构 :无需管理服务器,按需付费,成本极低
. 可靠性高: SQS 消息持久化 + DLQ 兜底 + DynamoDB 去重,确保不丢消息、不重复告警 . 易于扩展:修改实例类型前缀列表即可扩展监控范围,也可扩展到监控其他 API 事件
. 维护成本低:全托管服务,无需运维,DynamoDB TTL 自动清理过期数据
局限性
. 非实时:CloudTrail 写入 S3 有 5-15 分钟延迟,告警存在分钟级延迟
. 依赖 CloudTrail 配置:需确保组织级 Trail 已正确配置且处于 Logging 状态
. 飞书 Webhook 依赖公网 :Lambda 不能放在 VPC 内,否则需要 NAT Gateway 访问公网