[CDK]SNS+SQS+DynamoDBでBounceとComplaint情報を収集するスタック構築
2022-04-1128 min read
目次
概要
AWS の CDK で SNS+SQS+DynamoDBでBounceとComplaint情報を収集するスタック構築です。
構築するもの
SES → SNS → SQS → Lambda → DynamoDB といった形で情報が連携される構成です。
CDK Stackの実装
CDK の Stack の中身は以下となります。
import {
aws_dynamodb as dynamodb,
aws_iam as iam,
aws_lambda as lambda,
aws_ses as ses,
aws_ses_actions as sesActions,
aws_sns as sns,
aws_sqs as sqs,
Duration,
Stack,
StackProps,
} from 'aws-cdk-lib';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { Construct } from 'constructs';
/** */
export class CdkBounceStack extends Stack {
/**
* DB_TABLENAME
*/
private readonly dbTablename = `ses_notifications`;
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const queues = this.addQueue();
const sns = this.addSNS(queues);
this.addLambda(queues);
this.addDynamoDB();
// this.addSES(sns)
}
/**
* キューの構築
*
* @returns
*/
private addQueue() {
const bounceQueue = new sqs.Queue(this, 'bounceQueue');
const complaintQueue = new sqs.Queue(this, 'complaintQueue');
[bounceQueue, complaintQueue].forEach((queue) => {
const policy = new iam.PolicyStatement();
policy.addActions(
'SQS:SendMessage',
'SQS:DeleteMessage',
'SQS:GetQueueAttributes',
);
policy.addResources(queue.queueArn);
policy.addPrincipals(new iam.AnyPrincipal());
queue.addToResourcePolicy(policy);
});
return {
bounceQueue,
complaintQueue,
};
}
/**
* SNSトピックの追加
*
* @param props
*/
private addSNS(props: { bounceQueue: sqs.Queue; complaintQueue: sqs.Queue }) {
const bounceTopic = new sns.Topic(this, 'BounceTopic');
const complaintTopic = new sns.Topic(this, 'ComplaintTopic');
const bounceSubscription = new sns.Subscription(
this,
'BounceSubscription',
{
endpoint: props.bounceQueue.queueArn,
protocol: sns.SubscriptionProtocol.SQS,
topic: bounceTopic,
},
);
const complaintSubscription = new sns.Subscription(
this,
'ComplaintSubscription',
{
endpoint: props.complaintQueue.queueArn,
protocol: sns.SubscriptionProtocol.SQS,
topic: complaintTopic,
},
);
return {
bounceTopic,
complaintTopic,
bounceSubscription,
complaintSubscription,
};
}
/**
* ログLambda
*
* @param props
*/
private addLambda(props: {
bounceQueue: sqs.Queue;
complaintQueue: sqs.Queue;
}) {
// Lambda
const loggerLambdaPolicy = new iam.PolicyStatement();
loggerLambdaPolicy.addActions(
'sqs:ReceiveMessage',
'sqs:DeleteMessage',
'dynamodb:PutItem',
);
loggerLambdaPolicy.addResources(
`arn:aws:dynamodb:*:*:table/${this.dbTablename}`,
props.bounceQueue.queueArn,
props.complaintQueue.queueArn,
);
const loggerLambda = new NodejsFunction(this, 'LoggerLambda', {
entry: 'src/lambda-bounce-logger/index.ts',
timeout: Duration.seconds(10),
handler: 'index.handler',
runtime: lambda.Runtime.NODEJS_14_X,
initialPolicy: [loggerLambdaPolicy],
environment: {
// DynamoDBテーブル名
DB_TABLENAME: this.dbTablename,
},
});
new lambda.EventSourceMapping(this, 'BounceEventSourceMapping', {
batchSize: 10,
enabled: true,
eventSourceArn: props.bounceQueue.queueArn,
target: loggerLambda,
});
new lambda.EventSourceMapping(this, 'ComplaintEventSourceMapping', {
batchSize: 10,
enabled: true,
eventSourceArn: props.complaintQueue.queueArn,
target: loggerLambda,
});
}
/**
* DynamoDB
*/
private addDynamoDB() {
return new dynamodb.Table(this, 'SesNotificationsTable', {
tableName: 'ses_notifications',
partitionKey: {
name: 'SESMessageId',
type: dynamodb.AttributeType.STRING,
},
sortKey: {
name: 'SnsPublishTime',
type: dynamodb.AttributeType.STRING,
},
});
}
/**
* SES のルール設定
*
* @see https://github.com/aws/aws-cdk/issues/2584
* @param props
*/
private addSES(props: {
bounceTopic: sns.Topic;
complaintTopic: sns.Topic;
}) {
// todo:
// ap-northeast-1では利用できない
// Template format error: Unrecognized resource types: [AWS::SES::ReceiptRule, AWS::SES::ReceiptRuleSet]
new ses.ReceiptRuleSet(this, 'SesRuleSetBounce', {
rules: [
{
recipients: ['example.com'],
actions: [
new sesActions.Sns({
topic: props.bounceTopic,
}),
new sesActions.Sns({
topic: props.complaintTopic,
}),
],
},
],
});
}
}
バウンス情報を受け取るLambdaの実装
バウンス情報を受け取るLambdaの実装は以下となります。
import { SQSEvent, SQSHandler } from 'aws-lambda';
import { AWSError, DynamoDB } from 'aws-sdk';
/**
* DynamoDB
*/
const DynamoDBTableName = process.env.DB_TABLENAME;
const dynamodb = new DynamoDB({
params: { TableName: DynamoDBTableName },
});
/**
* 通知内容からDynamoDB登録用Itemを生成する
*
* @param {any} body
* @returns
*/
const parseRecordBody = (
body: any,
): DynamoDB.Types.PutItemInputAttributeMap | undefined => {
const SESMessage = JSON.parse(body.Message);
const SESMessageType = SESMessage.notificationType;
let commonItem = {
SESMessageId: { S: SESMessage.mail.messageId },
SnsPublishTime: { S: body.Timestamp },
SESMessageType: { S: SESMessageType },
SESDestinationAddress: { S: SESMessage.mail.destination.toString() },
};
let extendItem = {};
if (SESMessageType === 'Bounce') {
extendItem = {
SESreportingMTA: { S: SESMessage.bounce.reportingMTA },
SESbounceSummary: {
S: JSON.stringify(SESMessage.bounce.bouncedRecipients),
},
};
} else if (SESMessageType === 'Delivery') {
extendItem = {
SESsmtpResponse: { S: SESMessage.delivery.smtpResponse },
SESreportingMTA: { S: SESMessage.delivery.reportingMTA },
};
} else if (SESMessageType === 'Complaint') {
extendItem = {
SESComplaintFeedbackType: {
S: SESMessage.complaint.complaintFeedbackType,
},
SESFeedbackId: { S: SESMessage.complaint.feedbackId },
};
} else if (SESMessageType === 'AmazonSnsSubscriptionSucceeded') {
// memo: 明示的なスコープ
return;
} else {
return;
}
return {
...commonItem,
...extendItem,
};
};
/**
* handler
*
* @param {any} event
* @param {any} context
*/
export const handler: SQSHandler = (event, context) => {
for (let i = 0; i < event.Records.length; i++) {
const body = JSON.parse(event.Records[i].body);
console.log(body);
const Item = parseRecordBody(body);
if (!Item) {
continue;
}
dynamodb.putItem({
TableName: DynamoDBTableName,
Item,
}, (err: AWSError, data: DynamoDB.Types.PutItemOutput) => {
if (err) {
// context.fail(err);
return;
}
console.log(data);
// context.succeed();
});
}
};
Event オブジェクトの中身
ここで SES → SNS → SQS と経由されてきたバウンス情報は次のような形式で Lambda の handler の event に渡されます。
{
"Records": [
{
"messageId": "11989527-ae32-49e0-9424-d925f12a0f92",
"receiptHandle": "AQEB0eb...Aec0C==",
"body": { // note: 実際にはJSONエンコードされている
"Type": "Notification",
"MessageId": "33e3d3b7-5dc3-562a-9891-897b2d76593e",
"TopicArn": "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:CdkBounceStack-ComplaintTopic5CB073F4-QTV1QXVHZN3M",
"Message": { // note: 実際にはJSONエンコードされている
"notificationType": "AmazonSnsSubscriptionSucceeded",
"message": "You have successfully subscribed your Amazon SNS topic 'arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:CdkBounceStack-ComplaintTopic5CB073F4-QTV1QXVHZN3M' to receive 'Complaint' notifications from Amazon SES for identity 'noreplay@example.com'."
},
"Timestamp": "2022-04-10T05:43:17.368Z",
"SignatureVersion": "1",
"Signature": "OQP8sOoz...WQeqHCg==",
"SigningCertURL": "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-XXXXXXXXXX.pem",
"UnsubscribeURL": "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:CdkBounceStack-ComplaintTopic5CB073F4-QTV1QXVHZN3M:XXXXXXXXXXXX0"
},
"attributes": {
"ApproximateReceiveCount": "11",
"AWSTraceHeader": "Root=1-62526e75-198ddbee8XXXXXXXXXXXXXXX;Sampled=0",
"SentTimestamp": "1649569397414",
"SenderId": "AIDAIERWYNSXXXXXXXXXX",
"ApproximateFirstReceiveTimestamp": "1649569397414"
},
"messageAttributes": {},
"md5OfBody": "02b7770ff7d2576efea5d88edc9a2b8b",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:ap-northeast-1:XXXXXXXXXXXX:CdkBounceStack-complaintQueue1AFA60FA-JFFvaDoNfQ0g",
"awsRegion": "ap-northeast-1"
}
]
}
参考にしたサイト
Recommends
New Posts
Hot posts!
Date
Tags
(110)
(54)
(54)
(47)
(45)
(36)
(30)
(29)
(24)
(24)
(22)
(21)
(21)
(20)
(19)
(17)
(16)
(16)
(15)
(14)
(12)
(12)
(12)
(12)
(12)
(12)
(11)
(10)
(10)
(10)
(10)
(10)
(9)
(9)
(8)
(8)
(8)
(8)
(7)
(7)
(6)
(6)
(6)
(6)
(6)
(5)
(5)
(5)
(5)
(4)
Author