public class ProducerRecord<K, V> {
private final String topic;//必填项
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;//必填项
private final Long timestamp;
}
key用来计算分区号,确定发送到指定分区
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
try {
producer.send(record).get();
} catch (Exception e | InterruptedException e) {
e.printStackTrace();
}
如果需要Future.get()的返回值,可以采用以下方式。返回值中包含消息的主题、分区号、偏移量、时间戳等元数据信息。
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" +
metadata.partition() + "-" + metadata.offset());
} catch (Exception e | InterruptedException e) {
e.printStackTrace();
}
由于send()方法返回的是Future对象,可以使用java并发的相关方法丰富实现。
同步发送可以配置客户端参数的重试次数,如果超过重试次数,则必须处理异常。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" +
metadata.partition() + "-" + metadata.offset());
}
}
})
kafka消息发送保证分区有序的情况下,回调函数也能保证有序。
producer.close();
public void close(long timeout, TimeUnit timeUnit); //等待超时时间之后,强行退出。
close()方法会阻塞等待所有的消息都发送完成再关闭。
DefaultPartitioner implements Partitioner {
public int partition(...); //计算分区号
public void close(); //关闭分区器时释放相应资源,空方法
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitinoer.class.getName());
自定义拦截器
(1)实现ProducerInterceptor<String, String>接口,重写onsend()方法,重写onAcknowledgement()方法
(2)配置客户端的配置参数:ProducerConfig.INTECERPTOR_CLASSES_CONFIG
可以指定多个,类名中间以“,”连接,多个拦截器时,某一个失败,会从上一个成功的开始接着执行下一个拦截器
主线程:KafkaProducer-->拦截器(非必须,一般不)-->序列号器(必须)-->分区器(根据key可选)-->消息累加器-->Sender线程
Sender线程:Sender-->创建Request-->提交给Selector发送-->未收到响应的请求根据节点分别放在节点对应的InFlightRequests中。
Sender线程,先将分区信息转换为节点信息,做应用逻辑层到网络IO层的转换,然后将消息转换成满足kafka协议的Request。
可以通过比较InFlightRequests的大小的配置参数max.in.flight.requests.per.connection与在InFlightRequests中的消息多少,来判断各个节点的负载情况。
元数据(主题分区的数量、leader副本所在节点的地址、端口等信息)的更新由Sender线程,选择负载最小的节点进行发送MetaDataRequest获取。默认每5分钟更新一次。
主线程也需要使用Sender更新过的元数据。
end :)
原文:https://www.cnblogs.com/suyeSean/p/11241900.html