AWS IoT Device Shadow – Lambda

AWS

この記事ではAWS S3へのファイルの更新をトリガに起動し、AWS IoT Device Shadowを更新するAWS Lambdaを作成していきます。

Lambdaの作成はAWS SAM CLIを使用します。AWS SAM CLIはこちらの記事でも使用しました。runtimeはpython3.8を使用します。

sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambda
Code language: Bash (bash)

このLambdaでは以下のことを実現します。

  1. 特定のS3バケットのファイル作成・更新をトリガにLambdaを起動
  2. S3に作成・更新されたファイル情報の取得
  3. モノのDevice Shadowへファイル更新情報を書き込む

特定のS3バケットのファイル作成・更新をトリガにLambdaを起動

特定のS3バケットのファイル作成・更新をトリガにLambdaを起動させ、LambdaからS3ファイルの取得、Device Shadowへアクセスをする為、sam initで作成されたtemplate.yamlのを以下のように更新します。

AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > iot-shadow-lambda Sample SAM Template for iot-shadow-lambda Parameters: BucketName: Type: String Description: bucket name to create # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 15 MemorySize: 5312 Resources: HelloWorldFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.8 Architectures: - x86_64 Events: BucketEvent: Type: S3 Properties: Bucket: !Ref IoTShadowBucket Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html - 's3:ObjectCreated:*' Policies: - Statement: - Sid: S3Get Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/* - Sid: IoT Effect: Allow Action: - iot:ListThings - iot:GetThingShadow - iot:UpdateThingShadow - iot:DescribeEndpoint Resource: - "*" IoTShadowBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub {AWS::AccountId}-${BucketName}
Code language: YAML (yaml)

Lambdaの起動トリガはEventsTypeS3にし、Properties.BucketでバケットIoTShadowBucketを指定し、Properties.EventsS3のイベント作成イベントs3:ObjectCreated:*を指定しています。

Events: BucketEvent: Type: S3 Properties: Bucket: !Ref IoTShadowBucket Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html - 's3:ObjectCreated:*'
Code language: YAML (yaml)

またバケットに作成されたファイルの情報を取得するためs3:GetObjectと、モノの取得、モノのDevice Shadowの更新をするため、iot:ListThingsiot:GetThingShadowiot:UpdateThingShadowを許可しています。またLambdaで動的にendpointを取得するために、iot:DescribeEndpointを許可しています。

Policies: - Statement: - Sid: S3Get Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::${BucketName}/* - Sid: IoT Effect: Allow Action: - iot:ListThings - iot:GetThingShadow - iot:UpdateThingShadow - iot:DescribeEndpoint Resource: - "*"
Code language: YAML (yaml)

S3バケット名はParametersに記述しこのtemplate.yamlの外から指定できるようにします。

Parameters: BucketName: Type: String Description: bucket name to create
Code language: YAML (yaml)

また、S3バケットを作成する為、以下を追加しています。

IoTShadowBucket: Type: AWS::S3::Bucket Properties: BucketName: !Ref BucketName
Code language: YAML (yaml)

S3に作成・更新されたファイル情報の取得

template.yamlは完成していますので、一度ビルドとデプロイを実施してみます。hello_world/app.pyは以下のように、lambda_handlerの引数eventを表示するように修正しています。

import json # import requests def lambda_handler(event, context): print(context) return { "statusCode": 200, "body": json.dumps({ "message": "hello world", # "location": ip.text.replace("\n", "") }), }
Code language: Python (python)

ビルドしてデプロイします。ビルドの初回は--guidedオプションで実施します。2回目以降のデプロイはオプションが不要になります。

指定するバケット名をxxxxxxxxxx-iot-test-filesという名前にすることで、別の記事の導入がスムーズになります。なおxxxxxxxxxxにはご自身のアカウントIDを指定します。

$ sam build $ sam deploy --guided Configuring SAM deploy ====================== Looking for config file [samconfig.toml] : Not found Setting default arguments for 'sam deploy' ========================================= Stack Name [sam-app]: ==> any stack name can be used AWS Region [us-east-1]: ==> any region can be used Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used #Shows you resources changes to be deployed and require a 'Y' to initiate deploy Confirm changes before deploy [y/N]: ==> blank is OK #SAM needs permission to be able to create roles to connect to the resources in your template Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK #Preserves the state of previously provisioned resources when an operation fails Disable rollback [y/N]: ==> blank is OK Save arguments to configuration file [Y/n]: ==> blank is OK SAM configuration file [samconfig.toml]: ==> blank is OK SAM configuration environment [default]: ==> blank is OK Looking for resources needed for deployment: Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x A different default S3 bucket can be set in samconfig.toml Saved arguments to config file Running 'sam deploy' for future deployments will use the parameters saved above. The above parameters can be changed by modifying samconfig.toml Learn more about samconfig.toml syntax at https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html ...
Code language: Bash (bash)

Lambdaは上記で指定したS3バケットにファイルを作成・更新するとそれとトリガに起動しますので、作成したS3バケットにファイルをアップロードしてみます。以下ではREADME.mdを作成したバケット名<created bucket name>にコピーしています。

aws s3 cp README.md s3://<created bucket name>/
Code language: Bash (bash)

AWS CloudWatch Logsのログを見てみます。以下ではAWS CLIでCloudWatch Logsのログを確認しますが、AWS Consoleにログインしてログを確認しても問題ありません。下記<create stack name>sam deploy --guidedで指定したスタック名を入力します。

$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName' /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx $ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx ... 2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...
Code language: Bash (bash)

上記、{'Records': [...の部分を以下に整形します。

{ "Records": [ { "eventVersion": "2.1", ... }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx", "bucket": { "name": "<created bucket name>", "ownerIdentity": {"principalId": "xxxxxxxxxxxx"}, "arn": "arn:aws:s3:::<created bucket name>", }, "object": { "key": "README.md", "size": 8382, "eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "sequencer": "xxxxxxxxxxxxxxxxx", }, }, } ] }
Code language: JSON / JSON with Comments (json)

event["Records"]["s3"]["bucket"]["name"]にバケット名、event["Records"]["s3"]["object"]["key"]にファイルパスが含まれていることがわかります。

モノのDevice Shadowへファイル更新情報を書き込む

最終的なhello_world/app.pyは以下のようになります。

import json import boto3 from retrying import retry from hashlib import sha256 # import requests iot_client = boto3.client("iot") endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS") endpoint_url = f"https://{endpoint['endpointAddress']}" iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url) def _get_s3_object(bucket, key): s3 = boto3.client("s3") m = sha256() with open("/tmp/s3obj", "wb") as f: s3.download_file(bucket, key, "/tmp/s3obj") with open("/tmp/s3obj", "rb") as f: while True: chunk = f.read(1024 * 1024) if len(chunk) == 0: break m.update(chunk) return m.hexdigest() def _retry_if_throttling_exception(exception): return isinstance(exception, iot_data_client.exceptions.ThrottlingException) @retry( stop_max_attempt_number=2, wait_fixed=1000, retry_on_exception=_retry_if_throttling_exception, ) def _thing_shadow(iot_data, thing_name, bucket, key, digest): try: payload = { "state": { "desired": { "s3": { "url": f"s3://{bucket}/{key}", "hash": digest, } } } } iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload)) except iot_data.exceptions.ResourceNotFoundException: return None def lambda_handler(event, context): things = [] next_token = "" while True: list_things = iot_client.list_things(nextToken=next_token) things.extend(list_things["things"]) next_token = list_things.get("nextToken") if not next_token: break bucket = event["Records"][0]["s3"]["bucket"]["name"] key = event["Records"][0]["s3"]["object"]["key"] digest = _get_s3_object(bucket, key) for thing in things: _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)

上から順番に説明します。

iot_client = boto3.client("iot") endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS") endpoint_url = f"https://{endpoint['endpointAddress']}" iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
Code language: Python (python)

endpointはboto3 client iotのdescribe_endpointにより動的に取得します。

_get_s3_objectはs3からobjectを取得しそのhash値を計算します。Lambdaは書き込み可能なディレクトリが/tmpである点に注意します。

def _get_s3_object(bucket, key): s3 = boto3.client("s3") m = sha256() with open("/tmp/s3obj", "wb") as f: s3.download_file(bucket, key, "/tmp/s3obj") with open("/tmp/s3obj", "rb") as f: while True: chunk = f.read(1024 * 1024) if len(chunk) == 0: break m.update(chunk) return m.hexdigest()
Code language: Python (python)

_thing_shadowはDevice Shadowを更新します。更新するデータはdesired.s3.urldesired.s3.hashです。この関数はthing(モノ)の数だけ呼び出されるため、スロットリングでエラーが発声する場合があります。そのため、retryingを使用してiot_data_client.exceptions.ThrottlingException例外が発生した場合は1秒待ってから最大2回呼び出します。

def _retry_if_throttling_exception(exception): return isinstance(exception, iot_data_client.exceptions.ThrottlingException) @retry( stop_max_attempt_number=2, wait_fixed=1000, retry_on_exception=_retry_if_throttling_exception, ) def _thing_shadow(iot_data, thing_name, bucket, key, digest): try: payload = { "state": { "desired": { "s3": { "url": f"s3://{bucket}/{key}", "hash": digest, } } } } iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload)) except iot_data.exceptions.ResourceNotFoundException: return None
Code language: Python (python)

lambda_handlerはLambdaのエントリーです。iot_client.list_thingsでモノをリストを取得しthings配列に追加します。next_tokenが存在する場合はそのnext_tokenを使用してiot_client.list_thingsを呼び出すと続きのモノのリストが取得できます。

def lambda_handler(event, context): things = [] next_token = "" while True: list_things = iot_client.list_things(nextToken=next_token) things.extend(list_things["things"]) next_token = list_things.get("nextToken") if not next_token: break bucket = event["Records"][0]["s3"]["bucket"]["name"] key = event["Records"][0]["s3"]["object"]["key"] digest = _get_s3_object(bucket, key) for thing in things: _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)

以上です。別の記事ではデバイス側のコードを作成したいと思います。

参考

チュートリアル: Amazon S3 トリガーを使用して Lambda 関数を呼び出す - AWS Lambda
このチュートリアルでは、コンソールを使用して Lambda 関数を作成し、Amazon Simple Storage Service (Amazon S3) のトリガーを設定します。トリガーは、Amazon S3 バケットにオブジェクトを追加するたびに関数を呼び出します。
Event notification types and destinations - Amazon Simple Storage Service
Amazon S3 supports several event notification types and destinations where the notifications can be published. You can specify the event type and destination wh...
AWS SAM policy templates - AWS Serverless Application Model
This section contains the full list of supported policy templates.
AWS IoT Core endpoints and quotas - AWS General Reference
The following are the service endpoints and service quotas for this service. To connect programmatically to an AWS service, you use an endpoint. In addition to ...
AWS IoT のアクション、リソース、および条件キー - サービス認証リファレンス
AWS IoT へのアクセスを制御するために IAM ポリシーで使用できるサービス固有のリソースやアクション、条件キーを一覧表示します。