AWS IoT Device Shadow – Lambda

AWS

In this article, We will create an AWS Lambda that is triggered by a file upload to AWS S3 and update the AWS IoT Device Shadow.

The AWS SAM CLI is used to create the Lambda. The AWS SAM CLI is also used in this article. Runtime is python 3.8.

sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambdaCode language: Bash (bash)

This Lambda accomplishes the following

  1. Triggers Lambda on file creation/update in specific S3 bucket
  2. Obtaining information on files created and updated in S3
  3. Writes file update information to the device shadow of a thing

Triggers Lambda on file creation/update in specific S3 bucket

In order to trigger Lambda to start when a file is created or updated in a specific S3 bucket, and to access the S3 file and Device Shadow from Lambda, update the template.yaml created by sam init as follows.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  iot-shadow-lambda

  Sample SAM Template for iot-shadow-lambda

Parameters:
  BucketName:
    Type: String
    Description: bucket name to create

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 15
    MemorySize: 5312

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.8
      Architectures:
        - x86_64
      Events:
        BucketEvent:
          Type: S3
          Properties:
            Bucket: !Ref IoTShadowBucket
            Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
              - 's3:ObjectCreated:*'
      Policies:
        - Statement:
          - Sid: S3Get
            Effect: Allow
            Action:
            - s3:GetObject
            Resource:
            - !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/*

          - Sid: IoT
            Effect: Allow
            Action:
            - iot:ListThings
            - iot:GetThingShadow
            - iot:UpdateThingShadow
            - iot:DescribeEndpoint
            Resource:
            - "*"

  IoTShadowBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub {AWS::AccountId}-${BucketName}Code language: YAML (yaml)

The Lambda launch trigger sets the Type of Events to S3, specifies the bucket IoTShadowBucket in Properties.Bucket, and the S3 creation event s3:ObjectCreated:* in Properties.Events.

      Events:
        BucketEvent:
          Type: S3
          Properties:
            Bucket: !Ref IoTShadowBucket
            Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
              - 's3:ObjectCreated:*'Code language: YAML (yaml)

In addition, s3:GetObject is used to retrieve information about files created in the bucket, and iot:ListThings, iot:GetThingShadow, and iot:UpdateThingShadow are used to retrieve a list of things and update the Device Shadow of the things. In addition, iot:DescribeEndpoint is allowed to dynamically retrieve endpoints in Lambda.

      Policies:
        - Statement:
          - Sid: S3Get
            Effect: Allow
            Action:
            - s3:GetObject
            Resource:
            - !Sub arn:aws:s3:::${BucketName}/*

          - Sid: IoT
            Effect: Allow
            Action:
            - iot:ListThings
            - iot:GetThingShadow
            - iot:UpdateThingShadow
            - iot:DescribeEndpoint
            Resource:
            - "*"Code language: YAML (yaml)

The S3 bucket name is specified in the Parameters so that it can be specified from outside this template.yaml.

Parameters:
  BucketName:
    Type: String
    Description: bucket name to createCode language: YAML (yaml)

The following is also added to create an S3 bucket.

  IoTShadowBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref BucketNameCode language: YAML (yaml)

Obtaining information on files created and updated in S3

Since template.yaml is completed, let’s build and deploy this once. hello_world/app.py is modified to show the event argument of lambda_handler, as shown below.

import json

# import requests


def lambda_handler(event, context):
    print(context)
    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
            # "location": ip.text.replace("\n", "")
        }),
    }Code language: Python (python)

Build and deploy. The first time the build is performed with the --guided option. The second and subsequent deployments do not require the option.

Specify the bucket name as xxxxxxxxxxxxxx-iot-test-files to facilitate the introduction of another article. Specify your own account ID for xxxxxxxxxxxxxxxx.

$ sam build
$ sam deploy --guided

Configuring SAM deploy
======================

	Looking for config file [samconfig.toml] :  Not found

	Setting default arguments for 'sam deploy'
	=========================================
	Stack Name [sam-app]: ==> any stack name can be used
	AWS Region [us-east-1]: ==> any region can be used
	Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used
	#Shows you resources changes to be deployed and require a 'Y' to initiate deploy
	Confirm changes before deploy [y/N]: ==> blank is OK
	#SAM needs permission to be able to create roles to connect to the resources in your template
	Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK
	#Preserves the state of previously provisioned resources when an operation fails
	Disable rollback [y/N]: ==> blank is OK
	Save arguments to configuration file [Y/n]: ==> blank is OK
	SAM configuration file [samconfig.toml]: ==> blank is OK
	SAM configuration environment [default]: ==> blank is OK

	Looking for resources needed for deployment:
	 Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x
	 A different default S3 bucket can be set in samconfig.toml

	Saved arguments to config file
	Running 'sam deploy' for future deployments will use the parameters saved above.
	The above parameters can be changed by modifying samconfig.toml
	Learn more about samconfig.toml syntax at 
	https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html

...Code language: Bash (bash)

Lambda is triggered when a file is created or updated in the S3 bucket specified above, so let’s try uploading a file to the created S3 bucket. In the example below, README.md is copied to the created bucket name <created bucket name>.

aws s3 cp README.md s3://<created bucket name>/Code language: Bash (bash)

Let’s look at the AWS CloudWatch Logs logs. In the following, CloudWatch Logs logs are viewed in the AWS CLI, but you can also log in to the AWS Console to view the logs. Below <create stack name> is the stack name specified with sam deploy --guided.

$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName'
/aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
$ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
...
2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...Code language: Bash (bash)

Above the {'Records': [… part is formatted as follows.

{
    "Records": [
        {
            "eventVersion": "2.1",
            ...
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx",
                "bucket": {
                    "name": "<created bucket name>",
                    "ownerIdentity": {"principalId": "xxxxxxxxxxxx"},
                    "arn": "arn:aws:s3:::<created bucket name>",
                },
                "object": {
                    "key": "README.md",
                    "size": 8382,
                    "eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
                    "sequencer": "xxxxxxxxxxxxxxxxx",
                },
            },
        }
    ]
}Code language: JSON / JSON with Comments (json)

You can see that event["Records"]["s3"]["bucket"]["name"] contains the bucket name and event["Records"]["s3"]["object"]["key"] contains the file path.

Writes file update information to the device shadow of a thing

The final hello_world/app.py is shown as follows.

import json
import boto3
from retrying import retry
from hashlib import sha256

# import requests

iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)


def _get_s3_object(bucket, key):
    s3 = boto3.client("s3")
    m = sha256()

    with open("/tmp/s3obj", "wb") as f:
        s3.download_file(bucket, key, "/tmp/s3obj")

    with open("/tmp/s3obj", "rb") as f:
        while True:
            chunk = f.read(1024 * 1024)
            if len(chunk) == 0:
                break
            m.update(chunk)
        return m.hexdigest()


def _retry_if_throttling_exception(exception):
    return isinstance(exception, iot_data_client.exceptions.ThrottlingException)


@retry(
    stop_max_attempt_number=2,
    wait_fixed=1000,
    retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
    try:
        payload = {
            "state": {
                "desired": {
                    "s3": {
                        "url": f"s3://{bucket}/{key}",
                        "hash": digest,
                    }
                }
            }
        }
        iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
    except iot_data.exceptions.ResourceNotFoundException:
        return None


def lambda_handler(event, context):
    things = []
    next_token = ""
    while True:
        list_things = iot_client.list_things(nextToken=next_token)
        things.extend(list_things["things"])
        next_token = list_things.get("nextToken")
        if not next_token:
            break

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    digest = _get_s3_object(bucket, key)

    for thing in things:
        _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)Code language: Python (python)

I will explain from top to bottom.

iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)Code language: Python (python)

The endpoint is dynamically obtained by describe_endpoint of the boto3 client iot.

_get_s3_object retrieves an object from s3 and calculates its hash value. Note that the Lambda has /tmp as its writable directory.

def _get_s3_object(bucket, key):
    s3 = boto3.client("s3")
    m = sha256()

    with open("/tmp/s3obj", "wb") as f:
        s3.download_file(bucket, key, "/tmp/s3obj")

    with open("/tmp/s3obj", "rb") as f:
        while True:
            chunk = f.read(1024 * 1024)
            if len(chunk) == 0:
                break
            m.update(chunk)
        return m.hexdigest()Code language: Python (python)

_thing_shadow updates the Device Shadow. The data to be updated are desired.s3.url and desired.s3.hash. Since this function is called for every number of thing(s), throttling errors may be raised. Therefore, if a iot_data_client.exceptions.ThrottlingException exception is raised using retrying, wait 1 second and call it up to 2 times.

def _retry_if_throttling_exception(exception):
    return isinstance(exception, iot_data_client.exceptions.ThrottlingException)


@retry(
    stop_max_attempt_number=2,
    wait_fixed=1000,
    retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
    try:
        payload = {
            "state": {
                "desired": {
                    "s3": {
                        "url": f"s3://{bucket}/{key}",
                        "hash": digest,
                    }
                }
            }
        }
        iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
    except iot_data.exceptions.ResourceNotFoundException:
        return NoneCode language: Python (python)

The lambda_handler is a Lambda entry. iot_client.list_things retrieves a list of things and adds it to the things array. If next_token exists, the next list of things can be obtained by calling iot_client.list_things with the next_token.

def lambda_handler(event, context):
    things = []
    next_token = ""
    while True:
        list_things = iot_client.list_things(nextToken=next_token)
        things.extend(list_things["things"])
        next_token = list_things.get("nextToken")
        if not next_token:
            break

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    digest = _get_s3_object(bucket, key)

    for thing in things:
        _thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)Code language: Python (python)

That’s all. In another article I would like to create the code for the device side.

Reference

チュートリアル: Amazon S3 トリガーを使用して Lambda 関数を呼び出す - AWS Lambda
このチュートリアルでは、コンソールを使用して Lambda 関数を作成し、Amazon Simple Storage Service (Amazon S3) バケットのトリガーを設定します。Amazon S3 バケットにオブジェクトを追加するたびに関数を実行し、Amazon CloudWatch Logs にオブジェク...
Event notification types and destinations - Amazon Simple Storage Service
Amazon S3 supports several event notification types and destinations where the notifications can be published. You can specify the event type and destination wh...
AWS SAM policy templates - AWS Serverless Application Model
This section contains the full list of supported policy templates.
AWS IoT Core endpoints and quotas - AWS General Reference
The following are the service endpoints and service quotas for this service. To connect programmatically to an AWS service, you use an endpoint. In addition to ...
AWS IoT のアクション、リソース、および条件キー - サービス認可リファレンス
AWS IoT へのアクセスを制御するために IAM ポリシーで使用できるサービス固有のリソースやアクション、条件キーを一覧表示します。