内置水印生成器
1.有序生成
只需提取事件时间的时间戳作为水印即可。
java
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
2.有界无序生成策略
设置延迟的上限。我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值。
java
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
自定义水印生成器
1.定期水印
AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
通过 ExecutionConfig.setAutoWatermarkInterval(...) 定义生成水印的间隔(每n毫秒)。 每次都会调用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大于前一个水印,则将发出一个新的水印。
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
// the maximum observed timestamp
var maxTs: Long = _
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
时间戳方法为处理元素时调用
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
}
2.带标记的水印
AssignerWithPunctuatedWatermarks 根据元素的特定标记生成新的水印。 对于此类,Flink 将首先调用 extractTimestamp(...) 方法为该元素分配时间戳,然后立即在该元素上调用 checkAndGetNextWatermark(...)方法。
将 extractTimestamp(...) 方法中分配的时间戳传递给 checkAndGetNextWatermark(...) 方法,并决定是否要生成水印。 每当 checkAndGetNextWatermark(...) 方法返回非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
// emit watermark if reading is from sensor_1
new Watermark(extractedTS - bound)
} else {
// do not emit a watermark
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// assign record timestamp
r.timestamp
}
}
参考文章
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html
原文:https://www.cnblogs.com/lemos/p/12642793.html