Compartilhar via


Transformar dados com pipelines

Este artigo descreve como você pode usar pipelines para declarar transformações em conjuntos de dados e especificar como os registros são processados por meio da lógica de consulta. Ele também contém exemplos de padrões comuns de transformação para a criação de pipelines.

Você pode definir um conjunto de dados em relação a uma consulta que retorne um DataFrame. Você pode usar operações embutidas do Apache Spark, UDFs, lógica personalizada e modelos de MLflow para transformações em Pipelines Declarativos do Lakeflow Spark. Depois que os dados forem ingeridos em seu pipeline, você poderá definir novos conjuntos de dados contra fontes upstream para criar novas tabelas de streaming, visões materializadas e visões.

Para aprender a executar efetivamente o processamento com estado em um pipeline, consulte Otimizar o processamento com estado usando marcadores de água.

Quando usar exibições, exibições materializadas e tabelas de streaming

Ao implementar suas consultas de pipeline, escolha o melhor tipo de conjunto de dados para garantir que elas sejam eficientes e gerenciáveis.

Considere usar uma exibição para fazer o seguinte:

  • Quebre uma consulta grande ou complexa que você deseja em consultas mais fáceis de gerenciar.
  • Valide os resultados intermediários usando as expectativas.
  • Reduza os custos de armazenamento e computação para resultados que você não precisa manter. Como as tabelas são materializadas, elas requerem recursos adicionais de computação e armazenamento.

Considere o uso de uma exibição materializada quando:

  • Diversas consultas downstream consomem a tabela. Como as exibições são calculadas sob demanda, a exibição é recalculada toda vez que é consultada.
  • Outros pipelines, trabalhos ou consultas consomem a tabela. Como as exibições não são materializadas, só é possível usá-las no mesmo pipeline.
  • Você quiser exibir os resultados de uma consulta durante o desenvolvimento. Como as tabelas são materializadas e podem ser exibidas e consultadas fora do pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a exatidão dos cálculos. Após a validação, converta as consultas que não requerem materialização em exibições.

Considere o uso de uma tabela de streaming quando:

  • Uma consulta é definida em relação a uma fonte de dados que está crescendo de forma contínua ou incremental.
  • Os resultados da consulta devem ser computados de forma incremental.
  • O pipeline precisa de alta taxa de transferência e baixa latência.

Observação

As tabelas de streaming são sempre definidas em relação a fontes de streaming. Você também pode utilizar fontes de streaming com AUTO CDC ... INTO para aplicar atualizações de feeds da CDA. Confira as APIs AUTO CDC: Simplifique a captura de alterações de dados com pipelines.

Excluir as tabelas do esquema de destino

Se você precisar calcular tabelas intermediárias não destinadas ao consumo externo, poderá impedi-las de serem publicadas em um esquema usando a palavra-chave TEMPORARY. As tabelas temporárias ainda armazenam e processam dados de acordo com a semântica do Lakeflow Spark Declarative Pipelines, mas não devem ser acessadas fora do pipeline atual. Uma tabela temporária persiste durante o tempo de vida do pipeline que a cria. Use a seguinte sintaxe para declarar as tabelas temporárias:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dp.table(
  temporary=True)
def temp_table():
  return ("...")

Combinar as tabelas de streaming e exibições materializadas em um único pipeline

As tabelas de streaming herdam as garantias de processamento do Apache Spark Structured Streaming e são configuradas para processar consultas de fontes de dados somente acréscimo, em que novas linhas são sempre inseridas na tabela de origem, ao invés de modificadas.

Observação

Embora, por padrão, as tabelas de streaming exijam fontes de dados somente acréscimo, quando uma fonte de streaming é outra tabela de streaming que exige atualizações ou exclusões, você pode substituir esse comportamento pelo sinalizador skipChangeCommits

Um padrão de streaming comum envolve a ingestão de dados de origem para criar os conjuntos de dados iniciais em um pipeline. Esses conjuntos de dados iniciais são comumente chamados de tabelas bronze e geralmente executam transformações simples.

Por outro lado, as tabelas finais em um pipeline, geralmente chamadas de tabelas de ouro, geralmente exigem agregações complicadas ou leitura de destinos de uma operação AUTO CDC ... INTO. Como essas operações criam inerentemente atualizações em vez de acrescentar, elas não têm suporte como entradas para as tabelas de streaming. Essas transformações são mais adequadas para exibições materializadas.

Ao combinar as tabelas de streaming e as exibições materializadas em um único pipeline, você pode simplificar seu pipeline, evitar a dispendiosa reingestão ou o reprocessamento de dados brutos e ter todo o poder do SQL para computar agregações complexas em um conjunto de dados eficientemente codificado e filtrado. O exemplo a seguir ilustra esse tipo de processamento misto:

Observação

Esses exemplos utilizam o Carregador Automático para carregar arquivos do armazenamento em nuvem. Para carregar arquivos com o Carregador Automático em um pipeline habilitado para o Catálogo do Unity, você deve usar localizações externas. Para saber mais sobre como usar o Catálogo do Unity com pipelines, consulte Usar o Catálogo do Unity com pipelines.

Python

@dp.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dp.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.read.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Saiba mais sobre como usar o Carregador Automático para ingerir incrementalmente arquivos JSON do armazenamento do Azure.

Junções de fluxo estático

As junções estáticas de fluxo são uma boa escolha ao desnormalizar um fluxo contínuo de dados somente acréscimo com uma tabela de dimensão principalmente estática.

A cada atualização do pipeline, os novos registros do fluxo são ingressados com o instantâneo mais atual da tabela estática. Se os registros forem adicionados ou atualizados na tabela estática depois que os dados correspondentes da tabela de streaming tiverem sido processados, os registros resultantes não serão recalculados, a menos que uma atualização completa seja executada.

Nos pipelines configurados para execução disparada, a tabela estática retorna resultados a partir do tempo em que a atualização foi iniciada. Em pipelines configurados para execução contínua, a versão mais recente da tabela estática é consultada sempre que a tabela processa uma atualização.

A seguir, um exemplo de uma junção de fluxo estático:

Python

@dp.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Calcular eficientemente as agregações

Você pode utilizar as tabelas de streaming para calcular de forma incremental as agregações distributivas simples, como contagem, mínimo, máximo ou soma, e agregações algébricas, como média ou desvio padrão. O Databricks recomenda a agregação incremental para consultas com um número limitado de grupos, como uma consulta com uma cláusula GROUP BY country. Somente os novos dados de entrada são lidos a cada atualização.

Para saber mais sobre como escrever consultas do Lakeflow Spark Declarative Pipelines que executam agregações incrementais, consulte Executar agregações em janelas com marcas d'água.

Usar modelos do MLflow em Pipelines Declarativos do Spark Lakeflow

Observação

Para usar os modelos do MLflow em um pipeline habilitado para o Catálogo do Unity, o pipeline precisa ser configurado para usar o canal preview. Para usar o canal current, você precisa configurar o pipeline para publicar no metastore do Hive.

Você pode usar modelos treinados pelo MLflow em pipelines. Os modelos do MLflow são tratados como transformações no Azure Databricks, o que significa que eles agem sobre uma entrada do DataFrame do Spark e retornam resultados como um DataFrame do Spark. Como o Lakeflow Spark Declarative Pipelines define conjuntos de dados em dataFrames, você pode converter cargas de trabalho do Apache Spark que usam o MLflow em pipelines com apenas algumas linhas de código. Para obter mais informações sobre MLflow, consulte MLflow para ciclo de vida de modelos de ML.

Se você já tiver um script Python chamando um modelo do MLflow, poderá adaptar esse código a um pipeline usando o decorador @dp.table e garantir que as funções @dp.materialized_view sejam definidas para retornar os resultados da transformação. O Lakeflow Spark Declarative Pipelines não instala o MLflow por padrão, portanto, confirme se você instalou as bibliotecas do MLflow com %pip install mlflow e importou mlflow e dp no topo do seu código. Para obter uma introdução à sintaxe do pipeline, consulte Desenvolver código de pipeline com Python.

Para usar modelos do MLflow em pipelines, conclua as seguintes etapas:

  1. Obtenha a ID de execução e o nome do modelo do modelo de MLflow. A ID de execução e o nome do modelo são usados para construir o URI do modelo de MLflow.
  2. Use o URI para definir uma UDF do Spark para carregar o modelo de MLflow.
  3. Chame a UDF em suas definições de tabela para usar o modelo de MLflow.

O exemplo a seguir mostra a sintaxe básica para esse padrão:

%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Como um exemplo concluído, o código a seguir define um Spark UDF chamado loaded_model_udf que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados utilizadas para fazer a previsão são passadas como um argumento para o UDF. A tabela loan_risk_predictions calcula as previsões para cada linha em loan_risk_input_data.

%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Manter as exclusões ou atualizações manuais

O Lakeflow Spark Declarative Pipelines permite que você exclua ou atualize manualmente registros de uma tabela e faça uma operação de atualização para recompor tabelas downstream.

Por padrão, os pipelines recomputam os resultados da tabela com base nos dados de entrada sempre que são atualizados, portanto, você deve garantir que o registro excluído não seja recarregado dos dados de origem. A definição da propriedade da tabela pipelines.reset.allowed como false impede a atualização de uma tabela, mas não impede gravações incrementais nas tabelas ou o fluxo de novos dados para a tabela.

O diagrama a seguir ilustra um exemplo usando duas tabelas de streaming:

  • raw_user_table ingere dados brutos do usuário de uma fonte.
  • bmi_table computa incrementalmente as pontuações de IMC usando o peso e altura de raw_user_table.

Você deseja excluir ou atualizar manualmente os registros de usuários do raw_user_table e computar novamente o bmi_table.

Manter o diagrama de dados

O código a seguir demonstra a configuração da propriedade da tabela pipelines.reset.allowed como false para desabilitar a atualização completa para raw_user_table, de modo que as alterações pretendidas sejam mantidas ao longo do tempo, mas as tabelas de downstream sejam recomputadas quando uma atualização do pipeline for executada:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);