Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo discute a seleção de um modo de saída para streaming com monitoração de estado. Somente fluxos com monitoração de estado contendo agregações exigem uma configuração de modo de saída.
As junções suportam apenas o modo de saída de acréscimo e o modo de saída não afeta a desduplicação. Os operadores mapGroupsWithState com estado arbitrário e flatMapGroupsWithState emitem registros usando sua própria lógica personalizada, para que o modo de saída do fluxo não afete seu comportamento.
Para streaming sem monitoração de estado, todos os modos de saída se comportam da mesma forma.
Para configurar o modo de saída corretamente, você deve entender o streaming com monitoração de estado, marcas d'água e gatilhos. Consulte os seguintes artigos:
- O que é streaming stateful?
- Aplicar marcas d'água para controlar os limites de processamento de dados
- Configurar intervalos de gatilho do Streaming Estruturado
O que é o modo de saída?
O modo de saída de uma consulta de Streaming Estruturado determina quais registos os operadores da consulta emitem durante cada gatilho. Os três tipos de registos que podem ser emitidos são:
- Registra que o processamento futuro não é alterado.
- Os registros que foram alterados desde o último gatilho.
- Todos os registros na tabela de estados.
Saber quais tipos de registros emitir é importante para operadores com monitoração de estado, porque uma linha específica produzida por um operador com estado pode mudar de gatilho para gatilho. Por exemplo, à medida que um operador de agregação de streaming recebe mais linhas para uma janela específica, os valores de agregação dessa janela podem mudar entre gatilhos.
Para operadores sem monitoração de estado, a distinção entre tipos de registro não afeta o comportamento do operador. Os registros que um operador sem estado emite durante um gatilho são sempre os registros de origem processados durante esse disparador.
Modos de saída disponíveis
Há três modos de saída que informam a um operador quais registros emitir durante um determinado gatilho:
| Modo de saída | Descrição |
|---|---|
| Modo de acréscimo (padrão) | Por padrão, as consultas de streaming são executadas no modo de acréscimo. Nesse modo, os operadores emitem apenas linhas que permanecem inalteradas em gatilhos futuros. Os operadores com estado usam a marca d'água para determinar quando isso acontece. |
| Modo de atualização | No modo de atualização, os operadores emitem todas as linhas que foram alteradas durante o gatilho, mesmo que o registro emitido possa mudar em um gatilho subsequente. |
| Modo completo | O modo completo só funciona com agregações de streaming. No modo completo, todas as linhas resultantes já produzidas pelo operador são emitidas a jusante. |
Considerações sobre a produção
Para muitas operações de streaming com monitoração de estado, você deve escolher entre os modos de acréscimo e atualização. As seções a seguir descrevem considerações que podem informar sua decisão.
Nota
O modo completo tem algumas aplicações, mas pode ter um desempenho fraco à medida que os dados são dimensionados. O Databricks recomenda o uso de vistas materializadas para obter garantias semânticas associadas ao modo completo com processamento incremental para muitas operações com estado. Ver Vistas materializadas.
Semântica da aplicação
A semântica do aplicativo descreve como os aplicativos downstream usam os dados de streaming.
Se os serviços downstream precisarem executar uma única ação para cada gravação downstream, use o modo de acréscimo na maioria dos casos. Por exemplo, se você tiver um serviço de notificação downstream enviando notificações para cada novo registro gravado no coletor, o modo de acréscimo garante que cada registro seja gravado apenas uma vez. O modo de atualização grava o registro sempre que as informações de estado são alteradas, o que resultaria em várias atualizações.
Se os serviços downstream precisarem de novos resultados, o modo de atualização garante que o coletor permaneça up-too mais atualizado possível. Os exemplos incluem um modelo de aprendizado de máquina que lê recursos em tempo real ou um painel de análise que rastreia agregações em tempo real.
Compatibilidade do operador e do lavatório
O Streaming Estruturado não suporta todas as operações disponíveis no Apache Spark e algumas operações de streaming não são suportadas em todos os modos de saída. Para obter mais informações sobre as limitações do operador, consulte os documentos de streaming do OSS.
Nem todos os coletores suportam todos os modos de saída. Tanto o Delta Lake, que suporta todas as tabelas gerenciadas do Unity Catalog, quanto o Kafka suportam todos os modos de saída. Para obter mais informações sobre a compatibilidade do coletor, consulte os documentos de streaming do OSS.
Latência e custo
O modo de saída afeta quanto tempo deve decorrer antes de gravar um registro, e a frequência e a quantidade de dados gravados podem afetar os custos associados aos pipelines de streaming.
O modo de acréscimo força os operadores com estado a emitirem resultados somente depois que os resultados com estado forem finalizados, o que é pelo menos tão longo quanto o atraso da marca d'água. Um atraso de marca d'água de 1 hour no modo de saída de acréscimo significa que seus registros têm pelo menos um atraso de 1 hora antes de serem emitidos a jusante.
O modo de atualização resulta em uma gravação por gatilho por valor agregado. Se o coletor cobrar por gravação por registro, isso pode ser caro se os registros forem atualizados muitas vezes antes que o atraso da marca d'água passe.
Exemplos de configuração
Os exemplos de código a seguir mostram a configuração do modo de saída para streaming de atualizações para tabelas do Unity Catalog:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
linguagem de programação Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
Consulte os documentos OSS para PySpark DataStreamWriter.outputMode ou Scala DataStreamWriter.outputMode.
Exemplo de modos de streaming e saída com monitoração de estado
O exemplo a seguir destina-se a ajudá-lo a raciocinar sobre como o modo de saída interage com marcas d'água para streaming com monitoração de estado.
Considere uma agregação de streaming que calcule a receita total gerada a cada hora em uma loja com um atraso de marca d'água de 15 minutos. O primeiro microlote processa os seguintes registos:
- $15 às 14:40
- $10 às 14:30
- $30 às 15:10
Neste ponto, a marca d'água do motor é 2:55pm porque subtrai 15 minutos (o atraso) do tempo máximo visto (3:10pm). O operador de agregação de streaming tem o seguinte em seu estado:
-
[2pm, 3pm]: $25 -
[3pm, 4pm]: 30 dólares
A tabela a seguir descreve o que aconteceria em cada modo de saída:
| Modo de saída | Resultado e razão |
|---|---|
| Acrescentar | O operador de agregação de streaming não emite nada a jusante. Isso ocorre porque ambas as janelas podem mudar à medida que novos valores aparecem com um evento subsequente: a marca d'água de 14h55 indica que os registros depois das 14h55 ainda podem chegar, e esses registros poderão pertencer à janela [2pm, 3pm] ou à janela [3pm, 4pm]. |
| Atualizar | O operador emite ambos os registros, porque ambos os registros receberam atualizações. |
| Concluído | O operador emite todos os registos. |
Agora, suponha que o fluxo receba mais um registro:
- $20 às 15:20
A marca d'água atualiza para 15h05 porque o motor subtrai 15 minutos das 15h20. Neste ponto, o operador de agregação de streaming tem o seguinte em seu estado:
-
[2pm, 3pm]: $25 -
[3pm, 4pm]: 50 dólares
A tabela a seguir descreve o que aconteceria em cada modo de saída:
| Modo de saída | Resultado e razão |
|---|---|
| Acrescentar | O operador de agregação de streaming observa que a marca temporal de 15h05 é superior ao final da janela de [2pm, 3pm]. Pela definição da marca d'água, essa janela não pode mais mudar, então ela emite a janela [2pm, 3pm]. |
| Atualizar | O operador de agregação de streaming emite a janela de [3pm, 4pm] porque o valor do estado mudou de US$ 30 para US$ 50. |
| Concluído | O operador emite todos os registos. |
O seguinte resume como os operadores com estado se comportam em cada modo de acréscimo:
- No modo de acréscimo, grave os registros somente após o atraso da marca d'água.
- No modo de atualização, grave os registros que foram alterados desde o gatilho anterior.
- No modo completo, escreva todos os registros já produzidos pelo operador stateful.