Compartilhar via


Fazer a interação com o Azure Cosmos DB usando o Apache Spark 2 no Link do Azure Synapse

Importante

O Link do Synapse para Cosmos DB não tem mais suporte para novos projetos. Não use esse recurso.

Use o Espelhamento do Azure Cosmos DB para Microsoft Fabric, que agora está em Disponibilidade Geral. O espelhamento fornece os mesmos benefícios de ETL zero e é totalmente integrado ao Microsoft Fabric. Saiba mais na Visão Geral do Espelhamento do Cosmos DB.

Observação

Para o Link do Azure Synapse do Azure Cosmos DB usando o Spark 3, confira este artigo: Link do Azure Synapse para Azure Cosmos DB no Spark 3

Neste artigo, você aprenderá a interagir com o Azure Cosmos DB usando o Synapse Apache Spark 2. Com seu suporte completo para Scala, Python, SparkSQL e C#, o Synapse Apache Spark é fundamental para análise, engenharia de dados, ciência de dados e cenários de exploração de dados no Link do Azure Synapse para Azure Cosmos DB.

Os seguintes recursos têm suporte ao interagir com o Azure Cosmos DB:

  • O Synapse Apache Spark permite analisar dados em seus contêineres do Azure Cosmos DB que são habilitados com o Link do Azure Synapse quase em tempo real sem afetar o desempenho das cargas de trabalho transacionais. As duas seguintes opções estão disponíveis para consultar o armazenamento de análise do Azure Cosmos DB do Spark:
    • Carregar para o DataFrame do Spark
    • Criar tabela do Spark
  • O Synapse Apache Spark também permite ingerir dados no Azure Cosmos DB. É importante observar que os dados sempre são ingeridos em contêineres do Azure Cosmos DB por meio do armazenamento transacional. Quando o Link do Azure Synapse está habilitado, todas as novas inserções, atualizações e exclusões são sincronizadas automaticamente com o repositório analítico.
  • O Apache Spark do Synapse também dá suporte ao streaming estruturado do Spark com o Azure Cosmos DB como uma origem e um coletor.

As seções a seguir orientam você pela sintaxe das funcionalidades acima. Você também pode conferir o módulo Learn sobre como consultar o Azure Cosmos DB com o Apache Spark para o Azure Synapse Analytics. Os gestos no workspace do Azure Synapse Analytics são projetados para proporcionar uma experiência fácil de usar para começar. Os gestos ficam visíveis quando você clica com o botão direito do mouse em um contêiner do Azure Cosmos DB na guia Dados do workspace do Azure Synapse. Com gestos, você pode gerar código rapidamente e ajustá-lo às suas necessidades. Os gestos também são perfeitos para descobrir dados com um único clique.

Importante

Você deve estar ciente sobre algumas restrições no esquema analítico que poderão gerar um comportamento inesperado em operações de carregamento de dados. Por exemplo, somente as primeiras 1.000 propriedades do esquema transacional estão disponíveis no esquema analítico, as propriedades com espaços não estão disponíveis etc. Se você estiver tendo alguns resultados inesperados, verifique as restrições de esquema do repositório analítico para obter mais detalhes.

Consultar o armazenamento de análise do Azure Cosmos DB

Antes de aprender sobre as duas opções possíveis para consultar o armazenamento de análise do Azure Cosmos DB, carregar para o Spark DataFrame e criar a tabela do Spark, vale a pena explorar as diferenças na experiência para que você possa escolher a opção adequada às suas necessidades.

A diferença na experiência é relativa a se as alterações de dados subjacentes no contêiner de Azure Cosmos DB devem ser refletidas automaticamente na análise realizada no Spark. Quando um DataFrame do Spark é registrado ou uma tabela do Spark é criada no armazenamento de análise de um contêiner, os metadados relativos ao instantâneo atual dos dados no armazenamento de análise são buscados para o Spark para uma aplicação eficiente da análise subsequente. É importante observar que, como o Spark segue uma política de avaliação lenta, a menos que uma ação seja invocada no DataFrame do Spark ou uma consulta SparkSQL seja executada na tabela Spark, os dados reais não são buscados no repositório analítico do contêiner subjacente.

No caso de carregamento para o DataFrame do Spark, os metadados buscados são armazenados em cache durante o tempo de vida da sessão do Spark. Portanto, as ações seguintes invocadas no DataFrame são avaliadas no instantâneo do armazenamento de análise no momento da criação do DataFrame.

Por outro lado, no caso de criar uma tabela spark, os metadados do estado do repositório analítico não são armazenados em cache no Spark e são recarregados em cada execução de consulta SparkSQL na tabela Spark.

Portanto, você pode escolher entre carregar para o DataFrame do Spark e criar uma tabela do Spark dependendo de se deseja que sua análise do Spark seja avaliada em um instantâneo fixo do armazenamento de análise ou no instantâneo mais recente do armazenamento de análise, respectivamente.

Se suas consultas analíticas têm filtros usados com frequência, você tem a opção de particionar com base nesses campos para melhorar o desempenho da consulta. Para disparar o particionamento no armazenamento analítico, você pode executar periodicamente o trabalho de particionamento de um notebook do Azure Synapse Spark. Esse armazenamento particionado aponta para a ADLS Gen2 de armazenamento primário que está vinculada ao workspace do Azure Synapse. Para saber mais, confira os artigos Introdução ao particionamento personalizado e Como configurar o particionamento personalizado.

Observação

Para consultar contas do Azure Cosmos DB for MongoDB, saiba mais sobre a representação de esquema de fidelidade completa no armazenamento de análise e os nomes de propriedade estendida a serem usados.

Observação

Observe que todas as options nos comandos abaixo diferenciam maiúsculas de minúsculas. Por exemplo, você deve usar Gateway, enquanto gateway retornará um erro.

Carregar para o DataFrame do Spark

Neste exemplo, você criará um DataFrame do Spark que aponta para o armazenamento de análise do Azure Cosmos DB. Em seguida, você pode executar outra análise invocando ações do Spark no DataFrame. Esta operação não afeta o repositório transacional.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Criar tabela do Spark

Neste exemplo, você criará uma tabela do Spark que aponta o armazenamento de análise do Azure Cosmos DB. Em seguida, você pode executar outra análise invocando consultas SparkSQL na tabela. Essa operação não afeta o armazenamento transacional nem gera nenhuma movimentação de dados. Se você decidir excluir essa tabela do Spark, o contêiner subjacente do Azure Cosmos DB e o repositório analítico correspondente não serão afetados.

Esse cenário é conveniente para reutilizar tabelas do Spark por meio de ferramentas de terceiros e fornecer acessibilidade aos dados subjacentes para o tempo de execução.

A sintaxe para criar uma tabela do Spark é a seguinte:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Observação

Se você tiver cenários em que o esquema do contêiner do Azure Cosmos DB subjacente mude ao longo do tempo e quiser que o esquema atualizado se reflita automaticamente nas consultas na tabela do Spark, poderá conseguir isso definindo a opção spark.cosmos.autoSchemaMerge como true nas opções da tabela do Spark.

Gravar o DataFrame do Spark no contêiner do Azure Cosmos DB

Neste exemplo, você escreverá um DataFrame do Spark em um contêiner do Azure Cosmos DB. Esta operação afetará o desempenho de cargas de trabalho transacionais e consumirá as unidades de solicitação provisionadas no contêiner do Azure Cosmos DB ou no banco de dados compartilhado.

A sintaxe no Python seria a seguinte:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Carregar DataFrame de streaming do contêiner

Nesse gesto, você usará o recurso de streaming do Spark para carregar dados de um contêiner para um dataframe. Os dados serão armazenados na conta principal do data lake (e do sistema de arquivos) que você conectou ao workspace.

Observação

Se você estiver procurando fazer referência a bibliotecas externas no Apache Spark do Synapse, saiba mais aqui. Por exemplo, se você estiver procurando ingerir um DataFrame do Spark em um contêiner do Azure Cosmos DB para MongoDB, poderá usar o conector do MongoDB para Spark.

Carregar DataFrame de streaming do contêiner do Azure Cosmos DB

Neste exemplo, você usará o recurso de streaming estruturado do Spark para carregar dados de um contêiner do Azure Cosmos DB em um DataFrame de streaming do Spark usando a funcionalidade de feed de alterações no Azure Cosmos DB. Os dados do ponto de verificação usados pelo Spark serão armazenados na conta principal do data lake (e sistema de arquivos) que você conectou ao workspace.

Se a pasta /localReadCheckpointFolder não for criada (no exemplo abaixo), ela será criada automaticamente. Esta operação afetará o desempenho de cargas de trabalho transacionais e consumirá as Unidades de Solicitação provisionadas no contêiner do Azure Cosmos DB ou no banco de dados compartilhado.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Gravar DataFrame de streaming para o contêiner do Azure Cosmos DB

Neste exemplo, você escreverá um DataFrame de streaming em um contêiner do Azure Cosmos DB. Esta operação afetará o desempenho de cargas de trabalho transacionais e consumirá as Unidades de Solicitação provisionadas no contêiner do Azure Cosmos DB ou no banco de dados compartilhado. Se a pasta /localWriteCheckpointFolder não for criada (no exemplo abaixo), ela será criada automaticamente.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Próximas etapas