结构化流式处理的生产注意事项

在 Azure Databricks 上以计划的 Lakeflow 作业的形式运行生产结构化流式处理工作负荷。 请参阅 Lakeflow Jobs

Databricks 建议始终配置以下内容:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 不要使用全用途计算运行结构化流式处理工作负荷。 始终使用作业计算资源将流调度为 Lakeflow 作业。
  • 使用 Continuous 模式调度 Lakeflow 作业。 这指的是Azure Databricks作业计划功能,而不是结构化流式处理trigger 间隔
  • 不要为结构化流作业的计算启用自动缩放。

某些工作负载会受益于以下功能:

Databricks 引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道

注意

计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道群集利用率

:::note 无服务器计算

在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()

对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。

请参阅 流式处理限制

:::

设计流式处理工作负载来应对失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)要求结构化流式处理工作负载自动重试。 请参阅如何配置结构化流式处理作业以在失败时重启流式查询

有些操作(例如 foreachBatch)提供至少一次(而不是恰好一次)保证。 对于这些操作,请确保处理管道是幂等的。 请参阅使用 foreachBatch 将内容写入到任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果您的作业由于内存不足错误导致失败,或者您因微批次过大而手动取消作业,则可能需要升级计算资源,以便成功处理微批次。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业重试时

可以将多个任务安排为Azure Databricks作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可选择使用以下方法之一在单个作业中计划多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略 多个任务 多个查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的计算资源。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择将查询分配给 调度池
如何处理重试? 在作业重试之前,所有任务都必须失败。 如果任何查询失败,任务将会重试。

有关处理多个任务或查询的更多详细信息,请参阅 在同一群集上运行多个结构化流式处理查询

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议将所有流式工作负载配置为使用连续触发器。 请参阅连续运行作业

默认情况下,连续触发器具有以下行为:

  • 防止作业同时多次运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

Databricks 建议不要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 请参阅 何时使用 awaitTermination()

何时使用 awaitTermination()

streamingQuery.awaitTermination()spark.streams.awaitAnyTermination() 阻止当前线程,直到流式查询终止。 是否使用这些函数取决于执行环境。

在 Lakeflow 作业中,请勿使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 这些函数并不是必需的,因为作业服务会在流式查询处于活动状态时自动阻止运行完成。 这两个函数都阻止笔记本单元格完成执行,并阻止 Jobs 服务跟踪流式处理查询,这会影响积压指标和作业通知。

在以下情况下使用 awaitTermination()

用例 行为
用于全用途计算的交互式笔记本 awaitTermination() 使单元格保持运行状态,使你能够观察查询状态,并确保笔记本输出中的故障浮出水面。
本地和开发环境 在本地运行 Spark 程序时,当主线程完成时,进程将退出。 调用 awaitTermination() 以使程序保持活动状态,直到流式处理查询完成或失败。
故障蔓延至驱动程序 如果没有 awaitTermination(),那么在非作业上下文中的流式查询失败可能不会传播到调用线程。 查询可能会以无提示方式失败,从而使故障更难检测和诊断。 调用 awaitTermination() 会再次引发驱动程序上的查询异常。