// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 获取数据
DataSource<String> dataSource = env.fromElements("flink spark hadoop", "hadoop spark", "flink flink");
// 转换数据
AggregateOperator<Tuple2<String, Integer>> result = dataSource
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            for (String field : s.split(" ")) {
                collector.collect(field);
            }
        }
    })
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String s) throws Exception {
            return Tuple2.of(s, 1);
        }
    })
    .groupBy(0)
    .sum(1);
// 输出数据
result.print();
// 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// source数据源
DataStreamSource<String> lines = env.socketTextStream("localhost", 9999);
// 数据转换
SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            for (String word : s.split(" ")) {
                collector.collect(word);
            }
        }
    })
    .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String s) throws Exception {
            return Tuple2.of(s, 1);
        }
    })
    .keyBy(t -> t.f0)
    .sum(1);
// sink
result.print();
env.execute();
// 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 获取数据
DataStreamSource<String> dataStreamSource = env.fromElements("abc abc abc");
// 数据转换
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource
    .flatMap((String value, Collector<String> out) -> {
        Arrays.stream(value.split(" ")).forEach(out::collect);
    }).returns(Types.STRING)
    .map((String value) ->
            Tuple2.of(value, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}
    ))
    .keyBy(t -> t.f0)
    .sum(1);
// 数据输出
result.print();
// 执行程序
env.execute();
原文:https://www.cnblogs.com/yangshibiao/p/14907795.html