首页 > Windows开发 > 详细

Flink Window那些事——ProcessWindowFunction/ProcessAllWindowFunction

时间:2020-03-29 14:32:14      阅读:806      评论:0      收藏:0      [点我收藏+]

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。

ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

WindowFunction的升级版,可以跟ReduceFunction/AggregateFunction/FoldFunction结合使用(推荐用法)

package com.lynch.stream.window;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * 测试ProcessWinFunction
 *
 * @author dajiangtai
 * @create 2019-06-11-18:37
 */
public class TestProcessWinFunctionOnWindow {

    public static void main(String[] args) throws Exception{
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取数据
        DataStream<Tuple3<String,String,Long>> input = env.fromElements(ENGLISH);

        //求各班级英语成绩平均分
        DataStream<Double> avgScore = input.keyBy(0)
                .countWindow(2)
                .process(new MyProcessWindowFunction());
        avgScore.print();
        env.execute("TestProcessWinFunctionOnWindow");

    }


    public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double, Tuple, GlobalWindow>{
        
        //iterable 输入流中的元素类型集合
        @Override
        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> out) throws Exception {
            long sum = 0;
            long count = 0;
            for (Tuple3<String,String,Long> in :iterable){
                sum+=in.f2;
                count++;
            }
            out.collect((double)(sum/count));
        }
    }

    public static final Tuple3[] ENGLISH = new Tuple3[]{
            Tuple3.of("class1","张三",100L),
            Tuple3.of("class1","李四",78L),
            Tuple3.of("class1","王五",99L),
            Tuple3.of("class2","赵六",81L),
            Tuple3.of("class2","小七",59L),
            Tuple3.of("class2","小八",97L),
    };
}

 

Flink Window那些事——ProcessWindowFunction/ProcessAllWindowFunction

原文:https://www.cnblogs.com/linjiqin/p/12591729.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!