一般来说,flink程序由以下几个部分组成:
- Obtain an
execution environment
- 获取运行时环境- Load/create the initial data - 新增输入数据(connectors等)
- Specify transformations on this data - 在输入上进行算子(operators),构建逻辑运算
- Specify where to put the results of your computations - 新增输出(sink)
- Trigger the program execution - 触发DAG运算
本文会以Kafka connector 为例,向大家展示如何新建一个Flink程序。
假设我们的数据存在于kafka中,其数据格式如下
{"user_id": 123, "uuid": "d16def06-8fd1-426c-8ffc-0cc4a9d6eef0", "info": {"age": 23, "monthly_income": 10000, "sex": 1, "consumption_amount": 20, "consumption_time": 1617070481}}
我们需要新增以下POJO对其进行表示(这里的lombok只是为了简化代码,实际的flink-scala程序中请不要使用lombok)
import com.fasterxml.jackson.annotation.JsonValue; /** * @author : Lee * @since : 3/30/21, Tue **/ public enum Sex { /** * Female */ FEMALE(0), /** * Male */ MALE(1); private final Integer code; Sex(Integer code) { this.code = code; } @JsonValue public Integer getCode() { return code; } }
import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import java.time.Instant; /** * @author : Lee * @since : 3/30/21, Tue **/ @Data public class Info { private Integer age; @JsonProperty("monthly_income") private Float monthlyIncome; private Sex sex; @JsonProperty("consumption_amount") private Float consumptionAmount; @JsonProperty("consumption_time") private Instant consumptionTime; }
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; /** * @author : Lee * @since : 3/30/21, Tue **/ @Data @JsonIgnoreProperties(ignoreUnknown = true) public class UserRecord { @JsonProperty("user_id") private Long userId; private Info info; }
由于Kafka中存的是数据的byte流,所以我们需要定义自己的反序列化类。
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.example.entity.UserRecord; import java.io.IOException; /** * @author : Lee * @since : 3/30/21, Tue **/ public class UserRecordDeserializationSchema implements DeserializationSchema<UserRecord> { static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @Override public UserRecord deserialize(byte[] message) throws IOException { return objectMapper.readValue(message, UserRecord.class); } @Override public boolean isEndOfStream(UserRecord nextElement) { return false; } @Override public TypeInformation<UserRecord> getProducedType() { return TypeInformation.of(UserRecord.class); } }
之后新建kafka consumer
val kafkaConsumer = new FlinkKafkaConsumer[UserRecord](kafkaTopicName, new UserRecordDeserializationSchema, kafkaConsumerProperties)
在kafka consumer上新增watermark
kafkaConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[UserRecord](Duration.ofSeconds(5)))
val stream = env.addSource(kafkaConsumer)
原文:https://www.cnblogs.com/leerocks/p/14596219.html