首页 > 其他 > 详细

Flink使用POJO实现分组和汇总

时间:2021-05-23 09:24:52      阅读:25      评论:0      收藏:0      [点我收藏+]

 

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FraudDetection {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<MyTransaction> myTransactions = env.socketTextStream("localhost", 9999).map(new MapFunction<String, MyTransaction>() {

            MyTransaction myTransaction = null;

            @Override
            public MyTransaction map(String value) throws Exception {
                String[] split = value.split(",");
                myTransaction = new MyTransaction(split[0].trim(), Long.valueOf(split[1].trim()));
                return myTransaction;
            }
        });

        myTransactions
                //使用新的API
                .keyBy(new KeySelector<MyTransaction, String>() {
            @Override
            public String getKey(MyTransaction value) throws Exception {
                return value.getAccounId();
            }
        },TypeInformation.of(String.class))
                //或者也可以使用旧API
//                .keyBy("accounId")

                .sum("amount").print();

        env.execute();
//        DataStream<MyTransaction> transactions = env.addSource(new TransactionSource()).name("transaction");
    }

    public static class MyTransaction {

        private String accounId;
        private long amount;

        public MyTransaction() {
        }

        public MyTransaction(String accounId, long amount) {
            this.accounId = accounId;
            this.amount = amount;
        }

        public String getAccounId() {
            return accounId;
        }

        public void setAccounId(String accounId) {
            this.accounId = accounId;
        }

        public long getAmount() {
            return amount;
        }

        public void setAmount(long amount) {
            this.amount = amount;
        }

        @Override
        public String toString() {
            return "{\"accounId\":\"" + accounId + "\","  +
                    "\"amount\":" + amount + "}";
        }
    }


}

 

官网介绍的使用可转化为流的类型

Flink使用POJO实现分组和汇总

原文:https://www.cnblogs.com/yoyowin/p/14800178.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!