StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置流的时间特征,使用Event Time 必须要设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
TimestampAssigner
和 WatermarkGenerator,WatermarkStrategy 提供了一部分静态方法,用户也可以进行自定义,自定义调用如下方法public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
WatermarkStrategy
在程序中有两处可以使用final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
TimeWindow中
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
package com.bbx.flink.demo.allow_latenss;
import com.bbx.flink.demo.entity.Temperature;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
* 本Demo功能点
* 1、设置Event Time 的watermark ,
* 2、侧输出流,等待迟到的数据
*
* 注意点:
* 1、进哪个窗口是由 Event Time 决定的
* 2、watermark 只是用来表示 标记时间内的元素已经到达,
* 3、窗口开始时间计算公式 :timestamp - (timestamp - offset + windowSize) % windowSize
* windowSize 为窗口大小
* timestamp 为 Event Time
* offset : 时间偏移(主要用于时区调整)
* 例如:以下面程序为例,windowSize 为:15s , offset :0,watermark 延迟时间为 2s ,输入的元素如下:
* 1,1607654003161,11 0
* 1,1607654004161,22 1
* 1,1607654008161,19 5
* 1,1607654009161,16 6
* 1,1607654010161,17 7
* 1,1607654011161,17 8
* 1,1607654012161,27 9
*
* 1,1607654013161,17 10
* 输入第一个元素的 event time为 1607654003161 ms ,offset 为0
* 则窗口开始时间为 :1607654003161-(1607654003161-0+15000)%15000=1607653995000
* 因此范围为 第一个窗口为 [1607653995000 1607654010000)
* 第二个窗口为 [1607654010000 1607654025000)
* .......
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* 窗口内数据 watermark 时间
* 第一个窗口 1,1607654003161,11 1607654001161
* 1,1607654004161,22 1607654002161
* 1,1607654008161,19 1607654006161
* 1,1607654009161,16 1607654007161
* 第二个窗口 1,1607654010161,17 1607654008161
* 1,1607654011161,17 1607654009161
* 1,1607654012161,27 1607654010161
* 1,1607654013161,17 1607654011161
* 因为watermark延迟时间为2 s ,因此第一个窗口关闭时间为 1607654012000,当元素 1,1607654012161,27 抵达时第一个窗口开始关闭
*/
@Slf4j
public class SideOutDemo {
public static void main (String [] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置流的时间特征,使用Event Time 必须要设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//设置watermark 的时间间隔,有默认值
// env.getConfig().setAutoWatermarkInterval(1L);
OutputTag<Temperature> sideWindow = new OutputTag<>("sideWindow", PojoTypeInfo.of(Temperature.class));
SingleOutputStreamOperator<Temperature> reduce = env.socketTextStream("114.116.104.74", 10003)
.map((MapFunction<String, Temperature>) elment -> {
String[] varElment = elment.split(",");
return new Temperature(varElment[0], Long.parseLong(varElment[2]), Long.parseLong(varElment[1]));
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((i, timestamp) -> i.getTime())
)
.keyBy(i -> i.getId())
.timeWindow(Time.seconds(15))
//设定等待迟到数据的时间
.allowedLateness(Time.seconds(5L))
//为迟到数据开启侧输出流
.sideOutputLateData(sideWindow)
.max("tem");
reduce.print("~~~~~~~~~");
reduce.getSideOutput(sideWindow).print("######");
env.execute();
}
}
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/** 每个元素抵达后调用,通常用于生成 Punctuated watermark 通过事件触发
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**通常用于生成 periodic watermark, 通过时间触发
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
env.getConfig().setAutoWatermarkInterval(1L)
**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
//已经接收到元素的最大 timestamp 减去 最大乱序程度 -1 ,此处的 -1 表示不包含这个时间---个人理解
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don‘t need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don‘t need to do anything because we emit in reaction to events above
}
}
WatermarkStrategy.forMonotonousTimestamps();
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
原文:https://www.cnblogs.com/sxubo/p/14134729.html