读取和写入协议缓冲区

协议缓冲区 (protobuf)是 Google 开发的中性二进制序列化格式。 Azure Databricks 用户最常在处理来自 Apache Kafka 等事件流系统的二进制编码记录时遇到这种情况。 Azure Databricks 支持使用 Apache Spark 通过 from_protobufto_protobuf 函数读取和写入 protobuf 数据,这些函数可在二进制 protobuf 与 Spark SQL 结构体类型之间进行转换,适用于流式和批处理工作负载。

先决条件

Protobuf 函数需要 Databricks Runtime 12.2 LTS 及更高版本。

函数语法

使用 from_protobuf 将二进制列强制转换为结构体,并使用 to_protobuf 将结构体列强制转换为二进制。 必须提供由参数标识 descFilePath 的描述符文件或由 options 参数指定的架构注册表。 有关选项的完整列表,请参阅 Protobuf

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala(编程语言)

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

选项

使用 options 参数将选项传递给 from_protobufto_protobuf。 有关支持选项的完整列表,请参阅 Protobuf

架构注册表选项

以下选项特定于架构注册表使用情况,常规选项参考中未涵盖这些选项。

选项 必须 Default Description
schema.registry.schema.evolution.mode "restart" 在传入记录中检测到较新的架构 ID 时,如何处理架构更改。 "restart"UnknownFieldException 结束查询;配置作业以在无法获取更改时重新启动。 "none" 忽略架构 ID 更改,并使用原始架构分析较新的记录。
confluent.schema.registry.<option> 使用前缀 "confluent.schema.registry" 传递任何 Confluent 架构注册表客户端选项。 例如,将"confluent.schema.registry.basic.auth.credentials.source"设置为"USER_INFO",并将"confluent.schema.registry.basic.auth.user.info"设置为"<KEY>:<SECRET>",以配置基本身份验证。

Usage

以下示例使用 Wanderbricks 数据集 演示如何使用 to_protobuf() 将 Apache Spark 结构体序列化为二进制 protobuf,并使用 from_protobuf() 反序列化二进制 protobuf 记录。

将 protobuf 与 Confluent 架构注册表配合使用

Azure Databricks 支持使用 Confluent 架构注册表来定义 Protobuf。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

Scala(编程语言)

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

向外部 Confluent 架构注册表进行身份验证

若要向外部 Confluent 架构注册表进行身份验证,请更新架构注册表选项以包含身份验证凭据和 API 密钥。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala(编程语言)

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

在 Unity Catalog 数据卷中使用信任库和密钥库文件

在 Databricks Runtime 14.3 LTS 及更高版本中,可以使用 Unity Catalog 卷中的信任存储和密钥存储文件向 Confluent 架构注册表进行身份验证。 根据以下示例更新架构注册表选项:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala(编程语言)

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

将 Protobuf 与描述符文件配合使用

还可以引用可用于计算群集的 protobuf 描述符文件。 请确保你具有读取文件的适当权限,具体取决于其位置。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

Scala(编程语言)

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

其他资源

  • 读取和写入流式处理 Avro 数据:如果流式处理工作负载使用 Avro 序列化而不是 Protobuf,请参阅 Avro 流式处理函数,了解等效 from_avro 函数和 to_avro 函数。