首页 > 其他 > 详细

flink file source

时间:2020-06-22 09:50:31      阅读:94      评论:0      收藏:0      [点我收藏+]
技术分享图片
 1 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 2 
 3 object FlinkDemo04_CommonSource_fromFile {
 4     case class Flight(avgTicketPrice:String, cancelled:String, carrier:String, dest:String, destAirportID:String, origin:String, originAirportID:String)
 5     def main(args: Array[String]): Unit = {
 6         
 7         //1 创建环境
 8         val env = StreamExecutionEnvironment.getExecutionEnvironment
 9         //2 获取DataStream
10         val dStream: DataStream[String] = env.readTextFile("I:\\projectImplement\\dataWareHouse\\test-es\\data\\630data.csv")
11         //3 计算
12         import org.apache.flink.api.scala._
13         dStream.map{
14             line=>
15             val vals = line.split(",")
16             Flight(vals(0),vals(1),vals(2),vals(3),vals(4),vals(5),vals(6))
17         }.print()
18         //4 执行
19         env.execute("stream job")
20     }
21 }
View Code

 

flink file source

原文:https://www.cnblogs.com/xiefeichn/p/13174974.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!