Partilhar via


Use foreachBatch para gravar em coletores de dados arbitrários

Este artigo discute o uso foreachBatch com Streaming estruturado para gravar a saída de uma consulta de streaming em fontes de dados que não têm um coletor de streaming existente.

O padrão streamingDF.writeStream.foreachBatch(...) de código permite que você aplique funções de lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch têm dois parâmetros:

  • Um DataFrame que tem os dados de saída de um microlote.
  • A ID exclusiva do microlote.

Você deve usar foreachBatch para operações de mesclagem Delta Lake no Structured Streaming. Consulte Upsert de consultas de streaming usando foreachBatch.

Aplicar operações adicionais do DataFrame

Muitas operações DataFrame e Dataset não são suportadas no streaming de DataFrames porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch() você pode aplicar algumas dessas operações em cada saída de microlote. Por exemplo, você pode usar foreachBatch() e a operação SQL MERGE INTO para gravar a saída de agregações de streaming em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.

Importante

  • foreachBatch() fornece apenas garantias de escrita pelo menos uma vez. No entanto, você pode usar o batchId fornecido para a função como forma de desduplicar a saída e obter uma garantia exata uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta.
  • foreachBatch()não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução em microlote de uma consulta de streaming. Se você gravar dados no modo contínuo, use foreach() em vez disso.
  • Ao usar foreachBatch com um operador com estado, é importante consumir completamente cada lote antes de concluir o processamento. Consulte Consumir completamente cada DataFrame em lote

Um dataframe vazio pode ser invocado e foreachBatch() o código do usuário precisa ser resiliente para permitir a operação adequada. Pode ver um exemplo aqui:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Alterações de comportamento para foreachBatch no Databricks Runtime 14.0

No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:

  • print() comandos gravam a saída nos logs do driver.
  • Não é possível acessar o dbutils.widgets submódulo dentro da função.
  • Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.

Reutilizar fontes de dados em lote existentes

Usando foreachBatch()o , você pode usar gravadores de dados em lote existentes para coletores de dados que podem não ter suporte a Streaming Estruturado. Eis alguns exemplos:

Muitas outras fontes de dados em lote podem ser usadas a partir de foreachBatch(). Consulte Conectar-se a fontes de dados e serviços externos.

Gravar em vários locais

Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomenda o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.

O uso foreachBatch para gravar em vários coletores serializa a execução de gravações de streaming, o que pode aumentar a latência para cada microlote.

Se utilizar foreachBatch para gravar em várias tabelas Delta, consulte Gravações idempotentes em tabela em foreachBatch.

Consuma completamente cada DataFrame em lote

Quando estiver a utilizar operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração em lote deve consumir todo o DataFrame ou reiniciar a consulta. Se você não consumir o DataFrame inteiro, a consulta de streaming falhará com o próximo lote.

Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.

Utilização intencional de um subconjunto do lote

Se você se preocupa apenas com um subconjunto do lote, você pode ter um código como o seguinte.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Neste caso, o batch_df.show(2) apenas manipula os dois primeiros itens do lote, o que é esperado, mas, se houver mais itens, estes devem ser consumidos. O código a seguir consome o DataFrame completo.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Aqui, a do_nothing função ignora silenciosamente o resto do DataFrame.

Tratamento de um erro num lote

Pode haver um erro ao executar um foreachBatch processo. Você pode ter um código como o seguinte (neste caso, o exemplo intencionalmente gera um erro para mostrar o problema).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Ao manusear (e engolir silenciosamente) o erro, o resto do lote não pode ser consumido. Existem duas opções para lidar com esta situação.

Primeiro, você pode gerar novamente o erro, que o passa para a camada de orquestração para tentar novamente o lote. Isso pode resolver o erro, se for um problema transitório, ou sinalizá-lo para a sua equipa de operações tentar corrigir manualmente. Para fazer isso, altere o partial_func código para ter esta aparência:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

A segunda opção, se quiser capturar a exceção e ignorar o resto do conjunto, é alterar o código desta forma.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Esse código usa a do_nothing função para ignorar silenciosamente o resto do lote.