Partilhar via


O streaming de tabela delta lê e grava

Esta página descreve como transmitir alterações de uma tabela Delta. O Delta Lake está profundamente integrado com o Spark Structured Streaming através readStream e writeStream. O Delta Lake supera muitas das limitações tipicamente associadas a sistemas de transmissão em fluxo e ficheiros, incluindo:

  • Coalescência de pequenos arquivos produzidos por ingestão de baixa latência.
  • Manter o processamento "exatamente uma vez" com mais de um fluxo (ou trabalhos em lote simultâneos).
  • Descobrir com eficiência quais arquivos são novos ao usar arquivos como fonte para um fluxo.

Note

Este artigo descreve o uso de tabelas Delta Lake como fontes e coletores de streaming. Para saber como carregar dados usando tabelas de streaming no Databricks SQL, consulte Usar tabelas de streaming no Databricks SQL.

Para obter informações sobre junções estáticas de fluxo com o Delta Lake, consulte Junções estáticas de fluxo.

Alterações de fluxo

Quando se trata de streaming de alterações de uma tabela Delta para processamento incremental, há duas opções a considerar:

  1. Efetuar streaming a partir de um feed de captura de alteração de dados (CDC) de uma tabela Delta.
  2. Fluxo da própria tabela Delta.

A opção 1 é a solução mais robusta e seu código define como você deseja processar diferentes tipos de eventos de alteração, incluindo inserções, atualizações e exclusões. A opção 2 é mais simples porque você não precisa escrever código para processar eventos de alteração. No entanto, a opção 2 só é recomendada quando a tabela Delta de origem é somente acréscimo. Quando há alterações (por exemplo, atualizações e exclusões) na tabela Delta de origem, o mecanismo de Streaming Estruturado lança uma exceção. Você pode lidar com essa exceção reprocessando todos os dados da tabela de origem ou configurando-a para ignorar as alterações na tabela de origem. Para obter mais detalhes, consulte Ignorar atualizações e exclusões.

O Databricks recomenda o streaming a partir do feed CDC de uma tabela Delta (opção 1) em vez da própria tabela Delta (opção 2) sempre que possível.

Opção 1: Transmitir a partir de um feed de captura de dados de alteração (CDC)

O feed de dados de alteração do Delta Lake registra as alterações em uma tabela Delta, incluindo atualizações e exclusões. Com esta função ativada, é possível transmitir de um feed de dados de alteração e escrever lógica para processar inserções, atualizações e exclusões em tabelas subsequentes. Embora a saída de dados do feed de dados de alteração difira ligeiramente da tabela Delta descrita, ainda assim permite propagar alterações incrementais para tabelas subsequentes em uma arquitetura medalhão.

Important

No Databricks Runtime 12.2 LTS e versões anteriores, não é possível fazer streaming do feed de dados de alteração para uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução do esquema não aditiva, como renomear ou remover colunas. Consulte Streaming com mapeamento de colunas e alterações de esquema.

Opção 2: Transmitir a partir de uma tabela Delta

O Streaming estruturado lê incrementalmente tabelas Delta. Enquanto uma consulta de streaming está ativa em relação a uma tabela Delta, novos registros são processados idempotentemente à medida que novas versões de tabela são confirmadas na tabela de origem.

Os exemplos de código a seguir mostram a configuração de uma leitura de streaming usando o nome da tabela ou o caminho do arquivo.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Important

Se o esquema de uma tabela Delta for alterado após uma leitura de transmissão em fluxo começar na tabela, a consulta falhará. Para a maioria das alterações de esquema, pode reiniciar o fluxo para resolver o erro de correspondência do esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e inferior, não é possível transmitir de uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução de esquema não aditiva, como renomear ou soltar colunas. Para obter detalhes, veja Transmissão em fluxo com mapeamento de colunas e alterações de esquema.

Limitar a taxa de entrada

As seguintes opções estão disponíveis para controlar microlotes:

  • maxFilesPerTrigger: Quantos novos arquivos devem ser considerados em cada microlote. A predefinição é 1000.
  • maxBytesPerTrigger: Quantidade de dados processados em cada microlote. Esta opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de streaming avançar nos casos em que a menor unidade de entrada é maior do que esse limite. Isso não é definido por padrão.

Se você usar maxBytesPerTrigger em conjunto com maxFilesPerTrigger, o micro-lote processará dados até que o limite de maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

Note

Nos casos em que as transações da tabela de origem são limpas devido à logRetentionDurationconfiguração e a consulta de streaming tenta processar essas versões, por padrão, a consulta não consegue evitar a perda de dados. Você pode definir a opção failOnDataLoss para false ignorar dados perdidos e continuar o processamento.

Ignorar atualizações e exclusões

Ao transmitir a partir de uma tabela Delta, o Streaming Estruturado não manipula entradas que não sejam um acréscimo e lança uma exceção se ocorrerem modificações na tabela que está sendo usada como fonte. Existem duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente a jusante:

  • Você pode excluir a saída e o ponto de verificação e reiniciar o fluxo desde o início.
  • Você pode definir uma das seguintes opções:
    • skipChangeCommits (recomendado): ignora transações que excluem ou modificam registros existentes. Esta opção subsume ignoreDeletes.
    • ignoreDeletes (legado): ignora transações que excluem dados nos limites da partição. Esta opção lida apenas com a eliminação total de partições.

Note

A Databricks recomenda o uso do skipChangeCommits.

No Databricks Runtime 12.2 LTS e superior, skipChangeCommits substitui a configuração ignoreChangesanterior. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges é a única opção suportada.

A semântica para ignoreChanges difere muito de skipChangeCommits. Com ignoreChanges ativado, os ficheiros de dados reescritos na tabela de origem são reemitidos após uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Muitas vezes, as linhas inalteradas são emitidas juntamente com novas linhas, pelo que os consumidores posteriores devem ser capazes de lidar com duplicados. As eliminações não são propagadas mais abaixo. ignoreChanges engloba ignoreDeletes.

skipChangeCommits ignora totalmente as operações de alteração de ficheiros. Os ficheiros de dados que são reescritos na tabela de origem devido a uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETEe OVERWRITE são ignorados na totalidade. Para refletir as alterações nas tabelas de origem a montante, tem de implementar lógicas separadas para propagar estas alterações.

As cargas de trabalho configuradas com ignoreChanges continuam a operar usando semântica conhecida, mas o Databricks recomenda o uso skipChangeCommits para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges para skipChangeCommits requer lógica de refatoração.

Example

Por exemplo, suponha que você tenha uma tabela user_events com date, user_emaile action colunas particionada por date. Você sai da user_events tabela e precisa excluir dados dela devido ao GDPR.

Quando você exclui nos limites da partição (ou seja, o WHERE está em uma coluna de partição), os arquivos já estão segmentados por valor, então a exclusão apenas descarta esses arquivos dos metadados. Ao excluir uma partição inteira de dados, você pode usar o seguinte:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Se você excluir dados em várias partições (neste exemplo, filtrando em user_email), use a seguinte sintaxe:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Se atualizar o user_email com a declaração UPDATE, o ficheiro que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

Databricks recomenda utilizar skipChangeCommits em vez de ignoreDeletes, a menos que tenha a certeza de que as eliminações são sempre eliminação total de partições.

Especificar posição inicial

Você pode usar as opções a seguir para especificar o ponto inicial da fonte de streaming Delta Lake sem processar a tabela inteira.

  • startingVersion: A versão do Delta Lake a partir da qual começar. O Databricks recomenda omitir essa opção para a maioria das cargas de trabalho. Quando não está configurado, o fluxo começa a partir da versão mais recente disponível, incluindo um instantâneo completo da tabela naquele momento e alterações futuras como dados de alterações.

    Caso especificado, o fluxo lê todas as alterações na tabela Delta começando pela versão especificada (inclusivamente). Se a versão especificada não estiver mais disponível, o fluxo não será iniciado. Você pode obter as versões de confirmação na coluna version da saída do comando DESCRIBE HISTORY.

    Para retornar apenas as alterações mais recentes, especifique latest.

  • startingTimestamp: O carimbo de data/hora a partir do qual começar. Todas as alterações de tabela confirmadas no ou após o carimbo de data/hora (inclusive) são lidas pelo leitor de streaming. Se o carimbo de data/hora fornecido preceder todas as confirmações da tabela, a leitura de streaming começará com o carimbo de data/hora mais antigo disponível. Um dos seguintes:

    • Uma cadeia de caracteres que representa um carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma cadeia de caracteres de data. Por exemplo, "2019-01-01".

Não é possível definir as duas opções ao mesmo tempo. Eles entram em vigor somente ao iniciar uma nova consulta de streaming. Se uma consulta de streaming tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas opções serão ignoradas.

Important

Embora tu possas iniciar a fonte de streaming a partir de uma versão especificada ou de um timestamp, o esquema da fonte de streaming é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou o timestamp. Caso contrário, a fonte de streaming pode retornar resultados incorretos ao ler os dados com um esquema incorreto.

Example

Por exemplo, suponha que você tenha uma tabela user_events. Se você quiser ler as alterações desde a versão 5, use:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Se quiser ler as alterações desde 2018-10-18, utilize:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Processar snapshot inicial sem perda de dados

Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.

Ao usar uma tabela Delta como fonte de fluxo, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta nesta versão é chamada de instantâneo inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.

Em uma consulta de streaming com monitoração de estado com uma marca d'água definida, o processamento de arquivos por tempo de modificação pode resultar em registros sendo processados na ordem errada. Isso pode levar a que os registos sejam classificados como eventos tardios pela marca temporal.

Você pode evitar o problema de queda de dados ativando a seguinte opção:

  • withEventTimeOrder: Se o instantâneo inicial deve ser processado com a ordem de hora do evento.

Com a ordem de tempo do evento habilitada, o intervalo de tempo do evento dos dados iniciais do instantâneo é dividido em intervalos de tempo. Cada microlote processa um bucket filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microlote, mas apenas de forma aproximada devido à natureza do processamento.

O gráfico abaixo mostra esse processo:

Instantâneo inicial

Informações notáveis sobre este recurso:

  • O problema de perda de dados só acontece quando o instantâneo Delta inicial de uma consulta de streaming com estado é processado na ordem padrão.
  • Não é possível alterar withEventTimeOrder depois que a consulta de fluxo é iniciada enquanto o instantâneo inicial ainda está sendo processado. Para reiniciar com withEventTimeOrder alterado, você precisa excluir o ponto de verificação.
  • Se estiver a executar uma consulta de fluxo com o withEventTimeOrder ativado, não pode fazer downgrade para uma versão Databricks Runtime que não suporta esta funcionalidade até que o processamento inicial do snapshot esteja concluído. Se precisar fazer downgrade, aguarde a conclusão do snapshot inicial ou exclua o ponto de verificação e reinicie a consulta.
  • Este recurso não é suportado nos seguintes cenários incomuns:
    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
  • Com a ordem de tempo do evento ativada, o desempenho do processamento inicial do snapshot Delta pode ser mais lento.
  • Cada microlote verifica o instantâneo inicial para filtrar dados dentro do intervalo de tempo de evento correspondente. Para uma ação de filtro mais rápida, é aconselhável usar uma coluna de origem Delta como o tempo do evento, para que o salto de dados possa ser aplicado (verifique Saída de dados para Delta Lake para saber quando é aplicável). Além disso, o particionamento de tabelas ao longo da coluna de tempo do evento pode acelerar ainda mais o processamento. Você pode verificar a interface do usuário do Spark para ver quantos arquivos delta são verificados para um microlote específico.

Example

Suponha que você tenha uma tabela user_events com uma event_time coluna. Sua consulta de streaming é uma consulta de agregação. Se quiser garantir que não haja queda de dados durante o processamento inicial do snapshot, você pode usar:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Note

Você também pode habilitar isso com a configuração do Spark no cluster, que se aplicará a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true

Mesa delta como lavatório

Você também pode gravar dados em uma tabela Delta usando o Structured Streaming. O registo de transações permite que o Delta Lake garanta processamento exatamente uma vez, mesmo quando há outros fluxos ou consultas em lote a serem executados simultaneamente na tabela.

Ao escrever em uma tabela Delta usando um coletor de Streaming Estruturado, você pode observar confirmações vazias com epochId = -1. Estes são esperados e normalmente ocorrem:

  • No primeiro lote de cada execução da consulta de streaming (isso acontece a cada lote para Trigger.AvailableNow).
  • Quando um esquema é alterado (como adicionar uma coluna).

Essas confirmações vazias não afetam a correção ou o desempenho da consulta de forma relevante. São intencionais e não indicam erro.

Note

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.

Metrics

Você pode descobrir o número de bytes e o número de arquivos ainda a serem processados em uma consulta de streaming através das métricas e numBytesOutstanding. As métricas adicionais incluem:

  • numNewListedFiles: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências para este lote.
    • backlogEndOffset: A versão da tabela usada para calcular a lista de pendências.

Se estiveres a executar o fluxo num caderno, poderás ver essas métricas na guia de dados brutos no painel de progresso da consulta em streaming:

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

Modo de acréscimo

Por padrão, os fluxos são executados no modo de acréscimo, que adiciona novos registros à tabela.

Use o toTable método ao transmitir para tabelas, como no exemplo a seguir:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modo completo

Você também pode usar o Streaming estruturado para substituir a tabela inteira por cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.

Para aplicativos com requisitos de latência mais brandos, você pode economizar recursos de computação com gatilhos únicos. Use-os para atualizar tabelas de agregação resumidas em um determinado cronograma, processando apenas novos dados que chegaram desde a última atualização.

Upsert de consultas em streaming usando foreachBatch

Você pode usar uma combinação de merge e foreachBatch para escrever upserts complexos a partir de uma consulta de streaming numa tabela Delta. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Esse padrão tem muitas aplicações, incluindo as seguintes:

  • Escrever agregados de streaming no Modo de Atualização: Isso é muito mais eficiente do que o Modo Completo.
  • Escrever um fluxo de alterações de base de dados numa tabela Delta: a consulta de mesclagem para escrever dados de alteração pode ser usada foreachBatch para aplicar de forma contínua um fluxo de alterações a uma tabela Delta.
  • Escrever um fluxo de dados na tabela Delta com deduplicação: A consulta de mesclagem para inserção única pode ser usada em foreachBatch para gravar dados continuamente (com duplicatas) numa tabela Delta com deduplicação automática.

Note

  • Certifique-se de que sua merge instrução dentro foreachBatch é idempotente, pois as reinicializações da consulta de streaming podem aplicar a operação no mesmo lote de dados várias vezes.
  • Quando merge é usado no foreachBatch, a taxa de dados de entrada da consulta de streaming (reportada através de StreamingQueryProgress e visível no gráfico de taxa do notebook) pode ser relatada como um múltiplo da taxa real à qual os dados são gerados na fonte. Isto acontece porque merge lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se isto for um estrangulamento, pode colocar o DataFrame de lote em cache antes de merge e, em seguida, desfazer a cache depois de merge.

O exemplo a seguir demonstra como você pode usar o SQL dentro foreachBatch para realizar essa tarefa:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Você também pode optar por usar as APIs do Delta Lake para executar upserts de streaming, como no exemplo a seguir:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Tabela idempotente escreve em foreachBatch

Note

O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você deseja atualizar em vez de usar foreachBatcho . Isso ocorre porque as gravações em várias tabelas são serializadas ao usar 'foreachBatch', o que reduz a paralelização e aumenta a latência geral.

As tabelas delta suportam as seguintes DataFrameWriter opções para fazer gravações em várias tabelas dentro foreachBatch do idempotent:

  • txnAppId: Uma cadeia de caracteres exclusiva que tu podes passar a cada gravação de DataFrame. Por exemplo, você pode usar a ID StreamingQuery como txnAppId.
  • txnVersion: Um número monotonicamente crescente que atua como versão de transação.

Delta Lake usa a combinação de txnAppId e txnVersion para identificar gravações duplicadas e ignorá-las.

Se uma gravação em lote for interrompida com uma falha, a nova execução do lote usará o mesmo aplicativo e ID de lote para ajudar o tempo de execução a identificar corretamente gravações duplicadas e ignorá-las. O ID do aplicativo (txnAppId) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionado ao ID do fluxo. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Warning

Caso exclua o ponto de verificação de streaming e reinicie a consulta com um novo ponto de verificação, deverá fornecer um txnAppId. Novos pontos de verificação começam com um ID de lote de 0. O Delta Lake usa o ID do lote e txnAppId como uma chave exclusiva, e ignora os lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}