原来代码如下
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafka_server, "monmetric") // .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200).setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) .setRetry(newRetryService()).setOffsetCommitPeriodMs(10000).setFirstPollOffsetStrategy(LATEST) .setMaxUncommittedOffsets(250).build();
主要问题出在setMaxUncommittedOffsets(250)上,该属性默认值为1000w,其含义为:
它和另外一个参数有关:offset.commit.period.ms,这个参数是控制多久向 Kafka commit 一次。
maxUncommittedOffset = 1000 的执行过程是这样的:
这就导致了kafka消费慢的问题,我把参数改成默认值就能很快消费了。
原文:https://www.cnblogs.com/blue-rain/p/12430128.html