Compartir a través de


Comparación de Spark Connect con Spark classic

Spark Connect es un protocolo basado en gRPC en Apache Spark que especifica cómo una aplicación cliente puede comunicarse con un servidor Spark remoto. Permite la ejecución remota de cargas de trabajo de Spark mediante la API DataFrame.

Spark Connect se usa en lo siguiente:

  • Cuadernos de Scala con Databricks Runtime versión 13.3 y posteriores, en cómputo estándar
  • En cómputo estándar, cuadernos de Python con Databricks Runtime versión 14.3 y posteriores
  • Proceso sin servidor
  • Conexión de Databricks

Aunque Spark Connect y Spark Classic usan la ejecución diferida para las transformaciones, hay diferencias importantes que se deben saber para evitar problemas inesperados de comportamiento y rendimiento al migrar código existente de Spark clásico a Spark Connect o al escribir código que debe funcionar con ambos.

Perezoso frente a ansioso

La diferencia clave entre Spark Connect y Spark Classic es que Spark Connect aplaza el análisis y la resolución de nombres al tiempo de ejecución, como se resume en la tabla siguiente.

Aspecto Spark Classic Spark Connect
Ejecución de la consulta Perezoso Perezoso
Análisis de esquemas Ansioso Perezoso
Acceso al esquema de base de datos Local Desencadenadores RPC
Vistas temporales Plan incrustado Búsqueda de nombres
Serialización de UDF Al crear En ejecución

Ejecución de la consulta

Spark Classic y Spark Connect siguen el mismo modelo de ejecución diferida para la ejecución de consultas.

En Spark Classic, las transformaciones DataFrame (como filter y limit) son perezosas. Esto significa que no se ejecutan inmediatamente, pero se registran en un plan lógico. El cálculo real solo se desencadena cuando se invoca una acción (como show(), collect()).

Spark Connect sigue un modelo de evaluación diferida similar. Las transformaciones se construyen en el lado cliente y se envían como planes proto sin resolver al servidor. A continuación, el servidor realiza el análisis y la ejecución necesarios cuando se llama a una acción.

Aspecto Spark Classic Spark Connect
Transformaciones: df.filter(...), , df.select(...)df.limit(...) Ejecución diferida Ejecución diferida
Consultas SQL: spark.sql("select …") Ejecución diferida Ejecución diferida
Acciones: df.collect(), df.show() Ejecución diligente Ejecución diligente
Comandos SQL: spark.sql("insert …"), spark.sql("create …") Ejecución diligente Ejecución diligente

Análisis de esquemas

Spark Classic realiza análisis de esquemas diligentemente durante la fase de construcción del plan lógico. Al definir transformaciones, Spark analiza inmediatamente el esquema de DataFrame para asegurarse de que todas las columnas y tipos de datos a los que se hace referencia son válidas. Por ejemplo, ejecutar spark.sql("select 1 as a, 2 as b").filter("c > 1") producirá un error inmediatamente, lo que indica que no se encuentra la columna c.

Spark Connect crea en su lugar planes proto sin resolver durante la transformación. Al acceder a un esquema o ejecutar una acción, el cliente envía los planes sin resolver al servidor a través de RPC (llamada a procedimiento remoto). A continuación, el servidor realiza el análisis y la ejecución. Este diseño aplaza el análisis de esquemas. Por ejemplo, spark.sql("select 1 as a, 2 as b").filter("c > 1") no producirá ningún error porque el plan sin resolver es solo del lado cliente, pero en df.columns o df.show() se producirá un error porque el plan sin resolver se envía al servidor para su análisis.

A diferencia de la ejecución de consultas, Spark Classic y Spark Connect difieren en cuando se produce el análisis de esquemas.

Aspecto Spark Classic Spark Connect
Transformaciones: df.filter(...), , df.select(...)df.limit(...) Ansioso Perezoso
Acceso de esquema: df.columns, df.schema, df.isStreaming Ansioso Ansioso
Desencadena una solicitud RPC de análisis, a diferencia de Spark Classic
Acciones: df.collect(), df.show() Ansioso Ansioso
Estado de sesión dependiente: UDF, vistas temporales, configuraciones Ansioso Perezoso
Evaluado durante la ejecución

procedimientos recomendados

La diferencia entre el análisis perezoso y el diligente implica que existen algunas prácticas recomendadas a seguir para evitar comportamientos inesperados y problemas de rendimiento, específicamente los causados por la sobrescritura de nombres de vistas temporales, la captura de variables externas en funciones definidas por el usuario (UDF), la detección de errores retrasada y el acceso excesivo a esquemas en nuevos DataFrames.

Crea nombres de vista temporales únicos

En Spark Connect, el DataFrame almacena solo una referencia a la vista temporal por su nombre. Como resultado, si la vista temporal se reemplaza más adelante, los datos del DataFrame también cambiarán ya que se busca la vista por nombre en tiempo de ejecución.

Este comportamiento difiere del clásico de Spark, donde el plan lógico de la vista temporal se inserta en el plan del marco de datos en el momento de la creación. Cualquier reemplazo posterior de la vista temporal no afecta al marco de datos original.

Para reducir la diferencia, cree siempre nombres únicos para las vistas temporales. Por ejemplo, incluya un UUID en el nombre de la vista. Esto evita que afecte a los dataframes existentes que hagan referencia a una vista temporal registrada anteriormente.

Pitón

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Envolver definiciones de UDF

En Spark Connect, las UDF de Python son diferidas. Su serialización y registro se aplazan hasta el tiempo de ejecución. En el ejemplo siguiente, la UDF solo se serializa y carga en el clúster de Spark para su ejecución cuando se llama a show().

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Este comportamiento difiere de Spark clásico, donde las UDF se crean con anticipación. En Spark Classic, el valor de x se captura en el momento de la creación de la UDF, por lo que los cambios posteriores en x no afectan a la UDF ya creada.

Si necesita modificar el valor de las variables externas de las que depende una UDF, use un generador de funciones (cierre con enlace anticipado) para capturar correctamente los valores de las variables. En concreto, encapsula la creación de UDF en una función auxiliar para capturar el valor de una variable dependiente.

Pitón

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

Al encapsular la definición de UDF dentro de otra función (make_udf), creamos un nuevo ámbito en el que el valor actual de x se pasa como argumento. Esto garantiza que cada UDF generada tenga su propia copia del campo, enlazada en el momento en que se crea la UDF.

Activar el análisis proactivo para la detección de errores

El siguiente control de errores es útil en Spark Classic porque realiza un análisis diligente, lo que permite que se produzcan excepciones rápidamente. Sin embargo, en Spark Connect, este código no causa ningún problema, ya que solo construye un plan proto sin resolver local sin desencadenar ningún análisis.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

Si el código se basa en la excepción de análisis y desea capturarla, puede desencadenar el análisis anticipado, por ejemplo, con df.columns, df.schema o df.collect().

Pitón

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Evitar demasiadas solicitudes de análisis ansiosas

El rendimiento se puede mejorar si evita un gran número de solicitudes de análisis evitando el uso excesivo de llamadas que desencadenan análisis diligente (como df.columns, df.schema).

Si no puede evitar esto y debe comprobar con frecuencia las columnas de nuevos marcos de datos, mantenga un conjunto para realizar un seguimiento de los nombres de columna para evitar solicitudes de análisis.

Pitón

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Otro caso similar es crear un gran número de dataframes intermedios innecesarios y analizarlos. En su lugar, obtenga StructType información de campo directamente del esquema de DataFrame en lugar de crear DataFrames intermedios.

Pitón

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)