Compartilhar via


Ingerir dados usando a biblioteca python do Azure Data Explorer

Neste artigo, você ingerirá dados usando a biblioteca Python do Azure Data Explorer. O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados telemétricos e de log. O Azure Data Explorer fornece duas bibliotecas de clientes para Python: uma biblioteca de ingestão e uma biblioteca de dados. Essas bibliotecas permitem ingerir ou carregar dados em um cluster e consultar dados do código.

Primeiro, crie uma tabela e um mapeamento de dados em um cluster. Depois, você enfileira a ingestão no cluster e valida os resultados.

Pré-requisitos

Instalar as bibliotecas de dados e ingestão

Instale azure-kusto-data e azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Adicionar instruções de importação e constantes

Importar classes a partir de azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Para autenticar um aplicativo, o Azure Data Explorer usa sua ID de locatário do Microsoft Entra. Para localizar sua ID de locatário, use a URL a seguir, substituindo seu domínio por YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o domínio for contoso.com, a URL será: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique nesta URL para ver os resultados; a primeira linha é a seguinte.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

A ID do locatário nesse caso é aaaabbbb-0000-cccc-1111-dddd2222eeee. Defina os valores para AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE antes de executar esse código.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Agora construa a cadeia de conexão. O exemplo a seguir usa a autenticação de dispositivo para acessar o cluster. Você também pode usar a autenticação de identidade gerenciada , o certificado do aplicativo Microsoft Entra, a chave de aplicativo do Microsoft Entra e o usuário e a senha do Microsoft Entra.

Você cria a tabela de destino e o mapeamento em uma etapa posterior.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Definir informações do arquivo de origem

Importe classes adicionais e defina constantes para o arquivo de fonte de dados. Este exemplo usa um arquivo de exemplo hospedado no Armazenamento de Blobs do Azure. O conjunto de dados de exemplo StormEvents contém dados relacionados ao clima dos Centros Nacionais de Informações Ambientais.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Criar uma tabela em seu cluster

Crie uma tabela que corresponda ao esquema dos dados no arquivo StormEvents.csv. Quando esse código é executado, ele retorna uma mensagem como a seguinte mensagem: Para entrar, use um navegador da Web para abrir a página https://microsoft.com/devicelogin e insira o código F3W4VWZDM para autenticar. Siga as etapas para entrar e, em seguida, retorne para executar o próximo bloco de código. Os blocos de código subsequentes que fazem uma conexão exigem que você entre novamente.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definir mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de coluna e os tipos de dados usados ao criar a tabela. Isso mapeia campos de dados de origem para colunas de tabela de destino

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Enfileirar uma mensagem para ingestão

Enfileira uma mensagem para extrair dados do armazenamento de blobs e ingerir esses dados no Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://v4.hkg1.meaqua.org/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Consultar dados que foram ingeridos na tabela

Aguarde de cinco a dez minutos para que o agendamento da ingestão enfileirada se conclua e os dados sejam carregados no Azure Data Explorer. Em seguida, execute o código a seguir para obter a contagem de registros na tabela StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Executar consultas de solução de problemas

https://dataexplorer.azure.com Entre e conecte-se ao cluster. Execute o comando a seguir no banco de dados para ver se houve falhas de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o comando a seguir para exibir o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar os recursos

Se você planeja seguir nossos outros artigos, mantenha os recursos criados. Caso contrário, execute o seguinte comando em seu banco de dados para limpar a tabela StormEvents.

.drop table StormEvents

Próxima etapa