Compartilhar via


Comparar o Spark Connect com o Spark Classic

O Spark Connect é um protocolo baseado em gRPC no Apache Spark que especifica como um aplicativo cliente pode se comunicar com um Servidor Spark remoto. Ele permite a execução remota de cargas de trabalho do Spark usando a API DataFrame.

O Spark Connect é usado no seguinte:

  • Notebooks Scala com Databricks Runtime versão 13.3 e superior, na computação padrão
  • Notebooks Python com o Databricks Runtime versão 14.3 ou superior, na computação padrão
  • Computação sem servidor
  • Conexão de Databricks

Embora o Spark Connect e o Spark Classic utilizem a execução lenta para transformações, há diferenças importantes para saber para evitar problemas inesperados de comportamento e desempenho ao migrar o código existente do Spark Classic para o Spark Connect ou ao escrever código que deve funcionar com ambos.

Preguiçoso vs ávido

A principal diferença entre o Spark Connect e o Spark Classic é que o Spark Connect adia a análise e a resolução de nomes para o tempo de execução, conforme resumido na tabela a seguir.

Aspecto Spark Classic Spark Connect
Execução da consulta Preguiçoso Preguiçoso
Análise de esquema Ansioso Preguiçoso
Acesso ao esquema Local Aciona RPC
Visões temporárias Planejar inserido Pesquisa de nome
Serialização de UDF Na criação Em execução

Execução da consulta

O Spark Classic e o Spark Connect seguem o mesmo modelo de execução lenta para execução de consulta.

No Spark Classic, as transformações de DataFrame (como filter e limit) são lentas. Isso significa que eles não são executados imediatamente, mas são registrados em um plano lógico. A computação real é disparada somente quando uma ação (como show(), collect()) é invocada.

O Spark Connect segue um modelo de avaliação lento semelhante. As transformações são construídas no lado do cliente e enviadas como planos proto não resolvidos para o servidor. Em seguida, o servidor executa a análise e a execução necessárias quando uma ação é chamada.

Aspecto Spark Classic Spark Connect
Transformações: df.filter(...), , df.select(...)df.limit(...) Execução lenta Execução lenta
Consultas SQL: spark.sql("select …") Execução lenta Execução lenta
Ações: df.collect(), df.show() Execução imediata Execução imediata
Comandos SQL: spark.sql("insert …"), spark.sql("create …") Execução imediata Execução imediata

Análise de esquema

O Spark Classic executa a análise de esquema ansiosamente durante a fase de construção do plano lógico. Quando você define transformações, o Spark analisa imediatamente o esquema do DataFrame para garantir que todas as colunas e tipos de dados referenciados sejam válidos. Por exemplo, executar spark.sql("select 1 as a, 2 as b").filter("c > 1") gerará imediatamente um erro, indicando que a coluna c não foi encontrada.

Em vez disso, o Spark Connect constrói planos de proto não resolvidos durante a transformação. Ao acessar um esquema ou executar uma ação, o cliente envia os planos não resolvidos para o servidor por meio de RPC (chamada de procedimento remoto). Em seguida, o servidor executa a análise e a execução. Esse design adia a análise de esquema. Por exemplo, spark.sql("select 1 as a, 2 as b").filter("c > 1") não gerará nenhum erro porque o plano não resolvido é somente do lado do cliente, mas, em df.columns ou df.show(), um erro será gerado porque o plano não resolvido é enviado ao servidor para análise.

Ao contrário da execução da consulta, o Spark Classic e o Spark Connect diferem quando ocorre a análise de esquema.

Aspecto Spark Classic Spark Connect
Transformações: df.filter(...), , df.select(...)df.limit(...) Ansioso Preguiçoso
Acesso ao esquema: df.columns, df.schema, df.isStreaming Ansioso Ansioso
Dispara uma solicitação RPC de análise, diferente do comportamento observado no Spark Classic.
Ações: df.collect(), df.show() Ansioso Ansioso
Estado de sessão dependente: UDFs, exibições temporárias, configurações Ansioso Preguiçoso
Avaliado durante a execução

Práticas recomendadas

A diferença entre análise preguiçosa e rápida significa que há algumas práticas recomendadas a seguir para evitar problemas inesperados de comportamento e desempenho, especificamente aqueles causados pela sobrescrita de nomes de exibição temporários, captura de variáveis externas em funções definidas pelo usuário (UDFs), detecção de erros atrasada e acesso excessivo ao esquema em novos DataFrames.

Criar nomes de exibição temporários exclusivos

No Spark Connect, o DataFrame armazena apenas uma referência à exibição temporária por nome. Como resultado, se a visualização temporária for substituída posteriormente, os dados no DataFrame também serão alterados porque o DataFrame procurará a visualização pelo nome no momento da execução.

Esse comportamento difere do Spark Classic, em que o plano lógico da exibição temporária é inserido no plano do quadro de dados no momento da criação. Qualquer substituição subsequente da exibição temporária não afeta o quadro de dados original.

Para atenuar a diferença, sempre crie nomes de exibição temporários exclusivos. Por exemplo, inclua uma UUID no nome da exibição. Isso evita afetar os DataFrames existentes que fazem referência a uma exibição temporária registrada anteriormente.

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Encapsular as definições de UDF

No Spark Connect, as UDFs do Python são lentas. Sua serialização e registro são adiados até o tempo de execução. No exemplo a seguir, a UDF só é serializada e carregada no cluster Spark para execução quando show() é chamada.

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Esse comportamento difere do Spark Classic, em que as UDFs são criadas ansiosamente. No Spark Classic, o valor de x no momento da criação do UDF é capturado, então as alterações subsequentes em x não afetam o UDF já criado.

Se você precisar modificar o valor de variáveis externas das quais um UDF depende, use uma fábrica de funções (fechamento com associação antecipada) para capturar corretamente os valores de variável. Especificamente, embrulhe a criação da UDF em uma função auxiliar para capturar o valor de uma variável dependente.

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

Ao encapsular a definição de UDF dentro de outra função (make_udf), criamos um novo escopo no qual o valor x atual é passado como um argumento. Isso garante que cada UDF gerada tenha sua própria cópia do campo, associada no momento em que a UDF é criada.

Iniciar análise antecipada para detecção de erros

O tratamento de erros a seguir é útil no Spark Classic porque ele executa uma análise ansiosa, o que permite que exceções sejam lançadas prontamente. No entanto, no Spark Connect, esse código não causa nenhum problema, pois ele só constrói um plano proto local não resolvido sem disparar nenhuma análise.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

Se seu código depende da exceção de análise e você quiser capturá-la, é possível iniciar a análise antecipada, por exemplo, com df.columns, df.schema ou df.collect().

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Evitar muitas solicitações de análise ansiosas

O desempenho poderá ser melhorado se você evitar um grande número de solicitações de análise evitando o uso excessivo de chamadas que disparam análises ansiosas (como df.columns, df.schema).

Se você não puder evitar isso e precisar verificar com frequência colunas de novos quadros de dados, mantenha um conjunto para acompanhar nomes de colunas para evitar solicitações de análise.

Python

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Outro caso semelhante é criar um grande número de DataFrames intermediários desnecessários e analisá-los. Em vez disso, obtenha StructType informações de campo diretamente do esquema do DataFrame em vez de criar DataFrames intermediários.

Python

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)