async producer是将producer.type设为async时启用的producer
此时,调用send方法的线程和实际完成消息发送的线程是分开的。
当调用java API中producer的send方法时,最终会调用kafka.producer.Producer的send方法。在kafka.producer.Producer类中,会根据producer.type配置使用不同的方法发送消息。
| 
       1 
      2 
      3 
      4 
      5 
      6 
      7 
      8 
      9 
      10 
      11  | 
    
      def send(messages: KeyedMessage[K,V]*) {    lock synchronized 
{      if 
(hasShutdown.get)        throw 
new ProducerClosedException      recordStats(messages)      sync match {        case 
true => eventHandler.handle(messages)        case 
false => asyncSend(messages)      }    }  } | 
当async时,会使用asyncSend。asyncSend方法会根据“queue.enqueue.timeout.ms”配置选项采用BlockingQueue的put或offer方法把消息放入kafka.producer.Producer持有的一个LinkedBlockingQueue
| 
       1  | 
    
      private 
val queue = new 
LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) | 
在kafka.producer.Producer构造时,会检查"producer.type“,如果是asnyc,就会开启一个送发线程。
| 
       1 
      2 
      3 
      4 
      5 
      6 
      7 
      8 
      9 
      10 
      11  | 
    
      config.producerType match {  case 
"sync" =>  case 
"async" =>    sync = false    producerSendThread = new 
ProducerSendThread[K,V]("ProducerSendThread-" 
+ config.clientId,                                                     queue,                                                     eventHandler,                                                     config.queueBufferingMaxMs,                                                     config.batchNumMessages,                                                     config.clientId)    producerSendThread.start() | 
现在有了一个队列,一个发送线程 。看来这个ProducerSendThread是来完成大部分发送的工作,而"async"的特性都主要都是由它来实现。
这个线程的run方法实现为:
| 
       1 
      2 
      3 
      4 
      5 
      6 
      7 
      8 
      9  | 
    
      override def run {  try 
{    processEvents  }catch 
{    case 
e: Throwable => error("Error in sending events: ", e)  }finally 
{    shutdownLatch.countDown  }} | 
看来实际工作由processEvents方法来实现喽
| 
       1 
      2 
      3 
      4 
      5 
      6 
      7 
      8 
      9 
      10 
      11 
      12 
      13 
      14 
      15 
      16 
      17 
      18 
      19 
      20 
      21 
      22 
      23 
      24 
      25 
      26 
      27 
      28 
      29 
      30 
      31 
      32 
      33 
      34 
      35 
      36 
      37 
      38 
      39 
      40 
      41 
      42  | 
    
      private 
def processEvents() {  var lastSend = SystemTime.milliseconds //上一次发送的时间,每发送一次会更新  var events = new 
ArrayBuffer[KeyedMessage[K,V]] //一起发送的消息的集合,发送完后也会更新  var full: Boolean = false  
//是否消息的数量已大于指定的batch大小(batch大小指多少消息在一起发送,由"batch.num.messages"确定)  // drain the queue until you get a shutdown command  //构造一个流,它的每个元素为queue.poll(timeout)取出来的值。  //timeout的值是这么计算的:lastSend+queueTime表示下次发送的时间,再减去当前时间,就是最多还能等多长时间,也就是poll阻塞的最长时间  //takeWhile接受的函数参数决定了当item是shutdownCommand时,流就结束了。这个shutdownCommand是shutdown()方法执行时,往队列里发的一个特殊消息  Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))                    .takeWhile(item => if(item != null) item ne shutdownCommand else 
true).foreach {    currentQueueItem =>                                        //对每一条处理的消息      val elapsed = (SystemTime.milliseconds - lastSend)  //距上次发送已逝去的时间,只记录在debug里,并不会以它作为是否发送的条件      // check if the queue time is reached. This happens when the poll method above returns after a timeout and      // returns a null object      val expired = currentQueueItem == null 
//当poll方法超时,就返回一个null,说明一定已经是时候发送这批消息了。当时间到了,poll(timeout)中timeout为负值时,poll一定返回null      if(currentQueueItem != null) {        trace("Dequeued item for topic %s, partition key: %s, data: %s"            .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))        events += currentQueueItem //如果当前消息不为空,就附加在发送集合里      }      // check if the batch size is reached      full = events.size >= batchSize //是否当前发送集合的大小已经大于batch size      if(full || expired) {  //如果发送集合有了足够多的消息或者按时间计可以发送了,就发送        if(expired)          debug(elapsed + " ms elapsed. Queue time reached. Sending..")        if(full)          debug("Batch full. Sending..")        // if either queue time has reached or batch size has reached, dispatch to event handler        tryToHandle(events)        lastSend = SystemTime.milliseconds //更新lastSend,将一个新的ArrayBuffer的引用赋给events        events = new 
ArrayBuffer[KeyedMessage[K,V]]      }  }  // send the last batch of events  tryToHandle(events) //当shutdownCommand遇到时,流会终结。此时之前的消息只要不是恰好发送完,就还会有一些在events里,做为最后一批发送。  if(queue.size > 0) //些时producerSendThread已经不再发消息了,但是queue里若还有没发完的,就是一种异常情况    throw 
new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"      .format(queue.size))} | 
看来Scala的Stream帮了不少忙。shutdown方法将一个特殊的shutdownCommand发给queue,也正好使得这个Stream可以用takeWhile方法正确结束。
好吧,搞了这么多,这个ProducerSendThread只有打包的逻辑 ,并没有处理topic、partition、压缩的逻辑,这些逻辑都在另一个类中。明天再来看看这个handler
Kafka 之 async producer (1),布布扣,bubuko.com
原文:http://www.cnblogs.com/devos/p/3629190.html