Microsoft Azure Stream Analytics ist eine Engine f├╝r die Verarbeitung von Datenstr├Âmen in Echtzeit. Sie bietet die M├Âglichkeit gro├če Datenmengen die in einer hohen Geschwindigkeit eintreffen in Echtzeit abzufragen, zu filtern, mit weiteren Daten anzureichern, mit Hilfe von ML Modellen Muster und Anomalien zu erkennen und auf diese zu reagieren.

Einige Anwendungsbeispiele k├Ânnten sein:

  • Betrugserkennung bei Bezahldiensten in Echtzeit
  • Analysieren von Telemetriedatenstr├Âmen von IoT-Ger├Ąten in Echtzeit
  • Geoanalyse f├╝r das Flottenmanagement
  • Sentiment-Analyse von Social-Media Beitr├Ągen
  • Echtzeitverarbeitung von POS-Daten (Point of Sale)

Funktionsweise von Azure Stream Analytics

Microsoft Azure Stream Analytics basiert auf Trill. Eine von Microsoft Research entwickelten High-Performance One-Pass In-Memory Streaming Analytics Engine. Sie kommt innerhalb von Microsoft in verschiedenen Produkten zum Einsatz.

Azure Stream Analytics besteht aus Jobs. Jedes dieser Jobs besteht wiederum aus Eingabe (Input), Abfrage (Query) und Ausgabe (Output). Daten k├Ânnen aus einem Event Hub, dem IoT Hub oder aus dem Blob Storage kommen.

Diese Daten k├Ânnen mit Hilfe der zur Verf├╝gung gestellten Abfragesprache, welche auf SQL basiert, gefiltert, aggregiert und verkn├╝pft werden. Die Dauer des Zeitfensters von Vorg├Ąngen kann in der Sprache mit angegeben werden.

Diese transformierten Daten werden dann an verschiedene Ziele/Ausgaben gesendet. Das kann beispielsweise ein Event Hub oder Azure Functions sein. Aber auch Power BI f├╝r die Echtzeitvisualisierung. Nat├╝rlich wird auch die M├Âglichkeit geboten Daten in einer SQL Datenbank, Cosmos DB oder beispielsweise einem Azure Data Lake Gen1 & Gen2 zu persistieren.

Stream Analytics-Einf├╝hrung zur Pipeline
Quelle: Microsoft

Azure Stream Analytics Job erstellen

Im ersten Schritt erstellen wir einen Azure Stream Analytics Job im Azure Portal. Auf der Startseite klicken wir auf Create a resource.

Im Marketplace suchen wir nach dem Begriff Stream Analytics. Es sollte Stream Analytics job erscheinen.

Wir klicken auf Create und dann auf Stream Analytics job.

Bei den Eigenschaften nehme ich als Job name iotgstreamanalytics. W├Ąhlen die relevante Resource group und eine Location (West US 2) aus. Klicken auf Create und erstellen den Stream Analytics job.

Nach einigen Sekunden ist der Azure Stream Analytics Service erfolgreich erstellt.

Wiederverwendung des bereits erstellten Azure Data Lake Storage Gen2

Im Artikel Ein Microsoft Azure Speicherkonto erstellen wurde bereits beschrieben wie ein Speicherkonto f├╝r Azure Data Lake Gen2 erstellt wird. Darin haben wir einen Storage Account mit dem Namen iotgstorage, als auch bereits einen Container mit dem Namen iotgdatalake erstellt. Diesen Container k├Ânnen wir f├╝r die Eingabe als auch f├╝r die Ausgabe wiederverwenden.

Die Auftragseingabe konfigurieren

Nun gilt es dem Stream Analytics Job mitzuteilen wie Daten empfangen werden sollen. Dazu ├Âffnen wir den zuvor erstellen Stream Analytics Job iotgstreamanalytics und clicken auf Job topology > Inputs im linken Men├╝.

Um einen neuen Input hinzuf├╝gen klicken wir auf + Add stream input und w├Ąhlen aus der Liste IoT Hub aus. Im Dialog IoT Hub geben wir einen Input alias f├╝r den IoT Hub ein und w├Ąhlen Select IoT Hub from your subscriptions. Anschlie├čend w├Ąhlen wir die gew├╝nschte Subscription und den IoT Hub im DropDown IoT Hub aus. Mit klick auf Save speichern wir die Eingabe.

Die Ausgabe konfigurieren

Um die Ausgabe zu konfigurieren, klicken wir im linken Men├╝ auf Job topology > Ouputs.

In Outputs klicken wir auf + Add und w├Ąhlen Blob storage/ADLS Gen2 aus der Liste.

Im Dialog Blob storage/ADLS Gen2 geben wir einen Output alias f├╝r das Ausgabeziel an und w├Ąhlen Select Blob storage/ADLS Gen2 from your subscription. W├Ąhlen dann den gew├╝nschten Storage account aus und den Container (hier kann auch ein neuer Container erstellt werden). Als Path pattern gebe ich sajob1/raw/{date}. Als Authentication mode w├Ąhlen wir Connection string aus.

Klicken auf Save und speichern die Einstellungen.

Datenstr├Âme generieren

Daten mit dem Rasperry Pi-Azure IoT-Onlinesimulator an die IoT Hub-Instanz senden

Eine erste M├Âglichkeit einfache Datenstr├Âme zu erzeugen ist der Rasperry Pi-Azure IoT-Onlinelinesimulator. Um diesen verwenden zu k├Ânnen, m├╝ssen wir zun├Ąchst ein IoT-Ger├Ąt im IoT Hub erstellen und uns den kopieren. Dazu w├Ąhlen wir im Portal unseren IoT Hub iotghub aus und klicken im IoT Hub im linken Men├╝ auf Explorers > IoT devices. Klicken dort anschlie├čend auf + New und geben auf der Seite Create a device eine Device ID ein. Ich nehme simdevice01. Den Rest lasse ich bei den Standardwerten.

Klicken auf Save und legen somit das IoT-Ger├Ąt an.

Danach klicken wir in der Liste der IoT devices auf das zuvor erstellte IoT-Ger├Ąt mit der Device ID simdevice01. Hier kopieren wir die Primary Connection String in die Zwischenablage. Wenn sie es sich im Klartext anschauen m├Âchte, so kann man auf das Auge Symbol klicken.

Nun ├Âffnen wir den Rasperry Pi-Azure IoT-Onlinelinesimulator und ersetzen in der Zeile 15 den String [Your IoT hub device connection string] mit den zuvor kopierten Primary Connection String des IoT-Ger├Ąts.

Um die Simulation zu starten, klicken wir auf Run unterhalb des Codes.

Anschlie├čend kann in dem Ausgabebereich gepr├╝ft werden was an den IoT Hub gesendet wird.

Daten mit dem C# Simulated Device an die IoT Hub-Instanz senden

Im Artikel Ein IoT Ger├Ąt in C# simulieren und den IoT Hub testen haben wir bereits ein simuliertes Device in C# entwickelt und erfolgreich getestet. Dieses simulierte Device k├Ânnen wir nun wieder f├╝r die Generierung von Datenstr├Âmen verwenden. Dazu ├Âffnen die Eingabeaufforderung und navigieren in den Ordner mit dem Projekt.

Geben in der Eingabeaufforderung dotnet run ein und starten damit die Anwendung, welcher nun flei├čig Daten generiert und an den IoT Hub sendet.

Daten abfragen und transformieren

Azure Stream Analytics bietet eine SQL-Abfragesprache zum Durchf├╝hren von Transformationen und Berechnungen ├╝ber Daten-Str├Âme. Eine ausf├╝hrliche Dokumentation dazu findet man in der Microsoft Dokumentation Referenz zur Stream Analytics-Abfragesprache. Mit Hilfe der Abfragesprache kann man Daten aggregieren, berechnen, auf Spalten/Attribute reduzieren, Distanzen berechnen, konvertieren etc. Es ist zudem m├Âglich Ereignisse ├╝ber Zeitfenster zusammenzufassen, so dass man beispielsweise die Abfrage-/Transformations-Operation ├╝ber Ergeignisse durchf├╝hrt, welche in 60 Sekunden Intervallen eintreffen (Windowing functions).

Jeder Stream Analytics besteht aus einer Query. Diese Query kann aufgerufen werden auf der Stream Analytics Job Seite im linken Men├╝ unter Job topology > Query.

In mittleren Bereich finden wir alle verf├╝gbaren Quellen und Ziele. Rechts steht uns der Query-Editor zur Verf├╝gung in das wir die Abfrage(n) schreiben k├Ânnen.

Um eine einfache Pass-Through-Abfrage zu erstellen reicht es, wenn wir nachfolgende Abfrage verwenden. Diese nimmt den Datenstrom vom Input IoTGarageIoTHub entgegen und reicht diese 1:1 durch an den Output IoTGarageStorageDataLake.

SELECT
    *
INTO
    [IoTGarageStorageDataLake]

Eine Abfrage kann kann auch mehrere SELECT’s beinhalten die Daten aus unterschiedlichen Input-Quellen nimmt und an unterschiedliche Output-Ziele routet. Ein anderes Beispiel k├Ânnte sein, dass Daten mit kritischen Werten an eine weitere zus├Ątzliche Quellen geroutet werden, wohingegen alles andere an eine andere Quelle geroutet werden w├╝rde Auch w├Ąre es m├Âglich Daten aus mehreren Quellen in einem Ziel zusammenzuf├╝hren.

Beispielsweise k├Ânnte das wie folgt aussehen:

WITH StreamReaderADLS AS (
    SELECT
        *
    FROM
        [IoTGarageStorageDataLake]
)

SELECT * INTO [IoTGarageRawArchiveBlob] FROM StreamReaderADLS

SELECT 
    MachineID,
    Temperature
INTO [IoTGarageIoTHub] 
FROM StreamReaderADLS
HAVING Temperature > 27

Um die Abfrage zu testen kann man diese mit einem Klick auf Test query ausf├╝hren. Das Ergebnis erscheint im unteren Bereich im Reiter Test results.

Mit Save query kann die Abfrage gespeichert werden.

Azure Streaming Analytics Job starten

Nachdem nun alles eingerichtet ist, k├Ânnen wir schauen ob auch der Datenstrom weggeschrieben wird. Dazu m├╝ssen wir den Stream Analytics Job starten und klicken auf Start auf der ├ťbersichtsseite vom Stream Analytics Job.

Auf der Seite Start job k├Ânnen wir alles beim Standard belassen und klicken auf Start.

Im letzten Schritt pr├╝fen wir noch einmal ob tats├Ąchlich die Daten in den Data Lake geschrieben wurden indem wir unser Storage Account iotgstorage ├Âffnen > im Men├╝ links Containers anklicken und den Container iotgdatalake ausw├Ąhlen.

Wenn wir dann in dem Container in das Verzeichnis sajob1 > raw > … > … wechseln, sollten wir hier .json Dateien finden. Wir klicken auf eine JSON Datei und gehen in den Reiter Edit. Jetzt sollte der Inhalt die Daten beinhalten, die uns unser IoT Device gesendet hat.

Weitere n├╝tzliche Funktionen von Azure Stream Analytics

Neben der M├Âglichkeit Daten aus einem IoT Hub oder einem Event Hub in ein Ziel zu streamen, bietet Azure Stream Analytics, neben weiteren, noch zwei ganz interessante Funktionen an:

Insbesondere diese beiden Funktionen erm├Âglichen es in Echtzeit auf Ereignisse direkt im Datenstrom zu reagieren und so beispielsweise einen Betrugsversuch bei einer Zahlung aufzudecken oder Daten im Fluss mit Verweisdaten abzugleichen (e.g. ist das Parkticket bezahlt?).

Weiterf├╝hrende Links