Compartilhar via


Funções definidas pelo usuário no Databricks Connect para Scala

Observação

Este artigo aborda o Databricks Connect para Databricks Runtime 14.1 e superior.

O Databricks Connect para Scala dá suporte à execução de UDFs (funções definidas pelo usuário) em clusters do Databricks em seu ambiente de desenvolvimento local.

Esta página descreve como executar funções definidas pelo usuário com o Databricks Connect para Scala.

Para obter a versão do Python deste artigo, consulte funções definidas pelo usuário no Databricks Connect para Python.

Carregar classe compilada e JARs

Para que as UDFs funcionem, as classes compiladas e os JARs devem ser carregados no cluster usando a addCompiledArtifacts() API.

Observação

O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do Sistema" para a versão do Databricks Runtime do cluster nas notas de versão do Databricks Runtime e compatibilidade.

O programa Scala a seguir configura um UDF simples que coloca valores quadrados em uma coluna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI aponta para o mesmo local que a saída compilada do projeto (por exemplo, destino/classes ou o JAR compilado). Todas as classes compiladas são carregadas no Databricks, não apenas Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregadas usando a spark.addArtifact() API.

Observação

Ao carregar JARs, todos os JARs de dependência transitiva devem ser incluídos no upload. As APIs não executam nenhuma detecção automática de dependências transitivas.

UDFs com dependências de terceiros

Se você adicionou uma dependência do Maven build.sbt que é usada em um UDF, mas não está disponível no cluster do Databricks, por exemplo:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Use spark.addArtifact() com ivy:// para baixar dependências do Maven.

  1. Adicionar a oro biblioteca ao seu build.sbt arquivo

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Adicione o artefato depois de criar a sessão com a addArtifact() API:

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

APIs do conjunto de dados tipado

As APIs de conjunto de dados tipada permitem executar transformações como map(), filter(), mapPartitions() e agregações em conjuntos de dados resultantes. Carregar a classe compilada e os JARs no cluster usando a addCompiledArtifacts() API também se aplica a eles, portanto, seu código deve se comportar de forma diferente dependendo de onde ele é executado:

  • Desenvolvimento local com o Databricks Connect: carregar artefatos no cluster remoto.
  • Implantado no Databricks em execução no cluster: não é necessário carregar nada porque as classes já estão lá.

O aplicativo Scala a seguir usa a map() API para modificar um número em uma coluna de resultado para uma cadeia de caracteres prefixada.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dependências JAR externas

Se você estiver usando uma biblioteca privada ou de terceiros que não esteja no cluster:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Carregue JARs externos da pasta lib/ ao criar a sessão:

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

Isso carrega automaticamente todos os JARs em seu diretório/lib para o Databricks ao executar localmente.

Projetos com vários módulos

Em um projeto SBT de vários módulos, getClass.getProtectionDomain.getCodeSource.getLocation.toURI retorna apenas a localização do módulo atual. Se o UDF usar classes de outros módulos, você obterá ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Use getClass de uma classe em cada módulo para obter todas as localizações e carregá-los separadamente.

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}