【初学者向け】サーバーレスを学ぶための実践演習5

【初学者向け】実践演習,AWS,Lambda,SQS

演習5: Lambda + SQSキュー

 

はじめに

  • Lambdaは、AWSが提供するサーバーレスのコンピューティングサービスです。前回に引き続き、Lambda を基礎から学習するための演習を発信します。Lambdaを初めて使う方は、演習1からお読みください。
  • 今回の演習では、Amazon SQS(Amazon Simple Queue Service) 使用します。SQS は、フルマネージド型のメッセージキューイングサービスです。キューイングはいわゆる待ち行列のことです。キューイングを使用することで、AWS上の分散アプリケーションが疎結合で構成できます。アプリケーションがこれから処理する内容を一度キューに溜めることができ、順次任意のタイミング(非同期)に取り出して処理ができます。
  • SQSをトリガとして使用するパターン, SQSにポーリングして使用するパターンを記載しています。ぜひ楽しんで、学んでください。

 

演習5 Lambda+SQSキューのステップ1

概要

  • ステップ1は、SQSキューをトリガにしてLambdaを起動させます。Lambda は、単純にSQSキューから受け取ったメッセージをCloudWatch Logs に出力します。
  • 前提条件として、事前にLambdaの実行ロールを準備します。ロールには、下記のポリシーをアタッチします。
    • AWSLambdaBasicExecutionRole
    • AmazonSQSFullAccess

 

システム構成

  • 本演習ステップ1 のシステム構成は、下記の通りです。
  • ステップ1 は、AWS CLI を利用します。

 

構築してみる

SQSキューを作る

  • 先ず、Lambda のトリガに使用するSQS キューを作成します。以下の例はAWS CLI を利用していますが、AWSコンソールを利用しても良いです。AWS CLI を利用する場合は、事前にアクセスキーを設定する必要があります。アクセスキーの設定は、こちらを参考にしてください。
niikawa@niikawa1:~$ QUEUE_NAME=niikawa-test-queue
niikawa@niikawa1:~$ aws sqs create-queue --queue-name ${QUEUE_NAME} --region ap-northeast-1
{
    "QueueUrl": "https://ap-northeast-1.queue.amazonaws.com/111111111111/niikawa-test-queue"
}

 

Lambda を作る

  • 次に、Lambda 関数を作成します。Lambdaのコンソールを開き、一から作成します。関数名(例: niikawa-test-sqs)を入力、ランタイム(今回はPython 3.9とします)、先ほど準備した実行ロールを選択します。
  • Lambda関数が作成されたら、下記のコードを利用します。コードを貼りつけ、[デプロイ]を押します。このコードは、こちらのドキュメントに記載のサンプルとなります。
from __future__ import print_function

def lambda_handler(event, context):
    for record in event['Records']:
        print("test")
        payload = record["body"]
        print(str(payload))

 

Lambda にトリガを設定する

  • 次に、Lambda にトリガを設定します。「トリガーを追加」を選択、トリガーのソースに「SQS」、SQS キューに先ほど作成したSQS キューを指定、トリガーの有効化を選択します。「追加」を押します。

 

テストしてみる

  • AWS CLI からSQSキューにメッセージを送信します。SQSキューをトリガにして、Lambdaが起動します。
niikawa@niikawa1:~$ QUEUE_URL="https://ap-northeast-1.queue.amazonaws.com/111111111111/niikawa-test-queue"
niikawa@niikawa1:~$ aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello from SQS!" --region ap-northeast-1
{
    "MD5OfMessageBody": "7b270e59b47ff90a553787216d55d91d",
    "MessageId": "a5b97bfd-3707-4036-b832-03d38a4a3f66"
}

 

  • CloudWatch Logs のログストリームを確認します。テスト時に送信したメッセージが出力されました。
  • これでステップ1 は、完了です。

 

 

演習5 Lambda+SQSキューのステップ2

概要

  • ステップ2は、Lambda が起動すると、SQSキューからメッセージの取得を行います。Lambda は、SQSキューから受け取ったメッセージをCloudWatch Logs に出力します。
  • ステップ2は、トリガを使用しません。
  • 前提条件として、事前にLambdaの実行ロールを準備します。ロールには、下記のポリシーをアタッチします。
    • AWSLambdaBasicExecutionRole
    • AmazonSQSFullAccess

 

システム構成

  • 本演習ステップ2 のシステム構成は、下記の通りです。
  • ステップ2 も、AWS CLI を利用します。SQSキューは、ステップ1 で作成したリソースを使用します。

 

構築してみる

Lambda を作る①

  • Lambda 関数を作成します。Lambdaのコンソールを開き、一から作成します。関数名(例: niikawa-test-sqs-rcv)を入力、ランタイム(今回はPython 3.9とします)、実行ロールを選択します。
  • Lambda関数が作成されたら、下記のコードを利用します。コードを貼りつけ、[デプロイ]を押します。
  • receive_messages は、指定されたキューから1つもしくは複数のメッセージを取得します。Return type はdict型になります。for文で出力した後、メッセージの削除を行っています。
import os
import boto3

name = os.environ['queue_name']
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=name)

def lambda_handler(event, context):
    message_list = queue.receive_messages(MaxNumberOfMessages=10)
    for message in message_list:
        print(message.body)
        message.delete()
  • 環境変数 queue_name に、SQS キューの名前を登録します。

 

 

  • 動作を確認するため、AWS CLI からSQS キューにメッセージを送信します。
QUEUE_URL="https://ap-northeast-1.queue.amazonaws.com/111111111111/niikawa-test-queue"
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 1" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 2" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 3" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 4" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 5" --region ap-northeast-1

 

  • [Test] ボタンを押し、Lambda を起動します。"Hello 1″から"Hello 5″ のメッセージが計5件が出力されると期待しましたが、メッセージは2件しか表示されませんね。

 

  • ドキュメントには、下記の記載があります。MaxNumberOfMessages は、返されるメッセージの最大数ですが、返されるメッセージが少ない場合もあります。
  • MaxNumberOfMessages (integer) — The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.

 

Lambda を作る②

  • 下記のコードに変更します。コードを貼りつけ、[デプロイ]を押します。
import os
import boto3

name = os.environ['queue_name']
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=name)

def lambda_handler(event, context):
    while True:
        message_list = queue.receive_messages(MaxNumberOfMessages=10)
        if message_list:
            for message in message_list:
                print(message.body)
                message.delete()
        else:
            break

 

  • 動作を確認するため、AWS CLI からSQS キューにメッセージを送信します。
QUEUE_URL="https://ap-northeast-1.queue.amazonaws.com/111111111111/niikawa-test-queue"
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 1" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 2" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 3" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 4" --region ap-northeast-1
aws sqs send-message --queue-url ${QUEUE_URL} --message-body "Hello 5" --region ap-northeast-1

 

  • 再度、[Test] ボタンを押し、Lambda のテストを行います。今回は、"Hello 1″から"Hello 5″ のメッセージが計5件が出力されました。

 

 

演習5 Lambda+SQSキューのステップ3

概要

  • ステップ3は、ステップ2 の継続となります。SQSキューにメッセージを送信するためのLambda を作成します。そして、ステップ2 で作成したLambda のトリガにEventBridge を設定します。受信用のLambda をスケジュールで起動させます。受信用のLambda は、SQSキューからメッセージを取得してCloudWatch Logs に出力します。
  • 前提条件として、事前にLambdaの実行ロールを準備します。ロールには、下記のポリシーをアタッチします。
      • AWSLambdaBasicExecutionRole
      • AmazonSQSFullAccess

 

システム構成

  • 本演習ステップ3 のシステム構成は、下記の通りです。
  • ステップ2でAWS CLI を利用していた処理を送信用Lambda として作成しています。ステップ2 のLambdaが受信用Lambda となり、トリガにEventBridge を設定しています。

 

 

構築してみる

送信用Lambda を作る

  • Lambda 関数を作成します。Lambdaのコンソールを開き、一から作成します。関数名(例: niikawa-test-sqs-snd)を入力、ランタイム(今回はPython 3.9とします)、実行ロールを選択します。
  • Lambda関数が作成されたら、下記のコードを利用します。コードを貼りつけ、[デプロイ]を押します。
import os
import boto3

name = os.environ['queue_name']
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=name)

def lambda_handler(event, context):
    entries = [{'Id' : '{}'.format(i+1), 'MessageBody' : 'Hello {}'.format(i+1)} for i in range(3)]
    response = queue.send_messages(Entries=entries)
    print(response)
  • 環境変数 queue_name に、SQS キューの名前を登録します。

 

受信用Lambda にトリガを設定する

  • ステップ2で作成したLambda(例: niikawa-test-sqs-rcv)を選択します。
  • 「トリガーを追加」を選択、トリガーのソースに「EventBridge」、ルールに「新規ルールの作成」、ルール名の指定、ルールタイプに「スケジュール式」、スケジュール式にcronまたはrate 式を使用したスケジュールを指定します。「追加」を押します。

 

テストしてみる

  • 送信用Lambda にて、[Test] ボタンを押し、SQSキューにメッセージを送ります。
  • SQSキューに、テスト用のメッセージが送信されました。CloudWatch Logs からログを確認します。

 

  • 受信用Lambda のトリガに設定したスケジュールを待ちます。
  • EventBridge に設定したスケジュールをトリガにしてLambda が起動しました。CloudWatch Logs からログを確認します。SQSキューからメッセージを取得し、ログに出力されました。

 

参考資料

バックナンバー

 

ドキュメント