首页 > 其他 > 详细

如何使用kafka发送顺序消息

时间:2020-12-16 18:55:10      阅读:101      评论:0      收藏:0      [点我收藏+]

正常情况下单个producer写入kafka单个分区的数据是有序的

producer配置示例:

    Properties props = new Properties();
       // props.put("bootstrap.servers", "10.4.4.91:9092,10.4.4.92:9092,10.4.4.93:9092");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");//"acks"
        props.put(ProducerConfig.RETRIES_CONFIG, 120000);//retries
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//"batch.size"
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//"linger.ms"
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//"buffer.memory"
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//"request.timeout.ms"
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"drtest");//"client.id"
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");//"compression.type"
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//"key.serializer"
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// "value.serializer"
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection
Producer producer = new KafkaProducer<>(props);
Producer<String,String> producer = new KafkaProducer<>(props);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("test",
"testV")).get();
...

 

 其中有一个配置参数max.in.flight.requests.per.connection是干嘛的呢

max.in.flight.requests.per.connection默认配置为5

props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection

kafkaproducer内部维护了一个发送缓存区,具体在RecordAccumulator类下

用户调用producer的send方法时会通过accumulator的append方法写入内存中

 

public final class RecordAccumulator {

...
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
...

kafkaProducer初始化时:
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);

}

 初始化KafkaProducer时会实例化Sender并用到max.in.flight.requests.per.connection

guaranteeMessageOrder属性标志了此producer对broker发送数据是否是顺序性的

Sender类实现了Runnable接口

public class Sender implements Runnable 
....
this
.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, //guaranteeMessageOrder属性配置 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
....

Sender启动时会去读取accumulator中保存的数据并判断对应的客户端是否准备好,未准备好的会从此次已准备好的集合中移除

其中会通过

InFlightRequests对象的
public boolean canSendMore(String node) {
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
      //能继续发送数据的条件之一为队列大小小于max.in.flight.requests.per.connection配置的值
return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }

 

final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;//此项配置即为max.in.flight.requests.per.connection
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
    /** Thread safe total number of in flight requests. */
    private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);

移除未准备好的节点

// remove any nodes we aren‘t ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

Client进行数据发送前会构造InFlightRequest对象加入InFlightRequests的requests映射表

NetworkClient的doSend方法:

Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);

max.in.flight.requests.per.connection参数决定了在客户端收到服务器响应前能发送的消息个数,如果设置为1可以保证消息在失败重试后依然是有序的,设置的值大于1如果发送失败造成重试,producer内部会进行重排序;值设置过大会占用较多内存

 

如果要保证消息严格有序:

1.产生有序消息的生产客户端需要指定topic分区

  -topic设置为单分区

  -使用kafka提供的客户端指定分区

  -使用自定义分区策略类实现Partitioner接口

  -指定key

2.max.in.flight.requests.per.connection参数设置为1

 

吞吐量影响

1.使用顺序消息对吞吐量影响较大,可以将多条消息合并后作为一条消息提交

消息大小可根据实际环境调优进行配置,也可将数据在生产点序列化,在消费端反序列化减少传输量


 

如何使用kafka发送顺序消息

原文:https://www.cnblogs.com/fatFatCat/p/14144140.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!