Nachdem wir nun erfolgreich einen Azure IoT Hub inkl. Nachrichtenrouting und einem ersten IoT Gerät erstellt haben, ist es nun an der Zeit mit Hilfe einer kleinen C# Konsolenanwendung ein IoT Gerät zu simulieren und Daten an den IoT Hub zu senden.
MQTT und Firewall
Da das simulierte IoT Gerät das MQTT Protokoll für die Kommunikation mit dem IoT Hub verwendet, müssen wir sicherstellen das der Port 8883 in der Firewall freigegeben ist.
MQTT steht für Message Queuing Telemetry Transport und ist ein offenes Nachrichtenprotokoll für Machine-to-Machine-Kommunikation. Es ermöglicht den Austausch von Nachrichten zwischen Internet of Things-Geräten. Es ist an sich eine leichtgewichtige Publish/Subscribe-Lösung.
Die C# Konsolenanwendung um Daten an den IoT Hub zu senden
Wir starten Microsoft Visual Studio 2019 und erstellen eine neue Konsolen-App (.NET Core).
Wir benennen das Projekt IoTGarageSimDevice01 und klicken auf Erstellen.
Bevor wir entwickeln können müssen wir zuerst die Device SDK for Azure IoT Hub dem Projekt hinzufügen. Dazu öffnen wir die NuGet Package Manager Konsole über Extras > NuGet-Paket-Manager > Paket-Manager-Konsole.
In der Konsole geben wir nachfolgenden Befehl ein um die SDK zu installieren.
PM> Install-Package Microsoft.Azure.Devices.Client
Nachdem das Paket erfolgreich installiert wurde nutzen wir den nachfolgenden Code um folgende Dinge zu tun.
- Baut eine Verbindung zu IoT Hub auf
- Simuliert Temperatur und Feuchtigkeit
- Fügt eine Eigenschaft der Nachricht hinzu auf Basis der Größe des Wertes um damit das Nachrichtenrouting steuern zu können
- Packt diese Informationen als JSON Objekt welches in BASE64 encodiert wurde in den Body der Nachricht
using Microsoft.Azure.Devices.Client; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace IoTGarageSimDevice01 { class Program { private static DeviceClient s_deviceClient; private readonly static string s_myDeviceId = "<Die Geräte-Id des im IoT Hub registrierten Gerätes>"; // simulatedDevice private readonly static string s_iotHubUri = "<Hostname des IoT Hubs>"; // iotghub.azure-devices.net // Im IoT Hub > Geräte > Primärschlüssel private readonly static string s_deviceKey = "PM2yvQu4/SwEhLZBLTCEqOjrYLWFbnDNf/kFJ0F0vUo="; private static async Task Main() { Console.WriteLine("Routing Tutorial: Simulated device\n"); s_deviceClient = DeviceClient.Create(s_iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt); using var cts = new CancellationTokenSource(); var messages = SendDeviceToCloudMessagesAsync(cts.Token); Console.WriteLine("Press the Enter key to stop."); Console.ReadLine(); cts.Cancel(); await messages; } private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token) { double minTemperature = 20; double minHumidity = 60; Random rand = new Random(); while (!token.IsCancellationRequested) { double currentTemperature = minTemperature + rand.NextDouble() * 15; double currentHumidity = minHumidity + rand.NextDouble() * 20; string infoString; string levelValue; if (rand.NextDouble() > 0.7) { if (rand.NextDouble() > 0.5) { levelValue = "critical"; infoString = "This is a critical message."; } else { levelValue = "storage"; infoString = "This is a storage message."; } } else { levelValue = "normal"; infoString = "This is a normal message."; } var telemetryDataPoint = new { deviceId = s_myDeviceId, temperature = currentTemperature, humidity = currentHumidity, pointInfo = infoString }; var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint); // You can encode this as ASCII, but if you want it to be the body of the message, // and to be able to search the body, it must be encoded in UTF with base64 encoding. using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString)); //Add one property to the message. message.Properties.Add("target", levelValue); // Submit the message to the hub. await s_deviceClient.SendEventAsync(message); // Print out the message. Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString); await Task.Delay(1000); } } } }
Wenn die Anwendung erfolgreich gestartet ist wird sie Nachrichten generieren und an den IoT Hub senden. Das sieht wie folgt aus.
In dem vorherigen Artikel hatten wir im IoT Hub ein Nachrichtenrouting für Nachrichten mit der Eigenschaft level = storage erstellt. Diese sollten in den Data Lake geroutet werden. Ob das auch wirklich passiert ist, können wir prüfen in dem wir den Data Lake Explorer starten uns anschauen ob dort Nachrichten weggeschrieben wurden. Dazu wechseln wir im Azure Portal zu unserem Speicherkonto iotgstorage und in der Übersicht klicken wir auf Container. In der Container Übersicht wählen wir unseren Data Lake iotgdatalake aus.
Hier klicken wir auf iotgdatalake. Automatisch öffnet sich der Azure Storage Explorer und wir können nun in den Verzeichnissen navigieren. Auf der Root-Ebene befindet sich ein Ordner iotghub. Das heißt dass das Nachrichtenrouting des IoT Hubs Daten in den Data Lake geschrieben hat. Wir drillen weiter runter in der Verzeichnisstruktur bis iotgdatalake / iotghub / 00 / 2021 / 02 / 07 / 15. Dort angekommen sehen wir eine JSON Datei. Das heißt, dass unser simuliertes IoT Gerät Nachrichten mit der Eigenschaft level = storage erzeugt hat. Wir können die Nachricht auch herunterladen und uns beispielsweise den Inhalt im Notepad anschauen oder auf den Abschnitt Bearbeiten klicken.
{"EnqueuedTimeUtc":"2021-02-07T15:01:47.6580000Z","Properties":{"target":"storage"},"SystemProperties":{"connectionDeviceId":"simulatedDevice","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637480710472027598","enqueuedTime":"2021-02-07T15:01:47.6580000Z"},"Body":"ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAzAAAAMAAAAC4AAAA0AAAAMAAAADIAAAA3AAAAMAAAADAAAAA0AAAANwAAADIAAAA4AAAAMQAAADAAAAA2AAAAMwAAADMAAAAsAAAAIgAAAGgAAAB1AAAAbQAAAGkAAABkAAAAaQAAAHQAAAB5AAAAIgAAADoAAAA2AAAANwAAAC4AAAA0AAAAOQAAADgAAAA3AAAAMwAAADYAAAAxAAAAOQAAADgAAAA3AAAANQAAADgAAAAxAAAAMgAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA=="} {"EnqueuedTimeUtc":"2021-02-07T15:02:15.6010000Z","Properties":{"target":"storage"},"SystemProperties":{"connectionDeviceId":"simulatedDevice","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637480710472027598","enqueuedTime":"2021-02-07T15:02:15.6010000Z"},"Body":"ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAyAAAANgAAAC4AAAAxAAAANQAAADcAAAAzAAAAMgAAADMAAAA5AAAAMQAAADYAAAA1AAAAMQAAADUAAAA5AAAANQAAADMAAAAsAAAAIgAAAGgAAAB1AAAAbQAAAGkAAABkAAAAaQAAAHQAAAB5AAAAIgAAADoAAAA2AAAAOAAAAC4AAAA0AAAAOQAAADIAAAA0AAAAOAAAADMAAAAyAAAANQAAADgAAAA0AAAANwAAADYAAAA3AAAAMQAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA=="} {"EnqueuedTimeUtc":"2021-02-07T15:02:17.9760000Z","Properties":{"target":"storage"},"SystemProperties":{"connectionDeviceId":"simulatedDevice","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637480710472027598","enqueuedTime":"2021-02-07T15:02:17.9760000Z"},"Body":"ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAzAAAAMQAAAC4AAAAxAAAAMAAAADcAAAA5AAAAOAAAADEAAAA4AAAANwAAADEAAAAzAAAAOQAAADgAAAAxAAAANgAAACwAAAAiAAAAaAAAAHUAAABtAAAAaQAAAGQAAABpAAAAdAAAAHkAAAAiAAAAOgAAADYAAAAxAAAALgAAADEAAAA5AAAANQAAADcAAAA0AAAAMAAAADcAAAA5AAAAMwAAADQAAAA1AAAANwAAADEAAAAzAAAANAAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA=="} {"EnqueuedTimeUtc":"2021-02-07T15:02:25.0850000Z","Properties":{"target":"storage"},"SystemProperties":{"connectionDeviceId":"simulatedDevice","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637480710472027598","enqueuedTime":"2021-02-07T15:02:25.0850000Z"},"Body":"ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAyAAAANAAAAC4AAAAyAAAAMgAAADYAAAA5AAAAMAAAADIAAAA5AAAAOAAAADgAAAAzAAAANwAAADkAAAAzAAAAMAAAADgAAAAsAAAAIgAAAGgAAAB1AAAAbQAAAGkAAABkAAAAaQAAAHQAAAB5AAAAIgAAADoAAAA3AAAAMwAAAC4AAAA1AAAANAAAADAAAAA2AAAANAAAADMAAAA4AAAAMgAAADMAAAAxAAAAMQAAADkAAAAxAAAAOAAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA=="} {"EnqueuedTimeUtc":"2021-02-07T15:02:30.9930000Z","Properties":{"target":"storage"},"SystemProperties":{"connectionDeviceId":"simulatedDevice","connectionAuthMethod":"{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}","connectionDeviceGenerationId":"637480710472027598","enqueuedTime":"2021-02-07T15:02:30.9930000Z"},"Body":"ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAyAAAAMgAAAC4AAAA4AAAAMgAAADQAAAAyAAAAMAAAADYAAAAxAAAANAAAADYAAAAzAAAAMgAAADkAAAA3AAAAMwAAADYAAAAsAAAAIgAAAGgAAAB1AAAAbQAAAGkAAABkAAAAaQAAAHQAAAB5AAAAIgAAADoAAAA3AAAAOAAAAC4AAAA5AAAANwAAADgAAAAwAAAAMwAAADEAAAAzAAAAMQAAADQAAAA0AAAAMwAAADMAAAA1AAAANwAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA=="}
Wenn wir uns den Body der Nachrichten beispielsweise der ersten Nachricht decodiert anschauen, sieht das Ergebnis wie folgt aus.
Base64
ewAAACIAAABkAAAAZQAAAHYAAABpAAAAYwAAAGUAAABJAAAAZAAAACIAAAA6AAAAIgAAAHMAAABpAAAAbQAAAHUAAABsAAAAYQAAAHQAAABlAAAAZAAAAEQAAABlAAAAdgAAAGkAAABjAAAAZQAAACIAAAAsAAAAIgAAAHQAAABlAAAAbQAAAHAAAABlAAAAcgAAAGEAAAB0AAAAdQAAAHIAAABlAAAAIgAAADoAAAAzAAAAMAAAAC4AAAA0AAAAMAAAADIAAAA3AAAAMAAAADAAAAA0AAAANwAAADIAAAA4AAAAMQAAADAAAAA2AAAAMwAAADMAAAAsAAAAIgAAAGgAAAB1AAAAbQAAAGkAAABkAAAAaQAAAHQAAAB5AAAAIgAAADoAAAA2AAAANwAAAC4AAAA0AAAAOQAAADgAAAA3AAAAMwAAADYAAAAxAAAAOQAAADgAAAA3AAAANQAAADgAAAAxAAAAMgAAACwAAAAiAAAAcAAAAG8AAABpAAAAbgAAAHQAAABJAAAAbgAAAGYAAABvAAAAIgAAADoAAAAiAAAAVAAAAGgAAABpAAAAcwAAACAAAABpAAAAcwAAACAAAABhAAAAIAAAAHMAAAB0AAAAbwAAAHIAAABhAAAAZwAAAGUAAAAgAAAAbQAAAGUAAABzAAAAcwAAAGEAAABnAAAAZQAAAC4AAAAiAAAAfQAAAA==
Decodiert
{"deviceId":"simulatedDevice", "temperature":30.402700472810633, "humidity":67.49873619875812, "pointInfo":"This is a storage message."}
Sehr schön! Das sieht doch gut aus. Wir haben ein IoT Gerät simuliert das Nachrichten an den IoT Hub in der Azure Cloud geschickt hat und zudem der IoT Hub auf Basis von Eigenschaften der Nachrichten, diese Nachricht an den Data Lake geroutet hat.
Lesen vom integrierten Endpunkt
Der IoT Hub stellt standardmäßig einen integrierten Event Hub kompatiblen Endpunkt unter messages/events zur Verfügung. Nun werden wir zum einen unser zuvor erstelltes simuliertes Gerät starten um Nachrichten in den IoT Hub zu stremen und auf der anderen Seite werden wir eine weitere Konsolenanwendung erstellen, welcher vom Endpunkt messages/events die Nachrichten lesen wird.
Wir benötigen zuerst ein Blob Storage. Der Azure Blog Storage wird als Prüfpunktspeicher verwendet, also erstellen wir im nächsten Schritt ein Blob Storage.
Einen Blob Storage für den Prüfpunktspeicher (Ckeckpoint) erstellen
Dazu gehen wir wieder in das Azure Portal und öffnen und Speicherkonto iotgstorage. Hier klicken wir auf Container und dann auf + Container.
Ich benenne den Blob Storage iotgcheckpointblob und klicke auf Erstellen.
Der Blob Storage wurde erfolgreich erstellt und erscheint in der Liste der Container im Speicherkonto.
C# Client für das Lesen des integrierten Endpunktes
Wir starten Microsoft Visual Studio 2019 und erstellen eine neue Konsolen-App (.NET Core).
Wir nennen das Projekt IoTGarage-Azure-02-IoTHub-ReadFromInternalEndpoint und klicken auf Erstellen.
Bevor wir entwickeln können, müssen wir zuerst die Bibliothek Azure.Messaging.EventHubs.Processor dem Projekt hinzufügen. Dazu öffnen wir die NuGet Package Manager Konsole über Extras > NuGet-Paket-Manager > Paket-Manager-Konsole.
In der Konsole geben wir nachfolgenden Befehl ein um die Bibliothek zu installieren.
PM> Install-Package Azure.Messaging.EventHubs.Processor
Nachdem das Paket erfolgreich installiert wurde können wir nun den Client für das Abrufen der Nachrichten aus den Endpunkt entwickeln.
Für die Verbindung zum Event Hub kompatiblen Endpunkt herzustellen, benötigen wir vier Informationen:
- Event Hub Connection String
- Event Hub Name
- Blob Storage Connection String
- Blob Storage Container Name
Der Event Hub Name ist einfach: es ist der Name unseres IoT Hubs. In unserem Fall iotghub. Beim Blob Storage Container Name ist ebenfalls einfach, das ist iotgcheckpointblob (was wir im vorherigen Abschnitt erstellt haben).
Den Connection String für den Event Hub finden wir wie folgt. Wir öffnen das Azure Portal. Dort öffnen wir die Ressource iotghub (unseren IoT Hub). Klicken im linken Menü unter Einstellungen auf Integrierte Endpunkte. Dort kopieren wir den Eintrag im Feld Event Hub-kompatibler Endpunkt.
Den Blob Storage Connection String finden wir wie folgt. Wir öffnen diesmal die Ressource iotgstorage (unser Speicherkonto). Wählen links im Menü unter Einstellungen > Zugriffsschlüssel den Eintrag im Abschnitt key1 > Verbindungszeichenfolge. Hinter dem Eintrag in diesem Feld verbirgt sich der Blob Storage Connection String. So, da wir jetzt alle Notwendigen Informationen haben, tragen wir diese im Code in den Konstanten ein und können die Anwendung starten.
using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; namespace IoTGarage_Azure_04_EventHubProcessEvents { class Program { private const string ehubNamespaceConnectionString = "Endpoint=sb://..."; private const string eventHubName = "{hubname}"; private const string blobStorageConnectionString = "DefaultEndpointsProtocol=..."; private const string blobContainerName = "{blobcontainername}"; private static Task initializeEventHandler(PartitionInitializingEventArgs arg) { arg.DefaultStartingPosition = EventPosition.Latest; return Task.CompletedTask; } static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { Console.WriteLine("\tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray())); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; } static async Task Main() { BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, ehubNamespaceConnectionString, eventHubName); processor.PartitionInitializingAsync += initializeEventHandler; processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; await processor.StartProcessingAsync(); Console.ReadLine(); } } }
Wenn alles richtig war, so sollten nun die Einträge aus dem Integrierten Event Hub Endpunkt ausgelesen und ausgegeben werden.
Weitere detaillierte Information für das verarbeiten von Events findet man hier und hier. Das ist eine Möglichkeit um die Nachrichten aus dem ioT Hub auszulesen. Alternativ gibt es die Möglichkeit Nachrichten via Azure Stream Analytics in Echtzeit zu verarbeiten und in verschiedene Quellen wegzuschreiben sowie mit Power BI ein Echtzeit Dashboard aufzubauen. Oder den Event Grid zu nutzen oder in den Event Hub zu schreiben. Oder den Strom an Nachrichten beispielsweise mit Azure Databricks zu analysieren oder die Nachrichten in einen Blob, Data Lake und anschließend in eine Datenbank zu schreiben oder für AI Szenarien einzusetzen. Oder oder oder oder … Es gibt also not einige Möglichkeiten auf die wir teilweise in dieser Serie eingehen werden.