A ingestão de streaming é útil para carregar dados quando você precisa de baixa latência entre a ingestão e a consulta. Considere o uso da ingestão de streaming nos seguintes cenários:
- É necessária uma latência inferior a um segundo.
- Para otimizar o processamento operacional de muitas tabelas onde o fluxo de dados em cada tabela é relativamente pequeno (alguns registros por segundo), mas o volume geral de ingestão de dados é alto (milhares de registros por segundo).
Se o fluxo de dados em cada tabela for alto (mais de 4 GB por hora), considere usar a ingestão em lote.
Para saber mais sobre os diferentes métodos de ingestão, consulte Visão geral da ingestão de dados.
Escolha o tipo apropriado de ingestão em streaming
São suportados dois tipos de ingestão de streaming:
| Tipo de ingestão |
Descrição |
| Conexão de dados |
As conexões de dados do Hub de Eventos, do Hub IoT e da Grade de Eventos podem usar a ingestão de streaming, desde que esteja habilitada no nível do cluster. A decisão de usar a ingestão de streaming é feita de acordo com a política de ingestão de streaming configurada na tabela de destino. Para obter informações sobre como gerenciar conexões de dados, consulte Hub de Eventos, Hub IoT e Grade de Eventos. |
|
Ingestão personalizada |
A ingestão personalizada requer que você escreva um aplicativo que use uma das bibliotecas de cliente do Azure Data Explorer. Use as informações neste tópico para configurar a ingestão personalizada. Você pode também achar útil o aplicativo de exemplo de ingestão de streaming C#. |
Use a tabela a seguir para ajudá-lo a escolher o tipo de ingestão apropriado para seu ambiente:
| Critério |
Conexão de dados |
Ingestão Personalizada |
| Atraso de dados entre o início da ingestão e os dados disponíveis para consulta |
Maior atraso |
Menor atraso |
| Despesas gerais de desenvolvimento |
Configuração rápida e fácil, sem sobrecarga de desenvolvimento |
Alta sobrecarga de desenvolvimento para criar um aplicativo ingerir os dados, manipular erros e garantir a consistência dos dados |
Observação
Você pode gerenciar o processo para habilitar e desabilitar a ingestão de streaming em seu cluster usando o portal do Azure ou programaticamente em C#. Se você estiver usando C# para seu aplicativo personalizado, você pode achar mais conveniente usando a abordagem programática.
Pré-requisitos
Os principais contribuintes que podem impactar a ingestão de streaming são:
-
Tamanho da VM e do cluster: o desempenho e a capacidade de ingestão de streaming são dimensionados com tamanhos de VM e cluster aumentados. O número de solicitações de ingestão simultâneas é limitado a seis por núcleo. Por exemplo, para 16 SKUs principais, como D14 e L16, a carga máxima suportada é de 96 solicitações de ingestão simultâneas. Para duas SKUs principais, como D11, a carga máxima suportada é de 12 solicitações de ingestão simultâneas.
-
Limite de tamanho de dados: o limite de tamanho de dados para uma solicitação de ingestão de streaming é de 4 MB. Isso inclui todos os dados criados no contexto de políticas de atualização durante o processo de ingestão.
-
Atualizações de esquema: as atualizações de esquema, como a criação e modificação de tabelas e mapeamentos de ingestão, podem levar até cinco minutos para o serviço de ingestão de streaming. Para obter mais informações, consulte Ingestão de streaming e alterações de esquema.
-
Capacidade SSD: Permitir a ingestão de dados em streaming num cluster, mesmo quando os dados não são ingeridos por streaming, utiliza parte do disco SSD local das máquinas do cluster para dados de ingestão em streaming e reduz o armazenamento disponível para cache ativo.
Ativar a ingestão de streaming no seu cluster
Antes de poder usar a ingestão de streaming, você deve habilitar o recurso em seu cluster e definir uma política de ingestão de streaming. Você pode habilitar o recurso ao criar o cluster ou adicioná-lo a um cluster existente.
Advertência
Analise as limitações antes de ativar a ingestão de streaming.
Habilitar a ingestão de streaming ao criar um novo cluster
Você pode habilitar a ingestão de streaming ao criar um novo cluster usando o portal do Azure ou programaticamente em C#.
Para habilitar a ingestão de streaming durante a criação de um novo cluster do Azure Data Explorer, execute o seguinte código:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Habilitar a ingestão de streaming em um cluster existente
Se você tiver um cluster existente, poderá habilitar a ingestão de streaming usando o portal do Azure ou programaticamente em C#.
No portal do Azure, vá para o cluster do Azure Data Explorer.
Em Configurações, selecione Configurações.
No painel Configurações , selecione Ativado para ativar a ingestão de streaming.
Selecione Guardar.
Você pode habilitar a ingestão de streaming ao criar um novo cluster do Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Criar uma tabela de destino e definir a política
Crie uma tabela para receber os dados de ingestão de streaming e defina sua política relacionada usando o portal do Azure ou programaticamente em C#.
No portal do Azure, navegue até o cluster.
Selecione Consulta.
Para criar a tabela que receberá os dados por ingestão de streaming, copie o seguinte comando no painel Consulta e selecione Executar.
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Copie um dos seguintes comandos para o painel Consulta e selecione Executar. Isso define a política de ingestão de streaming na tabela que você criou ou no banco de dados que contém a tabela.
Sugestão
Uma política definida no nível do banco de dados se aplica a todas as tabelas existentes e futuras no banco de dados. Quando você habilita a política no nível do banco de dados, não há necessidade de habilitá-la por tabela.
Para definir a política na tabela que você criou, use:
.alter table TestTable policy streamingingestion enable
Para definir a política no banco de dados que contém a tabela que você criou, use:
.alter database StreamingTestDb policy streamingingestion enable
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Criar um aplicativo de ingestão de streaming para ingerir dados para seu cluster
Crie seu aplicativo para ingerir dados para seu cluster usando seu idioma preferido.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Desativar a ingestão de streaming no cluster
Advertência
Desativar a ingestão de streaming pode levar algumas horas.
Antes de desabilitar a ingestão de streaming em seu cluster do Azure Data Explorer, remova a política de ingestão de streaming de todas as tabelas e bancos de dados relevantes. A remoção da política de ingestão de streaming aciona a reorganização de dados dentro do cluster do Azure Data Explorer. Os dados de ingestão de fluxo contínuo são movidos do armazenamento inicial para o armazenamento permanente na base de dados colunar (extensões ou fragmentos). Esse processo pode levar entre alguns segundos a algumas horas, dependendo da quantidade de dados no armazenamento inicial.
Abandonar a política de ingestão de streaming
Você pode descartar a política de ingestão de streaming usando o portal do Azure ou programaticamente em C#.
No portal do Azure, vá para o cluster do Azure Data Explorer e selecione Consulta.
Para soltar a política de ingestão de streaming da tabela, copie o seguinte comando no painel Consulta e selecione Executar.
.delete table TestTable policy streamingingestion
Em Configurações, selecione Configurações.
No painel Configurações , selecione Desativado para desativar a ingestão de streaming.
Selecione Guardar.
Para remover a política de ingestão de streaming da tabela, execute o seguinte código:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Para desativar a ingestão de streaming no cluster, execute o seguinte código:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Limitações
-
Os mapeamentos de dados devem ser pré-criados para uso na ingestão de streaming. As solicitações individuais de ingestão de streaming não acomodam mapeamentos de dados embutidos.
-
As tags de extensão não podem ser definidas nos dados de ingestão de streaming.
-
Política de atualização. A política de atualização pode fazer referência apenas aos dados recém-ingeridos na tabela de origem e não a quaisquer outros dados ou tabelas no banco de dados.
- Se a ingestão de streaming estiver habilitada em um cluster usado como líder para bancos de dados de seguidores, a ingestão de streaming também deverá ser habilitada nos clusters a seguir para seguir os dados de ingestão de streaming. O mesmo se aplica se os dados do cluster forem compartilhados por meio do Compartilhamento de Dados.
Próximos passos