【初学者向け】サーバーレスを学ぶための実践演習4
演習4: Lambda + DynamoDBのput/query
はじめに
- Lambdaは、AWSが提供するサーバーレスのコンピューティングサービスです。前回に引き続き、Lambda を基礎から学習するための演習を発信します。Lambdaを初めて使う方は、演習1からお読みください。
- 今回の演習では、LambdaからDynamoDBのテーブルにアクセスし、アイテムの読み込み/書き込みを行います。今回のゴールは、ランダム文字列を生成するAPI の作成となります。クライアントからUser IDを引数としたリクエストを投げます。リクエストのUser IDが事前にテーブルに登録したID と一致した場合のみ、ランダム文字列を生成してメールで通知を行います。
- 初めて、LambdaからDynamoDB にアクセスする方を想定して、ステップ1~3と順を追って理解を深める内容としております。ぜひ楽しんで、学んでください。
演習4 Lambda+DynamoDBのステップ1
概要
- ステップ1は、API GatewayのリクエストをトリガにLambdaが起動し、ランダム文字列の生成およびHistoryテーブルにタイムスタンプ, User ID, ランダム文字列の書き込みを行います。最後に、ランダム文字列を付けてレスポンスを返します。ステップ1では、アイテムのput のみ行います(queryは行いません)。
- 前提条件として、事前にLambdaの実行ロールを準備します。ロールには、下記のポリシーをアタッチします。
- AWSLambdaBasicExecutionRole
- AmazonDynamoDBFullAccess
- AmazonSESFullAccess(ステップ2,3 で使用)
システム構成
- 本演習のシステム構成は、下記の通りです。
構築してみる
DynamoDB を作る
- 先ず、DynamoDBにHistory Tableを作成します。DynamoDBのコンソールを開き、[テーブルの作成]を選択します。
- [テーブル名]を入力します。
- [プライマリキー]の[パーティションキー]に"timestamp"、属性に"文字列"を指定します。[ソートキー]に"userid"、属性に"文字列"を指定します。ソートキーは必須ではありませんが、今回の機能では、同時にリクエストがあった場合にタイムスタンプが重複する可能性があるため、プライマリキーには、パーティションキーとソートキーを使用します。
- その他の設定はデフォルトでも構いません。今回はテストのため、キャパシティーユニットは1ユニットを指定しています。
Lambda を作る
- 次に、Lambda関数を作成します。Lambdaのコンソールを開き、一から作成します。関数名(例: niikawa-randomtext)を入力、ランタイム(今回はPython 3.8とします)、実行ロールを選択します。
- Lambda関数が作成されたら、下記のコードを利用します。コードを貼りつけ、[デプロイ]を押します。
- 簡単ですが、処理の流れを説明します。
- 最初に、DynamoDBのテーブルをオブジェクトとして取得します。ここでテーブル名の設定は、os.environ['history_table’] の箇所となり、Lambdaの環境変数を使い"history_table" を指定しています。
- テーブルへ接続するためのコードはローカルスコープ(lambda_handler内)ではなく、グローバルスコープに記述するようにする。
- リクエストに指定されたクエリパラメータはqueryStringParametersとして渡され、Pythonでは event['queryStringParameters’]['arg’] で値を取得できます。取得した値は、変数の userid に格納します。
- stringライブラリのstring.digits, string.ascii_lowercase, string.ascii_uppercaseによって取得した数字,英字の大文字・小文字を元に、20文字のランダム文字列を生成します。
- put_itemでテーブルにアイテムを書き込みます。アイテムは、パーティションキーのタイムスタンプ, ソートキーのUser ID、そして先ほど生成したランダム文字列を指定します。
- この後作成するAPI Gatewayで、Lambdaプロキシ統合を使用するため、本演習で使用するレスポンスの形式には"statusCode", “headers", “body"が必要となります。
import json
import os
import boto3
import datetime
import string
import random
dynamodb_history = boto3.resource('dynamodb').Table(os.environ['history_table'])
def lambda_handler(event, context):
userid = event['queryStringParameters']['userid']
data = string.digits + string.ascii_lowercase + string.ascii_uppercase
randomtext = ''.join([random.choice(data) for i in range(20)])
dynamodb_history.put_item(
Item = {
"timestamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"),
"userid": userid,
"randomtext": randomtext
}
)
return {
'statusCode': 200,
'body': json.dumps({
'randomtext': randomtext
}),
'isBase64Encoded': False,
'headers': {}
}
- 続いて、Lambda の環境変数を設定します。変数の名前に"history_table"、値に先ほど作成したテーブル名を登録し、[保存]を押します。
API Gateway を作る
- 最後にAPI Gatewayを作成します。演習1 で作成したAPI Gatewayがある場合は、再利用しても構いません。新規で作成する場合は、API Gateway のコンソールを開き、[API を作成] を選択します。API タイプは、「REST API」 を選択します。
- APIに「新しい API」、API nameにAPI名、Endpoint Typeに「リージョン」or 「エッジ最適化」を選択します。
- [リソース]ペインにて、[アクション]から[メソッドの作成]を選択し、メソッドをPOST とします。次に、統合タイプにて、[Lambda 関数]を選択、[Lambda プロキシ統合の使用]にチェックを入れ、[Lambda リージョン]と[Lambda 関数]で先ほど作成したLambda関数を選択します。[保存]を押します。
- API GatewayにLambda関数を呼び出す権限の付与を、[OK]とします。
- APIをデプロイします。[アクション]から[API のデプロイ]を選択します。デプロイされるステージに[新しいステージ]、ステージ名に名前(今回はdev)を設定し、[デプロイ]を押します。
- Lambdaのコンソールに戻り、API GatewayがLambdaのトリガーとして設定されたことを確認します。
テストしてみる
- curlコマンドで、作成したAPI Gatewayにリクエストを投げます。レスポンスに20文字のランダム文字列が返れば、成功です。
niikawa@niikawa1:~$ curl -X POST https://yyyyyy79yh.execute-api.ap-northeast-1.amazonaws.com/dev/?userid=niikawa
{"randomtext": "zDbsD9ZrJw1FR9OwH4lR"}
演習4 Lambda+DynamoDBのステップ2
概要
- ステップ2は、ステップ1の続きです。演習2 で作成したsend_mailメソッドを利用してメールの送信を行います。また、メールの宛先を設定するため、事前にDynamoDBにUserIdテーブルを作成し、テスト用のUser ID, メールアドレスを登録しておきます。
- API Gatewayからのリクエスト後、クエリパラメータとして渡されたuserid を使ってUserIdテーブルを検索し、User IDに対応するメールアドレスを取得します。メールの宛先、メールの本文(ランダム文字列をアタッチ)をsend_mailメソッドに指定して、ランダム文字列をメール送信します。ステップ2では、アイテムのquery、put を行います。
- 前提条件として、事前にLambdaの実行ロールを準備します。ロールには、下記のポリシーをアタッチします。
- AWSLambdaBasicExecutionRole
- AmazonDynamoDBFullAccess
- AmazonSESFullAccess
- 事前に、SES(Simple Email Service)でメールアドレスの検証を行います。詳細は、演習2 を参照して下さい。
システム構成
- 本演習のシステム構成は、下記の通りです。
構築してみる
DynamoDB を作る
- 先ず、DynamoDBにUserId Tableを作成します。DynamoDBのコンソールを開き、[テーブルの作成]を選択します。
- [テーブル名]を入力します。
- [プライマリキー]の[パーティションキー]に"userid"、属性に"文字列"を指定します。ソートキーは使用しません。
- その他の設定はデフォルトでも構いません。キャパシティーユニットは1ユニットで十分のため、今回は"1″を指定します。
Lambda を作る
- ステップ1 にて作成したLambda関数のコードを変更します。コードを変更後、[デプロイ]を押します。
- 簡単ですが、ステップ2で変更した処理を説明します。
- DynamoDBのテーブルをオブジェクトとして取得します。ここでテーブル名の設定は、os.environ['userid_table’] の箇所となり、Lambdaの環境変数を使い"userid_table" を指定しています。
- テーブルへ接続するためのコードはローカルスコープ(lambda_handler内)ではなく、グローバルスコープに記述するようにする。
- 環境に合わせて、SES_REGION, MAIL_FROM, MAIL_SUBJECTの変数を設定し、SESを使ってメールを送信するための send_mailメソッドの定義を追加します。
- query の KeyConditionExpressionオプションを使用して、テーブルからアイテムを検索します。queryには、プライマリキーのuseridおよびその値を指定しています。次に、検索結果からメールアドレスを取得、mail_destinationに格納します。(mail_destination = query_response['Items’][0]['email’] の箇所)
- ランダム文字列をアタッチしたメール本文を作成、メールアドレスとメール本文を send_mailメソッドに渡してメールの送信を行います。
import json
import os
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import string
import random
dynamodb_userid = boto3.resource('dynamodb').Table(os.environ['userid_table'])
dynamodb_history = boto3.resource('dynamodb').Table(os.environ['history_table'])
SES_REGION = 'us-east-1'
MAIL_FROM = 'admin@example.com'
MAIL_SUBJECT = 'Send the result of a random string'
def send_mail(to, subject, body):
client = boto3.client('ses', region_name=SES_REGION)
response = client.send_email(
Source = MAIL_FROM,
Destination = {
'ToAddresses' : [
to
]
},
Message = {
'Subject': {
'Data': subject,
'Charset': 'UTF-8'
},
'Body': {
'Text': {
'Data': body,
'Charset': 'UTF-8'
}
}
}
)
def lambda_handler(event, context):
userid = event['queryStringParameters']['userid']
query_response = dynamodb_userid.query(
KeyConditionExpression=Key('userid').eq(userid)
)
mail_destination = query_response['Items'][0]['email']
data = string.digits + string.ascii_lowercase + string.ascii_uppercase
randomtext = ''.join([random.choice(data) for i in range(20)])
mail_body = 'Your string is :'+randomtext
put_response = dynamodb_history.put_item(
Item = {
"timestamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"),
"userid": userid,
"randomtext": randomtext
}
)
send_mail(mail_destination, MAIL_SUBJECT, mail_body)
return {
'statusCode': 200,
'body': json.dumps({
'randomtext': randomtext
}),
'isBase64Encoded': False,
'headers': {}
}
- Lambda の環境変数に"userid_table"、値に先ほど作成したUserId Tableのテーブル名を登録し、[保存]を押します。
テストしてみる
- curlコマンドで、作成したAPI Gatewayにリクエストを投げます。事前にテーブルに登録したUser IDを指定することで、User IDに対応したメールアドレスへランダム文字列が送付されることを確認します。
- 次に、テーブルに未登録のUser IDを指定した場合、レスポンスにstatusCode 502の “Internal server error"が返されました。
niikawa@niikawa1:~$ curl -X POST https://yyyyyy79yh.execute-api.ap-northeast-1.amazonaws.com/dev/?userid=takahashi
{"message": "Internal server error"}
演習4 Lambda+DynamoDBのステップ3
概要
- ステップ3は、ステップ2の続きです。ステップ2の場合は、UserId Tableに登録されていないUser ID を指定すると、Internal Server Error となりました。ステップ3では、コードにエラー処理を追加します。
構築してみる
Lambda を作る
- ステップ2 にて作成したLambda関数のコードを変更します。コードを変更し、[デプロイ]を押します。
- queryおよびput_item においてエラーが検出された場合を想定して、try~except を追加します。queryにてエラーがあった場合に、CloudWatch Logsにエラーログを出力し、レスポンスにstatusCode 400 およびUserId Not Foundのメッセージを返します。
import json
import os
import logging
import boto3
from boto3.dynamodb.conditions import Key
import datetime
import string
import random
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
dynamodb_userid = boto3.resource('dynamodb').Table(os.environ['userid_table'])
dynamodb_history = boto3.resource('dynamodb').Table(os.environ['history_table'])
SES_REGION = 'us-east-1'
MAIL_FROM = 'admin@example.com'
MAIL_SUBJECT = 'Send the result of a random string'
def send_mail(to, subject, body):
client = boto3.client('ses', region_name=SES_REGION)
response = client.send_email(
Source = MAIL_FROM,
Destination = {
'ToAddresses' : [
to
]
},
Message = {
'Subject': {
'Data': subject,
'Charset': 'UTF-8'
},
'Body': {
'Text': {
'Data': body,
'Charset': 'UTF-8'
}
}
}
)
def lambda_handler(event, context):
userid = event['queryStringParameters']['userid']
try:
query_response = dynamodb_userid.query(
KeyConditionExpression=Key('userid').eq(userid)
)
mail_destination = query_response['Items'][0]['email']
except Exception as e:
logger.error(e)
return {
'statusCode': 400,
'body': json.dumps({
'msg': 'UserId Not Found'
}),
'isBase64Encoded': False,
'headers': {}
}
data = string.digits + string.ascii_lowercase + string.ascii_uppercase
randomtext = ''.join([random.choice(data) for i in range(20)])
mail_body = 'Your string is :'+randomtext
try:
put_response = dynamodb_history.put_item(
Item = {
"timestamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"),
"userid": userid,
"randomtext": randomtext
}
)
except Exception as e:
logger.error(e)
send_mail(mail_destination, MAIL_SUBJECT, mail_body)
return {
'statusCode': 200,
'body': json.dumps({
'randomtext': randomtext
}),
'isBase64Encoded': False,
'headers': {}
}
テストしてみる
- curlコマンドで、作成したAPI Gatewayにリクエストを投げます。テーブルに未登録のUser IDを指定することで、レスポンスにstatusCode 400が返されました。
niikawa@niikawa1:~$ curl -X POST https://yyyyyy79yh.execute-api.ap-northeast-1.amazonaws.com/dev/?userid=takahashi
{"msg": "UserId Not Found"}
参考資料
バックナンバー
- 【初学者向け】サーバーレスを学ぶための実践演習1
- 【初学者向け】サーバーレスを学ぶための実践演習2
- 【初学者向け】サーバーレスを学ぶための実践演習3
- 【初学者向け】サーバーレスを学ぶための実践演習4
ドキュメント