Partilhar via


Substituir dados seletivamente com o Delta Lake

O Azure Databricks aproveita a funcionalidade Delta Lake para dar suporte a duas opções distintas para substituições seletivas:

  • A replaceWhere opção substitui atomicamente todos os registros que correspondem a um determinado predicado.
  • Você pode substituir diretórios de dados com base em como as tabelas são particionadas usando substituições de partição dinâmica.

Para a maioria das operações, o Databricks recomenda o uso replaceWhere para especificar quais dados devem ser substituídos.

Important

Se os dados tiverem sido substituídos acidentalmente, você poderá usar restaurar para desfazer a alteração.

Substituição seletiva arbitrária com replaceWhere

Você pode substituir seletivamente apenas os dados que correspondem a uma expressão arbitrária.

Note

SQL requer Databricks Runtime 12.2 LTS ou superior.

O comando a seguir substitui atomicamente os eventos de janeiro na tabela de destino, que é particionada por start_date, pelos dados em replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Este código de exemplo grava os dados no , valida que todas as linhas correspondem replace_dataao predicado e executa uma substituição atômica usando overwrite semântica. Se quaisquer valores na operação estiverem fora da restrição, essa operação falhará com um erro por padrão.

Você pode alterar esse comportamento para overwrite valores dentro do intervalo de predicados e insert registros que estão fora do intervalo especificado. Para fazer isso, desative a verificação de restrição definindo spark.databricks.delta.replaceWhere.constraintCheck.enabled como false usando uma das seguintes configurações:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamento herdado

Na configuração padrão herdada, replaceWhere substituía dados que correspondiam a um predicado apenas nas colunas de partição. Com esse modelo herdado, o comando a seguir substituiria atomicamente o mês de janeiro na tabela de destino, que é particionada por date, com os dados em df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Se você quiser voltar ao comportamento antigo, você pode desativar o spark.databricks.delta.replaceWhere.dataColumns.enabled sinalizador:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Substituições de partição dinâmica

A sobrecarga dinâmica de partições atualiza apenas as partições para as quais a gravação introduz novos dados. Ele substitui todos os dados existentes nessas partições e deixa outros inalterados.

O Azure Databricks suporta duas abordagens:

  • REPLACE USING (recomendado) - Funciona em todos os tipos de computação, incluindo armazéns SQL Databricks, computação sem servidor e computação clássica. Não requer a definição de uma configuração de sessão do Spark.
  • partitionOverwriteMode (legado) - Requer computação clássica e configuração de uma configuração de sessão do Spark. Não suportado em Databricks SQL ou computação sem servidor.

As seções abaixo demonstram como usar cada abordagem.

A partição dinâmica substitui com REPLACE USING

O Databricks Runtime 16.3 e acima suporta substituições de partição dinâmica para tabelas particionadas usando REPLACE USING. Esse método permite substituir seletivamente os dados em todos os tipos de computação, sem a necessidade de definir uma configuração de sessão do Spark. REPLACE USING permite um comportamento de substituição atómica independente de computação, que funciona em armazéns de dados SQL do Databricks, em computação sem servidor e em computação clássica.

REPLACE USING apenas substitui as partições alvo dos dados recebidos. Todas as outras partições permanecem inalteradas.

O exemplo a seguir demonstra como se usa a substituição dinâmica de partição com REPLACE USING. Atualmente, você só pode usar SQL, não Python ou Scala. Para obter detalhes, consulte INSERT a referência da linguagem SQL.

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Tenha em mente as seguintes restrições e comportamentos para substituições de partições dinâmicas:

  • Você deve especificar o conjunto completo das colunas de partição da tabela na USING cláusula.
  • Verifique sempre que os dados escritos abrangem apenas as partições esperadas. Uma única linha na partição errada pode sobrepor involuntariamente toda a partição.

Se você precisar de uma lógica de correspondência mais personalizável do que a suportada REPLACE USING , como tratar NULL valores como iguais, use o complementar REPLACE ON . Consulte INSERT para obter detalhes.

A partição dinâmica substitui por partitionOverwriteMode (legado)

Important

Este recurso está no Public Preview.

O Databricks Runtime 11.3 LTS e superior suporta a substituição dinâmica de partições para tabelas particionadas usando o modo de substituição: INSERT OVERWRITE em SQL ou em uma escrita de DataFrame com df.write.mode("overwrite"). Esse tipo de substituição só está disponível para computação clássica, não para armazéns SQL Databricks ou computação sem servidor.

Configure o modo de substituição dinâmica de partições ajustando a configuração da sessão do Spark de spark.sql.sources.partitionOverwriteMode para dynamic. Como alternativa, você pode definir a DataFrameWriter opção partitionOverwriteMode como dynamic. Se presente, a opção específica da consulta substitui o modo definido na configuração da sessão. O padrão para spark.sql.sources.partitionOverwriteMode é static.

O exemplo a seguir demonstra o uso de partitionOverwriteMode:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Tenha em mente as seguintes restrições e comportamentos para partitionOverwriteMode:

  • Não é possível definir overwriteSchema como true.
  • Não é possível especificar ambos partitionOverwriteMode e replaceWhere na mesma DataFrameWriter operação.
  • Se você especificar uma replaceWhere condição usando uma DataFrameWriter opção, o Delta Lake aplicará essa condição para controlar quais dados serão substituídos. Esta opção tem precedência sobre a configuração a nível de sessão partitionOverwriteMode.
  • Verifique sempre que os dados escritos abrangem apenas as partições esperadas. Uma única linha na partição errada pode sobrepor involuntariamente toda a partição.