返回列表

如何利用组织级别的cloudtrail监控指定EC2机型飞书告警

发布时间:2026-03-28 02:04

方案概述

本方案基于 AWS Organizations 组织级 CloudTrail,实现对组织内所有成员账号特定 EC2 实例类型(如 GPU 实例 g5 系列)启动行为的实时监控与飞书告警通知。方案采用 Serverless 架构,无需管理服务器,成本极低,适用于需要对高成本实例启动进行管控的多账号组织环境。


架构组件

组件

服务

作用

日志采集
AWS CloudTrail(组织级)
集中采集组织内所有账号的 API调用事件
日志存储
Amazon S3
存储 CloudTrail 日志文件(.json.gz)
事件通知
Amazon SNS
CloudTrail 写入新日志文件时发送通知
消息缓冲
Amazon SQS
缓冲 SNS 通知消息
事件处理
AWS Lambda
解析日志、 匹配事件、发送告警
事件去重
Amazon DynamoDB
存储已处理的事件 ID,防止重复告警
告警通知
飞书 Webhook
发送卡片消息到飞书群

架构图


架构描述:


1. 在Org master账号中开启组织级别的cloudtrail,

2. 开启组织级别的cloutrail之后, 日志每5-15分钟会收集到S3中

3. SNS+SQS发送通知消息和避免Lambda并发过高

4. Lambda 通过 SQS Event Source Mapping 自动拉取消息,发送Post https请求




日志采集阶段

组织级 CloudTrail 在 Management Account 中开启, 自动采集组织内所有成员账号的管理事件(Management Events) 。CloudTrail 将事件按账号、 区域、 日期分目录存储到指定的 S3 Bucket中, 日志文件格式为 gzip 压缩的 JSON 文件。



事件通知阶段

CloudTrail 原生支持配置 SNS 通知,每次向 S3 写入新的日志文件时, 自动向指定的 SNS Topic 发送通知。通知内容仅包含 S3 Bucket 名称和文件路径( Key),不包含事件内容本身。



消息缓冲阶段

SNS 通知投递到 SQS 标准队列进行缓冲。SQS 的作用:

.  削峰填谷: 当短时间内产生大量日志文件时,SQS 缓冲消息,避免 Lambda 并发过高




事件处理阶段

Lambda 通过 SQS Event Source Mapping 自动拉取消息,处理流程:

1. 解析 SQS 消息体,提取 S3 文件路径

2. 从 S3 下载 .json.gz 日志文件

3. 解压并解析 JSON ,遍历所有 CloudTrail 事件记录

4. 筛选 event Name 为 RunInstances 的事件

5. 检查 instanceType 是否匹配监控列表(如 g5. 系列)

6. 通过 DynamoDB 条件写入进行去重,防止同—事件重复告警

7. 对新事件调用飞书 Webhook 发送卡片消息




去重机制

CloudTrail 可能在不同日志文件中重复记录同—事件,SQS 标准队列也可能重复投递消息。方案使用DynamoDB 基于 CloudTrail 事件唯—标识 eventID 进行去重:

.  使用 ConditionExpression='attribute_not_exists(eventId)' 条件写入 ,保证原子性 .  写入成功表示新事件,执行告警

.  写入失败(条件不满足)表示重复事件,跳过

.  TTL 设置为 24 小时,过期记录自动清理,表不会无限增长



告警通知

通过飞书 Webhook 发送交互式卡片消息,包含以下信息: .  事件类型、执行状态

.  账号 ID 、 区域

.  操作人( IAM User/Role/Root) .  来源 IP 、操作时间

.  实例类型、AMI 、实例 ID 、私有 IP .  完整请求参数


操作步骤:

1.创建SNS消息的通道






2.开启组织级别的cloudtail,开启这个功能的话,会将当前组织下所有的link Account下的cloudtrail 日志收集到org manager 账号当中





这里需要新创建—个MSK和配置刚刚创建好的SNS主题,配置完成之后直接创建即可3.配置SQS





创建完成之后需要订阅SNS





4.配置DynamoDB

通过cli来创建表

1  aws dynamodb create-table \
2     --table-name cloudtrail-alert-dedup \
3     --attribute-definitions AttributeName=eventId,AttributeType=S \
4     --key-schema AttributeName=eventId,KeyType=HASH \
5     --billing-mode PAY_PER_REQUEST \
6     --region us-east-1

开启生命周期



1  aws dynamodb update-time-to-live \
2     --table-name cloudtrail-alert-dedup \
3     --time-to-live-specification Enabled=true,AttributeName=ttl \
4     --region us-east-1

5.创建和配置Lambda





Lambda代码


1  import json
2  import gzip
3  import os
4  import time
5  import boto3
6  from urllib.parse import unquote_plus
7  from urllib import request
8
9  s3 = boto3.client('s3')
10  dynamodb = boto3.resource('dynamodb')
11  dedup_table = dynamodb.Table(os.environ['DEDUP_TABLE_NAME'])
12  FEISHU_WEBHOOK = os.environ['FEISHU_WEBHOOK_URL']
13
14  TARGET_EVENTS = {
15        'RunInstances',
16  }
17
18  EVENT_LABELS = {
19        'RunInstances': 'EC2 Create Instance',
20  }
21
22  # Instance families to alert on - add new prefixes here
23  ALERT_INSTANCE_PREFIXES = [
24        't3.',#测试用
25        'g5.',
26  ]
27
28
29  def lambda_handler(event, context):
30        failures = []
31       for record in event['Records']:
32             try:
33                   process_record(record)
34             except Exception as e:
35                   print("Failed: " + str(record.get('messageId')) + ", error: " + str(e))
36                   failures.append({"itemIdentifier": record['messageId']})
37        return {"batchItemFailures": failures}
38
39

 
40	def	is_duplicate(event_id):
41		try:
42		dedup_table.put_item(
43		Item={
44		'eventId': event_id,
45		'ttl': int(time.time()) + 86400
46		},
47		ConditionExpression='attribute_not_exists(eventId)'
48		)
49		return False
50		except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
51		return True
52		
53		
54	def	should_alert(e):
55		event_name = e.get('eventName', '')
56		if event_name == 'RunInstances':
57	
'')	instance_type = (e.get('requestParameters') or {}).get('instanceType',
58		for prefix in ALERT_INSTANCE_PREFIXES:
59		if instance_type.startswith(prefix):
60		return True
61		return False
62		
63		
64	def	process_record(record):
65		sns_msg = json.loads(record['body'])
66		s3_info = json.loads(sns_msg['Message'])
67		
68		bucket = s3_info['s3Bucket']
69		for key in s3_info.get('s3ObjectKey', []):
70		if not key.endswith('.json.gz'):
71		continue
72		obj = s3.get_object(Bucket=bucket, Key=unquote_plus(key))
73		with gzip.open(obj['Body'], 'rt') as f:
74		log = json.loads(f.read())
75		for e in log.get('Records', []):
76		if not should_alert(e):
77		continue
78		event_id = e.get('eventID', '')
79		if event_id and not is_duplicate(event_id):

 
80		send_alert(e)
81		elif not event_id:
82		send_alert(e)
83		
84		
85	def	send_alert(e):
86		event_name = e.get('eventName', '')
87		account_id = e.get('recipientAccountId', 'Unknown')
88		region = e.get('awsRegion', 'Unknown')
89		event_time = e.get('eventTime', 'Unknown')
90		source_ip = e.get('sourceIPAddress', 'Unknown')
91		error_code = e.get('errorCode', '')
92		params = e.get('requestParameters', {}) or {}
93		response = e.get('responseElements', {}) or {}
94		
95		user_identity = e.get('userIdentity', {})
96		user_type = user_identity.get('type', '')
97		if user_type == 'Root':
98		operator = "Root (" + user_identity.get('accountId', '') + ")"
99		else:
100		operator = user_identity.get('arn', 'Unknown')
101		
102		purchase_type = EVENT_LABELS.get(event_name, event_name)
103		status = "Failed" if error_code else "Success"
104		
105		details = []
106		details.append("Instance Type: " + str(params.get('instanceType', 'N/A')))
107		items = params.get('instancesSet', {}).get('items', [])
108		if items:
109		details.append("AMI: " + str(items[0].get('imageId', 'N/A')))
110		min_c = str(items[0].get('minCount', 'N/A'))
111		max_c = str(items[0].get('maxCount', 'N/A'))
112		details.append("Count: " + min_c + " - " + max_c)
113		resp_items = response.get('instancesSet', {}).get('items', [])
114		for ri in resp_items:
115		details.append("Instance ID: " + str(ri.get('instanceId', 'N/A')))
116		details.append("Private IP: " + str(ri.get('privateIpAddress', 'N/A')))
117		
118		detail_text = "\n".join(details) if details else "N/A"
119		params_text = json.dumps(params, indent=2, ensure_ascii=False, default=str)

 
120       params_text =  params_text[:2000]
121	params_block =  "、、、\n"  +  params_text  +  "\n、、、"
122	
123	card =  {
124	"msg_type":  "interactive",
125	"card":  {
126	"header":  {
127	"title":  {
128	"tag":  "plain_text",
129	"content":  "AWS Alert:  " +  purchase_type
130	},
131	"template":  "red"  if error_code else  "orange"
132	},
133	"elements":  [
134	{
135	"tag":  "div",
136	"fields":  [
137	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Event**\n" +  purchase_type}},
138	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Status**\n" +  status}},
139	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Account**\n" + account_id}},
140	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Region**\n" +  region}},
141	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Operator**\n" + operator}},
142	{"is_short": True,  "text":  {"tag":  "lark_md",
"content":  "**Source  IP**\n" +  source_ip}},
143	]
144	},
145	{"tag":  "div",  "text":  {"tag":  "lark_md",  "content":
"**Time**\n" + event_time}},
146	{"tag":  "div",  "text":  {"tag":  "lark_md",  "content":
"**Details**\n" + detail_text}},
147	{"tag":  "div",  "text":  {"tag":  "lark_md",  "content":
"**Parameters**\n" +  params_block}},
148	]
149	}
150	}
151	
152	data =  json.dumps(card).encode('utf-8')

 
153       req = request.Request(FEISHU_WEBHOOK, data=data, headers={'Content-Type': 'application/json'})
154        request.urlopen(req)
155

常规配置:





Lambdaj角色换权限需要额外添加以下权限

内联策略:

1  {
2        "Version": "2012-10-17",
3        "Statement": [
4             {
5                   "Sid": "Statement1", 6                   "Effect": "Allow",
7                   "Action": [
8                         "dynamodb:PutItem"
9                  ],
10                   "Resource": [
11                        "*"
12                    ]
13             }
14       ]
15  }

托管策略:

AWSLambdaSQSQueueExecutionRole

AmazonS3ReadOnlyAccess


Lambda环境变量:






DEDUP_TABLE_NAME:填写dynamodb的表名

FEISHU_WEBHOOK_URL:填写飞书WebHook的url

添加触发器




Lambda测试json:

1  {
2     "Records": [
3       {
4          "messageId": "test-001",
5          "body": "{\"Message\": \"{\\\"s3Bucket\\\": \\\"aws-cloudtrail-logs-xxxxx- 9f3ffaba\\\", \\\"s3ObjectKey\\\": [\\\"AWSLogs/o-8jppajkuxv/{账号
ID}/CloudTrail/us-east-1/2026/03/24/376129859421_CloudTrail_us-east-
1_20260324T0720Z_vyrL7oNCnrplt30N.json.gz\\\"]}\"}"
6       } 7    ]
8  }

验证结果:





成本估算

以 100 个成员账号的组织为例:


组件

月用量估算

月成本

CloudTrail(组织级)
已有,不额外计费

$0( 已有)

S3 存储

已有,不额外计费
$0( 已有)
SNS 通知
~数千条/月
~$0
SQS 消息
~数千条/月
$0(免费额度内)
Lambda 调用
~数千次/月 ,128MB
$0(免费额度内)
DynamoDB
按需模式,极少量读写
~$0
数据传输
同区域 AWS 服务间
$0
合计

~$0


方案优势

.  全组织覆盖:基于组织级 CloudTrail , 自动覆盖所有现有和新加入的成员账号,无需逐账号配置

.  Serverless 架构 :无需管理服务器,按需付费,成本极低

.  可靠性高: SQS 消息持久化 + DLQ 兜底 + DynamoDB 去重,确保不丢消息、不重复告警 .  易于扩展:修改实例类型前缀列表即可扩展监控范围,也可扩展到监控其他 API 事件

.  维护成本低:全托管服务,无需运维,DynamoDB TTL 自动清理过期数据




局限性

.  非实时:CloudTrail 写入 S3 有 5-15 分钟延迟,告警存在分钟级延迟

.  依赖 CloudTrail 配置:需确保组织级 Trail 已正确配置且处于 Logging 状态

.  飞书 Webhook 依赖公网 :Lambda 不能放在 VPC 内,否则需要 NAT Gateway 访问公网


顶部