Microsoft Azure Stream Analytics ist eine Engine für die Verarbeitung von Datenströmen in Echtzeit. Sie bietet die Möglichkeit große Datenmengen die in einer hohen Geschwindigkeit eintreffen in Echtzeit abzufragen, zu filtern, mit weiteren Daten anzureichern, mit Hilfe von ML Modellen Muster und Anomalien zu erkennen und auf diese zu reagieren.

Einige Anwendungsbeispiele könnten sein:

  • Betrugserkennung bei Bezahldiensten in Echtzeit
  • Analysieren von Telemetriedatenströmen von IoT-Geräten in Echtzeit
  • Geoanalyse für das Flottenmanagement
  • Sentiment-Analyse von Social-Media Beiträgen
  • Echtzeitverarbeitung von POS-Daten (Point of Sale)

Funktionsweise von Azure Stream Analytics

Microsoft Azure Stream Analytics basiert auf Trill. Eine von Microsoft Research entwickelten High-Performance One-Pass In-Memory Streaming Analytics Engine. Sie kommt innerhalb von Microsoft in verschiedenen Produkten zum Einsatz.

Azure Stream Analytics besteht aus Jobs. Jedes dieser Jobs besteht wiederum aus Eingabe (Input), Abfrage (Query) und Ausgabe (Output). Daten können aus einem Event Hub, dem IoT Hub oder aus dem Blob Storage kommen.

Diese Daten können mit Hilfe der zur Verfügung gestellten Abfragesprache, welche auf SQL basiert, gefiltert, aggregiert und verknüpft werden. Die Dauer des Zeitfensters von Vorgängen kann in der Sprache mit angegeben werden.

Diese transformierten Daten werden dann an verschiedene Ziele/Ausgaben gesendet. Das kann beispielsweise ein Event Hub oder Azure Functions sein. Aber auch Power BI für die Echtzeitvisualisierung. Natürlich wird auch die Möglichkeit geboten Daten in einer SQL Datenbank, Cosmos DB oder beispielsweise einem Azure Data Lake Gen1 & Gen2 zu persistieren.

Stream Analytics-Einführung zur Pipeline
Quelle: Microsoft

Azure Stream Analytics Job erstellen

Im ersten Schritt erstellen wir einen Azure Stream Analytics Job im Azure Portal. Auf der Startseite klicken wir auf Create a resource.

Im Marketplace suchen wir nach dem Begriff Stream Analytics. Es sollte Stream Analytics job erscheinen.

Wir klicken auf Create und dann auf Stream Analytics job.

Bei den Eigenschaften nehme ich als Job name iotgstreamanalytics. Wählen die relevante Resource group und eine Location (West US 2) aus. Klicken auf Create und erstellen den Stream Analytics job.

Nach einigen Sekunden ist der Azure Stream Analytics Service erfolgreich erstellt.

Wiederverwendung des bereits erstellten Azure Data Lake Storage Gen2

Im Artikel Ein Microsoft Azure Speicherkonto erstellen wurde bereits beschrieben wie ein Speicherkonto für Azure Data Lake Gen2 erstellt wird. Darin haben wir einen Storage Account mit dem Namen iotgstorage, als auch bereits einen Container mit dem Namen iotgdatalake erstellt. Diesen Container können wir für die Eingabe als auch für die Ausgabe wiederverwenden.

Die Auftragseingabe konfigurieren

Nun gilt es dem Stream Analytics Job mitzuteilen wie Daten empfangen werden sollen. Dazu öffnen wir den zuvor erstellen Stream Analytics Job iotgstreamanalytics und clicken auf Job topology > Inputs im linken Menü.

Um einen neuen Input hinzufügen klicken wir auf + Add stream input und wählen aus der Liste IoT Hub aus. Im Dialog IoT Hub geben wir einen Input alias für den IoT Hub ein und wählen Select IoT Hub from your subscriptions. Anschließend wählen wir die gewünschte Subscription und den IoT Hub im DropDown IoT Hub aus. Mit klick auf Save speichern wir die Eingabe.

Die Ausgabe konfigurieren

Um die Ausgabe zu konfigurieren, klicken wir im linken Menü auf Job topology > Ouputs.

In Outputs klicken wir auf + Add und wählen Blob storage/ADLS Gen2 aus der Liste.

Im Dialog Blob storage/ADLS Gen2 geben wir einen Output alias für das Ausgabeziel an und wählen Select Blob storage/ADLS Gen2 from your subscription. Wählen dann den gewünschten Storage account aus und den Container (hier kann auch ein neuer Container erstellt werden). Als Path pattern gebe ich sajob1/raw/{date}. Als Authentication mode wählen wir Connection string aus.

Klicken auf Save und speichern die Einstellungen.

Datenströme generieren

Daten mit dem Rasperry Pi-Azure IoT-Onlinesimulator an die IoT Hub-Instanz senden

Eine erste Möglichkeit einfache Datenströme zu erzeugen ist der Rasperry Pi-Azure IoT-Onlinelinesimulator. Um diesen verwenden zu können, müssen wir zunächst ein IoT-Gerät im IoT Hub erstellen und uns den kopieren. Dazu wählen wir im Portal unseren IoT Hub iotghub aus und klicken im IoT Hub im linken Menü auf Explorers > IoT devices. Klicken dort anschließend auf + New und geben auf der Seite Create a device eine Device ID ein. Ich nehme simdevice01. Den Rest lasse ich bei den Standardwerten.

Klicken auf Save und legen somit das IoT-Gerät an.

Danach klicken wir in der Liste der IoT devices auf das zuvor erstellte IoT-Gerät mit der Device ID simdevice01. Hier kopieren wir die Primary Connection String in die Zwischenablage. Wenn sie es sich im Klartext anschauen möchte, so kann man auf das Auge Symbol klicken.

Nun öffnen wir den Rasperry Pi-Azure IoT-Onlinelinesimulator und ersetzen in der Zeile 15 den String [Your IoT hub device connection string] mit den zuvor kopierten Primary Connection String des IoT-Geräts.

Um die Simulation zu starten, klicken wir auf Run unterhalb des Codes.

Anschließend kann in dem Ausgabebereich geprüft werden was an den IoT Hub gesendet wird.

Daten mit dem C# Simulated Device an die IoT Hub-Instanz senden

Im Artikel Ein IoT Gerät in C# simulieren und den IoT Hub testen haben wir bereits ein simuliertes Device in C# entwickelt und erfolgreich getestet. Dieses simulierte Device können wir nun wieder für die Generierung von Datenströmen verwenden. Dazu öffnen die Eingabeaufforderung und navigieren in den Ordner mit dem Projekt.

Geben in der Eingabeaufforderung dotnet run ein und starten damit die Anwendung, welcher nun fleißig Daten generiert und an den IoT Hub sendet.

Daten abfragen und transformieren

Azure Stream Analytics bietet eine SQL-Abfragesprache zum Durchführen von Transformationen und Berechnungen über Daten-Ströme. Eine ausführliche Dokumentation dazu findet man in der Microsoft Dokumentation Referenz zur Stream Analytics-Abfragesprache. Mit Hilfe der Abfragesprache kann man Daten aggregieren, berechnen, auf Spalten/Attribute reduzieren, Distanzen berechnen, konvertieren etc. Es ist zudem möglich Ereignisse über Zeitfenster zusammenzufassen, so dass man beispielsweise die Abfrage-/Transformations-Operation über Ergeignisse durchführt, welche in 60 Sekunden Intervallen eintreffen (Windowing functions).

Jeder Stream Analytics besteht aus einer Query. Diese Query kann aufgerufen werden auf der Stream Analytics Job Seite im linken Menü unter Job topology > Query.

In mittleren Bereich finden wir alle verfügbaren Quellen und Ziele. Rechts steht uns der Query-Editor zur Verfügung in das wir die Abfrage(n) schreiben können.

Um eine einfache Pass-Through-Abfrage zu erstellen reicht es, wenn wir nachfolgende Abfrage verwenden. Diese nimmt den Datenstrom vom Input IoTGarageIoTHub entgegen und reicht diese 1:1 durch an den Output IoTGarageStorageDataLake.

SELECT
    *
INTO
    [IoTGarageStorageDataLake]

Eine Abfrage kann kann auch mehrere SELECT’s beinhalten die Daten aus unterschiedlichen Input-Quellen nimmt und an unterschiedliche Output-Ziele routet. Ein anderes Beispiel könnte sein, dass Daten mit kritischen Werten an eine weitere zusätzliche Quellen geroutet werden, wohingegen alles andere an eine andere Quelle geroutet werden würde Auch wäre es möglich Daten aus mehreren Quellen in einem Ziel zusammenzuführen.

Beispielsweise könnte das wie folgt aussehen:

WITH StreamReaderADLS AS (
    SELECT
        *
    FROM
        [IoTGarageStorageDataLake]
)

SELECT * INTO [IoTGarageRawArchiveBlob] FROM StreamReaderADLS

SELECT 
    MachineID,
    Temperature
INTO [IoTGarageIoTHub] 
FROM StreamReaderADLS
HAVING Temperature > 27

Um die Abfrage zu testen kann man diese mit einem Klick auf Test query ausführen. Das Ergebnis erscheint im unteren Bereich im Reiter Test results.

Mit Save query kann die Abfrage gespeichert werden.

Azure Streaming Analytics Job starten

Nachdem nun alles eingerichtet ist, können wir schauen ob auch der Datenstrom weggeschrieben wird. Dazu müssen wir den Stream Analytics Job starten und klicken auf Start auf der Übersichtsseite vom Stream Analytics Job.

Auf der Seite Start job können wir alles beim Standard belassen und klicken auf Start.

Im letzten Schritt prüfen wir noch einmal ob tatsächlich die Daten in den Data Lake geschrieben wurden indem wir unser Storage Account iotgstorage öffnen > im Menü links Containers anklicken und den Container iotgdatalake auswählen.

Wenn wir dann in dem Container in das Verzeichnis sajob1 > raw > … > … wechseln, sollten wir hier .json Dateien finden. Wir klicken auf eine JSON Datei und gehen in den Reiter Edit. Jetzt sollte der Inhalt die Daten beinhalten, die uns unser IoT Device gesendet hat.

Weitere nützliche Funktionen von Azure Stream Analytics

Neben der Möglichkeit Daten aus einem IoT Hub oder einem Event Hub in ein Ziel zu streamen, bietet Azure Stream Analytics, neben weiteren, noch zwei ganz interessante Funktionen an:

Insbesondere diese beiden Funktionen ermöglichen es in Echtzeit auf Ereignisse direkt im Datenstrom zu reagieren und so beispielsweise einen Betrugsversuch bei einer Zahlung aufzudecken oder Daten im Fluss mit Verweisdaten abzugleichen (e.g. ist das Parkticket bezahlt?).

Weiterführende Links