Microsoft Azure Stream Analytics ist eine Engine f√ľr die Verarbeitung von Datenstr√∂men in Echtzeit. Sie bietet die M√∂glichkeit gro√üe Datenmengen die in einer hohen Geschwindigkeit eintreffen in Echtzeit abzufragen, zu filtern, mit weiteren Daten anzureichern, mit Hilfe von ML Modellen Muster und Anomalien zu erkennen und auf diese zu reagieren.

Einige Anwendungsbeispiele könnten sein:

  • Betrugserkennung bei Bezahldiensten in Echtzeit
  • Analysieren von Telemetriedatenstr√∂men von IoT-Ger√§ten in Echtzeit
  • Geoanalyse f√ľr das Flottenmanagement
  • Sentiment-Analyse von Social-Media Beitr√§gen
  • Echtzeitverarbeitung von POS-Daten (Point of Sale)

Funktionsweise von Azure Stream Analytics

Microsoft Azure Stream Analytics basiert auf Trill. Eine von Microsoft Research entwickelten High-Performance One-Pass In-Memory Streaming Analytics Engine. Sie kommt innerhalb von Microsoft in verschiedenen Produkten zum Einsatz.

Azure Stream Analytics besteht aus Jobs. Jedes dieser Jobs besteht wiederum aus Eingabe (Input), Abfrage (Query) und Ausgabe (Output). Daten können aus einem Event Hub, dem IoT Hub oder aus dem Blob Storage kommen.

Diese Daten k√∂nnen mit Hilfe der zur Verf√ľgung gestellten Abfragesprache, welche auf SQL basiert, gefiltert, aggregiert und verkn√ľpft werden. Die Dauer des Zeitfensters von Vorg√§ngen kann in der Sprache mit angegeben werden.

Diese transformierten Daten werden dann an verschiedene Ziele/Ausgaben gesendet. Das kann beispielsweise ein Event Hub oder Azure Functions sein. Aber auch Power BI f√ľr die Echtzeitvisualisierung. Nat√ľrlich wird auch die M√∂glichkeit geboten Daten in einer SQL Datenbank, Cosmos DB oder beispielsweise einem Azure Data Lake Gen1 & Gen2 zu persistieren.

Stream Analytics-Einf√ľhrung zur Pipeline
Quelle: Microsoft

Azure Stream Analytics Job erstellen

Im ersten Schritt erstellen wir einen Azure Stream Analytics Job im Azure Portal. Auf der Startseite klicken wir auf Create a resource.

Im Marketplace suchen wir nach dem Begriff Stream Analytics. Es sollte Stream Analytics job erscheinen.

Wir klicken auf Create und dann auf Stream Analytics job.

Bei den Eigenschaften nehme ich als Job name iotgstreamanalytics. Wählen die relevante Resource group und eine Location (West US 2) aus. Klicken auf Create und erstellen den Stream Analytics job.

Nach einigen Sekunden ist der Azure Stream Analytics Service erfolgreich erstellt.

Wiederverwendung des bereits erstellten Azure Data Lake Storage Gen2

Im Artikel Ein Microsoft Azure Speicherkonto erstellen wurde bereits beschrieben wie ein Speicherkonto f√ľr Azure Data Lake Gen2 erstellt wird. Darin haben wir einen Storage Account mit dem Namen iotgstorage, als auch bereits einen Container mit dem Namen iotgdatalake erstellt. Diesen Container k√∂nnen wir f√ľr die Eingabe als auch f√ľr die Ausgabe wiederverwenden.

Die Auftragseingabe konfigurieren

Nun gilt es dem Stream Analytics Job mitzuteilen wie Daten empfangen werden sollen. Dazu √∂ffnen wir den zuvor erstellen Stream Analytics Job iotgstreamanalytics und clicken auf Job topology > Inputs im linken Men√ľ.

Um einen neuen Input hinzuf√ľgen klicken wir auf + Add stream input und w√§hlen aus der Liste IoT Hub aus. Im Dialog IoT Hub geben wir einen Input alias f√ľr den IoT Hub ein und w√§hlen Select IoT Hub from your subscriptions. Anschlie√üend w√§hlen wir die gew√ľnschte Subscription und den IoT Hub im DropDown IoT Hub aus. Mit klick auf Save speichern wir die Eingabe.

Die Ausgabe konfigurieren

Um die Ausgabe zu konfigurieren, klicken wir im linken Men√ľ auf Job topology > Ouputs.

In Outputs klicken wir auf + Add und wählen Blob storage/ADLS Gen2 aus der Liste.

Im Dialog Blob storage/ADLS Gen2 geben wir einen Output alias f√ľr das Ausgabeziel an und w√§hlen Select Blob storage/ADLS Gen2 from your subscription. W√§hlen dann den gew√ľnschten Storage account aus und den Container (hier kann auch ein neuer Container erstellt werden). Als Path pattern gebe ich sajob1/raw/{date}. Als Authentication mode w√§hlen wir Connection string aus.

Klicken auf Save und speichern die Einstellungen.

Datenströme generieren

Daten mit dem Rasperry Pi-Azure IoT-Onlinesimulator an die IoT Hub-Instanz senden

Eine erste M√∂glichkeit einfache Datenstr√∂me zu erzeugen ist der Rasperry Pi-Azure IoT-Onlinelinesimulator. Um diesen verwenden zu k√∂nnen, m√ľssen wir zun√§chst ein IoT-Ger√§t im IoT Hub erstellen und uns den kopieren. Dazu w√§hlen wir im Portal unseren IoT Hub iotghub aus und klicken im IoT Hub im linken Men√ľ auf Explorers > IoT devices. Klicken dort anschlie√üend auf + New und geben auf der Seite Create a device eine Device ID ein. Ich nehme simdevice01. Den Rest lasse ich bei den Standardwerten.

Klicken auf Save und legen somit das IoT-Gerät an.

Danach klicken wir in der Liste der IoT devices auf das zuvor erstellte IoT-Gerät mit der Device ID simdevice01. Hier kopieren wir die Primary Connection String in die Zwischenablage. Wenn sie es sich im Klartext anschauen möchte, so kann man auf das Auge Symbol klicken.

Nun öffnen wir den Rasperry Pi-Azure IoT-Onlinelinesimulator und ersetzen in der Zeile 15 den String [Your IoT hub device connection string] mit den zuvor kopierten Primary Connection String des IoT-Geräts.

Um die Simulation zu starten, klicken wir auf Run unterhalb des Codes.

Anschlie√üend kann in dem Ausgabebereich gepr√ľft werden was an den IoT Hub gesendet wird.

Daten mit dem C# Simulated Device an die IoT Hub-Instanz senden

Im Artikel Ein IoT Ger√§t in C# simulieren und den IoT Hub testen haben wir bereits ein simuliertes Device in C# entwickelt und erfolgreich getestet. Dieses simulierte Device k√∂nnen wir nun wieder f√ľr die Generierung von Datenstr√∂men verwenden. Dazu √∂ffnen die Eingabeaufforderung und navigieren in den Ordner mit dem Projekt.

Geben in der Eingabeaufforderung dotnet run ein und starten damit die Anwendung, welcher nun fleißig Daten generiert und an den IoT Hub sendet.

Daten abfragen und transformieren

Azure Stream Analytics bietet eine SQL-Abfragesprache zum Durchf√ľhren von Transformationen und Berechnungen √ľber Daten-Str√∂me. Eine ausf√ľhrliche Dokumentation dazu findet man in der Microsoft Dokumentation Referenz zur Stream Analytics-Abfragesprache. Mit Hilfe der Abfragesprache kann man Daten aggregieren, berechnen, auf Spalten/Attribute reduzieren, Distanzen berechnen, konvertieren etc. Es ist zudem m√∂glich Ereignisse √ľber Zeitfenster zusammenzufassen, so dass man beispielsweise die Abfrage-/Transformations-Operation √ľber Ergeignisse durchf√ľhrt, welche in 60 Sekunden Intervallen eintreffen (Windowing functions).

Jeder Stream Analytics besteht aus einer Query. Diese Query kann aufgerufen werden auf der Stream Analytics Job Seite im linken Men√ľ unter Job topology > Query.

In mittleren Bereich finden wir alle verf√ľgbaren Quellen und Ziele. Rechts steht uns der Query-Editor zur Verf√ľgung in das wir die Abfrage(n) schreiben k√∂nnen.

Um eine einfache Pass-Through-Abfrage zu erstellen reicht es, wenn wir nachfolgende Abfrage verwenden. Diese nimmt den Datenstrom vom Input IoTGarageIoTHub entgegen und reicht diese 1:1 durch an den Output IoTGarageStorageDataLake.

SELECT
    *
INTO
    [IoTGarageStorageDataLake]

Eine Abfrage kann kann auch mehrere SELECT’s beinhalten die Daten aus unterschiedlichen Input-Quellen nimmt und an unterschiedliche Output-Ziele routet. Ein anderes Beispiel k√∂nnte sein, dass Daten mit kritischen Werten an eine weitere zus√§tzliche Quellen geroutet werden, wohingegen alles andere an eine andere Quelle geroutet werden w√ľrde Auch w√§re es m√∂glich Daten aus mehreren Quellen in einem Ziel zusammenzuf√ľhren.

Beispielsweise könnte das wie folgt aussehen:

WITH StreamReaderADLS AS (
    SELECT
        *
    FROM
        [IoTGarageStorageDataLake]
)

SELECT * INTO [IoTGarageRawArchiveBlob] FROM StreamReaderADLS

SELECT 
    MachineID,
    Temperature
INTO [IoTGarageIoTHub] 
FROM StreamReaderADLS
HAVING Temperature > 27

Um die Abfrage zu testen kann man diese mit einem Klick auf Test query ausf√ľhren. Das Ergebnis erscheint im unteren Bereich im Reiter Test results.

Mit Save query kann die Abfrage gespeichert werden.

Azure Streaming Analytics Job starten

Nachdem nun alles eingerichtet ist, k√∂nnen wir schauen ob auch der Datenstrom weggeschrieben wird. Dazu m√ľssen wir den Stream Analytics Job starten und klicken auf Start auf der √úbersichtsseite vom Stream Analytics Job.

Auf der Seite Start job können wir alles beim Standard belassen und klicken auf Start.

Im letzten Schritt pr√ľfen wir noch einmal ob tats√§chlich die Daten in den Data Lake geschrieben wurden indem wir unser Storage Account iotgstorage √∂ffnen > im Men√ľ links Containers anklicken und den Container iotgdatalake ausw√§hlen.

Wenn wir dann in dem Container in das Verzeichnis sajob1 > raw > … > … wechseln, sollten wir hier .json Dateien finden. Wir klicken auf eine JSON Datei und gehen in den Reiter Edit. Jetzt sollte der Inhalt die Daten beinhalten, die uns unser IoT Device gesendet hat.

Weitere n√ľtzliche Funktionen von Azure Stream Analytics

Neben der Möglichkeit Daten aus einem IoT Hub oder einem Event Hub in ein Ziel zu streamen, bietet Azure Stream Analytics, neben weiteren, noch zwei ganz interessante Funktionen an:

Insbesondere diese beiden Funktionen ermöglichen es in Echtzeit auf Ereignisse direkt im Datenstrom zu reagieren und so beispielsweise einen Betrugsversuch bei einer Zahlung aufzudecken oder Daten im Fluss mit Verweisdaten abzugleichen (e.g. ist das Parkticket bezahlt?).

Weiterf√ľhrende Links