本文探讨如何为有状态流式处理选择适当的输出模式。 只有包含聚合的有状态流才需要输出模式配置。
联接仅支持追加输出模式,输出模式不会影响重复数据删除。 任意有状态运算符 mapGroupsWithState 和 flatMapGroupsWithState 使用自己的自定义逻辑发出记录,因此流的输出模式不会影响其行为。
对于无状态流式处理,所有输出模式的行为都相同。
若要正确配置输出模式,必须了解有状态流式处理、水印和触发器。 请参阅以下文章:
什么是输出模式?
结构化流式处理查询的输出模式决定了查询的操作符在每次触发期间发出哪些记录。 可发出下面三种类型的记录:
- 将来处理时不会更改的记录。
- 自上次触发器以来已更改的记录。
- 状态表中的所有记录。
知晓要发出哪些类型的记录对于有状态运算符很重要,因为有状态运算符生成的特定行可能会因触发器而异。 例如,当流式聚合运算符接收到一个特定窗口的多行数据时,该窗口的聚合值可能在多次触发中发生变化。
对于无状态运算符,记录类型之间的区别不会影响运算符的行为。 无状态运算符在触发器期间发出的记录始终是在该触发器期间处理的源记录。
可用输出模式
有三种输出模式告知运算符在特定触发器期间发出哪些记录:
| 输出模式 | 说明 |
|---|---|
| 追加模式(默认) | 默认情况下,流式处理查询在追加模式下运行。 在此模式下,操作符仅发出在未来的触发中不会变化的行。 有状态运算符使用水印来确定何时发生这种情况。 |
| 更新模式 | 在更新模式下,运算符会发出所有在触发期间发生更改的行,即使这些发出的记录可能会在后续触发器中再次更改。 |
| 完整模式 | 完整模式仅适用于流式处理聚合。 在完整模式下,运算符生成的所有结果行都会在下游发出。 |
生产注意事项
对于许多有状态流式处理操作,必须在追加模式和更新模式之间进行选择。 以下部分概述了可能会影响你做出决定的注意事项。
注意
完整模式有一些应用场景,但随着数据扩展,可能会表现不佳。 Databricks 建议使用具体化视图获取与完整模式关联的语义保证,以及针对许多有状态操作的增量处理。 请参阅 具体化视图。
应用语义
应用语义描述了下游应用程序如何使用流数据。
如果下游服务需要对每个下游写入执行单个操作,则在大多数情况下使用追加模式。 例如,如果有下游通知服务为写入接收器的每个新记录发送通知,则追加模式可确保每个记录只写入一次。 每次状态信息更改时,更新模式都会写入记录,这将导致大量更新。
如果下游服务需要新的结果,更新模式可确保接收器尽可能保持最新状态。 示例包括实时读取特征的机器学习模型或跟踪实时聚合的分析仪表板。
运算符和接收器兼容性
结构化流式处理不支持 Apache Spark 中提供的所有操作,并且某些流式处理操作在所有输出模式下都不受支持。 有关运算符限制的详细信息,请参阅 OSS 流式处理文档。
并非所有接收器都支持所有输出模式。 Kafka 支持所有输出模式。 支持所有 Unity 目录托管表的 Delta Lake 支持追加和完整模式,但不支持更新模式。 有关与 Delta Lake 接收器的更新模式类似的行为,请参阅流式处理中的合并。
有关接收器兼容性的详细信息,请参阅 OSS 流式处理文档。
延迟和成本
输出模式会影响写入记录之前必须花费的时间,写入数据的频率和写入量可能会影响与流式处理管道相关的成本。
追加模式迫使有状态运算符在有状态结果被最终确定后才发出结果,而这个过程至少需要等同于水印延迟的时间。 在追加输出模式下 1 hour 水印延迟意味着记录在下游发出之前至少有 1 小时的延迟。
更新模式会导致每个触发器对每个聚合值执行一次写入。 如果你的接收器对每个记录的每次写入单独收费,那么如果记录在水印延迟期结束之前多次更新,此操作可能成本高昂。
配置示例
以下代码示例演示如何配置将更新流式传输到 Unity Catalog 表的输出模式:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala(编程语言)
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
有关 PySpark DataStreamWriter.outputMode 或 Scala DataStreamWriter.outputMode,请参阅 OSS 文档。
有状态流式处理和输出模式示例
以下示例旨在帮助你推理输出模式如何与有状态流式处理的水印交互。
请考虑一个流式聚合,该聚合会计算商店每小时产生的总收入,且存在 15 分钟的水印延迟。 第一个微批处理以下记录:
- 下午 2:40,15 美元
- 下午2:30 10美元
- 下午 3:10 30 美元
此时,引擎的水印为 2:55pm,因为它从看到的最大时间 (3:10pm) 中减去 15 分钟(延迟)。 流式处理聚合运算符的状态如下:
-
[2pm, 3pm]:25 美元 -
[3pm, 4pm]:30 美元
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式处理聚合运算符不会在下游输出任何内容。 这是因为这两个窗口可能会随着后续触发器出现新值而更改:下午 2:55 的水印表示下午 2:55 之后的记录可能仍会到达,并且这些记录可能位于 [2pm, 3pm] 窗口或 [3pm, 4pm] 窗口范围内。 |
| 更新 | 运算符发出这两条记录,因为它们都收到了更新。 |
| 完成 | 运算符输出所有记录。 |
现在,假设流又接收了一条记录:
- 下午3:20 20美元
水印更新为下午 3:05,因为系统从下午 3:20 中减去了 15 分钟。 此时,流式处理聚合运算符的状态如下:
-
[2pm, 3pm]:25 美元 -
[3pm, 4pm]:50 美元
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式处理聚合运算符观察到下午 3:05 的水印大于 [2pm, 3pm] 窗口的末尾。 根据水印的定义,该时窗无法再更改,因此它发出 [2pm, 3pm] 时窗。 |
| 更新 | 流式处理聚合运算符发出 [3pm, 4pm] 时窗口,因为状态值已从“30 美元”更改为“50 美元”。 |
| 完成 | 运算符输出所有记录。 |
下面总结了有状态运算符在每个追加模式下的行为方式:
- 在追加模式下,在水印延迟后写入记录一次。
- 在更新模式下,写入自上一个触发器以来已更改的记录。
- 在完整模式下,写入由有状态运算符生成的所有记录。