Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel wird gezeigt, wie Sie eine bestimmte Klassifizierungsaufgabe mit zwei Methoden ausführen. Eine Methode verwendet nur pysparkeinfach, und eine Methode verwendet die synapseml Bibliothek. Obwohl die Methoden dieselbe Leistung erzielen, heben sie die Einfachheit synapseml im Vergleich zu pyspark.
Die in diesem Artikel beschriebene Aufgabe prognostiziert, ob eine bestimmte Kundenbewertung des auf Amazon verkauften Buchs gut (Bewertung > 3) oder schlecht ist, basierend auf dem Rezensionstext. Um die Aufgabe zu erstellen, trainieren Sie LogistikRegression-Lernende mit verschiedenen Hyperparametern und wählen dann das beste Modell aus.
Voraussetzungen
Fügen Sie Ihr Notizbuch an ein Seehaus an. Auf der linken Seite können Sie "Hinzufügen" auswählen, um ein vorhandenes Seehaus hinzuzufügen, oder Sie können ein neues Seehaus erstellen.
Konfiguration
Importieren Sie die erforderlichen Python-Bibliotheken, und rufen Sie eine Spark-Sitzung ab:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lesen der Daten
Laden Sie die Daten herunter, und lesen Sie sie:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extrahieren von Features und Prozessdaten
Echte Daten haben mehr Komplexität, verglichen mit dem Dataset, das wir zuvor heruntergeladen haben. Ein Dataset verfügt häufig über Features mehrerer Typen , z. B. Text, Numerisch und kategorisierend. Um die Schwierigkeiten beim Arbeiten mit diesen Datasets zu zeigen, fügen Sie dem Dataset zwei numerische Features hinzu: die Wortanzahl der Überprüfung und die mittlere Wortlänge:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Klassifizieren mithilfe von Pyspark
Um den besten LogisticsRegression-Klassifizierer mithilfe der pyspark Bibliothek auszuwählen, müssen Sie diese Schritte explizit ausführen:
- Verarbeiten der Features
- Tokenisieren der Textspalte
- Hashen der tokenisierten Spalte in einen Vektor mithilfe von Hashing
- Zusammenführen der numerischen Features mit dem Vektor
- Um die Bezeichnungsspalte zu verarbeiten, wandeln Sie diese Spalte in den richtigen Typ um.
- Trainieren mehrerer LogisticsRegression-Algorithmen im
trainDataset mit unterschiedlichen Hyperparametern - Berechnen Sie den Bereich unter der ROC-Kurve für jedes der trainierten Modelle, und wählen Sie das Modell mit der höchsten Metrik aus, die im
testDataset berechnet wird. - Bewerten des besten Modells mit dem
validation-Satz
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Klassifizieren mithilfe von SynapseML
Die synapseml Option umfasst einfachere Schritte:
Der Schätzer
TrainClassifierwandelt die Daten intern in Features um, solange die in den Datasetstrain,testundvalidationausgewählten Spalten die Features darstellenDer
FindBestModelEstimator findet das beste Modell aus einem Pool trainierter Modelle. Dazu wird das Modell gefunden, das für dastestDataset mit der angegebenen Metrik am besten geeignet ist.Der
ComputeModelStatisticsTransformator berechnet die verschiedenen Metriken für ein bewertetes Dataset (in unserem Fall dasvalidationDataset) gleichzeitig
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)