AWS IoT Device Shadow – IoT Device

AWS

こちらの記事では、AWS IoT, AWS IoT Device Shadowについて説明しました。またこちらの記事では、AWS S3へのファイルを更新をトリガに、AWS IoT Device Shadowを更新するAWS Lambdaについて説明しました。またこちらの記事ではAWS IoTデバイスからS3にアクセスする仕組みについて説明しました。

これらの記事のまとめとして、この記事では、AWS IoT Device Shadowの差分情報を元にIoTデバイスからS3のファイルを取得するプログラムについて説明します。

全体の流れとしては以下のとおりです。

  1. Lambda: 特定のS3バケットのファイル作成・更新をトリガにLambdaを起動 ← こちらで解説済み
  2. Lambda: S3に作成・更新されたファイル情報の取得 ← こちらで解説済み
  3. Lambda: モノのDevice Shadowへファイル更新情報を書き込む ← こちらで解説済み
  4. IoTデバイス: 証明書を使用しクレデンシャルプロバイダーからトークンを取得 ← こちらで解説済み
  5. IoTデバイス: S3からファイルを取得 ← ここを解説

なお、こちらの記事でIoT Device Shadowの説明に使用したサンプルプログラムaws-iot-device-sdk-python-v2/samples/shadow.pyをこの記事でも使用します。

事前にこちらこちらこちらの記事を参考に、すべてのセットアップを済ませて下さい。

こちらの記事で指定するバケット名をxxxxxxxxxx-iot-test-filesにして、こちらの記事と合わせておく必要があります。xxxxxxxxxxはご自身のアカウントIDです。

プログラム修正部分

aws-iot-device-sdk-python-v2/samples/shadow.pyの修正部分は以下のとおりです。なお全体のコードはhttps://github.com/otamajakusi/aws-iot-device-sdk-python-v2を参照して下さい。

 # - Overview -
 # This sample uses the AWS IoT Device Shadow Service to keep a property in
@@ -42,6 +49,7 @@ cmdUtils.register_command("client_id", "<str>", "Client ID to use for MQTT conne
 cmdUtils.register_command("thing_name", "<str>", "The name assigned to your IoT Thing", required=True)
 cmdUtils.register_command("shadow_property", "<str>", "The name of the shadow property you want to change (optional, default='color'", default="color")
 cmdUtils.register_command("is_ci", "<str>", "If present the sample will run in CI mode (optional, default='None'. Will publish shadow automatically if set)")
+cmdUtils.register_command("credential_endpoint", "<str>", "iot:CredentialProvider endpoint", True, str)
 # Needs to be called so the command utils parse the commands
 cmdUtils.get_args()
 
@@ -52,7 +60,7 @@ shadow_thing_name = cmdUtils.get_command_required("thing_name")
 shadow_property = cmdUtils.get_command("shadow_property")
 is_ci = cmdUtils.get_command("is_ci", None) != None
 
-SHADOW_VALUE_DEFAULT = "off"
+SHADOW_VALUE_DEFAULT = {}
 
 class LockedData:
     def __init__(self):
@@ -60,9 +68,73 @@ class LockedData:
         self.shadow_value = None
         self.disconnect_called = False
         self.request_tokens = set()
+        self.shadow_desired.get("url")
+        if url and expected_hash:
+            locked_data.shadow_desired["url"] = url
+            download_file(url, tmpfile)
+            actual_hash = hash_sha256(tmpfile)
+            if expected_hash == actual_hash:
+                return True
+            print("  invalid hash value {expected_hash} != {actual_hash}")
+        return False
+
+
 # Function for gracefully quitting this sample
 def exit(msg_or_exception):
     if isinstance(msg_or_exception, Exception):
@@ -103,11 +175,16 @@ def on_get_shadow_accepted(response):
                 return
 
         if response.state:
+            if response.state.desired:
+                locked_data.shadow_desired = response.state.desired.get(shadow_property)
+                print("  Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+
             if response.state.delta:
                 value = response.state.delta.get(shadow_property)
                 if value:
                     print("  Shadow contains delta value '{}'.".format(value))
-                    change_shadow_value(value)
+                    if download_file_with_delta(value):
+                        change_shadow_value(value)
                     return
 
             if response.state.reported:
@@ -159,7 +236,8 @@ def on_shadow_delta_updated(delta):
                 print("  Delta reports that desired value is '{}'. Changing local value...".format(value))
                 if (delta.client_token is not None):
                     print ("  ClientToken is: " + delta.client_token)
-                change_shadow_value(value)
+                if download_file_with_delta(value):
+                    change_shadow_value(value)
         else:
             print("  Delta did not report a change in '{}'".format(shadow_property))
 
@@ -194,7 +272,6 @@ def on_update_shadow_accepted(response):
                     print ("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore
             else:
                 print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None
-            print("Enter desired value: ") # remind user they can input new values
         except:
             exit("Updated shadow is missing the target property") Code language: Diff (diff)

プログラムは以下のように実行します。

$ python3 aws-iot-device-sdk-python-v2/samples/shadow.py --endpoint xxxxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com --key private.key --cert certificate.pem --client_id test-thing --ca_file AmazonRootCA1.pem --thing_name test-thing --credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials --shadow_property s3Code language: Bash (bash)

なおこちらの記事から追加した引数は以下の2つです。

  • --credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials
    • このエンドポイントについてはこちらの記事を参照して下さい。
  • --shadow_property s3
    • これを指定することで、サンプルプログラムが参照するDevice ShadowのJSONのキーをs3にします。下で説明します。

以下順番に説明していきます。

credential_endpointプログラム引数の追加

IoTデバイスからS3にアセスする為、証明書を使用しクレデンシャルプロバイダーからトークンを取得します。そのクレデンシャルプロバイダーのエンドポイントをプログラム実行時に指定するため、--credential_endpoint引数を追加します。

+cmdUtils.register_command("credential_endpoint", "<str>", "iot:CredentialProvider endpoint", True, str)Code language: Diff (diff)

Device Shadowのデフォルト値の変更

こちらの記事で解説したDevice Shadowの形式を抜粋します。

            "state": {
                "desired": {
                    "s3": {
                        "url": f"s3://{bucket}/{key}",
                        "hash": digest,
                    }
                }
            }Code language: JSON / JSON with Comments (json)

プラグラムの引数に、--shadow_property s3を追加しました。そのためJSONのキーs3はオブジェクトとなるためデフォルト値を{}に修正します。

-SHADOW_VALUE_DEFAULT = "off"
+SHADOW_VALUE_DEFAULT = {} Code language: Diff (diff)

desiredの保存

Device ShadowがLambdaにより更新されたとき、その差分は以下のつの場合があります。

  1. desired.s3.urldesired.s3.hashの2つが差分となる場合
  2. desired.s3.hashだけが差分となる場合

1の場合urlの情報が差分情報コールバックの引数に渡るため問題ありませんが、2の場合urlの情報が差分情報コールバックの引数に渡らないため変数にurlを保存しておきます。差分情報コールバックについてはこの後説明します。

@@ -60,9 +68,73 @@ class LockedData:
         self.shadow_value = None
         self.disconnect_called = False
         self.request_tokens = set()
+        self.shadow_desired = {}
@@ -103,11 +175,16 @@ def on_get_shadow_accepted(response):
                 return
 
         if response.state:
+            if response.state.desired:
+                locked_data.shadow_desired = response.state.desired.get(shadow_property)
+                print("  Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+Code language: Diff (diff)

クレデンシャルプロバイダーからトークンを取得

以下のコードでクレデンシャルプロバイダーからトークンを取得します。詳細はこちらの記事を参照して下さい。

+class Credential:
+    def __init__(self):
+        self.cert = cmdUtils.get_command('cert')
+        self.key = cmdUtils.get_command('key')
+        self.ca_file = cmdUtils.get_command('ca_file')
+        self.credential_endpoint = cmdUtils.get_command('credential_endpoint')
+        self.headers = {"x-amzn-iot-thingname": cmdUtils.get_command_required("thing_name")}
+
+    def get(self):
+        response = requests.get(
+            f"https://{self.credential_endpoint}",
+            verify=self.ca_file,
+            cert=(self.cert, self.key),
+            headers=self.headers,
+        )
+        response.raise_for_status()
+        return json.loads(response.text)Code language: Diff (diff)

S3からファイルをダウンロードしハッシュ値を計算

下記のdownload_file関数では、クレデンシャルプロバイダーからトークン(aws_access_key_idaws_secret_access_keyaws_session_token)を取得しboto3.clientの引数にそれぞれ渡します。これによりs3にアクセスすることができるboto3クライアントが作成できます。

なお、以下のコードは毎回トークンを取得していますが、前回取得済みのトークンの有効期限を確認して有効期限内であればトークンの取得処理を省略するといった処理も追加可能です。

+def hash_sha256(path):
+    m = sha256()
+    with open(path, "rb") as f:
+        while True:
+            chunk = f.read(1024 * 1024)
+            if len(chunk) == 0:
+                break
+            m.update(chunk)
+        return m.hexdigest()
+
+def download_file(url, dst):
+    parsed = urlparse(url)
+    if parsed.scheme != "s3":
+        print(f"{url=} is not s3")
+        return
+
+    credential = Credential()
+    cred = credential.get()["credentials"]
+
+    s3_client = boto3.client(
+        "s3",
+        aws_access_key_id=cred["accessKeyId"],
+        aws_secret_access_key=cred["secretAccessKey"],
+        aws_session_token=cred["sessionToken"],
+    )
+    s3_client.download_file(parsed.netloc, parsed.path.lstrip("/"), dst)
+
+def download_file_with_delta(value):
+    with locked_data.lock:
+        url = value.get("url")
+        expected_hash = value.get("hash")
+        tmpfile = "/tmp/tmpfile"
+        print(f"  {url=}, {expected_hash=}")
+        if url is None:
+            url = locked_data.shadow_desired.get("url")
+        if url and expected_hash:
+            locked_data.shadow_desired["url"] = url
+            download_file(url, tmpfile)
+            actual_hash = hash_sha256(tmpfile)
+            if expected_hash == actual_hash:
+                return True
+            print("  invalid hash value {expected_hash} != {actual_hash}")
+        return FalseCode language: Diff (diff)

Device Shadowの更新

Device Shadowを更新するタイミングは以下の2つの場合です。

  1. 起動時にDevice Shadowを確認したときに差分があった場合
  2. 起動中にDevice Shadowの差分が発生した場合

1の場合のプログラムの差分を下に掲載します。なお差分の前後を増やして関数全体を表示しています。

on_get_shadow_accepted関数は起動時にDevice Shadowの取得に成功した場合に呼び出される関数でresponse.state.deltaが含まれている場合、S3からファイルをダウンロードし、ダウンロードに成功した場合Device Shadowを更新します。

 def on_get_shadow_accepted(response):
     # type: (iotshadow.GetShadowResponse) -> None
     try:
         with locked_data.lock:
             # check that this is a response to a request from this session
             try:
                 locked_data.request_tokens.remove(response.client_token)
             except KeyError:
                 print("Ignoring get_shadow_accepted message due to unexpected token.")
                 return
 
             print("Finished getting initial shadow state.")
             if locked_data.shadow_value is not None:
                 print("  Ignoring initial query because a delta event has already been received.")
                 return
 
         if response.state:
+            if response.state.desired:
+                locked_data.shadow_desired = response.state.desired.get(shadow_property)
+                print("  Save desired[{shadow_property}]: {locked_data.shadow_desired}")
+
             if response.state.delta:
                 value = response.state.delta.get(shadow_property)
                 if value:
                     print("  Shadow contains delta value '{}'.".format(value))
-                    change_shadow_value(value)
+                    if download_file_with_delta(value):
+                        change_shadow_value(value)
                     return
 
             if response.state.reported:
                 value = response.state.reported.get(shadow_property)
                 if value:
                     print("  Shadow contains reported value '{}'.".format(value))
                     set_local_value_due_to_initial_query(response.state.reported[shadow_property])
                     return
 
         print("  Shadow document lacks '{}' property. Setting defaults...".format(shadow_property))
         change_shadow_value(SHADOW_VALUE_DEFAULT)
         return
 
     except Exception as e:
         exit(e)Code language: Diff (diff)

2の場合のプログラムの差分を下に掲載します。こちらも差分の前後を増やして関数全体を表示しています。

on_shadow_delta_updated関数は起動中にDevice Shadowのdesiredreportedに差分が発生したときに呼び出される関数でその差分情報が引数deltaに設定されます。この関数が差分情報コールバックです。on_get_shadow_accepted関数と同様にS3からファイルをダウンロードし、ダウンロードに成功した場合Device Shadowを更新します。

 def on_shadow_delta_updated(delta):
     # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
     try:
         print("Received shadow delta event.")
         if delta.state and (shadow_property in delta.state):
             value = delta.state[shadow_property]
             if value is None:
                 print("  Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property))
                 change_shadow_value(SHADOW_VALUE_DEFAULT)
                 return
             else:
                 print("  Delta reports that desired value is '{}'. Changing local value...".format(value))
                 if (delta.client_token is not None):
                     print ("  ClientToken is: " + delta.client_token)
-                change_shadow_value(value)
+                if download_file_with_delta(value):
+                    change_shadow_value(value)
         else:
             print("  Delta did not report a change in '{}'".format(shadow_property))
 
     except Exception as e:
         exit(e)Code language: Diff (diff)

実行

プログラムを実行してみます。

$ python3 aws-iot-device-sdk-python-v2/samples/shadow.py --endpoint xxxxxxxxxxxxxx-ats.iot.us-east-1.amazonaws.com --key private.key --cert certificate.pem --client_id test-thing --ca_file AmazonRootCA1.pem --thing_name test-thing --credential_endpoint xxxxxxxxxxxxx.credentials.iot.us-east-1.amazonaws.com/role-aliases/iot-role-alias-s3-read/credentials --shadow_property s3
Connecting to a15ks10d68w3hk-ats.iot.us-east-1.amazonaws.com with client ID 'test-thing'...
Connected!
Subscribing to Update responses...
Subscribing to Get responses...
Subscribing to Delta events...
Requesting current shadow state...
Thing has no shadow document. Creating with defaults...
Changed local shadow value to '{}'.
Updating reported shadow value to '{}'...
Update request published.
Finished updating reported shadow value to '{}'.Code language: Bash (bash)

別のターミナルから以下を実行します。

echo hoge > hoge.txt
aws s3 cp hoge.txt s3://xxxxxxxxxxxx-iot-test-files/hoge.txtCode language: Bash (bash)

shadow.pyを実行したターミナルに以下が表示されます。

Received shadow delta event.
  Delta reports that desired value is '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'. Changing local value...
  url='s3://xxxxxxxxxxxx-iot-test-files/hoge.txt', expected_hash='2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'
Changed local shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'.
Updating reported shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'...
Ignoring update_shadow_accepted message due to unexpected token.
Update request published.
Finished updating reported shadow value to '{'url': 's3://xxxxxxxxxxxx-iot-test-files/hoge.txt', 'hash': '2e0390eb024a52963db7b95e84a9c2b12c004054a7bad9a97ec0c7c89d4681d2'}'.Code language: Bash (bash)

別のターミナルから/tmp/tmpfileが作成され中身がhogeであることを確認します。

$ cat /tmp/tmpfile 
hoge

hoge.txtを書き換えてS3にアップロードしてみます。

echo fuga > hoge.txt
aws s3 cp hoge.txt s3://xxxxxxxxxxxx-iot-test-files/hoge.txtCode language: Bash (bash)

shadow.pyを実行したターミナルに以下が表示されます。url=Noneが表示されurlに差分が無いことが分かります。

Received shadow delta event.
  Delta reports that desired value is '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'. Changing local value...
  url=None, expected_hash='e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'
Changed local shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'.
Updating reported shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'...
Ignoring update_shadow_accepted message due to unexpected token.
Update request published.
Finished updating reported shadow value to '{'hash': 'e712aff3705ac314b9a890e0ec208faa20054eee514d86ab913d768f94e01279'}'.Code language: Bash (bash)

/tmp/tmpfileが作成され中身がfugaであることを確認します。

$ cat /tmp/tmpfile 
fugaCode language: Bash (bash)

うまく動いているようです。

説明をわかりやすくするためにサンプルの修正をできる限り少なくしましたが、本番で運用する際はもう少しクリーンナップしても良いかもしれません。

以上です。

参照

GitHub - otamajakusi/aws-iot-device-sdk-python-v2: Next generation AWS IoT Client SDK for Python using the AWS Common Runtime
Next generation AWS IoT Client SDK for Python using the AWS Common Runtime - GitHub - otamajakusi/aws-iot-device-sdk-python-v2: Next generation AWS IoT Client S...
GitHub - otamajakusi/aws-iot-device-sdk-python-v2: Next generation AWS IoT Client SDK for Python using the AWS Common Runtime
Next generation AWS IoT Client SDK for Python using the AWS Common Runtime - GitHub - otamajakusi/aws-iot-device-sdk-python-v2: Next generation AWS IoT Client S...