Partilhar via


Preenchimento de dados históricos com pipelines

Em engenharia de dados, backfill refere-se ao processo de processamento retroativo de dados históricos através de um pipeline de dados que foi projetado para processar dados atuais ou de streaming.

Normalmente, esse é um fluxo separado que envia dados para suas tabelas existentes. A ilustração a seguir mostra um fluxo de preenchimento retroativo enviando dados históricos para as tabelas de bronze no pipeline.

Fluxo de backfill para adicionar dados históricos a um fluxo de trabalho existente

Alguns cenários que podem exigir uma reposição de dados:

  • Processe dados históricos de um sistema herdado para treinar um modelo de aprendizado de máquina (ML) ou criar um painel de análise de tendências históricas.
  • Reprocessar um subconjunto de dados devido a um problema de qualidade de dados com fontes de dados upstream.
  • Seus requisitos de negócios foram alterados e você precisa preencher dados para um período de tempo diferente que não foi coberto pelo pipeline inicial.
  • Sua lógica de negócios mudou e você precisa reprocessar dados históricos e atuais.

Um processo de preenchimento em pipelines declarativas do Lakeflow Spark é suportado com um fluxo especializado de acréscimo que utiliza a opção ONCE. Consulte append_flow ou CREATE FLOW (pipelines) para obter mais informações sobre a ONCE opção.

Considerações ao preencher dados históricos em uma tabela de streaming

  • Normalmente, anexe os dados à tabela de streaming bronze. As camadas de prata e ouro a jusante captarão os novos dados da camada de bronze.
  • Certifique-se de que seu pipeline possa lidar com dados duplicados normalmente, caso os mesmos dados sejam acrescentados várias vezes.
  • Verifique se o esquema de dados históricos é compatível com o esquema de dados atual.
  • Considere o tamanho do volume de dados e o SLA de tempo de processamento necessário e, consequentemente, configure os tamanhos de cluster e lote.

Exemplo: Adicionar um preenchimento a um pipeline existente

Neste exemplo, digamos que você tenha um pipeline que ingere dados brutos de registro de eventos de uma fonte de armazenamento em nuvem, a partir de 01º de janeiro de 2025. Mais tarde, você perceberá que deseja preencher os três anos anteriores de dados históricos para relatórios downstream e casos de uso de análise. Todos os dados estão em um único local, particionados por ano, mês e dia, no formato JSON.

Pipeline inicial

Aqui está o código de pipeline inicial que ingere incrementalmente os dados brutos de registro de eventos do armazenamento em nuvem.

Python

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dp.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
    )

SQL

-- create a streaming table and the default flow to ingest streaming events
CREATE OR REFRESH STREAMING LIVE TABLE registration_events_raw AS
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025'; -- safeguard to not process data before begin_year

Aqui usamos a modifiedAfter opção Auto Loader para garantir que não estamos processando todos os dados do caminho de armazenamento em nuvem. O processamento incremental é cortado nesse limite.

Sugestão

Outras fontes de dados, como Kafka, Kinesis e Hubs de Eventos do Azure, têm opções de leitor equivalentes para obter o mesmo comportamento.

Preencher dados dos últimos 3 anos

Agora você deseja adicionar um ou mais fluxos para preencher dados anteriores. Neste exemplo, execute as seguintes etapas:

  • Use o append once fluxo. Este processo executa um preenchimento único sem continuar a ser executado após o primeiro preenchimento. O código permanece em seu pipeline e, se o pipeline for totalmente atualizado, o backfill será executado novamente.
  • Crie três fluxos de preenchimento, um para cada ano (neste caso, os dados são divididos por ano no caminho). Para Python, parametrizamos a criação dos fluxos, mas em SQL repetimos o código três vezes, uma para cada fluxo.

Se você estiver trabalhando em seu próprio projeto e não estiver usando computação sem servidor, convém atualizar o máximo de trabalhadores para o pipeline. Aumentar o número máximo de trabalhadores garante que o utilizador tenha os recursos para processar os dados históricos enquanto continua a processar os dados de streaming atuais dentro do SLA previsto.

Sugestão

Se você usar computação sem servidor com dimensionamento automático aprimorado (o padrão), o cluster aumentará automaticamente de tamanho quando a carga aumentar.

Python

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
    backfill_path = f"{source_root_path}/year={year}/*/*"
    @dp.append_flow(
        target="registration_events_raw",
        once=True,
        name=f"flow_registration_events_raw_backfill_{year}",
        comment=f"Backfill {year} Raw registration events")
    def backfill():
        return (
            spark
            .read
            .format("json")
            .option("inferSchema", "true")
            .load(backfill_path)
        )

# create the streaming table
dp.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dp.append_flow(
        target="registration_events_raw",
        name="flow_registration_events_raw_incremental",
        comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}")
    )

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
    setup_backfill_flow(year) # call the previously defined append_flow for each year

SQL

-- create the streaming table
CREATE OR REFRESH STREAMING TABLE registration_events_raw;

-- append the original incremental, streaming flow
CREATE FLOW
  registration_events_raw_incremental
AS INSERT INTO
  registration_events_raw BY NAME
SELECT * FROM STREAM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025';


-- one time backfill 2024
CREATE FLOW
  registration_events_raw_backfill_2024
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2024/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2023
CREATE FLOW
  registration_events_raw_backfill_2023
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2023/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2022
CREATE FLOW
  registration_events_raw_backfill_2022
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2022/*/*",
  format => "json",
  inferColumnTypes => true
);

Esta implementação destaca vários padrões importantes.

Separação de responsabilidades

  • O processamento incremental é independente das operações de enchimento.
  • Cada fluxo tem suas próprias configurações e configurações de otimização.
  • Existe uma distinção clara entre operações incrementais e operações de enchimento.

Execução controlada

  • Usar a ONCE opção garante que cada preenchimento seja executado exatamente uma vez.
  • O fluxo de backfill mantém-se no gráfico do pipeline, mas torna-se inativo após a sua conclusão. Ele está pronto para uso em atualização completa, automaticamente.
  • Existe uma pista de auditoria clara das operações de enchimento na definição do gasoduto.

Otimização do processamento

  • Você pode dividir o backfill grande em vários backfills menores para um processamento mais rápido ou para ter controle sobre o processamento.
  • O uso do dimensionamento automático aprimorado dimensiona dinamicamente o tamanho do cluster com base na carga atual do cluster.

Evolução do esquema

  • O schemaEvolutionMode="addNewColumns" lida com mudanças de esquema de forma eficaz.
  • Você tem inferência de esquema consistente entre dados históricos e atuais.
  • Há manipulação segura de novas colunas em dados mais recentes.

Recursos adicionais