??Kafka 服务端通过Kafka.scala的主函数main方法启动。KafkaServerStartable类提供读取配置文件、启动/停止服务的方法。而启动/停止服务最终调用的是KafkaServer的startup/shutdown方法。
Acceptor,即启动 NIO Socket。num.network.threads个接收器到请求通道RequestChannel的处理器缓存ConcurrentHashMap,key 为递增编号,value 为处理器Processor。Acceptor执行CountDownLatch.await等待通知启动。Acceptor到ConcurrentHashMap,key 为EndPoint,value 为Acceptor。KafkaApis。num.io.threads个请求处理器线程KafkaRequestHandler。ArrayBlockingQueue获取请求,调用KafkaApis.handle方法,进行集中处理请求。CountDownLatch.countDown通知唤醒Acceptor线程。
NIO.select轮询。SocketChannel加入缓存队列ConcurrentLinkedQueueSocketChannel,绑定到KafkaChannel。ArrayBlockingQueue
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件
startupComplete() // 通知 Acceptor 线程
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500) // 轮询事件
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
iter.remove()
if (key.isAcceptable) { // 有可接受事件
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor) // 缓存 Processor
}
accept(key, processor) // 将 SocketChannel 缓存到队列
}
}
}
}
}
override def run() {
startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。
while (isRunning) {
configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel
processNewResponses() // 处理返回客户端的响应
poll() // Kafka.Selector 轮询读取/写入事件
processCompletedReceives() // 处理客户端的请求,放到阻塞队列
processCompletedSends() // 处理返回客户端响应后的回调
processDisconnected() // 断开连接后的处理
}
}
def run() {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 从阻塞队列拉取请求
val req = requestChannel.receiveRequest(300)
req match {
case request: RequestChannel.Request =>
try {
apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。
}
}
}
}
??参考客户端源码分析。
原文:https://www.cnblogs.com/bigshark/p/11204428.html