Partilhar via


Carregar dados nos pipelines

Você pode carregar dados de qualquer fonte de dados suportada pelo Apache Spark no Azure Databricks usando pipelines. Você pode definir conjuntos de dados (tabelas e visualizações) nos Lakeflow Spark Declarative Pipelines para qualquer consulta que retorne um Spark DataFrame, incluindo Spark DataFrames em streaming e Pandas para Spark DataFrames. Para tarefas de ingestão de dados, o Databricks recomenda o uso de tabelas de streaming para a maioria dos casos de uso. As tabelas de streaming são ótimas para a ingestão de dados do armazenamento de objetos na nuvem usando o Auto Loader ou de barramentos de mensagens como o Kafka.

Observação

  • Nem todas as fontes de dados têm suporte SQL para ingestão. Você pode misturar códigos-fonte SQL e Python em pipelines para usar Python onde for necessário e SQL para outras operações no mesmo pipeline.
  • Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas no Lakeflow Spark Declarative Pipelines por padrão, consulte Manage Python dependencies for pipelines.
  • Para obter informações gerais sobre ingestão no Azure Databricks, consulte Conectores padrão no Lakeflow Connect.

Os exemplos abaixo demonstram alguns padrões comuns.

Carregar a partir de uma tabela existente

Carregue dados de qualquer tabela existente no Azure Databricks. Você pode transformar os dados usando uma consulta ou carregar a tabela para processamento adicional em seu pipeline.

O exemplo a seguir lê dados de uma tabela existente:

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Carregar arquivos do armazenamento de objetos na nuvem

O Databricks recomenda o uso do Auto Loader em pipelines para a maioria das tarefas de ingestão de dados do armazenamento de objetos na nuvem ou de arquivos em um volume do Catálogo Unity. O Auto Loader e os pipelines são projetados para carregar dados cada vez maiores de forma incremental e idimpotente à medida que chegam ao armazenamento em nuvem.

Consulte O que é Auto Loader? e Carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento em nuvem usando o Auto Loader:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Os exemplos a seguir usam o Auto Loader para criar conjuntos de dados a partir de arquivos CSV em um volume do Catálogo Unity:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Observação

  • Se utilizar o Auto Loader com notificações de arquivo e realizar uma atualização completa do seu pipeline ou da sua tabela de streaming, deverá limpar manualmente os seus recursos. Você pode usar o CloudFilesResourceManager num notebook para executar a limpeza.
  • Para carregar arquivos com o Auto Loader num pipeline com Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar o Unity Catalog com pipelines, consulte Usar o Unity Catalog com pipelines.

Carregar dados de um barramento de mensagens

Você pode configurar pipelines para incorporar dados de buses de mensagens. A Databricks recomenda o uso de tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência a partir de barramentos de mensagens. Consulte Otimize a utilização do cluster de pipelines declarativos do Lakeflow Spark com dimensionamento automático.

Por exemplo, o código a seguir configura uma tabela de streaming para ingerir dados de Kafka, usando a função read_kafka :

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Para ingerir a partir de outras fontes de barramento de mensagens, consulte:

Carregar dados dos Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com Apache Kafka. Você pode usar o conector Kafka de Streaming Estruturado, incluído no runtime do Lakeflow Spark Declarative Pipelines, para carregar mensagens a partir dos Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, consulte Usar Hubs de Eventos do Azure como uma fonte de dados de pipeline.

Carregar dados de sistemas externos

O Lakeflow Spark Declarative Pipelines dá suporte ao carregamento de dados de qualquer fonte de dados suportada pelo Azure Databricks. Consulte Conectar-se a fontes de dados e serviços externos. Você também pode carregar dados externos usando Lakehouse Federation para fontes de dados suportadas. Como a Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou superior, para usar a Lakehouse Federation o seu pipeline deve ser configurado para usar o canal de pré-visualização .

Algumas fontes de dados não têm suporte equivalente em SQL. Se você não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar Python para ingerir dados da fonte. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma exibição materializada para acessar o estado atual dos dados em uma tabela remota do PostgreSQL:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregue conjuntos de dados pequenos ou estáticos a partir do armazenamento de objetos na nuvem

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Lakeflow Spark Declarative Pipelines suporta todos os formatos de arquivo suportados pelo Apache Spark no Azure Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar uma tabela:

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Observação

A função read_files SQL é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL em pipelines. Para obter mais informações, consulte as opções e.

Carregar dados de uma fonte de dados personalizada do Python

As fontes de dados personalizadas do Python permitem carregar dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica, ou aproveitar o código Python existente em seus sistemas existentes para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre o desenvolvimento de fontes de dados Python, consulte Fontes de dados personalizadas do PySpark.

Para usar uma fonte de dados personalizada do Python para carregar dados em um pipeline, registre-a com um nome de formato, como my_custom_datasource, e leia a partir dela:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option(). Não é possível usar esse sinalizador em uma função dp.read_stream().
  • Não é possível usar o skipChangeCommits sinalizador quando a tabela de streaming de origem é definida como o destino de uma função create_auto_cdc_flow( ).

Por padrão, as tabelas de streaming exigem fontes somente de adição. Quando uma tabela de streaming utiliza outra tabela de streaming como fonte e a tabela de streaming de origem necessita de atualizações ou exclusões, como no caso do tratamento do "direito ao esquecimento" do GDPR, o sinalizador skipChangeCommits pode ser configurado ao ler a tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre este indicador, consulte Ignorar atualizações e eliminações.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Acesse, com segurança, credenciais de armazenamento usando segredos num pipeline

Você pode usar o Azure Databricks segredos para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo no seu pipeline, use uma propriedade Spark nas definições de configuração do cluster do pipeline. Consulte Configurar computação clássica para pipelines.

O exemplo a seguir usa um segredo para armazenar uma chave de acesso necessária para ler dados de entrada de uma conta de armazenamento do Azure Data Lake Storage (ADLS) usando Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, chaves da AWS para acessar o S3 ou a senha para um metastore do Apache Hive.

Para saber mais sobre como trabalhar com o Armazenamento do Azure Data Lake, veja Conecte-se ao Armazenamento do Azure Data Lake e ao Armazenamento de Blob.

Observação

Você deve adicionar o prefixo spark.hadoop. à chave de configuração spark_conf que define o valor secreto.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Substituir

  • <storage-account-name> com o nome da conta de armazenamento ADLS.
  • <scope-name> com o nome do escopo secreto do Azure Databricks.
  • <secret-name> com o nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Substituir

  • <container-name> com o nome do contêiner da conta de armazenamento do Azure que armazena os dados de entrada.
  • <storage-account-name> com o nome da conta de armazenamento ADLS.
  • <path-to-input-dataset> com o caminho para o conjunto de dados de entrada.