Partilhar via


Azure Data Explorer Connector para Apache Spark

O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala. O Azure Data Explorer é um serviço de análise de dados rápido e totalmente gerenciado para análise em tempo real de grandes volumes de dados.

O conector Kusto para Spark é um projeto de código aberto que pode ser executado em qualquer cluster Spark. Ele implementa a fonte de dados e o coletor de dados para mover dados entre clusters do Azure Data Explorer e Spark. Usando o Azure Data Explorer e o Apache Spark, você pode criar aplicativos rápidos e escaláveis direcionados a cenários orientados por dados. Por exemplo, aprendizado de máquina (ML), Extract-Transform-Load (ETL) e Log Analytics. Com o conector, o Azure Data Explorer torna-se um armazenamento de dados válido para operações padrão de fonte e destino do Spark, como write, read, e writeStream.

Você pode gravar no Azure Data Explorer por meio da ingestão em fila ou da ingestão de streaming. A leitura a partir do Azure Data Explorer suporta a redução de colunas e a aplicação de predicados de filtragem, o que contribui para a filtragem dos dados dentro do Azure Data Explorer, reduzindo o volume de dados transferidos.

Observação

Para obter informações sobre como trabalhar com o conector Synapse Spark para Azure Data Explorer, consulte Conectar-se ao Azure Data Explorer usando o Apache Spark for Azure Synapse Analytics.

Este artigo descreve como instalar e configurar o conector do Azure Data Explorer Spark e mover dados entre clusters do Azure Data Explorer e Apache Spark.

Observação

Embora alguns dos exemplos neste artigo se refiram a um cluster do Azure Databricks Spark, o conector do Azure Data Explorer Spark não tem dependências diretas do Databricks ou de qualquer outra distribuição do Spark.

Pré-requisitos

Sugestão

As versões do Spark 2.3.x também são suportadas, mas talvez seja necessário alterar algumas dependências no pom.xml.

Como construir o conector Spark

A partir da versão 2.3.0, introduzimos novos Ids de artefato substituindo spark-kusto-connector: kusto-spark_3.0_2.12 visando Spark 3.x e Scala 2.12.

Observação

As versões anteriores à 2.5.1 não funcionam mais para ingestão em uma tabela existente, atualize para uma versão posterior. Esta etapa é opcional. Se você estiver usando bibliotecas pré-criadas, por exemplo, Maven, consulte Configuração do cluster Spark.

Pré-requisitos de compilação

  1. Consulte esta fonte para criar o Spark Connector.

  2. Para aplicativos Scala/Java usando definições de projeto Maven, vincule seu aplicativo ao artefato mais recente. Encontre o artefato mais recente no Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Se você não estiver usando bibliotecas pré-criadas, precisará instalar as bibliotecas listadas nas dependências , incluindo as seguintes bibliotecas Kusto Java SDK . Para encontrar a versão certa para instalar, procure no pom da versão relevante:

    1. Para construir jar e executar todos os testes:

      mvn clean package -DskipTests
      
    2. Para criar jar, execute todos os testes e instale jar em seu repositório Maven local:

      mvn clean install -DskipTests
      

Para obter mais informações, consulte Uso do conector.

Configuração do cluster Spark

Observação

Recomenda-se usar a versão mais recente do conector Kusto Spark ao executar as etapas a seguir.

  1. Configure as seguintes configurações de cluster do Spark, com base no cluster do Azure Databricks Spark 3.0.1 e no Scala 2.12:

    Configurações de cluster Databricks.

  2. Instale a biblioteca mais recente do spark-kusto-connector do Maven.

    Importar bibliotecas. Selecione Spark-Kusto-Connector.

  3. Verifique se todas as bibliotecas necessárias estão instaladas:

    Verifique as bibliotecas instaladas.

  4. Para instalação usando um arquivo JAR, verifique se outras dependências foram instaladas:

    Adicione dependências.

Authentication

O conector Kusto Spark permite que você se autentique com o Microsoft Entra ID usando um dos seguintes métodos:

Autenticação de aplicativos Microsoft Entra

A autenticação do aplicativo Microsoft Entra é o método de autenticação mais simples e comum e é recomendada para o conector Kusto Spark.

  1. Inicie sessão na sua subscrição do Azure através da CLI do Azure. Em seguida, autentique-se no navegador.

    az login
    
  2. Escolha a assinatura para hospedar o principal. Esta etapa é necessária quando você tem várias assinaturas.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie o principal de serviço. Neste exemplo, a entidade de serviço é chamada my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dos dados JSON retornados, copie o appId, passworde tenant para uso futuro.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Você criou a sua aplicação e principal de serviço do Microsoft Entra.

O conector Spark usa as seguintes propriedades do aplicativo Entra para autenticação:

Propriedades String de opção Description
KUSTO_AAD_APP_ID kustoAadAppId Identificador do aplicativo Microsoft Entra (cliente).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Autoridade de autenticação do Microsoft Entra. ID do Microsoft Entra Directory (tenant). Opcional - o padrão é microsoft.com. Para mais informações, consulte Microsoft Entra Authority.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Chave de aplicação Microsoft Entra para o cliente.
KUSTO_ACCESS_TOKEN kustoAccessToken Se já tiver um "accessToken" criado com acesso ao Kusto, pode ser passado para o conector para autenticação.

Observação

As versões mais antigas da API (menos de 2.0.0) têm a seguinte nomenclatura: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Privilégios Kusto

Conceda os seguintes privilégios no lado kusto com base na operação Spark que você deseja executar.

Operação Spark Privilégios
Ler - Modo Único Leitor
Ler – Forçar Modo Distribuído Leitor
Escrever – Modo em fila com a opção de criação de tabela CreateTableIfNotExist Administrador
Escrever – Modo em fila com a opção de criação da tabela FailIfNotExist Ingestor
Escrever – TransactionalMode Administrador

Para obter mais informações sobre funções principais, consulte Controle de acesso baseado em função. Para gerenciar funções de segurança, consulte Gerenciamento de funções de segurança.

Spark Sink: Escrevendo para Kusto

  1. Configure os parâmetros do coletor:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Gravar Spark DataFrame no cluster Kusto em lote.

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Ou use a sintaxe simplificada:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    // Optional, for any extra options:
    val conf: Map[String, String] = Map()
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Gravar dados de streaming:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // As an alternative to adding .option by .option, you can provide a map:
    val conf: Map[String, String] = Map(
      KustoSinkOptions.KUSTO_CLUSTER -> cluster,
      KustoSinkOptions.KUSTO_TABLE -> table,
      KustoSinkOptions.KUSTO_DATABASE -> database,
      KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken)
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf)
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark: leitura de Kusto

  1. Ao ler pequenas quantidades de dados, defina a consulta de dados:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Opcional: Se você fornecer o armazenamento de blob transitório (e não Kusto), os blobs serão criados sob a responsabilidade do chamador. Isso inclui o provisionamento do armazenamento, a rotação de chaves de acesso e a exclusão de artefatos transitórios. O módulo KustoBlobStorageUtils contém funções auxiliares para excluir blobs com base em coordenadas de conta e contêiner e credenciais de conta, ou uma URL SAS completa com permissões de gravação, leitura e lista. Quando o RDD correspondente não é mais necessário, cada transação armazena artefatos de blob transitórios em um diretório separado. Esse diretório é capturado como parte dos logs de informações de transação de leitura relatados no nó Spark Driver.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    No exemplo acima, o Azure Key Vault não é acessado usando a interface do conector; é utilizado um método mais simples que utiliza os segredos do Azure Databricks.

  3. Leia de Kusto.

    • Se você fornecer o armazenamento de blob transitório, leia Kusto da seguinte maneira:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Se Kusto fornece o armazenamento de blob transitório, leia Kusto da seguinte maneira:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)