Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Use as expectativas para aplicar restrições de qualidade que validam os dados à medida que fluem pelos pipelines de ETL. As expectativas fornecem uma visão mais aprofundada das métricas de qualidade de dados e permitem que você falhe atualizações ou descarte registros ao detetar registros inválidos.
Este artigo tem uma visão geral das expectativas, incluindo exemplos de sintaxe e opções de comportamento. Para casos de uso mais avançados e práticas recomendadas, consulte Recomendações e padrões avançados.
O que são expectativas?
As expectativas são cláusulas opcionais na exibição materializada de pipeline, tabela de streaming ou instruções de criação de exibição que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta. As expectativas utilizam declarações Booleanas SQL padrão para especificar restrições. Você pode combinar várias expectativas para um único conjunto de dados e definir expectativas em todas as declarações de conjunto de dados em um pipeline.
As seções a seguir apresentam os três componentes de uma expectativa e fornecem exemplos de sintaxe.
Nome da expectativa
Cada expectativa deve ter um nome, que é usado como um identificador para rastrear e monitorar a expectativa. Escolha um nome que comunique as métricas que estão sendo validadas. O exemplo a seguir define a expectativa valid_customer_age para confirmar que a idade está entre 0 e 120 anos:
Importante
Um nome de expectativa deve ser exclusivo para um determinado conjunto de dados. Você pode reutilizar expectativas em múltiplos conjuntos de dados num pipeline. Consulte Expectativas portáteis e reutilizáveis.
Python
@dp.table
@dp.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
Restrição para avaliar
A cláusula de restrição é uma instrução condicional SQL que deve ser avaliada como true ou false para cada registro. A restrição contém a lógica real para o que está sendo validado. Quando um registro falha nessa condição, a expectativa é acionada.
As restrições devem usar sintaxe SQL válida e não podem conter o seguinte:
- Funções Python personalizadas
- Chamadas de serviço externo
- Subconsultas que fazem referência a outras tabelas
A seguir estão exemplos de restrições que podem ser adicionadas às instruções de criação de conjunto de dados:
Python
A sintaxe para uma restrição em Python é:
@dp.expect(<constraint-name>, <constraint-clause>)
Várias restrições podem ser especificadas:
@dp.expect(<constraint-name>, <constraint-clause>)
@dp.expect(<constraint2-name>, <constraint2-clause>)
Examples:
# Simple constraint
@dp.expect("non_negative_price", "price >= 0")
# SQL functions
@dp.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dp.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dp.expect("non_negative_price", "price >= 0")
@dp.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dp.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dp.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
A sintaxe para uma restrição no SQL é:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )
Várias restrições devem ser separadas por uma vírgula:
CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )
Examples:
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
Ação em registro inválido
Você deve especificar uma ação para determinar o que acontece quando um registro falha na verificação de validação. A tabela a seguir descreve as ações disponíveis:
| Ação | Sintaxe SQL | Sintaxe Python | Result |
|---|---|---|---|
| avisar (padrão) | EXPECT |
dp.expect |
Registros inválidos são gravados no destino. |
| drop | EXPECT ... ON VIOLATION DROP ROW |
dp.expect_or_drop |
Os registros inválidos são descartados antes que os dados sejam gravados no destino. A contagem de registros descartados é registrada junto com outras métricas do conjunto de dados. |
| erro | EXPECT ... ON VIOLATION FAIL UPDATE |
dp.expect_or_fail |
Registros inválidos impedem que a atualização seja bem-sucedida. É necessária uma intervenção manual antes do reprocessamento. Essa expectativa causa uma falha de um único fluxo e não faz com que outros fluxos em seu pipeline falhem. |
Você também pode implementar lógica avançada para colocar em quarentena registros inválidos sem falhar ou descartar dados. Consulte a Quarentena de registos inválidos
Métricas de acompanhamento de expectativas
Você pode ver métricas de acompanhamento para warn ou drop ações na UI do pipeline. Como fail faz com que a atualização falhe quando um registro inválido é detetado, as métricas não são registradas.
Para visualizar as métricas de expectativa, conclua as seguintes etapas:
- Na barra lateral do espaço de trabalho do Azure Databricks, clique em Trabalhos & Pipelines.
- Clique no Nome do seu pipeline.
- Clique em um conjunto de dados com uma expectativa definida.
- Selecione a guia Qualidade de dados na barra lateral direita.
Você pode exibir métricas de qualidade de dados consultando o log de eventos do Lakeflow Spark Declarative Pipelines. Consulte Qualidade de dados de consulta ou métricas de expectativas.
Reter registos inválidos
A retenção de registros inválidos é o comportamento padrão para as expectativas. Use o expect operador quando quiser manter registos que violem a expectativa, mas coletem métricas sobre quantos registos passam ou falham em relação a uma restrição. Os registros que violam a expectativa são adicionados ao conjunto de dados de destino junto com registros válidos:
Python
@dp.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Descartar registros inválidos
Use o expect_or_drop operador para evitar o processamento adicional de registros inválidos. Os registros que violam a expectativa são descartados do conjunto de dados de destino:
Python
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Falha em registros inválidos
Quando registos inválidos forem inaceitáveis, use o operador expect_or_fail para interromper a execução imediatamente quando um registo falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:
Python
@dp.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Importante
Se você tiver vários fluxos paralelos definidos em um pipeline, a falha de um único fluxo não fará com que outros fluxos falhem.
Solução de problemas de falhas em atualizações a partir de expectativas
Quando um pipeline falha devido a uma violação de expectativa, você deve corrigir o código do pipeline para manipular os dados inválidos corretamente antes de executar novamente o pipeline.
As expectativas configuradas para pipelines que falham modificam o plano de consulta do Spark das suas transformações para rastrear as informações necessárias para detetar e reportar violações. Você pode usar essas informações para identificar qual registro de entrada resultou na violação para muitas consultas. Lakeflow Spark Declarative Pipelines fornece uma mensagem de erro dedicada para relatar tais violações. Aqui está um exemplo de uma mensagem de erro de violação de expectativa:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
Gestão de múltiplas expectativas
Observação
Embora o SQL e o Python suportem várias expectativas em um único conjunto de dados, apenas o Python permite agrupar várias expectativas e especificar ações coletivas.
Você pode agrupar várias expectativas e especificar ações coletivas usando as funções expect_all, expect_all_or_drope expect_all_or_fail.
Estes decoradores aceitam um dicionário Python como argumento, onde a chave é o nome da expectativa e o valor é a condição de expectativa. Você pode reutilizar o mesmo conjunto de expectativas em vários conjuntos de dados em seu pipeline. A seguir mostra exemplos de cada um dos expect_all operadores Python:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dp.table
@dp.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dp.table
@dp.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dp.table
@dp.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
Limitações
- Como apenas tabelas de streaming e exibições materializadas dão suporte às expectativas, as métricas de qualidade de dados são suportadas apenas para esses tipos de objeto.
- As métricas de qualidade de dados não estão disponíveis quando:
- Nenhuma expectativa é definida em uma consulta.
- Um fluxo utiliza um operador que não suporta funcionalidades esperadas.
- O tipo de fluxo, como destinos, não é compatível com expectativas.
- Não há atualizações na tabela de streaming associada ou na visualização materializada para uma determinada execução de fluxo.
- A configuração do pipeline não inclui as configurações necessárias para capturar métricas, como
pipelines.metrics.flowTimeReporter.enabled.
- Para alguns casos, um
COMPLETEDfluxo pode não conter métricas. Em vez disso, as métricas são relatadas em cada microlote em umflow_progressevento com o statusRUNNING.