Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Dieses Rezept zeigt, wie SynapseML- und Azure AI-Dienste auf Apache Spark für die multivariate Anomalieerkennung verwendet werden. Die multivariate Anomalieerkennung umfasst die Erkennung von Anomalien zwischen vielen Variablen oder Zeitreihen, wobei alle Interkorrelationen und Abhängigkeiten zwischen den verschiedenen Variablen berücksichtigt werden. In diesem Szenario werden SynapseML und die Azure AI-Dienste verwendet, um ein Modell für die multivariate Anomalieerkennung zu trainieren. Anschließend verwenden wir das Modell, um multivariate Anomalien innerhalb eines Datasets abzuleiten, die synthetische Messungen von drei IoT-Sensoren enthalten.
Wichtig
Ab dem 20. September 2023 können Sie keine neuen Anomaliedetektorressourcen erstellen. Der Anomaliedetektordienst wird am 1. Oktober 2026 eingestellt.
Weitere Informationen zum Azure AI-Anomaliedetektor finden Sie in der Informationsressource "Anomaliedetektor ".
Voraussetzungen
- Azure-Abonnement: Kostenloses Azure-Konto
- Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
Setup
Beginnend mit einer vorhandenen Anomaly Detector Ressource können Sie Möglichkeiten zum Behandeln von Daten verschiedener Formulare untersuchen. Der Katalog der Dienste in Azure AI bietet mehrere Optionen:
Erstellen einer Anomalieerkennungsressource
- Wählen Sie im Azure-Portal in Ihrer Ressourcengruppe die Option Erstellen und geben Sie dann Anomalie-Detektor ein. Wählen Sie die Ressource Anomalie-Detektor.
- Benennen Sie die Ressource, und verwenden Sie idealerweise dieselbe Region wie die restliche Ressourcengruppe. Verwenden Sie für den Rest die Standardoptionen, und wählen Sie dann Überprüfen + Erstellen und anschließend Erstellen.
- Nachdem Sie die Anomaliedetektorressource erstellt haben, öffnen Sie sie, und wählen Sie den
Keys and EndpointsBereich im linken Navigationsbereich aus. Kopieren Sie den Schlüssel für die Anomaly Detector-Ressource in dieANOMALY_API_KEY-Umgebungsvariable, oder speichern Sie ihn in deranomalyKey-Variablen.
Erstellen einer Speicherkontoressource
Zum Speichern von Zwischendaten müssen Sie ein Azure Blob Storage-Konto erstellen. Erstellen Sie in diesem Speicherkonto einen Container zum Speichern der Zwischendaten. Notieren Sie sich den Containernamen, und kopieren Sie die Verbindungszeichenfolge in den betreffenden Container. Sie benötigen sie, um die containerName Variable und die BLOB_CONNECTION_STRING Umgebungsvariable später aufzufüllen.
Eingeben Ihrer Dienstschlüssel
Richten Sie zunächst die Umgebungsvariablen für unsere Dienstschlüssel ein. Die nächste Zelle legt die ANOMALY_API_KEY Und die BLOB_CONNECTION_STRING Umgebungsvariablen basierend auf den werten fest, die in unserem Azure Key Vault gespeichert sind. Wenn Sie dieses Lernprogramm in Ihrer eigenen Umgebung ausführen, müssen Sie diese Umgebungsvariablen festlegen, bevor Sie fortfahren:
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lesen Sie die Umgebungsvariablen ANOMALY_API_KEY und BLOB_CONNECTION_STRING, und legen Sie die Variablen containerName und location fest.
# An Anomaly Dectector subscription key
anomalyKey = find_secret("anomaly-api-key") # use your own anomaly api key
# Your storage account name
storageName = "anomalydetectiontest" # use your own storage account name
# A connection string to your blob storage account
storageKey = find_secret("madtest-storage-key") # use your own storage key
# A place to save intermediate MVAD results
intermediateSaveDir = (
"wasbs://madtest@anomalydetectiontest.blob.core.windows.net/intermediateData"
)
# The location of the anomaly detector resource that you created
location = "westus2"
Stellen Sie eine Verbindung mit unserem Speicherkonto her, damit der Anomaliedetektor Zwischenergebnisse in diesem Speicherkonto speichern kann.
spark.sparkContext._jsc.hadoopConfiguration().set(
f"fs.azure.account.key.{storageName}.blob.core.windows.net", storageKey
)
Importieren Sie alle erforderlichen Module:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import synapse.ml
from synapse.ml.cognitive import *
Lesen Sie die Beispieldaten in einem Spark DataFrame:
df = (
spark.read.format("csv")
.option("header", "true")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/MVAD/sample.csv")
)
df = (
df.withColumn("sensor_1", col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
)
# Let's inspect the dataframe:
df.show(5)
Wir können nun ein estimator Objekt erstellen, mit dem wir unser Modell trainieren. Geben Sie die Start- und Endzeiten für die Trainingsdaten an. Geben Sie außerdem die zu verwendenden Eingabespalten und den Namen der Spalte an, die die Zeitstempel enthält. Schließlich spezifizieren wir die Anzahl der Datenpunkte für das Schiebefenster der Anomalieerkennung und konfigurieren die Verbindungszeichenfolge für das Azure Blob Storage-Konto.
trainingStartTime = "2020-06-01T12:00:00Z"
trainingEndTime = "2020-07-02T17:55:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
estimator = (
FitMultivariateAnomaly()
.setSubscriptionKey(anomalyKey)
.setLocation(location)
.setStartTime(trainingStartTime)
.setEndTime(trainingEndTime)
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setSlidingWindow(200)
)
Passen wir die estimator an die Daten an:
model = estimator.fit(df)
Sobald die Schulung abgeschlossen ist, können wir das Modell für die Ableitung verwenden. Der Code in der nächsten Zelle gibt die Anfangs- und Endzeiten für die Daten an, in denen wir die Anomalien erkennen möchten:
inferenceStartTime = "2020-07-02T18:00:00Z"
inferenceEndTime = "2020-07-06T05:15:00Z"
result = (
model.setStartTime(inferenceStartTime)
.setEndTime(inferenceEndTime)
.setOutputCol("results")
.setErrorCol("errors")
.setInputCols(inputColumns)
.setTimestampCol(timestampColumn)
.transform(df)
)
result.show(5)
In der vorherigen Zelle .show(5) haben wir die ersten fünf Datenframezeilen gezeigt. Die Ergebnisse waren alle null , weil sie außerhalb des Rückschlussfensters landeten.
Um die Ergebnisse nur für die abgeleiteten Daten anzuzeigen, wählen Sie die erforderlichen Spalten aus. Anschließend können die Zeilen im Datenrahmen nach aufsteigender Reihenfolge sortiert und das Ergebnis so gefiltert werden, dass nur die Zeilen im Bereich des Rückschlussfensters angezeigt werden.
inferenceEndTime Hier entspricht der letzten Zeile im Datenframe, sodass sie ignoriert werden kann.
Um die Ergebnisse besser zu zeichnen, konvertieren Sie den Spark-Datenframe in einen Pandas-Datenframe:
rdf = (
result.select(
"timestamp",
*inputColumns,
"results.contributors",
"results.isAnomaly",
"results.severity"
)
.orderBy("timestamp", ascending=True)
.filter(col("timestamp") >= lit(inferenceStartTime))
.toPandas()
)
rdf
Formatieren Sie die contributors-Spalte, in der die Beitragsbewertung der einzelnen Sensoren für die erkannten Anomalien gespeichert wird. Die nächste Zelle behandelt dies und teilt die Beitragsbewertung jedes Sensors in eine eigene Spalte auf:
def parse(x):
if type(x) is list:
return dict([item[::-1] for item in x])
else:
return {"series_0": 0, "series_1": 0, "series_2": 0}
rdf["contributors"] = rdf["contributors"].apply(parse)
rdf = pd.concat(
[rdf.drop(["contributors"], axis=1), pd.json_normalize(rdf["contributors"])], axis=1
)
rdf
Die Beitragsbewertungen der Sensoren 1, 2 und 3 befinden sich jetzt in den Spalten series_0, series_1 und series_2.
Führen Sie die nächste Zelle aus, um die Ergebnisse darzustellen. Der minSeverity Parameter gibt den minimalen Schweregrad der zu zeichnenden Anomalien an:
minSeverity = 0.1
####### Main Figure #######
plt.figure(figsize=(23, 8))
plt.plot(
rdf["timestamp"],
rdf["sensor_1"],
color="tab:orange",
linestyle="solid",
linewidth=2,
label="sensor_1",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_2"],
color="tab:green",
linestyle="solid",
linewidth=2,
label="sensor_2",
)
plt.plot(
rdf["timestamp"],
rdf["sensor_3"],
color="tab:blue",
linestyle="solid",
linewidth=2,
label="sensor_3",
)
plt.grid(axis="y")
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.legend()
anoms = list(rdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color="r", alpha=0.8)
plt.legend()
plt.title(
"A plot of the values from the three sensors with the detected anomalies highlighted in red."
)
plt.show()
####### Severity Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.plot(
rdf["timestamp"],
rdf["severity"],
color="black",
linestyle="solid",
linewidth=2,
label="Severity score",
)
plt.plot(
rdf["timestamp"],
[minSeverity] * len(rdf["severity"]),
color="red",
linestyle="dotted",
linewidth=1,
label="minSeverity",
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("Severity of the detected anomalies")
plt.show()
####### Contributors Figure #######
plt.figure(figsize=(23, 1))
plt.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
plt.bar(
rdf["timestamp"], rdf["series_0"], width=2, color="tab:orange", label="sensor_1"
)
plt.bar(
rdf["timestamp"],
rdf["series_1"],
width=2,
color="tab:green",
label="sensor_2",
bottom=rdf["series_0"],
)
plt.bar(
rdf["timestamp"],
rdf["series_2"],
width=2,
color="tab:blue",
label="sensor_3",
bottom=rdf["series_0"] + rdf["series_1"],
)
plt.grid(axis="y")
plt.legend()
plt.ylim([0, 1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()
Die Diagramme zeigen die Rohdaten der Sensoren (innerhalb des Rückschlussfensters) in Orange, Grün und Blau an. Die roten vertikalen Linien in der ersten Abbildung veranschaulichen die erkannten Anomalien, deren Schweregrad größer oder gleich minSeverity ist.
Das zweite Diagramm veranschaulicht den Schweregrad aller erkannten Anomalien, wobei der minSeverity-Schwellenwert als gestrichelte rote Linie angezeigt wird.
Schließlich veranschaulicht das letzte Diagramm den Beitrag der Daten der einzelnen Sensoren zu den erkannten Anomalien. Dies ist hilfreich, um die wahrscheinlichste Ursache der einzelnen Anomalien zu diagnostizieren und zu verstehen.