[AWS]使用lambda向Slack和Google Chat推送pipeline状态
动机:在Slack Channel中显示Codepipeline的运行消息。
Feature包括:
- 自动判断Account(Dev or Prod)
- 用对应颜色展示管线运行成功与否
- 管线URL link
- 自动提取GitHub Hash
具体实现组成部分如下:
- Slack Webhook
- Cloudwatch Event
- Lambda (Python)
Slack
准备Slack Webhook:Sending messages using Incoming Webhooks。
legacy的互动方法用的是通过attachments,官方推荐的新方法是block:Creating rich message layouts
。
Cloudwatch
Cloudwatch Event可以选择codepipeline作为源头,例如
{
"source": [
"aws.codepipeline"
],
"detail-type": [
"CodePipeline Stage Execution State Change"
],
"detail": {
"state": [
"FAILED",
"SUCCEEDED"
],
"pipeline": [
"pipeline1",
"pipeline2"
]
}
}
Lambda
Lambda就比较有趣了,需要注意的点有:
- 捕获GitHub Hash的技巧
- Layer for AWS CLI的准备
- Subprocess调用子进程
- def lambda_handler(event, context)的特点
- Request Library,Request Header和Body的细节
因为codepipeline的event本身不带Github的Hash,所以需要通过运行aws cli query后parse response.(具体query可参见下面代码实现)
lambda本身不自带aws cli,所以需要准备一个包含aws cli的layer,让lambda在这个layer上跑。步骤包括本地虚拟环境的创建,安装aws cli,修改python path,打包上传。可参考:
https://bezdelev.com/hacking/aws-cli-inside-lambda-layer-aws-s3-sync/
https://alestic.com/2016/11/aws-lambda-awscli/
转换cli的function是用subprocess实现的,也是参考了之前的reference.
The subprocess module allows you to spawn new processes, connect to their input/output/error pipes, and obtain their return codes. This module intends to replace several older modules and functions: os.system os.spawn*
参考:
https://docs.python.org/3/library/subprocess.html
https://zhuanlan.zhihu.com/p/72829009
代码实现:
command_input = '/opt/aws codepipeline --region ap-southeast-1 get-pipeline-execution --pipeline-name %s --pipeline-execution-id %s' % (pipeline_name, execution_id)
command_result = run_command(command_input)
logger.info("command_result: " + str(command_result))
revision_Id = command_result['pipelineExecution']['artifactRevisions'][0]['revisionId']
def run_command(command):
command_list = command.split(' ')
try:
# logger.info("Running shell command: \"{}\"".format(command))
result = subprocess.run(command_list, stdout=subprocess.PIPE)
return (json.loads(result.stdout.decode('UTF-8')))
#logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
except Exception as e:
logger.error("Exception: {}".format(e))
return False
return True
如果你使用了handler, 那它就成为了实质上的main,不会另外执行你定义的main的
def handler_name(event, context):
...
return some_value
https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html
因为lambda python取消了requests库,所以用的是urllib.request.Request。
req2是比较规范的写法。
req = Request(SLACK_WEBHOOK_URL, json.dumps(slack_message).encode("utf-8"))
req2 = Request(GOOGLE_WEBHOOK_URL, data=json.dumps(google_message).encode("utf-8"), headers={'Content-Type': 'application/json; charset=UTF-8'}, method='POST')
Sample Code
import json
import logging
import time
import subprocess
from urllib import request
from urllib.request import Request, urlopen
from urllib import error as ERROR
# Read environment variables
SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/xxxxxxxxxxxxxxxx'
SLACK_CHANNEL = 'notification'
SLACK_USER = 'codepipeline-bot'
GOOGLE_WEBHOOK_URL = 'https://chat.googleapis.com/v1/spaces/yyyyy/messages?key=zzzzzzzz'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Event: " + str(event))
account_id = event['account']
pipeline_name = event['detail']['pipeline']
execution_id = event['detail']['execution-id']
stage_name = event['detail']['stage']
state_name = event['detail']['state']
time_raw = event['time']
command_input = '/opt/aws codepipeline --region ap-southeast-1 get-pipeline-execution --pipeline-name %s --pipeline-execution-id %s' % (pipeline_name, execution_id)
command_result = run_command(command_input)
logger.info("command_result: " + str(command_result))
revision_Id = command_result['pipelineExecution']['artifactRevisions'][0]['revisionId']
revision_Url = command_result['pipelineExecution']['artifactRevisions'][0]['revisionUrl']
# Account Detection
if account_id == '00000000000':
account_name = 'DEV'
elif account_id == '11111111111':
account_name = 'PROD'
else:
account_name = 'UNKNOWN ACCOUNT'
# State Color Change
if state_name == 'FAILED':
color_name = '#D50A21'
elif state_name == 'SUCCEEDED':
color_name = '#2eb886'
else:
color_name = '#39A3E3'
#2020-01-17T10:59:29Z
timestamp = time.mktime(time.strptime(time_raw, '%Y-%m-%dT%H:%M:%SZ'))
pipeline_url = 'https://ap-southeast-1.console.aws.amazon.com/codesuite/codepipeline/pipelines/%s/view?region=ap-southeast-1' % pipeline_name
# Construct a slack message
slack_message = {
'channel': SLACK_CHANNEL,
'username': SLACK_USER,
'attachments': [
{
"fallback": "Plain-text summary of the attachment.",
"color": "%s" %(color_name),
"pretext": "CodePipeline Stage Execution State Change detected:",
"author_name": "Pipeline",
"title": "<%s | %s: %s>" %(pipeline_url,account_name,pipeline_name),
# "text": "Optional text that appears within the attachment",
"fields": [
{
"title": "Stage",
"value": "%s" %(stage_name),
"short": "false"
},
{
"title": "State",
"value": "%s" %(state_name),
"short": "false"
},
{
"title": "GitHub Hash",
"value": "%s" %(revision_Id[:8]),
"short": "false"
}
],
"footer": "Newshub notification bot",
"footer_icon": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/93/Amazon_Web_Services_Logo.svg/300px-Amazon_Web_Services_Logo.svg.png",
"ts": "%s" %(timestamp)
}
]
}
google_message = {
'text' : 'CodePipeline Stage Execution State Change detected: `%s`: `%s` with GitHub Hash `%s` Stage `%s` has `%s`.' %(account_name,pipeline_name,revision_Id[:8],stage_name,state_name)
}
# Post message on SLACK_WEBHOOK_URL
req = Request(SLACK_WEBHOOK_URL, json.dumps(slack_message).encode("utf-8"))
req2 = Request(GOOGLE_WEBHOOK_URL, data=json.dumps(google_message).encode("utf-8"), headers={'Content-Type': 'application/json; charset=UTF-8'}, method='POST')
try:
response1 = urlopen(req)
response1.read()
logger.info("Message posted to %s", slack_message['channel'])
response2 = urlopen(req2)
response2.read()
logger.info("Message posted to google chat")
except ERROR.HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except ERROR.URLError as e:
logger.error("Server connection failed: %s", e.reason)
def run_command(command):
command_list = command.split(' ')
try:
# logger.info("Running shell command: \"{}\"".format(command))
result = subprocess.run(command_list, stdout=subprocess.PIPE)
return (json.loads(result.stdout.decode('UTF-8')))
#logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
except Exception as e:
logger.error("Exception: {}".format(e))
return False
return True
相关的后续动作(可以,但没必要)
- 如果是针对CloudWatch Alert的Slack Notification, AWS出了官方的Chatbot, 可以推送带有screenshot的message。
- 很希望CloudWatch Email Alert也能带有screenshot。参考了18年的官方workaround实现,每个metric的image还要自己画,不是很方便,等后续。
References:
Detect and React to Changes in Pipeline State with Amazon CloudWatch Events