Partilhar via


Aplicar marcas d'água para controlar os limites de processamento de dados

Este artigo apresenta os conceitos básicos de marca d'água e fornece recomendações para o uso de marcas d'água em operações comuns de streaming com monitoração de estado. Você deve aplicar marcas d'água a operações de streaming com monitoração de estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que poderia introduzir problemas de memória e aumentar as latências de processamento durante operações de streaming de longa duração.

O que é uma marca de água?

O Streaming Estruturado usa marcas d'água para controlar o limite por quanto tempo continuar processando atualizações para uma determinada entidade de estado. Exemplos comuns de entidades estatais incluem:

  • Agregações ao longo de uma janela temporal.
  • Chaves únicas numa união entre dois fluxos.

Quando declara uma marca temporal, especifica um campo de timestamp e um limiar de marca temporal em um DataFrame de fluxo contínuo. À medida que novos dados chegam, o gerente de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registros dentro do limite de atraso.

O exemplo seguinte aplica um limiar de marca de água de 10 minutos a uma contagem em janelas:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Neste exemplo:

  • A coluna event_time é utilizada para definir uma marca de água de 10 minutos e uma janela deslizante de 5 minutos.
  • Uma contagem é coletada para cada id janela de 5 minutos não sobreposta.
  • A informação do estado é mantida para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado event_time.

Importante

Os limiares de marca de água garantem que os registos que chegam dentro do limiar especificado são processados de acordo com a semântica da consulta definida. Os registros que chegam tardiamente fora do limite especificado ainda podem ser processados usando métricas de consulta, mas isso não é garantido.

Como as marcas d'água afetam o tempo de processamento e o rendimento?

As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz de marcas d'água é essencial para uma taxa de transferência de streaming com monitoração de estado eficiente.

Nota

Nem todos os modos de saída são suportados para todas as operações com monitoração de estado.

Marcas d'água e modo de saída para agregações em janela

Os detalhes da tabela a seguir explicam o processamento para consultas com agregação em uma marca temporal com uma marca de água definida.

Modo de saída Comportamento
Acrescentar As linhas são escritas na tabela alvo assim que o limiar da marca de água for ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado assim que o limite é ultrapassado.
Atualização As linhas são escritas na tabela alvo à medida que os resultados são calculados, podendo ser atualizadas e sobrescrevidas à medida que chegam novos dados. O estado de agregação antigo é descartado assim que o limite é ultrapassado.
Concluído O estado de agregação não é descartado. A tabela de destino é reescrita a cada gatilho.

Marcas d'água e saída para junções stream-stream

As junções entre vários fluxos suportam apenas o modo de acréscimo, e os registros correspondentes são gravados em cada lote em que são descobertos. Para junções internas, o Databricks recomenda definir um limiar de watermark em cada fonte de dados em streaming. Isso permite que as informações de estado sejam descartadas para registros antigos. Sem watermarks, o Structured Streaming tenta juntar todas as chaves de ambos os lados da união com cada trigger.

O Streaming Estruturado tem semântica especial para suportar junções externas. A marca d'água é obrigatória para junções externas, pois indica quando uma chave deve ser escrita com um valor nulo depois de ficar incomparável. É importante notar que, embora as ligações externas possam ser úteis para registar registos que nunca são correspondidos durante o processamento de dados, porque as ligações só escrevem em tabelas como operações de adição, estes dados em falta só são registados após o limite de atraso ter sido ultrapassado.

Controla o limite de dados atrasados com uma política de múltiplos watermarks no Structured Streaming

Ao trabalhar com múltiplas entradas de Streaming Estruturado, pode definir múltiplas marcas de água para controlar os limiares de tolerância para dados que chegam tarde. A configuração de marcas d'água permite controlar as informações de estado e a latência de impactos.

Uma consulta de streaming pode ter vários fluxos de entrada que são unidos ou unidos. Cada um dos fluxos de entrada pode ter um limite diferente de dados atrasados que precisa ser tolerado para operações com monitoração de estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada um dos fluxos de entrada. Segue-se um exemplo de consulta com junções stream-stream.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Durante a execução da consulta, o Structured Streaming acompanha individualmente o tempo máximo de evento observado em cada fluxo de entrada, calcula watermarks com base no atraso correspondente e escolhe um único watermark global para ser usado em operações com estado. Por padrão, o mínimo é escolhido como ponto de referência global porque garante que nenhum dado é acidentalmente descartado como demasiado tarde se um dos fluxos se atrasar em relação aos outros (por exemplo, quando um dos fluxos deixa de receber dados devido a falhas no upstream). Ou seja, a marca d'água global move-se com segurança ao ritmo do fluxo de dados mais lento e a saída da consulta é atrasada em conformidade.

Se quiser obter resultados mais rápidos, pode definir a política de várias marcas de água para escolher o valor máximo como marca global, definindo a configuração SQL spark.sql.streaming.multipleWatermarkPolicy para max (o padrão é min). Isto permite que o indicador global de progresso avance ao ritmo da corrente mais rápida. No entanto, essa configuração descarta dados dos fluxos mais lentos. Por isso, a Databricks recomenda que você use essa configuração criteriosamente.

Colocar duplicados dentro da marca de água

No Databricks Runtime 13.3 LTS e superiores, pode desduplicar registos dentro de um limiar de marca de água usando um identificador único.

O Streaming Estruturado fornece garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros de fontes de dados. Pode usar dropDuplicatesWithinWatermark para desduplicar registos em qualquer campo especificado, permitindo-lhe remover duplicados de um fluxo mesmo que alguns campos diferem (como hora do evento ou hora de chegada).

Registos duplicados que chegam dentro do limite de tempo especificado são garantidamente eliminados. Essa garantia é rigorosa em apenas uma direção, e registros duplicados que chegam fora do limite especificado também podem ser descartados. Deve definir o limiar de atraso da marca temporal maior do que as diferenças máximas de carimbo de tempo entre eventos duplicados para remover todos os duplicados.

É necessário especificar uma marca de água para usar o método dropDuplicatesWithinWatermark, como no seguinte exemplo:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])