正常情况下单个producer写入kafka单个分区的数据是有序的
producer配置示例:
Properties props = new Properties(); // props.put("bootstrap.servers", "10.4.4.91:9092,10.4.4.92:9092,10.4.4.93:9092"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all");//"acks" props.put(ProducerConfig.RETRIES_CONFIG, 120000);//retries props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//"batch.size" props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//"linger.ms" props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//"buffer.memory" props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//"request.timeout.ms" props.put(ProducerConfig.CLIENT_ID_CONFIG,"drtest");//"client.id" props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");//"compression.type" props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//"key.serializer" props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// "value.serializer" props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection
Producer producer = new KafkaProducer<>(props);
Producer<String,String> producer = new KafkaProducer<>(props);
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("test",
"testV")).get();
...
其中有一个配置参数max.in.flight.requests.per.connection是干嘛的呢
max.in.flight.requests.per.connection默认配置为5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");//max.in.flight.requests.per.connection
kafkaproducer内部维护了一个发送缓存区,具体在RecordAccumulator类下
用户调用producer的send方法时会通过accumulator的append方法写入内存中
public final class RecordAccumulator { ...
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
...
kafkaProducer初始化时:
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
}
初始化KafkaProducer时会实例化Sender并用到max.in.flight.requests.per.connection
guaranteeMessageOrder属性标志了此producer对broker发送数据是否是顺序性的
Sender类实现了Runnable接口
public class Sender implements Runnable
....
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, //guaranteeMessageOrder属性配置 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
....
Sender启动时会去读取accumulator中保存的数据并判断对应的客户端是否准备好,未准备好的会从此次已准备好的集合中移除
其中会通过
InFlightRequests对象的
public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
//能继续发送数据的条件之一为队列大小小于max.in.flight.requests.per.connection配置的值 return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
final class InFlightRequests { private final int maxInFlightRequestsPerConnection;//此项配置即为max.in.flight.requests.per.connection private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>(); /** Thread safe total number of in flight requests. */ private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);
移除未准备好的节点
// remove any nodes we aren‘t ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
Client进行数据发送前会构造InFlightRequest对象加入InFlightRequests的requests映射表
NetworkClient的doSend方法:
Send send = request.toSend(destination, header); InFlightRequest inFlightRequest = new InFlightRequest( clientRequest, header, isInternalRequest, request, send, now); this.inFlightRequests.add(inFlightRequest); selector.send(send);
max.in.flight.requests.per.connection参数决定了在客户端收到服务器响应前能发送的消息个数,如果设置为1可以保证消息在失败重试后依然是有序的,设置的值大于1如果发送失败造成重试,producer内部会进行重排序;值设置过大会占用较多内存
如果要保证消息严格有序:
1.产生有序消息的生产客户端需要指定topic分区
-topic设置为单分区
-使用kafka提供的客户端指定分区
-使用自定义分区策略类实现Partitioner接口
-指定key
2.max.in.flight.requests.per.connection参数设置为1
吞吐量影响
1.使用顺序消息对吞吐量影响较大,可以将多条消息合并后作为一条消息提交
消息大小可根据实际环境调优进行配置,也可将数据在生产点序列化,在消费端反序列化减少传输量
原文:https://www.cnblogs.com/fatFatCat/p/14144140.html