Kafka消费者客户端从Kafka cluster中读取消息并处理。
Kafka消费者可以手动绑定自己到某个topic的某些partition上或者通过subscribe方法监听某个topic自动绑定。Kafka消费者绑定到某个parition后就和这个partition的leader连接,然后发出fetch request, 获取消息后进行处理。
offset管理
kafka的消费模型是一个partition最多被一个consumer消费,而offset可以有consumer控制,例如通过seek前进或后退到某个offset位置。
首次连接时,可以通过KafkaConsumer配置参数里的auto.offset.reset参数决定是从最新的位置(默认)还是从就早的位置开始消费。
默认情况下, enable.auto.commit参数是true,即KafkaConsumer客户端会定时commit offset,所有要注意的一点是如果poll函数得到ConsumerRecords后如果处理是异步的,则可能出现消费处理还没有完成但是却commit offset了,这时如果进程挂掉则重启后则会发生丢消息的情况。这里有两种解决方案,1是poll后的处理是同步的,这样下一次poll会尝试commit offset,则能保证at least one语义。2是关闭enable.auto.commit, 然后通过KafkaConsumer.commitSync方法来手动commit offset。
max.poll.interval.ms参数用于设置kafka消费者处理一次poll的消费结果的最大时间(默认300s),如果超过了这个时间则consumer被认为挂了会重新rebalance。
Consumer线程相关
消费者多线程处理有几种方式
- 每个consumer只由一个线程处理,优点是能保证partition内有序和实现简单,缺点是并发能力受限于partition的数量
- 将consumption和process过程分离,即consumer拉到一个消息后传递给另一个线程或线程池处理,这里提高了并发能力但是需要注意多线程处理中的顺序问题不再保证以及可能出现consumer提交了offset而线程池没处理完的情况,另外线程池要注意处理慢导致的内存队列积压问题。
KafkaConsumer.subscribe
监听某个topic
subscribe(Collection topics, ConsumerRebalanceListener listener)
当消费者使用kafka cluster来管理group membership时,ConsumerRebalanceListener会在consumer rebalance时调用,consumer rebalance发生在消费者或消费关系变化的时候
- 某个消费进程挂掉
- 新消费进程加入
- partition数量发生变化时
这个Listener的常见用途是保存这个partition的最新消费offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里保存当前的partition和offset到数据库中。然后reassign完成后,void onPartitionsAssigned(java.util.Collection partitions)中从数据库读取之前的消费位置,通过seek方法设置消费位置继续消费。
Kafka.poll
public ConsumerRecords<K, V> poll(long timeout) { // KafkaConsumer不是线程安全的 acquireAndEnsureOpen(); try { if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) client.pollNoWakeup(); if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } }
pollOnce处理
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { client.maybeTriggerWakeup(); // 协调者进行一次poll,里面会根据auto.commit.interval.ms决定是否自动提交offset coordinator.poll(time.milliseconds(), timeout); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // 如果已经有record数据了直接返回 // if data is available already, return it immediately Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 发送一次fetch请求 // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); // 等待fetch请求结果 client.poll(pollTimeout, now, new PollCondition() { @Override public boolean shouldBlock() { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.needRejoin()) return Collections.emptyMap(); // 返回fetch结果 return fetcher.fetchedRecords(); }