首页 > 其他 > 详细

watermark

时间:2019-09-02 19:17:47      阅读:61      评论:0      收藏:0      [点我收藏+]

一、watermark

  flink接收到每条记录都会产生一个waterMark,计算公式为当前接收到的最大eventTime-延迟时间,窗口函数接收到消息满足条件会触发窗口操作,因此若无满足条件消息进入则窗口不会进行计算。

  窗口计算触发条件:1、watermark时间>=window_end_time;2、在[window_start_time,window_end_time)中有数据。

  

  //指定按照eventtime时间处理
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.fromElements((1,"A"),(2,"B"),(3,"B"),(3000,"C")).assignTimestampsAndWatermarks(
//指定间隔时间为1秒
      new BoundedOutOfOrdernessTimestampExtractor[(Int, String)](Time.seconds(1)) {
        //指定字段作为eventtime
        override def extractTimestamp(element: (Int, String)): Long = {
          element._1.toLong
        }
      }
    )
    val result = stream.keyBy(1).window(
//eventtimewindow
      TumblingEventTimeWindows.of(Time.seconds(1))
    ).sum(0)

 

watermark

原文:https://www.cnblogs.com/csyusu/p/11448311.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!