Skip to content

Data Flow Engine - 处理器

ChangeDetector

ChangeDetector 可过滤掉与上一条消息具有相同指定 variable 的消息。如果多个设备发送具有同一指定 variable 的消息,则可以定义消息的另一个字段,以此来区分各个设备。第二个字段在 ChangeDetector 的 differentiating-key 属性中定义。

FilterProcessor

FilterProcessor 可过滤掉未满足指定表达式的消息。它可用于过滤使用 Spring 表达式语言定义的具有自定义表达式的消息。

MessageSampler

MessageSampler 限制在给定时间段内来自同一设备的最大消息数。这可以防止下游流量过载。从某一设备接收第一个消息后,MessageSampler 会抑制来自此设备的任何其它消息,直到给定时间段过去为止。

说明

MessageSampler 的作用基于随时间序列数据发送的时间戳。如果时间戳丢失或无法解析,则无论有效载荷如何,都会抑制流中的消息。

示例

MessageSampler 设置为每 2 分钟允许一次消息。以下时间序列数据将到达 MessageSampler:

1.第一条消息通过 MessageSampler:

 {
 {someKey} : {someValue},
 "_timestamp" :"2018.06.07T11:06:00.000Z"
 }

2.第二条消息被抑制:

 {
 {some key} : {some value},
 "_timestamp" :"2018.06.07T11:07:30.000Z"
 }

在此示例中,根据消息的时间戳,已经过去了 1 分 30 秒。因此,第二条消息不会流入下游。但是,在经过 2 分钟后,将转发下一条消息并重新启动这一循环。

Working principle of the MessageSampler

Hysteresis

Hysteresis 处理器会在事件发生后引入一小段闲置空间,以防止由于在阈值周围振荡而导致消息泛滥。

例如,有人想知道引擎的 RPM 是否超过阈值,但是他们不希望因出现阈值周围振荡而导致消息泛滥。他们使用 Hysteresis 来抑制消息,直到满足释放条件,例如:RPM 降至另一个阈值以下。

以下为滞后定义示例:

message emitting expression:"MAINMOTOR.rpm > 100"
suppression releasing expression:"MAINMOTOR.rpm < 80"

当输入消息满足 message emitting expression 时,Hysteresis 处理器将发出输出消息。发出此消息后,它会抑制任何其它消息,直到消息满足 suppression releasing expression。这会将 Hysteresis 处理器设置回其初始状态。当输入消息满足 message emitting expression 时,将发出输出消息,并且应用再次进入抑制状态。

Working principle of Hysteresis

如果未定义 suppression releasing expression,则应用将转发所有满足 message emitting expression 的传入消息。 message-emitting-expressionsuppress-releasing-expression 属性可以定义为 Spring 表达式语言表达式。

Marker

Marker 可在传入消息转发之前为其添加标签。它通过向消息添加具有已定义键和值的消息头来标记数据。

Sequence

Sequence 处理器可过滤掉与指定模式不匹配的并行流消息。Sequence 处理器仅通过时间戳和流来源区分消息。

Sequence 处理器有三种类型的模式:

  • 模式 1:来自各个流的消息在定义的时间窗口内到达。消息的顺序是任意的。
  • 模式 2:到达的第一条消息来自特定流。后续消息来自其它流,并以任意顺序在定义的时间窗口内到达。
  • 模式 3:消息以定义的顺序到达。为每两个连续消息定义最大的时间窗口。

Schematic diagram of message order patterns

Sequence 处理器提供了两个选项,用以区分来自不同流的消息:

  • 标签:如果每个流包含一个 Marker,则 Sequence 处理器可通过消息头中唯一的 header-value 进行过滤。
  • 有效载荷:如果消息包含 filter-id 字段,则 Sequence 处理器可以按其值进行过滤。

Sequence 处理器将根据时间戳对到达的消息进行排序。Sequence 处理器存储序列候选项列表。对于将有效启动一种模式的每个到达消息,它将创建一个新条目。如果后续消息是序列的有效扩展,则序列候选项将更新。序列候选项完成更新并因此满足定义的模式后,将在序列消息中输出匹配的消息,并且从候选项列表中移除该序列。

并行流和收集器流

Sequence 处理器需要来自不同流的输入数据,这些流并行运行并且可获取和处理不同 assets 的数据。多个并行流结束在同一目标,这使它们有别于传统流。这些流由 SNS 主题进行收集,而不是由 sink 组件收集: Schematic diagram of parallel input streams SNS 主题可充当以下流的来源,该流包含 Sequence 处理器并以 sink 结束: Schematic diagram of a collector stream 有关这些流的定义示例,请参见示例


Last update: June 26, 2019