Data Flow Engine - 处理器¶
ChangeDetector¶
ChangeDetector 可过滤掉与上一条消息具有相同指定 variable 的消息。如果多个设备发送具有同一指定 variable 的消息,则可以定义消息的另一个字段,以此来区分各个设备。第二个字段在 ChangeDetector 的 differentiating-key
属性中定义。
FilterProcessor¶
FilterProcessor 可过滤掉未满足指定表达式的消息。它可用于过滤使用 Spring 表达式语言定义的具有自定义表达式的消息。
MessageSampler¶
MessageSampler 限制在给定时间段内来自同一设备的最大消息数。这可以防止下游流量过载。从某一设备接收第一个消息后,MessageSampler 会抑制来自此设备的任何其它消息,直到给定时间段过去为止。
说明
MessageSampler 的作用基于随时间序列数据发送的时间戳。如果时间戳丢失或无法解析,则无论有效载荷如何,都会抑制流中的消息。
示例¶
MessageSampler 设置为每 2 分钟允许一次消息。以下时间序列数据将到达 MessageSampler:
1.第一条消息通过 MessageSampler:
{
{someKey} : {someValue},
"_timestamp" :"2018.06.07T11:06:00.000Z"
}
2.第二条消息被抑制:
{
{some key} : {some value},
"_timestamp" :"2018.06.07T11:07:30.000Z"
}
在此示例中,根据消息的时间戳,已经过去了 1 分 30 秒。因此,第二条消息不会流入下游。但是,在经过 2 分钟后,将转发下一条消息并重新启动这一循环。
Hysteresis¶
Hysteresis 处理器会在事件发生后引入一小段闲置空间,以防止由于在阈值周围振荡而导致消息泛滥。
例如,有人想知道引擎的 RPM 是否超过阈值,但是他们不希望因出现阈值周围振荡而导致消息泛滥。他们使用 Hysteresis 来抑制消息,直到满足释放条件,例如:RPM 降至另一个阈值以下。
以下为滞后定义示例:
message emitting expression:"MAINMOTOR.rpm > 100"
suppression releasing expression:"MAINMOTOR.rpm < 80"
当输入消息满足 message emitting expression
时,Hysteresis 处理器将发出输出消息。发出此消息后,它会抑制任何其它消息,直到消息满足 suppression releasing expression
。这会将 Hysteresis 处理器设置回其初始状态。当输入消息满足 message emitting expression
时,将发出输出消息,并且应用再次进入抑制状态。
如果未定义 suppression releasing expression
,则应用将转发所有满足 message emitting expression
的传入消息。 message-emitting-expression
和 suppress-releasing-expression
属性可以定义为 Spring 表达式语言表达式。
Marker¶
Marker 可在传入消息转发之前为其添加标签。它通过向消息添加具有已定义键和值的消息头来标记数据。
Sequence¶
Sequence 处理器可过滤掉与指定模式不匹配的并行流消息。Sequence 处理器仅通过时间戳和流来源区分消息。
Sequence 处理器有三种类型的模式:
- 模式 1:来自各个流的消息在定义的时间窗口内到达。消息的顺序是任意的。
- 模式 2:到达的第一条消息来自特定流。后续消息来自其它流,并以任意顺序在定义的时间窗口内到达。
- 模式 3:消息以定义的顺序到达。为每两个连续消息定义最大的时间窗口。
Sequence 处理器提供了两个选项,用以区分来自不同流的消息:
- 标签:如果每个流包含一个 Marker,则 Sequence 处理器可通过消息头中唯一的
header-value
进行过滤。 - 有效载荷:如果消息包含
filter-id
字段,则 Sequence 处理器可以按其值进行过滤。
Sequence 处理器将根据时间戳对到达的消息进行排序。Sequence 处理器存储序列候选项列表。对于将有效启动一种模式的每个到达消息,它将创建一个新条目。如果后续消息是序列的有效扩展,则序列候选项将更新。序列候选项完成更新并因此满足定义的模式后,将在序列消息中输出匹配的消息,并且从候选项列表中移除该序列。
并行流和收集器流¶
Sequence 处理器需要来自不同流的输入数据,这些流并行运行并且可获取和处理不同 assets 的数据。多个并行流结束在同一目标,这使它们有别于传统流。这些流由 SNS 主题进行收集,而不是由 sink 组件收集: SNS 主题可充当以下流的来源,该流包含 Sequence 处理器并以 sink 结束: 有关这些流的定义示例,请参见示例。