AWS IoT Device Shadow – Lambda

AWS

In this article, We will create an AWS Lambda that is triggered by a file upload to AWS S3 and update the AWS IoT Device Shadow.

The AWS SAM CLI is used to create the Lambda. The AWS SAM CLI is also used in this article. Runtime is python 3.8.

sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambda
Code language: Bash (bash)

This Lambda accomplishes the following

  1. Triggers Lambda on file creation/update in specific S3 bucket
  2. Obtaining information on files created and updated in S3
  3. Writes file update information to the device shadow of a thing

Triggers Lambda on file creation/update in specific S3 bucket

In order to trigger Lambda to start when a file is created or updated in a specific S3 bucket, and to access the S3 file and Device Shadow from Lambda, update the template.yaml created by sam init as follows.

AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > iot-shadow-lambda Sample SAM Template for iot-shadow-lambda Parameters: BucketName: Type: String Description: bucket name to create # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 15 MemorySize: 5312 Resources: HelloWorldFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.8 Architectures: - x86_64 Events: BucketEvent: Type: S3 Properties: Bucket: !Ref IoTShadowBucket Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html - 's3:ObjectCreated:*' Policies: - Statement: - Sid: S3Get Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/* - Sid: IoT Effect: Allow Action: - iot:ListThings - iot:GetThingShadow - iot:UpdateThingShadow - iot:DescribeEndpoint Resource: - "*" IoTShadowBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub {AWS::AccountId}-${BucketName}
Code language: YAML (yaml)

The Lambda launch trigger sets the Type of Events to S3, specifies the bucket IoTShadowBucket in Properties.Bucket, and the S3 creation event s3:ObjectCreated:* in Properties.Events.

Events: BucketEvent: Type: S3 Properties: Bucket: !Ref IoTShadowBucket Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html - 's3:ObjectCreated:*'
Code language: YAML (yaml)

In addition, s3:GetObject is used to retrieve information about files created in the bucket, and iot:ListThings, iot:GetThingShadow, and iot:UpdateThingShadow are used to retrieve a list of things and update the Device Shadow of the things. In addition, iot:DescribeEndpoint is allowed to dynamically retrieve endpoints in Lambda.

Policies: - Statement: - Sid: S3Get Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::${BucketName}/* - Sid: IoT Effect: Allow Action: - iot:ListThings - iot:GetThingShadow - iot:UpdateThingShadow - iot:DescribeEndpoint Resource: - "*"
Code language: YAML (yaml)

The S3 bucket name is specified in the Parameters so that it can be specified from outside this template.yaml.

Parameters: BucketName: Type: String Description: bucket name to create
Code language: YAML (yaml)

The following is also added to create an S3 bucket.

IoTShadowBucket: Type: AWS::S3::Bucket Properties: BucketName: !Ref BucketName
Code language: YAML (yaml)

Obtaining information on files created and updated in S3

Since template.yaml is completed, let’s build and deploy this once. hello_world/app.py is modified to show the event argument of lambda_handler, as shown below.

import json # import requests def lambda_handler(event, context): print(context) return { "statusCode": 200, "body": json.dumps({ "message": "hello world", # "location": ip.text.replace("\n", "") }), }
Code language: Python (python)

Build and deploy. The first time the build is performed with the --guided option. The second and subsequent deployments do not require the option.

Specify the bucket name as xxxxxxxxxxxxxx-iot-test-files to facilitate the introduction of another article. Specify your own account ID for xxxxxxxxxxxxxxxx.

$ sam build $ sam deploy --guided Configuring SAM deploy ====================== Looking for config file [samconfig.toml] : Not found Setting default arguments for 'sam deploy' ========================================= Stack Name [sam-app]: ==> any stack name can be used AWS Region [us-east-1]: ==> any region can be used Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used #Shows you resources changes to be deployed and require a 'Y' to initiate deploy Confirm changes before deploy [y/N]: ==> blank is OK #SAM needs permission to be able to create roles to connect to the resources in your template Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK #Preserves the state of previously provisioned resources when an operation fails Disable rollback [y/N]: ==> blank is OK Save arguments to configuration file [Y/n]: ==> blank is OK SAM configuration file [samconfig.toml]: ==> blank is OK SAM configuration environment [default]: ==> blank is OK Looking for resources needed for deployment: Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x A different default S3 bucket can be set in samconfig.toml Saved arguments to config file Running 'sam deploy' for future deployments will use the parameters saved above. The above parameters can be changed by modifying samconfig.toml Learn more about samconfig.toml syntax at https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html ...
Code language: Bash (bash)

Lambda is triggered when a file is created or updated in the S3 bucket specified above, so let’s try uploading a file to the created S3 bucket. In the example below, README.md is copied to the created bucket name <created bucket name>.

aws s3 cp README.md s3://<created bucket name>/
Code language: Bash (bash)

Let’s look at the AWS CloudWatch Logs logs. In the following, CloudWatch Logs logs are viewed in the AWS CLI, but you can also log in to the AWS Console to view the logs. Below <create stack name> is the stack name specified with sam deploy --guided.

$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName' /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx $ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx ... 2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...
Code language: Bash (bash)

Above the {'Records': [… part is formatted as follows.

{ "Records": [ { "eventVersion": "2.1", ... }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx", "bucket": { "name": "<created bucket name>", "ownerIdentity": {"principalId": "xxxxxxxxxxxx"}, "arn": "arn:aws:s3:::<created bucket name>", }, "object": { "key": "README.md", "size": 8382, "eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "sequencer": "xxxxxxxxxxxxxxxxx", }, }, } ] }
Code language: JSON / JSON with Comments (json)

You can see that event["Records"]["s3"]["bucket"]["name"] contains the bucket name and event["Records"]["s3"]["object"]["key"] contains the file path.

Writes file update information to the device shadow of a thing

The final hello_world/app.py is shown as follows.

import json import boto3 from retrying import retry from hashlib import sha256 # import requests iot_client = boto3.client("iot") endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS") endpoint_url = f"https://{endpoint['endpointAddress']}" iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url) def _get_s3_object(bucket, key): s3 = boto3.client("s3") m = sha256() with open("/tmp/s3obj", "wb") as f: s3.download_file(bucket, key, "/tmp/s3obj") with open("/tmp/s3obj", "rb") as f: while True: chunk = f.read(1024 * 1024) if len(chunk) == 0: break m.update(chunk) return m.hexdigest() def _retry_if_throttling_exception(exception): return isinstance(exception, iot_data_client.exceptions.ThrottlingException) @retry( stop_max_attempt_number=2, wait_fixed=1000, retry_on_exception=_retry_if_throttling_exception, ) def _thing_shadow(iot_data, thing_name, bucket, key, digest): try: payload = { "state": { "desired": { "s3": { "url": f"s3://{bucket}/{key}", "hash": digest, } } } } iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload)) except iot_data.exceptions.ResourceNotFoundException: return None def lambda_handler(event, context): things = [] next_token = "" while True: list_things = iot_client.list_things(nextToken=next_token) things.extend(list_things["things"]) next_token = list_things.get("nextToken") if not next_token: break bucket = event["Records"][0]["s3"]["bucket"]["name"] key = event["Records"][0]["s3"]["object"]["key"] digest = _get_s3_object(bucket, key) for thing in things: _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)

I will explain from top to bottom.

iot_client = boto3.client("iot") endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS") endpoint_url = f"https://{endpoint['endpointAddress']}" iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
Code language: Python (python)

The endpoint is dynamically obtained by describe_endpoint of the boto3 client iot.

_get_s3_object retrieves an object from s3 and calculates its hash value. Note that the Lambda has /tmp as its writable directory.

def _get_s3_object(bucket, key): s3 = boto3.client("s3") m = sha256() with open("/tmp/s3obj", "wb") as f: s3.download_file(bucket, key, "/tmp/s3obj") with open("/tmp/s3obj", "rb") as f: while True: chunk = f.read(1024 * 1024) if len(chunk) == 0: break m.update(chunk) return m.hexdigest()
Code language: Python (python)

_thing_shadow updates the Device Shadow. The data to be updated are desired.s3.url and desired.s3.hash. Since this function is called for every number of thing(s), throttling errors may be raised. Therefore, if a iot_data_client.exceptions.ThrottlingException exception is raised using retrying, wait 1 second and call it up to 2 times.

def _retry_if_throttling_exception(exception): return isinstance(exception, iot_data_client.exceptions.ThrottlingException) @retry( stop_max_attempt_number=2, wait_fixed=1000, retry_on_exception=_retry_if_throttling_exception, ) def _thing_shadow(iot_data, thing_name, bucket, key, digest): try: payload = { "state": { "desired": { "s3": { "url": f"s3://{bucket}/{key}", "hash": digest, } } } } iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload)) except iot_data.exceptions.ResourceNotFoundException: return None
Code language: Python (python)

The lambda_handler is a Lambda entry. iot_client.list_things retrieves a list of things and adds it to the things array. If next_token exists, the next list of things can be obtained by calling iot_client.list_things with the next_token.

def lambda_handler(event, context): things = [] next_token = "" while True: list_things = iot_client.list_things(nextToken=next_token) things.extend(list_things["things"]) next_token = list_things.get("nextToken") if not next_token: break bucket = event["Records"][0]["s3"]["bucket"]["name"] key = event["Records"][0]["s3"]["object"]["key"] digest = _get_s3_object(bucket, key) for thing in things: _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)

That’s all. In another article I would like to create the code for the device side.

Reference

チュートリアル: Amazon S3 トリガーを使用して Lambda 関数を呼び出す - AWS Lambda
このチュートリアルでは、コンソールを使用して Lambda 関数を作成し、Amazon Simple Storage Service (Amazon S3) バケットのトリガーを設定します。Amazon S3 バケットにオブジェクトを追加するたびに関数を実行し、Amazon CloudWatch Logs にオブジェク...
Event notification types and destinations - Amazon Simple Storage Service
Amazon S3 supports several event notification types and destinations where the notifications can be published. You can specify the event type and destination wh...
AWS SAM policy templates - AWS Serverless Application Model
This section contains the full list of supported policy templates.
AWS IoT Core endpoints and quotas - AWS General Reference
The following are the service endpoints and service quotas for this service. To connect programmatically to an AWS service, you use an endpoint. In addition to ...
AWS IoT のアクション、リソース、および条件キー - サービス認可リファレンス
AWS IoT へのアクセスを制御するために IAM ポリシーで使用できるサービス固有のリソースやアクション、条件キーを一覧表示します。