Freigeben über


Klassifizierungsaufgaben mit SynapseML

In diesem Artikel wird gezeigt, wie Sie eine bestimmte Klassifizierungsaufgabe mit zwei Methoden ausführen. Eine Methode verwendet nur pysparkeinfach, und eine Methode verwendet die synapseml Bibliothek. Obwohl die Methoden dieselbe Leistung erzielen, heben sie die Einfachheit synapseml im Vergleich zu pyspark.

Die in diesem Artikel beschriebene Aufgabe prognostiziert, ob eine bestimmte Kundenbewertung des auf Amazon verkauften Buchs gut (Bewertung > 3) oder schlecht ist, basierend auf dem Rezensionstext. Um die Aufgabe zu erstellen, trainieren Sie LogistikRegression-Lernende mit verschiedenen Hyperparametern und wählen dann das beste Modell aus.

Voraussetzungen

Fügen Sie Ihr Notizbuch an ein Seehaus an. Auf der linken Seite können Sie "Hinzufügen" auswählen, um ein vorhandenes Seehaus hinzuzufügen, oder Sie können ein neues Seehaus erstellen.

Konfiguration

Importieren Sie die erforderlichen Python-Bibliotheken, und rufen Sie eine Spark-Sitzung ab:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lesen der Daten

Laden Sie die Daten herunter, und lesen Sie sie:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrahieren von Features und Prozessdaten

Echte Daten haben mehr Komplexität, verglichen mit dem Dataset, das wir zuvor heruntergeladen haben. Ein Dataset verfügt häufig über Features mehrerer Typen , z. B. Text, Numerisch und kategorisierend. Um die Schwierigkeiten beim Arbeiten mit diesen Datasets zu zeigen, fügen Sie dem Dataset zwei numerische Features hinzu: die Wortanzahl der Überprüfung und die mittlere Wortlänge:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassifizieren mithilfe von Pyspark

Um den besten LogisticsRegression-Klassifizierer mithilfe der pyspark Bibliothek auszuwählen, müssen Sie diese Schritte explizit ausführen:

  1. Verarbeiten der Features
    • Tokenisieren der Textspalte
    • Hashen der tokenisierten Spalte in einen Vektor mithilfe von Hashing
    • Zusammenführen der numerischen Features mit dem Vektor
  2. Um die Bezeichnungsspalte zu verarbeiten, wandeln Sie diese Spalte in den richtigen Typ um.
  3. Trainieren mehrerer LogisticsRegression-Algorithmen im train Dataset mit unterschiedlichen Hyperparametern
  4. Berechnen Sie den Bereich unter der ROC-Kurve für jedes der trainierten Modelle, und wählen Sie das Modell mit der höchsten Metrik aus, die im test Dataset berechnet wird.
  5. Bewerten des besten Modells mit dem validation-Satz
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassifizieren mithilfe von SynapseML

Die synapseml Option umfasst einfachere Schritte:

  1. Der Schätzer TrainClassifier wandelt die Daten intern in Features um, solange die in den Datasets train, test und validation ausgewählten Spalten die Features darstellen

  2. Der FindBestModel Estimator findet das beste Modell aus einem Pool trainierter Modelle. Dazu wird das Modell gefunden, das für das test Dataset mit der angegebenen Metrik am besten geeignet ist.

  3. Der ComputeModelStatistics Transformator berechnet die verschiedenen Metriken für ein bewertetes Dataset (in unserem Fall das validation Dataset) gleichzeitig

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)