In this article, We will create an AWS Lambda that is triggered by a file upload to AWS S3 and update the AWS IoT Device Shadow.
The AWS SAM CLI is used to create the Lambda. The AWS SAM CLI is also used in this article. Runtime is python 3.8.
sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambda
Code language: Bash (bash)
This Lambda accomplishes the following
- Triggers Lambda on file creation/update in specific S3 bucket
- Obtaining information on files created and updated in S3
- Writes file update information to the device shadow of a thing
Triggers Lambda on file creation/update in specific S3 bucket
In order to trigger Lambda to start when a file is created or updated in a specific S3 bucket, and to access the S3 file and Device Shadow from Lambda, update the template.yaml created by sam init as follows.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
iot-shadow-lambda
Sample SAM Template for iot-shadow-lambda
Parameters:
BucketName:
Type: String
Description: bucket name to create
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 15
MemorySize: 5312
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.8
Architectures:
- x86_64
Events:
BucketEvent:
Type: S3
Properties:
Bucket: !Ref IoTShadowBucket
Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
- 's3:ObjectCreated:*'
Policies:
- Statement:
- Sid: S3Get
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/*
- Sid: IoT
Effect: Allow
Action:
- iot:ListThings
- iot:GetThingShadow
- iot:UpdateThingShadow
- iot:DescribeEndpoint
Resource:
- "*"
IoTShadowBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub {AWS::AccountId}-${BucketName}
Code language: YAML (yaml)
The Lambda launch trigger sets the Type of Events
to S3
, specifies the bucket IoTShadowBucket
in Properties.Bucket
, and the S3 creation event s3:ObjectCreated:*
in Properties.Events
.
Events:
BucketEvent:
Type: S3
Properties:
Bucket: !Ref IoTShadowBucket
Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
- 's3:ObjectCreated:*'
Code language: YAML (yaml)
In addition, s3:GetObject
is used to retrieve information about files created in the bucket, and iot:ListThings
, iot:GetThingShadow
, and iot:UpdateThingShadow
are used to retrieve a list of things and update the Device Shadow of the things. In addition, iot:DescribeEndpoint
is allowed to dynamically retrieve endpoints in Lambda.
Policies:
- Statement:
- Sid: S3Get
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::${BucketName}/*
- Sid: IoT
Effect: Allow
Action:
- iot:ListThings
- iot:GetThingShadow
- iot:UpdateThingShadow
- iot:DescribeEndpoint
Resource:
- "*"
Code language: YAML (yaml)
The S3 bucket name is specified in the Parameters so that it can be specified from outside this template.yaml
.
Parameters:
BucketName:
Type: String
Description: bucket name to create
Code language: YAML (yaml)
The following is also added to create an S3 bucket.
IoTShadowBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref BucketName
Code language: YAML (yaml)
Obtaining information on files created and updated in S3
Since template.yaml
is completed, let’s build and deploy this once. hello_world/app.py
is modified to show the event argument of lambda_handler
, as shown below.
import json
# import requests
def lambda_handler(event, context):
print(context)
return {
"statusCode": 200,
"body": json.dumps({
"message": "hello world",
# "location": ip.text.replace("\n", "")
}),
}
Code language: Python (python)
Build and deploy. The first time the build is performed with the --guided
option. The second and subsequent deployments do not require the option.
Specify the bucket name as xxxxxxxxxxxxxx-iot-test-files to facilitate the introduction of another article. Specify your own account ID for xxxxxxxxxxxxxxxx.
$ sam build
$ sam deploy --guided
Configuring SAM deploy
======================
Looking for config file [samconfig.toml] : Not found
Setting default arguments for 'sam deploy'
=========================================
Stack Name [sam-app]: ==> any stack name can be used
AWS Region [us-east-1]: ==> any region can be used
Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used
#Shows you resources changes to be deployed and require a 'Y' to initiate deploy
Confirm changes before deploy [y/N]: ==> blank is OK
#SAM needs permission to be able to create roles to connect to the resources in your template
Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK
#Preserves the state of previously provisioned resources when an operation fails
Disable rollback [y/N]: ==> blank is OK
Save arguments to configuration file [Y/n]: ==> blank is OK
SAM configuration file [samconfig.toml]: ==> blank is OK
SAM configuration environment [default]: ==> blank is OK
Looking for resources needed for deployment:
Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x
A different default S3 bucket can be set in samconfig.toml
Saved arguments to config file
Running 'sam deploy' for future deployments will use the parameters saved above.
The above parameters can be changed by modifying samconfig.toml
Learn more about samconfig.toml syntax at
https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
...
Code language: Bash (bash)
Lambda is triggered when a file is created or updated in the S3 bucket specified above, so let’s try uploading a file to the created S3 bucket. In the example below, README.md is copied to the created bucket name <created bucket name>
.
aws s3 cp README.md s3://<created bucket name>/
Code language: Bash (bash)
Let’s look at the AWS CloudWatch Logs logs. In the following, CloudWatch Logs logs are viewed in the AWS CLI, but you can also log in to the AWS Console to view the logs. Below <create stack name>
is the stack name specified with sam deploy --guided
.
$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName'
/aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
$ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
...
2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...
Code language: Bash (bash)
Above the {'Records': […
part is formatted as follows.
{
"Records": [
{
"eventVersion": "2.1",
...
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx",
"bucket": {
"name": "<created bucket name>",
"ownerIdentity": {"principalId": "xxxxxxxxxxxx"},
"arn": "arn:aws:s3:::<created bucket name>",
},
"object": {
"key": "README.md",
"size": 8382,
"eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"sequencer": "xxxxxxxxxxxxxxxxx",
},
},
}
]
}
Code language: JSON / JSON with Comments (json)
You can see that event["Records"]["s3"]["bucket"]["name"]
contains the bucket name and event["Records"]["s3"]["object"]["key"]
contains the file path.
Writes file update information to the device shadow of a thing
The final hello_world/app.py
is shown as follows.
import json
import boto3
from retrying import retry
from hashlib import sha256
# import requests
iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
def _get_s3_object(bucket, key):
s3 = boto3.client("s3")
m = sha256()
with open("/tmp/s3obj", "wb") as f:
s3.download_file(bucket, key, "/tmp/s3obj")
with open("/tmp/s3obj", "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if len(chunk) == 0:
break
m.update(chunk)
return m.hexdigest()
def _retry_if_throttling_exception(exception):
return isinstance(exception, iot_data_client.exceptions.ThrottlingException)
@retry(
stop_max_attempt_number=2,
wait_fixed=1000,
retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
try:
payload = {
"state": {
"desired": {
"s3": {
"url": f"s3://{bucket}/{key}",
"hash": digest,
}
}
}
}
iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
except iot_data.exceptions.ResourceNotFoundException:
return None
def lambda_handler(event, context):
things = []
next_token = ""
while True:
list_things = iot_client.list_things(nextToken=next_token)
things.extend(list_things["things"])
next_token = list_things.get("nextToken")
if not next_token:
break
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
digest = _get_s3_object(bucket, key)
for thing in things:
_thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)
I will explain from top to bottom.
iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
Code language: Python (python)
The endpoint is dynamically obtained by describe_endpoint
of the boto3 client iot.
_get_s3_object
retrieves an object from s3 and calculates its hash value. Note that the Lambda has /tmp as its writable directory.
def _get_s3_object(bucket, key):
s3 = boto3.client("s3")
m = sha256()
with open("/tmp/s3obj", "wb") as f:
s3.download_file(bucket, key, "/tmp/s3obj")
with open("/tmp/s3obj", "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if len(chunk) == 0:
break
m.update(chunk)
return m.hexdigest()
Code language: Python (python)
_thing_shadow
updates the Device Shadow. The data to be updated are desired.s3.url
and desired.s3.hash
. Since this function is called for every number of thing(s), throttling errors may be raised. Therefore, if a iot_data_client.exceptions.ThrottlingException
exception is raised using retrying, wait 1 second and call it up to 2 times.
def _retry_if_throttling_exception(exception):
return isinstance(exception, iot_data_client.exceptions.ThrottlingException)
@retry(
stop_max_attempt_number=2,
wait_fixed=1000,
retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
try:
payload = {
"state": {
"desired": {
"s3": {
"url": f"s3://{bucket}/{key}",
"hash": digest,
}
}
}
}
iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
except iot_data.exceptions.ResourceNotFoundException:
return None
Code language: Python (python)
The lambda_handler
is a Lambda entry. iot_client.list_things
retrieves a list of things and adds it to the things array. If next_token
exists, the next list of things can be obtained by calling iot_client.list_things
with the next_token
.
def lambda_handler(event, context):
things = []
next_token = ""
while True:
list_things = iot_client.list_things(nextToken=next_token)
things.extend(list_things["things"])
next_token = list_things.get("nextToken")
if not next_token:
break
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
digest = _get_s3_object(bucket, key)
for thing in things:
_thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)
That’s all. In another article I would like to create the code for the device side.