Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Inferir e evoluir o esquema usando
Importante
Esta funcionalidade está em Pré-visualização Pública.
Este artigo descreve como inferir e evoluir o esquema de blobs JSON com a from_json função SQL em Lakeflow Spark Declarative Pipelines.
Visão geral
A from_json função SQL analisa uma coluna de cadeia de caracteres JSON e retorna um valor struct. Quando usado fora de um pipeline, você deve fornecer explicitamente o esquema do valor retornado usando o schema argumento. Quando usado com Lakeflow Spark Declarative Pipelines, você pode habilitar a inferência e evolução de esquema, que gerencia automaticamente o esquema do valor retornado. Esse recurso simplifica a configuração inicial (especialmente quando o esquema é desconhecido) e as operações contínuas quando o esquema muda com freqüência. Ele permite o processamento contínuo de blobs JSON arbitrários de fontes de dados de streaming, como Auto Loader, Kafka ou Kinesis.
Especificamente, quando usado em um pipeline, a inferência de esquema e a evolução para a from_json função SQL podem:
- Detetar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
- Inferir os tipos de campo e mapeá-los para os tipos de dados apropriados do Spark
- Evolua automaticamente o esquema para acomodar novos campos
- Manipular automaticamente os dados que não estão em conformidade com o esquema atual
Sintaxe: inferir e evoluir automaticamente o esquema
Para habilitar a inferência de esquema com from_json em um pipeline, defina o esquema como NULL e especifique a schemaLocationKey opção. Isso permite inferir e acompanhar o esquema.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Uma consulta pode ter várias from_json expressões, mas cada expressão deve ter um schemaLocationKey único. O schemaLocationKey também deve ser único por pipeline.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintaxe: Esquema fixo
Se quiser impor um esquema específico, você pode usar a sintaxe a seguir from_json para analisar a cadeia de caracteres JSON usando esse esquema:
from_json(jsonStr, schema, [, options])
Essa sintaxe pode ser usada em qualquer ambiente do Azure Databricks, incluindo Lakeflow Spark Declarative Pipelines. Mais informações estão disponíveis aqui.
Inferência de esquema
from_json infere o esquema do primeiro lote de colunas de dados JSON e indexa-o internamente pelo seu schemaLocationKey (obrigatório).
Se a cadeia de caracteres JSON for um único objeto (por exemplo, {"id": 123, "name": "John"}), from_json inferirá um esquema do tipo STRUCT e adicionará um rescuedDataColumn à lista de campos.
STRUCT<id LONG, name STRING, _rescued_data STRING>
No entanto, se a string JSON tiver um array de nível de topo (como ["id": 123, "name": "John"]), então from_json encapsula o ARRAY em um STRUCT. Essa abordagem permite resgatar dados que são incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas a jusante.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Sobrepor inferência de esquema usando diretrizes de esquema
Opcionalmente, você pode fornecer schemaHints para influenciar como from_json infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro). Você pode fornecer uma quantidade arbitrária de sugestões sobre tipos de dados de coluna usando a sintaxe de especificação de esquema SQL. A semântica das dicas de esquema é a mesma das dicas de esquema do Carregador Automático. Por exemplo:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Quando a cadeia de caracteres JSON contém uma ARRAY de nível superior, ela é encapsulada em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma cadeia de caracteres JSON com uma matriz de nível superior, como:
[{"id": 123, "name": "John"}]
O esquema ARRAY inferido é encapsulado em um STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Para alterar o tipo de dados de id, especifique a dica de id esquema como STRING. Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col DOUBLE. Devido a essas dicas, o esquema para a matriz JSON de nível superior se torna:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolua o esquema usando schemaEvolutionMode
from_json Deteta a adição de novas colunas à medida que processa seus dados. Quando from_json deteta um novo campo, ele atualiza o esquema inferido com o esquema mais recente mesclando novas colunas no final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.
from_json suporta os seguintes modos para evolução de esquema, que você define usando a configuração opcional schemaEvolutionMode . Estes modos são consistentes com o Auto Loader.
schemaEvolutionMode |
Comportamento ao ler uma nova coluna |
|---|---|
addNewColumns (padrão) |
A transmissão falha. Novas colunas são adicionadas ao esquema. As colunas existentes não evoluem os tipos de dados. |
rescue |
O esquema nunca é evoluído e o fluxo não falha devido a alterações de esquema. Todas as novas colunas são registradas na coluna de dados resgatados. |
failOnNewColumns |
A transmissão falha. O fluxo não é reiniciado a menos que os schemaHints dados sejam atualizados ou os dados ofensivos sejam removidos. |
none |
Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados, a menos que a rescuedDataColumn opção seja definida. O fluxo não falha devido a alterações de esquema. |
Por exemplo:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Coluna de dados resgatados
Uma coluna de dados resgatados é adicionada automaticamente ao seu esquema como _rescued_data. Você pode renomear a coluna definindo a rescuedDataColumn opção. Por exemplo:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Quando você opta por usar a coluna de dados resgatados, todas as colunas que não correspondem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipo de dados, uma coluna ausente no esquema ou uma diferença na capitalização do nome da coluna.
Lidar com registros corrompidos
Para armazenar registros malformados e que não podem ser analisados, adicione uma _corrupt_record coluna definindo dicas de esquema, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Para renomear a coluna de registro corrompido, defina a columnNameOfCorruptRecord opção.
O analisador JSON suporta três modos para lidar com registros corrompidos:
| Mode | Description |
|---|---|
PERMISSIVE |
Para registros corrompidos, coloca a cadeia de caracteres malformada em um campo configurado por columnNameOfCorruptRecord e define campos malformados como null. Para manter registos corrompidos, podes definir um campo de tipo string nomeado columnNameOfCorruptRecord num esquema definido pelo utilizador. Se um esquema não tiver o campo, os registros corrompidos serão descartados durante a análise. Ao inferir um esquema, o analisador adiciona implicitamente um columnNameOfCorruptRecord campo no esquema de saída. |
DROPMALFORMED |
Ignora registros corrompidos. Quando utiliza o modo DROPMALFORMED com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registos sejam descartados. Somente registros corrompidos são descartados, como JSON incompleto ou malformado. |
FAILFAST |
Lança uma exceção quando o analisador encontra registros corrompidos. Quando se utiliza o modo FAILFAST com rescuedDataColumn, as discrepâncias de tipo de dados não produzem um erro. Somente registros corrompidos geram erros, como JSON incompleto ou malformado. |
Consulte um campo na saída de from_json
from_json Infere o esquema durante a execução do pipeline. Se uma consulta a jusante se referir a um from_json campo antes de a função ter sido executada from_json com êxito pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise para a consulta de tabela prateada será ignorada até que a from_json função na consulta bronze tenha executado e inferido o esquema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Se a from_json função e os campos que ela infere forem referidos na mesma consulta, a análise pode falhar como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Você pode corrigir isso movendo a referência para o from_json campo em uma consulta a jusante (como o exemplo de bronze/prata acima). Como alternativa, você pode especificar schemaHints que contêm os campos referidos from_json . Por exemplo:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exemplos: Inferir e evoluir automaticamente o esquema
Esta seção fornece código de exemplo para habilitar a inferência e evolução automática de esquema usando from_json nos pipelines declarativos do Lakeflow Spark.
Criar uma tabela de streaming a partir do armazenamento de objetos na nuvem
O exemplo a seguir usa a sintaxe read_files para criar uma tabela de streaming a partir do armazenamento de objetos em nuvem.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Criar uma tabela de streaming a partir de Kafka
O exemplo a seguir usa a sintaxe read_kafka para criar uma tabela de streaming do Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exemplos: esquema fixo
Consulte, por exemplo, código que utiliza from_json com um esquema fixo na função from_json.
FAQs
Esta seção responde a perguntas freqüentes sobre inferência de esquema e suporte à evolução na from_json função.
Qual é a diferença entre from_json e parse_json?
A parse_json função retorna um VARIANT valor da cadeia de caracteres JSON.
VARIANT fornece uma maneira flexível e eficiente de armazenar dados semi-estruturados. Isso contorna a inferência e a evolução do esquema, eliminando completamente os tipos estritos. No entanto, se você quiser impor um esquema em tempo de gravação (por exemplo, porque você tem um esquema relativamente rígido), from_json pode ser uma opção melhor.
A tabela a seguir descreve as diferenças entre from_json e parse_json:
| Função | Casos de uso | Disponibilidade |
|---|---|---|
from_json |
A evolução do esquema com from_json preserva o esquema. Isso é útil quando:
|
Disponível com inferência de esquema e evolução somente em Lakeflow Spark Declarative Pipelines |
parse_json |
VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:
|
Disponível com e sem Lakeflow Spark Declarative Pipelines |
Posso usar from_json inferência de esquema e sintaxe de evolução fora dos pipelines declarativos do Lakeflow Spark?
Não, não se pode usar from_json a sintaxe de inferência de esquemas e evolução fora dos Lakeflow Spark Declarative Pipelines.
Como faço para acessar o esquema inferido por from_json?
Exiba o esquema da tabela de streaming de destino.
Posso passar from_json um esquema e também fazer evolução?
Não, você não pode passar from_json um esquema e também fazer evolução. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos pelo from_json.
O que acontece com o esquema se a tabela for totalmente atualizada?
Os localizações do esquema associadas à tabela são removidas, e o esquema é re-inferido do zero.