Partilhar via


Funções definidas pelo usuário no Databricks Connect for Scala

Observação

Este artigo aborda o Databricks Connect for Databricks Runtime 14.1 e superior.

O Databricks Connect for Scala suporta a execução de funções definidas pelo usuário (UDFs) em clusters Databricks a partir do seu ambiente de desenvolvimento local.

Esta página descreve como executar funções definidas pelo usuário com o Databricks Connect for Scala.

Para a versão Python deste artigo, consulte funções definidas pelo usuário no Databricks Connect for Python.

Carregar classe compilada e JARs

Para que as UDFs funcionem, as classes compiladas e os JARs devem ser carregados no cluster usando a addCompiledArtifacts() API.

Observação

O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do sistema" para obter a versão do Databricks Runtime do cluster em Versões e compatibilidade das notas de versão do Databricks Runtime.

O programa Scala a seguir configura um UDF simples que quadra valores em uma coluna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI aponta para o mesmo local que a saída compilada do projeto (por exemplo, destino/classes ou o JAR construído). Todas as classes compiladas são enviadas para o Databricks, não apenas Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando a API spark.addArtifact().

Observação

Ao carregar JARs, é necessário incluir todos os JARs de dependência transitiva para o carregamento. As APIs não executam nenhuma deteção automática de dependências transitivas.

UDFs com dependências de terceiros

Se adicionaste uma dependência de Maven em build.sbt que é usada num UDF, mas não está disponível no cluster Databricks, por exemplo:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Use spark.addArtifact() com ivy:// para baixar dependências a partir do Maven:

  1. Adicionar a oro biblioteca ao ficheiro build.sbt

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Adicione o artefato depois de criar a sessão com a addArtifact() API:

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

APIs de conjunto de dados tipadas

As APIs de conjunto de dados tipadas permitem executar transformações como map(), filter(), mapPartitions()e agregações nos conjuntos de dados resultantes. O carregamento da classe compilada e dos JARs para o cluster usando a addCompiledArtifacts() API também se aplica a eles, portanto, seu código deve se comportar de forma diferente dependendo de onde ele é executado:

  • Desenvolvimento local com Databricks Connect: Carregue artefatos para o cluster remoto.
  • Implantado em Databricks em execução no cluster: Não há necessidade de carregar nada porque as classes já estão lá.

O aplicativo Scala a seguir usa a API map() para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dependências JAR externas

Se você estiver usando uma biblioteca privada ou de terceiros que não esteja no cluster:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Carregue JARs externos da sua lib/ pasta ao criar a sessão:

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

Isso carrega automaticamente todos os JARs em seu diretório lib/ para o Databricks quando executado localmente.

Projetos com múltiplos módulos

Em um projeto SBT multimódulo, getClass.getProtectionDomain.getCodeSource.getLocation.toURI retorna apenas a localização do módulo atual. Se o UDF usar classes de outros módulos, obterás ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Use getClass numa classe em cada módulo para obter todas as suas localizações e carregá-las separadamente.

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}