Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Os dados são processados em cadeias de processamento por meio de fluxos. Cada fluxo consiste em uma consulta e, normalmente, um destino. O fluxo processa a consulta, seja como um lote ou incrementalmente como um fluxo de dados para o destino. Um fluxo vive dentro de um gasoduto em Lakeflow Spark Declarative Pipelines.
Normalmente, os fluxos são definidos automaticamente quando você cria uma consulta em um pipeline que atualiza um destino, mas também é possível definir explicitamente fluxos adicionais para processamento mais complexo, como anexar a um único destino a partir de várias fontes.
Atualizações
Um fluxo é executado cada vez que o pipeline que o define é atualizado. O fluxo criará ou atualizará tabelas com os dados mais recentes disponíveis. Dependendo do tipo de fluxo e do estado das alterações nos dados, a atualização pode executar uma atualização incremental, que processa apenas novos registros, ou executar uma atualização completa, que reprocessa todos os registros da fonte de dados.
- Para obter mais informações sobre atualizações de pipeline, consulte Executar uma atualização de pipeline.
- Para obter mais informações sobre como agendar e acionar atualizações, consulte Modo de pipeline acionado versus contínuo.
Criar um fluxo padrão
Quando você cria um pipeline, normalmente define uma tabela ou um modo de exibição junto com a consulta que oferece suporte a ele. Por exemplo, nesta consulta SQL, você cria uma tabela de streaming chamada customers_silver pela leitura da tabela chamada customers_bronze.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Você também pode criar a mesma tabela de streaming em Python. Em Python, você usa pipelines criando uma função de consulta que retorna um dataframe, com decoradores para adicionar a funcionalidade Lakeflow Spark Declarative Pipelines:
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
Neste exemplo, você criou uma tabela de streaming. Você também pode criar exibições materializadas com sintaxe semelhante em SQL e Python. Para obter mais informações, consulte Tabelas de streaming e Vistas materializadas.
Este exemplo cria um fluxo padrão junto com a tabela de streaming. O fluxo padrão para uma tabela de streaming é um fluxo de acrescentar, que adiciona novas linhas a cada gatilho. ** Esta é a maneira mais comum de usar pipelines: criar um fluxo e o alvo numa única etapa. Você pode usar esse estilo para ingerir dados ou para transformá-los.
Os fluxos de anexação também suportam processamento que requer a leitura de dados de várias fontes de streaming para atualizar um único destino. Por exemplo, você pode usar a funcionalidade de fluxo de acréscimo quando tiver uma tabela e um fluxo de streaming existentes e quiser adicionar uma nova fonte de streaming que grave nessa tabela de streaming existente.
Usando vários fluxos para gravar em um único destino
No exemplo anterior, você criou um fluxo e uma tabela de streaming em uma única etapa. Você também pode criar fluxos para uma tabela criada anteriormente. Neste exemplo, você pode ver a criação de uma tabela e o fluxo associado a ela em etapas separadas. Esse código tem resultados idênticos à criação de um fluxo padrão, incluindo o uso do mesmo nome para a tabela de streaming e o fluxo.
Python
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Criar um fluxo independentemente do destino significa que você também pode criar vários fluxos que acrescentam dados ao mesmo destino.
Use o @dp.append_flow decorador na interface Python ou a CREATE FLOW...INSERT INTO cláusula na interface SQL para criar um novo fluxo, por exemplo, para direcionar uma tabela de streaming de várias fontes de streaming. Utilize o fluxo de acréscimo para tarefas de processamento, como as seguintes:
- Adicione fontes de streaming que acrescentam dados a uma tabela de streaming existente sem exigir uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Para obter um exemplo de adição de fontes de streaming à tabela de streaming existente, consulte Exemplo: Gravar em uma tabela de streaming a partir de vários tópicos do Kafka.
- Atualize uma tabela de streaming anexando dados históricos ausentes (backfilling). Você pode usar a
INSERT INTO ONCEsintaxe para criar um acréscimo de preenchimento histórico que é executado uma vez. Por exemplo, você tem uma tabela de streaming existente que é escrita por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de streaming e não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Para ver um exemplo de backfill, consulte Preenchimento de dados históricos com pipelines. - Combine dados de várias fontes e grave numa tabela única de streaming em vez de usar a cláusula
UNIONnuma consulta. Usar o processamento de fluxo de acréscimo permite que você atualize a tabela deUNIONdestino incrementalmente sem executar uma atualização completa. Para obter um exemplo de uma união feita dessa maneira, consulte Exemplo: Usar processamento de fluxo de acréscimo em vez deUNION.
O destino para a saída de registos pelo processamento de fluxo de adição pode ser uma tabela existente ou uma nova tabela. Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.
O exemplo a seguir adiciona dois fluxos para o mesmo destino, criando uma união das duas tabelas de origem:
Python
from pyspark import pipelines as dp
# create a streaming table
dp.create_streaming_table("customers_us")
# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
Importante
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função ou em uma definição de
create_streaming_table()tabela existente. Não é possível definir expectativas na@append_flowdefinição. - Os fluxos são identificados por um nome de fluxo, e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente numa canalização for renomeado, o ponto de controlo não será transferido e o fluxo renomeado será, efetivamente, um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
Tipos de fluxos
Os fluxos padrão para tabelas de streaming e visões materializadas são fluxos de acréscimo. Você também pode criar fluxos para ler a partir de fontes de dados de captura de dados de mudança. A tabela a seguir descreve os diferentes tipos de fluxos.
| Tipo de fluxo | Description |
|---|---|
| Append |
Os fluxos de acréscimo são o tipo mais comum de fluxo, onde novos registros na origem são gravados no destino a cada atualização. Eles correspondem ao modo de adição em streaming estruturado. Você pode adicionar o ONCE flag, indicando uma consulta por lote cujos dados devem ser inseridos no destino apenas uma vez, a menos que o destino seja totalmente atualizado. Qualquer número de fluxos de anexação pode gravar num destino específico.Os fluxos padrão (criados com a tabela de streaming de destino ou a exibição materializada) terão o mesmo nome que o destino. Outros alvos não têm fluxos padrão. |
| Auto CDC (anteriormente aplicar alterações) | Um fluxo CDC automático ingere uma consulta que contém dados de captura de dados de alteração (CDC). Os fluxos CDC automáticos só podem destinar-se a tabelas de streaming, e a origem deve ser streaming (mesmo no caso de fluxos ONCE). Vários fluxos CDC automáticos podem ter como alvo uma única tabela de streaming. Uma tabela de streaming que serve como destino para um fluxo CDC automático só pode ser direcionada por outros fluxos CDC automáticos.Para obter mais informações sobre dados CDC, consulte The AUTO CDC APIs: Simplify change data capture with pipelines. |
Informações adicionais
Para obter mais informações sobre fluxos e seu uso, consulte os seguintes tópicos:
- Exemplos de fluxos em Lakeflow Spark Declarative Pipelines
- AUTO CDC APIs: Simplifique a captura de dados de mudanças através de pipelines
- Preenchimento de dados históricos com pipelines
- Escrevendo pipelines em Python ou SQL
- Tabelas de streaming
- Visões materializadas
- Sumidouros em Oleodutos Declarativos Lakeflow Spark