こちらの記事では、AWS IoT, AWS IoT Device Shadowについて説明しました。またこちらの記事では、AWS S3へのファイルを更新をトリガに、AWS IoT Device Shadowを更新するAWS Lambdaについて説明しました。またこちらの記事ではAWS IoTデバイスからS3にアクセスする仕組みについて説明しました。
これらの記事のまとめとして、この記事では、AWS IoT Device Shadowの差分情報を元にIoTデバイスからS3のファイルを取得するプログラムについて説明します。
全体の流れとしては以下のとおりです。
- Lambda: 特定のS3バケットのファイル作成・更新をトリガにLambdaを起動 ← こちらで解説済み
- Lambda: S3に作成・更新されたファイル情報の取得 ← こちらで解説済み
- Lambda: モノのDevice Shadowへファイル更新情報を書き込む ← こちらで解説済み
- IoTデバイス: 証明書を使用しクレデンシャルプロバイダーからトークンを取得 ← こちらで解説済み
- IoTデバイス: S3からファイルを取得 ← ここを解説
なお、こちらの記事でIoT Device Shadowの説明に使用したサンプルプログラムaws-iot-device-sdk-python-v2/samples/shadow.py
をこの記事でも使用します。
事前にこちらとこちらとこちらの記事を参考に、すべてのセットアップを済ませて下さい。
こちらの記事で指定するバケット名をxxxxxxxxxx-iot-test-files
にして、こちらの記事と合わせておく必要があります。xxxxxxxxxx
はご自身のアカウントIDです。
プログラム修正部分
aws-iot-device-sdk-python-v2/samples/shadow.py
の修正部分は以下のとおりです。なお全体のコードはhttps://github.com/otamajakusi/aws-iot-device-sdk-python-v2を参照して下さい。
# - Overview -
# This sample uses the AWS IoT Device Shadow Service to keep a property in
@@ -42,6 +49,7 @@ cmdUtils.register_command("client_id", "<str>", "Client ID to use for MQTT conne
cmdUtils.register_command("thing_name", "<str>", "The name assigned to your IoT Thing", required=True)
cmdUtils.register_command("shadow_property", "<str>", "The name of the shadow property you want to change (optional, default='color'", default="color")
cmdUtils.register_command("is_ci", "<str>", "If present the sample will run in CI mode (optional, default='None'. Will publish shadow automatically if set)")
+cmdUtils.register_command("credential_endpoint", "<str>", "iot:CredentialProvider endpoint", True, str)
# Needs to be called so the command utils parse the commands
cmdUtils.get_args()
@@ -52,7 +60,7 @@ shadow_thing_name = cmdUtils.get_command_required("thing_name")
shadow_property = cmdUtils.get_command("shadow_property")
is_ci = cmdUtils.get_command("is_ci", None) != None
-SHADOW_VALUE_DEFAULT = "off"
+SHADOW_VALUE_DEFAULT = {}
class LockedData:
def __init__(self):
@@ -60,9 +68,73 @@ class LockedData:
self.shadow_value = None
self.disconnect_called = False
self.request_tokens = set()
+ self.shadow_desired.get("url")
+ if url and expected_hash:
+ locked_data.shadow_desired["url"] = url
+ download_file(url, tmpfile)
+ actual_hash = hash_sha256(tmpfile)
+ if expected_hash == actual_hash:
+ return True
+ print(" invalid hash value {expected_hash} != {actual_hash}")
+ return False
+
+
# Function for gracefully quitting this sample
def exit(msg_or_exception):
if isinstance(msg_or_exception, Exception):
@@ -103,11 +175,16 @@ def on_get_shadow_accepted(response):
return
if response.state:
+ if response.state.desired:
+ locked_data.shadow_desired = response.state.desired.get(shadow_property)
+ print(" Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+
if response.state.delta:
value = response.state.delta.get(shadow_property)
if value:
print(" Shadow contains delta value '{}'.".format(value))
- change_shadow_value(value)
+ if download_file_with_delta(value):
+ change_shadow_value(value)
return
if response.state.reported:
@@ -159,7 +236,8 @@ def on_shadow_delta_updated(delta):
print(" Delta reports that desired value is '{}'. Changing local value...".format(value))
if (delta.client_token is not None):
print (" ClientToken is: " + delta.client_token)
- change_shadow_value(value)
+ if download_file_with_delta(value):
+ change_shadow_value(value)
else:
print(" Delta did not report a change in '{}'".format(shadow_property))
@@ -194,7 +272,6 @@ def on_update_shadow_accepted(response):
print ("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore
else:
print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None
- print("Enter desired value: ") # remind user they can input new values
except:
exit("Updated shadow is missing the target property")
Code language: Diff (diff)
プログラムは以下のように実行します。
$ python3 aws-iot-device-sdk-python-v2/samples/shadow.py --endpoint xxxxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com --key private.key --cert certificate.pem --client_id test-thing --ca_file AmazonRootCA1.pem --thing_name test-thing --credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials --shadow_property s3
Code language: Bash (bash)
なおこちらの記事から追加した引数は以下の2つです。
--credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials
- このエンドポイントについてはこちらの記事を参照して下さい。
--shadow_property s3
- これを指定することで、サンプルプログラムが参照するDevice ShadowのJSONのキーを
s3
にします。下で説明します。
- これを指定することで、サンプルプログラムが参照するDevice ShadowのJSONのキーを
以下順番に説明していきます。
credential_endpointプログラム引数の追加
IoTデバイスからS3にアセスする為、証明書を使用しクレデンシャルプロバイダーからトークンを取得します。そのクレデンシャルプロバイダーのエンドポイントをプログラム実行時に指定するため、--credential_endpoint
引数を追加します。
+cmdUtils.register_command("credential_endpoint", "<str>", "iot:CredentialProvider endpoint", True, str)
Code language: Diff (diff)
Device Shadowのデフォルト値の変更
こちらの記事で解説したDevice Shadowの形式を抜粋します。
"state": {
"desired": {
"s3": {
"url": f"s3://{bucket}/{key}",
"hash": digest,
}
}
}
Code language: JSON / JSON with Comments (json)
プラグラムの引数に、--shadow_property s3
を追加しました。そのためJSONのキーs3
はオブジェクトとなるためデフォルト値を{}
に修正します。
-SHADOW_VALUE_DEFAULT = "off"
+SHADOW_VALUE_DEFAULT = {}
Code language: Diff (diff)
desiredの保存
Device ShadowがLambdaにより更新されたとき、その差分は以下のつの場合があります。
desired.s3.url
とdesired.s3.hash
の2つが差分となる場合desired.s3.hash
だけが差分となる場合
1の場合url
の情報が差分情報コールバックの引数に渡るため問題ありませんが、2の場合url
の情報が差分情報コールバックの引数に渡らないため変数にurl
を保存しておきます。差分情報コールバックについてはこの後説明します。
@@ -60,9 +68,73 @@ class LockedData:
self.shadow_value = None
self.disconnect_called = False
self.request_tokens = set()
+ self.shadow_desired = {}
@@ -103,11 +175,16 @@ def on_get_shadow_accepted(response):
return
if response.state:
+ if response.state.desired:
+ locked_data.shadow_desired = response.state.desired.get(shadow_property)
+ print(" Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+
Code language: Diff (diff)
クレデンシャルプロバイダーからトークンを取得
以下のコードでクレデンシャルプロバイダーからトークンを取得します。詳細はこちらの記事を参照して下さい。
+class Credential:
+ def __init__(self):
+ self.cert = cmdUtils.get_command('cert')
+ self.key = cmdUtils.get_command('key')
+ self.ca_file = cmdUtils.get_command('ca_file')
+ self.credential_endpoint = cmdUtils.get_command('credential_endpoint')
+ self.headers = {"x-amzn-iot-thingname": cmdUtils.get_command_required("thing_name")}
+
+ def get(self):
+ response = requests.get(
+ f"https://{self.credential_endpoint}",
+ verify=self.ca_file,
+ cert=(self.cert, self.key),
+ headers=self.headers,
+ )
+ response.raise_for_status()
+ return json.loads(response.text)
Code language: Diff (diff)
S3からファイルをダウンロードしハッシュ値を計算
下記のdownload_file
関数では、クレデンシャルプロバイダーからトークン(aws_access_key_id
、aws_secret_access_key
、aws_session_token
)を取得しboto3.client
の引数にそれぞれ渡します。これによりs3にアクセスすることができるboto3クライアントが作成できます。
なお、以下のコードは毎回トークンを取得していますが、前回取得済みのトークンの有効期限を確認して有効期限内であればトークンの取得処理を省略するといった処理も追加可能です。
+def hash_sha256(path):
+ m = sha256()
+ with open(path, "rb") as f:
+ while True:
+ chunk = f.read(1024 * 1024)
+ if len(chunk) == 0:
+ break
+ m.update(chunk)
+ return m.hexdigest()
+
+def download_file(url, dst):
+ parsed = urlparse(url)
+ if parsed.scheme != "s3":
+ print(f"{url=} is not s3")
+ return
+
+ credential = Credential()
+ cred = credential.get()["credentials"]
+
+ s3_client = boto3.client(
+ "s3",
+ aws_access_key_id=cred["accessKeyId"],
+ aws_secret_access_key=cred["secretAccessKey"],
+ aws_session_token=cred["sessionToken"],
+ )
+ s3_client.download_file(parsed.netloc, parsed.path.lstrip("/"), dst)
+
+def download_file_with_delta(value):
+ with locked_data.lock:
+ url = value.get("url")
+ expected_hash = value.get("hash")
+ tmpfile = "/tmp/tmpfile"
+ print(f" {url=}, {expected_hash=}")
+ if url is None:
+ url = locked_data.shadow_desired.get("url")
+ if url and expected_hash:
+ locked_data.shadow_desired["url"] = url
+ download_file(url, tmpfile)
+ actual_hash = hash_sha256(tmpfile)
+ if expected_hash == actual_hash:
+ return True
+ print(" invalid hash value {expected_hash} != {actual_hash}")
+ return False
Code language: Diff (diff)
Device Shadowの更新
Device Shadowを更新するタイミングは以下の2つの場合です。
- 起動時にDevice Shadowを確認したときに差分があった場合
- 起動中にDevice Shadowの差分が発生した場合
1の場合のプログラムの差分を下に掲載します。なお差分の前後を増やして関数全体を表示しています。
on_get_shadow_accepted
関数は起動時にDevice Shadowの取得に成功した場合に呼び出される関数でresponse.state.delta
が含まれている場合、S3からファイルをダウンロードし、ダウンロードに成功した場合Device Shadowを更新します。
def on_get_shadow_accepted(response):
# type: (iotshadow.GetShadowResponse) -> None
try:
with locked_data.lock:
# check that this is a response to a request from this session
try:
locked_data.request_tokens.remove(response.client_token)
except KeyError:
print("Ignoring get_shadow_accepted message due to unexpected token.")
return
print("Finished getting initial shadow state.")
if locked_data.shadow_value is not None:
print(" Ignoring initial query because a delta event has already been received.")
return
if response.state:
+ if response.state.desired:
+ locked_data.shadow_desired = response.state.desired.get(shadow_property)
+ print(" Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+
if response.state.delta:
value = response.state.delta.get(shadow_property)
if value:
print(" Shadow contains delta value '{}'.".format(value))
- change_shadow_value(value)
+ if download_file_with_delta(value):
+ change_shadow_value(value)
return
if response.state.reported:
value = response.state.reported.get(shadow_property)
if value:
print(" Shadow contains reported value '{}'.".format(value))
set_local_value_due_to_initial_query(response.state.reported[shadow_property])
return
print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
except Exception as e:
exit(e)
Code language: Diff (diff)
2の場合のプログラムの差分を下に掲載します。こちらも差分の前後を増やして関数全体を表示しています。
on_shadow_delta_updated
関数は起動中にDevice Shadowのdesired
とreported
に差分が発生したときに呼び出される関数でその差分情報が引数delta
に設定されます。この関数が差分情報コールバックです。on_get_shadow_accepted
関数と同様にS3からファイルをダウンロードし、ダウンロードに成功した場合Device Shadowを更新します。
def on_shadow_delta_updated(delta):
# type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
try:
print("Received shadow delta event.")
if delta.state and (shadow_property in delta.state):
value = delta.state[shadow_property]
if value is None:
print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
else:
print(" Delta reports that desired value is '{}'. Changing local value...".format(value))
if (delta.client_token is not None):
print (" ClientToken is: " + delta.client_token)
- change_shadow_value(value)
+ if download_file_with_delta(value):
+ change_shadow_value(value)
else:
print(" Delta did not report a change in '{}'".format(shadow_property))
except Exception as e:
exit(e)
Code language: Diff (diff)
実行
プログラムを実行してみます。
$ python3 aws-iot-device-sdk-python-v2/samples/shadow.py --endpoint xxxxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com --key private.key --cert certificate.pem --client_id test-thing --ca_file AmazonRootCA1.pem --thing_name test-thing --credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials --shadow_property s3
Connecting to a15ks10d68w3hk-ats.iot.us-east-1.amazonaws.com with client ID 'test-thing'...
Connected!
Subscribing to Update responses...
Subscribing to Get responses...
Subscribing to Delta events...
Requesting current shadow state...
Thing has no shadow document. Creating with defaults...
Changed local shadow value to '{}'.
Updating reported shadow value to '{}'...
Update request published.
Finished updating reported shadow value to '{}'.
Code language: Bash (bash)
別のターミナルから以下を実行します。
echo hoge > hoge.txt
aws s3 cp hoge.txt s3://xxxxxxxxxxxx-iot-test-files/hoge.txt
Code language: Bash (bash)
shadow.py
を実行したターミナルに以下が表示されます。
Received shadow delta event.
Delta reports that desired value is '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'. Changing local value...
url='s3://xxxxxxxxxxxx-iot-test-files/hoge.txt', expected_hash='2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'
Changed local shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'.
Updating reported shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'...
Ignoring update_shadow_accepted message due to unexpected token.
Update request published.
Finished updating reported shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'.
Code language: Bash (bash)
別のターミナルから/tmp/tmpfile
が作成され中身がhoge
であることを確認します。
$ cat /tmp/tmpfile
hoge
hoge.txtを書き換えてS3にアップロードしてみます。
echo fuga > hoge.txt
aws s3 cp hoge.txt s3://xxxxxxxxxxxx-iot-test-files/hoge.txt
Code language: Bash (bash)
shadow.py
を実行したターミナルに以下が表示されます。url
=Noneが表示されurl
に差分が無いことが分かります。
Received shadow delta event.
Delta reports that desired value is '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'. Changing local value...
url=None, expected_hash='e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'
Changed local shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'.
Updating reported shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'...
Ignoring update_shadow_accepted message due to unexpected token.
Update request published.
Finished updating reported shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'.
Code language: Bash (bash)
/tmp/tmpfile
が作成され中身がfuga
であることを確認します。
$ cat /tmp/tmpfile
fuga
Code language: Bash (bash)
うまく動いているようです。
説明をわかりやすくするためにサンプルの修正をできる限り少なくしましたが、本番で運用する際はもう少しクリーンナップしても良いかもしれません。
以上です。