首页 > 其他 > 详细

Flink入门-EventTime Process

时间:2021-03-30 16:35:05      阅读:22      评论:0      收藏:0      [点我收藏+]

引言

一般来说,flink程序由以下几个部分组成:

  1. Obtain an execution environment - 获取运行时环境
  2. Load/create the initial data - 新增输入数据(connectors等)
  3. Specify transformations on this data - 在输入上进行算子(operators),构建逻辑运算
  4. Specify where to put the results of your computations - 新增输出(sink)
  5. Trigger the program execution - 触发DAG运算

本文会以Kafka connector 为例,向大家展示如何新建一个Flink程序。

 实现第一个Flink程序

新增Kafka connector

假设我们的数据存在于kafka中,其数据格式如下

{"user_id": 123, "uuid": "d16def06-8fd1-426c-8ffc-0cc4a9d6eef0", "info": {"age": 23, "monthly_income": 10000, "sex": 1, "consumption_amount": 20, "consumption_time": 1617070481}}

我们需要新增以下POJO对其进行表示(这里的lombok只是为了简化代码,实际的flink-scala程序中请不要使用lombok)

技术分享图片
import com.fasterxml.jackson.annotation.JsonValue;

/**
 * @author : Lee
 * @since : 3/30/21, Tue
 **/
public enum Sex {
    /**
     * Female
     */
    FEMALE(0),
    /**
     * Male
     */
    MALE(1);

    private final Integer code;

    Sex(Integer code) {
        this.code = code;
    }

    @JsonValue
    public Integer getCode() {
        return code;
    }
}
SEX
技术分享图片
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.time.Instant;

/**
 * @author : Lee
 * @since : 3/30/21, Tue
 **/
@Data
public class Info {
    private Integer age;
    @JsonProperty("monthly_income")
    private Float monthlyIncome;
    private Sex sex;
    @JsonProperty("consumption_amount")
    private Float consumptionAmount;
    @JsonProperty("consumption_time")
    private Instant consumptionTime;
}
Info
技术分享图片
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

/**
 * @author : Lee
 * @since : 3/30/21, Tue
 **/

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class UserRecord {
    @JsonProperty("user_id")
    private Long userId;
    private Info info;
}
UserRecord

由于Kafka中存的是数据的byte流,所以我们需要定义自己的反序列化类。

技术分享图片
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.entity.UserRecord;

import java.io.IOException;

/**
 * @author : Lee
 * @since : 3/30/21, Tue
 **/
public class UserRecordDeserializationSchema implements DeserializationSchema<UserRecord> {

    static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

    @Override
    public UserRecord deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, UserRecord.class);
    }

    @Override
    public boolean isEndOfStream(UserRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<UserRecord> getProducedType() {
        return TypeInformation.of(UserRecord.class);
    }
}
UserRecordDeserializationSchema

之后新建kafka consumer

val kafkaConsumer = new FlinkKafkaConsumer[UserRecord](kafkaTopicName, new UserRecordDeserializationSchema, kafkaConsumerProperties)

在kafka consumer上新增watermark

kafkaConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[UserRecord](Duration.ofSeconds(5)))

 在env中注册该consumer

val stream = env.addSource(kafkaConsumer)

 

Flink入门-EventTime Process

原文:https://www.cnblogs.com/leerocks/p/14596219.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!