Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo discute o uso de foreachBatch com Structured Streaming para gravar a saída de uma consulta de streaming nas fontes de dados que não têm um coletor de streaming existente.
O padrão do código streamingDF.writeStream.foreachBatch(...) permite que você aplique funções nos lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch usam dois parâmetros:
- Um DataFrame que tem os dados de saída de um microlote.
- A ID exclusiva do microlote.
Você deve usar foreachBatch para operações de mesclagem do Delta Lake no Streaming Estruturado. Consulte Executar upsert de streaming de consultas usando foreachBatch.
Aplicar operações adicionais de DataFrame
Muitas operações de DataFrame e Dataset não têm suporte em DataFrames de streaming porque o Spark não oferece suporte à geração de planos incrementais nesses casos. Usando foreachBatch(), você pode aplicar algumas dessas operações em cada saída de micro lote. Por exemplo, você pode usar foreachBatch() e a operação SQL MERGE INTO para gravar a saída de agregações de streaming em uma tabela Delta no modo Atualização. Veja mais detalhes em MERGE INTO.
Importante
-
foreachBatch()fornece apenas garantias de gravação pelo menos uma vez. No entanto, você pode usar obatchIdfornecido para a função como uma forma de eliminar a duplicação da saída e obter uma garantia exatamente uma vez. Em ambos os casos, você terá que usar a semântica de ponta a ponta por conta própria. -
foreachBatch()não funciona com o modo de processamento contínuo, pois ele depende fundamentalmente da execução de uma consulta de streaming no microlote. Se você gravar dados no modo contínuo, useforeach()em seu lugar. - Ao usar
foreachBatchcom um operador com estado, é importante consumir completamente cada lote antes que o processamento seja concluído. Ver Consumir integralmente cada DataFrame em lote
Um dataframe vazio pode ser invocado com foreachBatch() e o código do usuário precisa ser resiliente para permitir a operação adequada. Um exemplo é mostrado aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Mudanças de comportamento para foreachBatch no Databricks Runtime 14.0
No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso padrão, as seguintes alterações de comportamento se aplicam:
- Os comandos
print()gravam a saída nos logs do driver. - Você não pode acessar o submódulo
dbutils.widgetsdentro da função. - Quaisquer arquivos, módulos ou objetos referenciados na função devem ser serializáveis e disponíveis no Spark.
Reutilizar fontes de dados do lote existentes
Usando foreachBatch(), você pode usar os gravadores de dados nos lotes existentes nos coletores de dados que podem não ter suporte do Streaming Estruturado. Veja alguns exemplos:
Muitas outras fontes de dados em lotes podem ser usadas em foreachBatch(). Consulte Conectar-se a fontes de dados e serviços externos.
Gravar em vários locais
Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomendará o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.
Usar foreachBatch para gravar em vários coletores serializa a execução das gravações de streaming, o que pode aumentar a latência para cada microlote.
Se você usar foreachBatch para gravar em várias tabelas Delta, consulte Gravações de tabela idempotentes em foreachBatch.
Consumir completamente cada DataFrame em lote
Quando você estiver usando operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração em lote deve consumir todo o DataFrame ou reiniciar a consulta. Se você não consumir todo o DataFrame, a consulta de streaming falhará com o próximo lote.
Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.
Usar intencionalmente um subconjunto do lote
Se você se importar apenas com um subconjunto do lote, poderá ter um código como o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Nesse caso, apenas o batch_df.show(2) manipula os dois primeiros itens no lote, o que é esperado, mas se houver mais itens, eles precisam ser consumidos. O código a seguir consome o DataFrame completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Aqui, a do_nothing função ignora silenciosamente o restante do DataFrame.
Tratando um erro em um lote
Pode haver um erro durante a execução de um foreachBatch processo. Você pode ter um código como o seguinte (nesse caso, o exemplo gera intencionalmente um erro para mostrar o problema).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Ao lidar com (e assimilar silenciosamente) o erro, o restante do lote pode não ser consumido. Há duas opções para lidar com essa situação.
Primeiro, você poderá gerar novamente o erro, passando-o para a camada de orquestração para tentar novamente o lote. Isso pode resolver o erro, se for um problema transitório, ou acioná-lo para que a equipe de operações tente corrigir manualmente. Para fazer isso, altere o partial_func código para ter esta aparência:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
A segunda opção, se você quiser capturar a exceção e ignorar o restante do lote, é alterar o código para o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Esse código usa a do_nothing função para ignorar silenciosamente o restante do lote.