この記事ではAWS S3へのファイルの更新をトリガに起動し、AWS IoT Device Shadowを更新するAWS Lambdaを作成していきます。
Lambdaの作成はAWS SAM CLIを使用します。AWS SAM CLIはこちらの記事でも使用しました。runtimeはpython3.8を使用します。
sam init --runtime python3.8 --package-type Zip --app-template hello-world --name iot-shadow-lambda
Code language: Bash (bash)
このLambdaでは以下のことを実現します。
- 特定のS3バケットのファイル作成・更新をトリガにLambdaを起動
- S3に作成・更新されたファイル情報の取得
- モノのDevice Shadowへファイル更新情報を書き込む
特定のS3バケットのファイル作成・更新をトリガにLambdaを起動
特定のS3バケットのファイル作成・更新をトリガにLambdaを起動させ、LambdaからS3ファイルの取得、Device Shadowへアクセスをする為、sam init
で作成されたtemplate.yaml
のを以下のように更新します。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
iot-shadow-lambda
Sample SAM Template for iot-shadow-lambda
Parameters:
BucketName:
Type: String
Description: bucket name to create
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 15
MemorySize: 5312
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.8
Architectures:
- x86_64
Events:
BucketEvent:
Type: S3
Properties:
Bucket: !Ref IoTShadowBucket
Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
- 's3:ObjectCreated:*'
Policies:
- Statement:
- Sid: S3Get
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::{AWS::AccountId}-${BucketName}/*
- Sid: IoT
Effect: Allow
Action:
- iot:ListThings
- iot:GetThingShadow
- iot:UpdateThingShadow
- iot:DescribeEndpoint
Resource:
- "*"
IoTShadowBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub {AWS::AccountId}-${BucketName}
Code language: YAML (yaml)
Lambdaの起動トリガはEvents
のType
をS3
にし、Properties.Bucket
でバケットIoTShadowBucket
を指定し、Properties.Events
でS3
のイベント作成イベントs3:ObjectCreated:*
を指定しています。
Events:
BucketEvent:
Type: S3
Properties:
Bucket: !Ref IoTShadowBucket
Events: # https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html
- 's3:ObjectCreated:*'
Code language: YAML (yaml)
またバケットに作成されたファイルの情報を取得するためs3:GetObject
と、モノの取得、モノのDevice Shadowの更新をするため、iot:ListThings
、iot:GetThingShadow
、iot:UpdateThingShadow
を許可しています。またLambdaで動的にendpointを取得するために、iot:DescribeEndpoint
を許可しています。
Policies:
- Statement:
- Sid: S3Get
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::${BucketName}/*
- Sid: IoT
Effect: Allow
Action:
- iot:ListThings
- iot:GetThingShadow
- iot:UpdateThingShadow
- iot:DescribeEndpoint
Resource:
- "*"
Code language: YAML (yaml)
S3バケット名はParameters
に記述しこのtemplate.yaml
の外から指定できるようにします。
Parameters:
BucketName:
Type: String
Description: bucket name to create
Code language: YAML (yaml)
また、S3バケットを作成する為、以下を追加しています。
IoTShadowBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref BucketName
Code language: YAML (yaml)
S3に作成・更新されたファイル情報の取得
template.yaml
は完成していますので、一度ビルドとデプロイを実施してみます。hello_world/app.py
は以下のように、lambda_handler
の引数event
を表示するように修正しています。
import json
# import requests
def lambda_handler(event, context):
print(context)
return {
"statusCode": 200,
"body": json.dumps({
"message": "hello world",
# "location": ip.text.replace("\n", "")
}),
}
Code language: Python (python)
ビルドしてデプロイします。ビルドの初回は--guided
オプションで実施します。2回目以降のデプロイはオプションが不要になります。
指定するバケット名をxxxxxxxxxx-iot-test-files
という名前にすることで、別の記事の導入がスムーズになります。なおxxxxxxxxxx
にはご自身のアカウントIDを指定します。
$ sam build
$ sam deploy --guided
Configuring SAM deploy
======================
Looking for config file [samconfig.toml] : Not found
Setting default arguments for 'sam deploy'
=========================================
Stack Name [sam-app]: ==> any stack name can be used
AWS Region [us-east-1]: ==> any region can be used
Parameter BucketName []: ==> any GLOBALLY UNIQUE bucket name can be used
#Shows you resources changes to be deployed and require a 'Y' to initiate deploy
Confirm changes before deploy [y/N]: ==> blank is OK
#SAM needs permission to be able to create roles to connect to the resources in your template
Allow SAM CLI IAM role creation [Y/n]: ==> blank is OK
#Preserves the state of previously provisioned resources when an operation fails
Disable rollback [y/N]: ==> blank is OK
Save arguments to configuration file [Y/n]: ==> blank is OK
SAM configuration file [samconfig.toml]: ==> blank is OK
SAM configuration environment [default]: ==> blank is OK
Looking for resources needed for deployment:
Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-32vpmut42j6x
A different default S3 bucket can be set in samconfig.toml
Saved arguments to config file
Running 'sam deploy' for future deployments will use the parameters saved above.
The above parameters can be changed by modifying samconfig.toml
Learn more about samconfig.toml syntax at
https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html
...
Code language: Bash (bash)
Lambdaは上記で指定したS3バケットにファイルを作成・更新するとそれとトリガに起動しますので、作成したS3バケットにファイルをアップロードしてみます。以下ではREADME.mdを作成したバケット名<created bucket name>にコピーしています。
aws s3 cp README.md s3://<created bucket name>/
Code language: Bash (bash)
AWS CloudWatch Logsのログを見てみます。以下ではAWS CLIでCloudWatch Logsのログを確認しますが、AWS Consoleにログインしてログを確認しても問題ありません。下記<create stack name>
はsam deploy --guided
で指定したスタック名を入力します。
$ aws logs describe-log-groups | jq -r '.logGroups[] | select(.logGroupName | contains("<created stack name>")).logGroupName'
/aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
$ aws logs tail /aws/lambda/iot-shadow-lambda-HelloWorldFunction-xxxxxxxxxxxx
...
2022-12-13T13:38:51.916000+00:00 2022/12/13/[$LATEST]abfed18bb61448faa8c96857bd3b9fee {'Records': [...
Code language: Bash (bash)
上記、{'Records': [...
の部分を以下に整形します。
{
"Records": [
{
"eventVersion": "2.1",
...
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "xxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxxxxxx",
"bucket": {
"name": "<created bucket name>",
"ownerIdentity": {"principalId": "xxxxxxxxxxxx"},
"arn": "arn:aws:s3:::<created bucket name>",
},
"object": {
"key": "README.md",
"size": 8382,
"eTag": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"sequencer": "xxxxxxxxxxxxxxxxx",
},
},
}
]
}
Code language: JSON / JSON with Comments (json)
event["Records"]["s3"]["bucket"]["name"]
にバケット名、event["Records"]["s3"]["object"]["key"]
にファイルパスが含まれていることがわかります。
モノのDevice Shadowへファイル更新情報を書き込む
最終的なhello_world/app.py
は以下のようになります。
import json
import boto3
from retrying import retry
from hashlib import sha256
# import requests
iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
def _get_s3_object(bucket, key):
s3 = boto3.client("s3")
m = sha256()
with open("/tmp/s3obj", "wb") as f:
s3.download_file(bucket, key, "/tmp/s3obj")
with open("/tmp/s3obj", "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if len(chunk) == 0:
break
m.update(chunk)
return m.hexdigest()
def _retry_if_throttling_exception(exception):
return isinstance(exception, iot_data_client.exceptions.ThrottlingException)
@retry(
stop_max_attempt_number=2,
wait_fixed=1000,
retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
try:
payload = {
"state": {
"desired": {
"s3": {
"url": f"s3://{bucket}/{key}",
"hash": digest,
}
}
}
}
iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
except iot_data.exceptions.ResourceNotFoundException:
return None
def lambda_handler(event, context):
things = []
next_token = ""
while True:
list_things = iot_client.list_things(nextToken=next_token)
things.extend(list_things["things"])
next_token = list_things.get("nextToken")
if not next_token:
break
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
digest = _get_s3_object(bucket, key)
for thing in things:
_thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)
上から順番に説明します。
iot_client = boto3.client("iot")
endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint_url = f"https://{endpoint['endpointAddress']}"
iot_data_client = boto3.client("iot-data", endpoint_url=endpoint_url)
Code language: Python (python)
endpointはboto3 client iotのdescribe_endpoint
により動的に取得します。
_get_s3_object
はs3からobjectを取得しそのhash値を計算します。Lambdaは書き込み可能なディレクトリが/tmp
である点に注意します。
def _get_s3_object(bucket, key):
s3 = boto3.client("s3")
m = sha256()
with open("/tmp/s3obj", "wb") as f:
s3.download_file(bucket, key, "/tmp/s3obj")
with open("/tmp/s3obj", "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if len(chunk) == 0:
break
m.update(chunk)
return m.hexdigest()
Code language: Python (python)
_thing_shadow
はDevice Shadowを更新します。更新するデータはdesired.s3.url
とdesired.s3.hash
です。この関数はthing(モノ)の数だけ呼び出されるため、スロットリングでエラーが発声する場合があります。そのため、retrying
を使用してiot_data_client.exceptions.ThrottlingException
例外が発生した場合は1秒待ってから最大2回呼び出します。
def _retry_if_throttling_exception(exception):
return isinstance(exception, iot_data_client.exceptions.ThrottlingException)
@retry(
stop_max_attempt_number=2,
wait_fixed=1000,
retry_on_exception=_retry_if_throttling_exception,
)
def _thing_shadow(iot_data, thing_name, bucket, key, digest):
try:
payload = {
"state": {
"desired": {
"s3": {
"url": f"s3://{bucket}/{key}",
"hash": digest,
}
}
}
}
iot_data.update_thing_shadow(thingName=thing_name, payload=json.dumps(payload))
except iot_data.exceptions.ResourceNotFoundException:
return None
Code language: Python (python)
lambda_handler
はLambdaのエントリーです。iot_client.list_things
でモノをリストを取得しthings配列に追加します。next_tokenが存在する場合はそのnext_tokenを使用してiot_client.list_things
を呼び出すと続きのモノのリストが取得できます。
def lambda_handler(event, context):
things = []
next_token = ""
while True:
list_things = iot_client.list_things(nextToken=next_token)
things.extend(list_things["things"])
next_token = list_things.get("nextToken")
if not next_token:
break
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
digest = _get_s3_object(bucket, key)
for thing in things:
_thing_shadow(iot_data_client, thing["thingName"], bucket, key, digest)
Code language: Python (python)
以上です。別の記事ではデバイス側のコードを作成したいと思います。