在同一集群上运行多个结构化流处理查询

许多客户在同一Azure Databricks群集上运行多个结构化流式处理查询。 尽管支持此模式,Databricks 建议限制每个群集的查询数,以避免缩放问题和性能瓶颈。 在无服务器计算中,Azure Databricks自动管理缩放,因此会为你处理这些注意事项。 如果您使用的是经典计算资源,并且可以控制驱动程序和执行器的资源规格,本页将介绍需要重点关注的关键瓶颈以及解决这些瓶颈的方法。

注释

Databricks 建议将 Lakeflow Spark 声明性管道用于新的流式处理工作负载,从而自动管理基础结构复杂性。 请参阅 Lakeflow Spark 声明式管道

何时在同一群集上使用多个查询

在同一群集上运行多个流式处理查询可降低基础结构成本,尤其是在有许多不需要专用计算的小流时。 关键的权衡在于故障共享:如果集群发生故障,其上的所有流都会失败。 对于任务关键型管道,共享故障模式通常不能接受。

对于混合关键流和非关键流的工作负荷,Databricks 建议执行以下操作:

  • 根据每个流的业务影响为每个流分配优先级。
  • 将任务关键型流放置在专用群集上,即使成本更高。
  • 共同定位低优先级流以共享计算并降低成本。

驱动器选型

驱动程序是共享资源。 多个查询共享相同的 CPU、内存、DAG 调度器、任务调度器以及驱动端 UDF 执行(例如,foreachBatch)。 运行多个并发流时,请观察超出标准 CPU 和内存预配的这些特定瓶颈:

  • 自动加载程序开销:如果流使用自动加载程序,文件发现和目录列表会增加驱动程序压力。
  • OS 级资源限制(打开文件):在单个驱动程序上同时运行大量基于文件的流(例如 FileStreamSource 或自动加载程序),可能会耗尽用户级打开的文件描述符限制,这可能导致随机流故障。
  • 侦听器总线反压:大量并发流式处理查询可能会导致单个 Spark 会话总线 StreamingQueryListener 出现反压。 所有事件(包括 onQueryIdle)都发送到此单一总线,大型事件积压工作可能会严重延迟异步 onQueryProgress 处理程序并影响群集稳定性。
  • 昂贵的驱动程序操作:避免在驱动程序上调用 collect() 或其他昂贵的 DataFrame 操作,除非绝对必要,以避免具体化大型结果集并导致内存不足(OOM)错误。

驱动程序争用疑难解答

如果您因 OOM 或争用问题而遇到驱动程序崩溃:

  1. 监视 Spark UI 中的驱动程序指标。 如果发现 CPU、内存或磁盘使用率较高,请在群集计算设置中调整驱动程序规格。
  2. 如果问题仍然存在,请验证代码是否未在驱动程序上运行内存密集型操作或 UDF。
  3. 如果无法进一步纵向缩放驱动程序,Databricks 强烈建议跨多个群集拆分作业,以绕过这些共享节点缩放瓶颈。

执行器容量规划

在同一群集上运行多个查询时,所有查询都共享执行程序上的任务槽。 一个查询中的阶段可能会占用可用槽位,从而导致其他查询出现延迟和饥饿。 Spark 使用任务槽和可用核心之间的 1:1 映射。 如果需要并发运行查询,请确保有足够的核心可用。

通常,执行程序可能会执行比驱动程序节点更多的内存密集型操作。 如果需要处理应用程序负载,请优化执行程序 JVM 和堆外内存分配参数。 确保在 CPU、内存和磁盘空间方面适当调整执行程序节点的大小,并根据需要垂直缩放。 如果无法进行垂直缩放,请考虑向群集添加其他工作器节点。

注释

其中一些更改可能需要重启群集才能生效。

使用调度程序池

可以配置调度池,以便在从同一源代码运行多个流式查询时,将计算能力分配给查询。

默认情况下,笔记本中启动的所有查询都在同一个公平调度池中运行。 Apache Spark 作业是由笔记本中所有流式查询的触发器生成的,它们会按照“先进先出 (FIFO)”的顺序一个接一个地运行。 这可能会导致查询中不必要的延迟,因为它们无法有效地共享群集资源。

调度池允许您声明哪些结构化流处理查询共享计算资源。

以下示例将query1分配到专用池,而query2query3共享一个计划程序池。

:::note 无服务器兼容性

Databricks 建议退出 spark.sparkContext ,因为它与 Databricks 无服务器计算体系结构不兼容。 请直接使用 spark (SparkSession)。 计划程序池是经典计算概念;在无服务器时,Databricks 会自动管理缩放和资源分配。

:::

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注释

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关公平计划程序池的详细信息,请参阅 Apache Spark 公平计划程序文档

有状态查询注意事项

对于在同一群集上运行的有状态查询,请记住以下几点:

  • 使用 RocksDB 作为状态存储提供程序 ,以避免 OOM 问题和 GC 暂停。 RocksDB 是 Databricks Runtime 17.3 及更高版本中的默认状态存储提供程序。 请参阅 在 Azure Databricks 上配置 RocksDB 状态存储
  • 根据应用程序的需求调整 Shuffle 分区。 对于有状态阶段,Spark 按 Shuffle 分区的数量成比例地调度任务。
  • 按每个节点限制 RocksDB 内存占用,以避免因堆外内存使用而导致 OOM 错误。 这在 Databricks Runtime 17.3 及更高版本中自动处理,但需要在早期版本中手动配置。 请参阅 Cap RocksDB 内存使用情况
  • 避免在同一执行程序节点上打包过多分区。 状态存储上的维护操作(包括快照上传和清理)按节点运行。 将过多分区分配给单个执行器节点,可能会因可用的完整快照较少而导致维护任务得不到足够资源,并延长恢复时间。