AWS IoT Device Shadow – Lambda

AWS

この記事ではAWS S3へのファイルの更新をトリガに起動し、AWS IoT Device Shadowを更新するAWS Lambdaを作成していきます。

Lambdaの作成はAWS SAM CLIを使用します。AWS SAM CLIはこちらの記事でも使用しました。runtimeはpython3.8を使用します。

sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambdaCode language: Bash (bash)

このLambdaでは以下のことを実現します。

  1. 特定のS3バケットのファイル作成・更新をトリガにLambdaを起動
  2. S3に作成・更新されたファイル情報の取得
  3. モノのDevice Shadowへファイル更新情報を書き込む

特定のS3バケットのファイル作成・更新をトリガにLambdaを起動

特定のS3バケットのファイル作成・更新をトリガにLambdaを起動させ、LambdaからS3ファイルの取得、Device Shadowへアクセスをする為、sam initで作成されたtemplate.yamlのを以下のように更新します。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  iot-shadow-lambda

  Sample SAM Template for iot-shadow-lambda

Parameters:
  BucketName:
    Type: String
    Description: bucket name to create

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 15
    MemorySize: 5312

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.8
      Architectures:
        - x86_64
      Events:
        BucketEvent:
          Type: S3
          Properties:
            Bucket: !Ref IoTShadowBucket
            Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
              - 's3:ObjectCreated:*'
      Policies:
        - Statement:
          - Sid: S3Get
            Effect: Allow
            Action:
            - s3:GetObject
            Resource:
            - !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/*

          - Sid: IoT
            Effect: Allow
            Action:
            - iot:ListThings
            - iot:GetThingShadow
            - iot:UpdateThingShadow
            - iot:DescribeEndpoint
            Resource:
            - "*"

  IoTShadowBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub {AWS::AccountId}-${BucketName}Code language: YAML (yaml)

Lambdaの起動トリガはEventsTypeS3にし、Properties.BucketでバケットIoTShadowBucketを指定し、Properties.EventsS3のイベント作成イベントs3:ObjectCreated:*を指定しています。

      Events:
        BucketEvent:
          Type: S3
          Properties:
            Bucket: !Ref IoTShadowBucket
            Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
              - 's3:ObjectCreated:*'Code language: YAML (yaml)

またバケットに作成されたファイルの情報を取得するためs3:GetObjectと、モノの取得、モノのDevice Shadowの更新をするため、iot:ListThingsiot:GetThingShadowiot:UpdateThingShadowを許可しています。またLambdaで動的にendpointを取得するために、iot:DescribeEndpointを許可しています。

      Policies:
        - Statement:
          - Sid: S3Get
            Effect: Allow
            Action:
            - s3:GetObject
            Resource:
            - !Sub arn:aws:s3:::${BucketName}/*

          - Sid: IoT
            Effect: Allow
            Action:
            - iot:ListThings
            - iot:GetThingShadow
            - iot:UpdateThingShadow
            - iot:DescribeEndpoint
            Resource:
            - "*"Code language: YAML (yaml)

S3バケット名はParametersに記述しこのtemplate.yamlの外から指定できるようにします。

Parameters:
  BucketName:
    Type: String
    Description: bucket name to createCode language: YAML (yaml)

また、S3バケットを作成する為、以下を追加しています。

  IoTShadowBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref BucketNameCode language: YAML (yaml)

S3に作成・更新されたファイル情報の取得

template.yamlは完成していますので、一度ビルドとデプロイを実施してみます。hello_world/app.pyは以下のように、lambda_handlerの引数eventを表示するように修正しています。

import json

# import requests


def lambda_handler(event, context):
    print(context)
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
            # "location": ip.text.replace("\n", "")
        }),
    }Code language: Python (python)

ビルドしてデプロイします。ビルドの初回は--guidedオプションで実施します。2回目以降のデプロイはオプションが不要になります。

指定するバケット名をxxxxxxxxxx-iot-test-filesという名前にすることで、別の記事の導入がスムーズになります。なおxxxxxxxxxxにはご自身のアカウントIDを指定します。

$ sam build
$ sam deploy --guided

Configuring SAM deploy
======================

	Looking for config file [samconfig.toml] :  Not found

	Setting default arguments for 'sam deploy'
	=========================================
	Stack Name [sam-app]: ==> any stack name can be used
	AWS Region [us-east-1]: ==> any region can be used
	Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used
	#Shows you resources changes to be deployed and require a 'Y' to initiate deploy
	Confirm changes before deploy [y/N]: ==> blank is OK
	#SAM needs permission to be able to create roles to connect to the resources in your template
	Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK
	#Preserves the state of previously provisioned resources when an operation fails
	Disable rollback [y/N]: ==> blank is OK
	Save arguments to configuration file [Y/n]: ==> blank is OK
	SAM configuration file [samconfig.toml]: ==> blank is OK
	SAM configuration environment [default]: ==> blank is OK

	Looking for resources needed for deployment:
	 Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x
	 A different default S3 bucket can be set in samconfig.toml

	Saved arguments to config file
	Running 'sam deploy' for future deployments will use the parameters saved above.
	The above parameters can be changed by modifying samconfig.toml
	Learn more about samconfig.toml syntax at 
	https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html

...Code language: Bash (bash)

Lambdaは上記で指定したS3バケットにファイルを作成・更新するとそれとトリガに起動しますので、作成したS3バケットにファイルをアップロードしてみます。以下ではREADME.mdを作成したバケット名<created bucket name>にコピーしています。

aws s3 cp README.md s3://<created bucket name>/Code language: Bash (bash)

AWS CloudWatch Logsのログを見てみます。以下ではAWS CLIでCloudWatch Logsのログを確認しますが、AWS Consoleにログインしてログを確認しても問題ありません。下記<create stack name>sam deploy --guidedで指定したスタック名を入力します。

$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName'
/aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
$ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
...
2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...Code language: Bash (bash)

上記、{'Records': [...の部分を以下に整形します。

{
    "Records": [
        {
            "eventVersion": "2.1",
            ...
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx",
                "bucket": {
                    "name": "<created bucket name>",
                    "ownerIdentity": {"principalId": "xxxxxxxxxxxx"},
                    "arn": "arn:aws:s3:::<created bucket name>",
                },
                "object": {
                    "key": "README.md",
                    "size": 8382,
                    "eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                    "sequencer": "xxxxxxxxxxxxxxxxx",
                },
            },
        }
    ]
}Code language: JSON / JSON with Comments (json)

event["Records"]["s3"]["bucket"]["name"]にバケット名、event["Records"]["s3"]["object"]["key"]にファイルパスが含まれていることがわかります。

モノのDevice Shadowへファイル更新情報を書き込む

最終的なhello_world/app.pyは以下のようになります。

import json
import boto3
from retrying import retry
from hashlib import sha256

# import requests

iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)


def _get_s3_object(bucket, key):
    s3 = boto3.client("s3")
    m = sha256()

    with open("/tmp/s3obj", "wb") as f:
        s3.download_file(bucket, key, "/tmp/s3obj")

    with open("/tmp/s3obj", "rb") as f:
        while True:
            chunk = f.read(1024 * 1024)
            if len(chunk) == 0:
                break
            m.update(chunk)
        return m.hexdigest()


def _retry_if_throttling_exception(exception):
    return isinstance(exception, iot_data_client.exceptions.ThrottlingException)


@retry(
    stop_max_attempt_number=2,
    wait_fixed=1000,
    retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
    try:
        payload = {
            "state": {
                "desired": {
                    "s3": {
                        "url": f"s3://{bucket}/{key}",
                        "hash": digest,
                    }
                }
            }
        }
        iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
    except iot_data.exceptions.ResourceNotFoundException:
        return None


def lambda_handler(event, context):
    things = []
    next_token = ""
    while True:
        list_things = iot_client.list_things(nextToken=next_token)
        things.extend(list_things["things"])
        next_token = list_things.get("nextToken")
        if not next_token:
            break

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    digest = _get_s3_object(bucket, key)

    for thing in things:
        _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)Code language: Python (python)

上から順番に説明します。

iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)Code language: Python (python)

endpointはboto3 client iotのdescribe_endpointにより動的に取得します。

_get_s3_objectはs3からobjectを取得しそのhash値を計算します。Lambdaは書き込み可能なディレクトリが/tmpである点に注意します。

def _get_s3_object(bucket, key):
    s3 = boto3.client("s3")
    m = sha256()

    with open("/tmp/s3obj", "wb") as f:
        s3.download_file(bucket, key, "/tmp/s3obj")

    with open("/tmp/s3obj", "rb") as f:
        while True:
            chunk = f.read(1024 * 1024)
            if len(chunk) == 0:
                break
            m.update(chunk)
        return m.hexdigest()Code language: Python (python)

_thing_shadowはDevice Shadowを更新します。更新するデータはdesired.s3.urldesired.s3.hashです。この関数はthing(モノ)の数だけ呼び出されるため、スロットリングでエラーが発声する場合があります。そのため、retryingを使用してiot_data_client.exceptions.ThrottlingException例外が発生した場合は1秒待ってから最大2回呼び出します。

def _retry_if_throttling_exception(exception):
    return isinstance(exception, iot_data_client.exceptions.ThrottlingException)


@retry(
    stop_max_attempt_number=2,
    wait_fixed=1000,
    retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
    try:
        payload = {
            "state": {
                "desired": {
                    "s3": {
                        "url": f"s3://{bucket}/{key}",
                        "hash": digest,
                    }
                }
            }
        }
        iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
    except iot_data.exceptions.ResourceNotFoundException:
        return NoneCode language: Python (python)

lambda_handlerはLambdaのエントリーです。iot_client.list_thingsでモノをリストを取得しthings配列に追加します。next_tokenが存在する場合はそのnext_tokenを使用してiot_client.list_thingsを呼び出すと続きのモノのリストが取得できます。

def lambda_handler(event, context):
    things = []
    next_token = ""
    while True:
        list_things = iot_client.list_things(nextToken=next_token)
        things.extend(list_things["things"])
        next_token = list_things.get("nextToken")
        if not next_token:
            break

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    digest = _get_s3_object(bucket, key)

    for thing in things:
        _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)Code language: Python (python)

以上です。別の記事ではデバイス側のコードを作成したいと思います。

参考

チュートリアル: Amazon S3 トリガーを使用して Lambda 関数を呼び出す - AWS Lambda
このチュートリアルでは、コンソールを使用して Lambda 関数を作成し、Amazon Simple Storage Service (Amazon S3) バケットのトリガーを設定します。Amazon S3 バケットにオブジェクトを追加するたびに関数を実行し、Amazon CloudWatch Logs にオブジェク...
Event notification types and destinations - Amazon Simple Storage Service
Amazon S3 supports several event notification types and destinations where the notifications can be published. You can specify the event type and destination wh...
AWS SAM policy templates - AWS Serverless Application Model
This section contains the full list of supported policy templates.
AWS IoT Core endpoints and quotas - AWS General Reference
The following are the service endpoints and service quotas for this service. To connect programmatically to an AWS service, you use an endpoint. In addition to ...
AWS IoT のアクション、リソース、および条件キー - サービス認可リファレンス
AWS IoT へのアクセスを制御するために IAM ポリシーで使用できるサービス固有のリソースやアクション、条件キーを一覧表示します。