前言
读完本文,你将了解到如下知识点:
- kafka 的消费者 和 消费者组
- 如何正确使用kafka consumer
- 常用的 kafka consumer 配置
消费者 和 消费者组
-
什么是消费者?
image
顾名思义,消费者就是从kafka集群消费数据的客户端,如下图,展示了一个消费者从一个topic中消费数据的模型 -
单个消费者模型存在的问题?
如果这个时候 kafka 上游生产的数据很快,超过了这个消费者1
的消费速度,那么就会导致数据堆积,产生一些大家都知道的蛋疼事情了,那么我们只能加强消费者
的消费能力,所以也就有了我们下面来说的消费者组
-
什么是消费者组?
所谓消费者组
,其实就是一组消费者
的集合,当我们看到下面这张图是不是就特别舒服了,我们采用了一个消费组
来消费这个topic
,众人拾柴火焰高,其消费能力那是按倍数递增的,所以这里我们一般来说都是采用消费者组
来消费数据,而不会是单消费者
来消费数据的。这里值得我们注意的是:- 一个
topic
可以被 多个消费者组
消费,但是每个消费者组
消费的数据是 互不干扰 的,也就是说,每个消费组
消费的都是 完整的数据 。 - 一个分区只能被 同一个消费组内 的一个
消费者
消费,而 不能拆给多个消费者 消费
- 一个
-
是不是一个 消费组 的 消费者 越多其消费能力就越强呢?
从图3
我们就很好的可以回答这个问题了,我们可以看到消费者4
是完全没有消费任何的数据的,所以如果你想要加强消费者组
的能力,除了添加消费者,分区的数量也是需要跟着增加的,只有这样他们的并行度才能上的去,消费能力才会强。
-
为了提高 消费组 的 消费能力,我是不是可以随便添加 分区 和 消费者 呢?
答案当然是否定的啦。。。嘿嘿
我们看到图2
,一般来说我们建议消费者
数量 和分区
数量是一致的,当我们的消费能力不够时,就必须通过调整分区的数量来提高并行度,但是,我们应该尽量来避免这种情况发生,比如:
现在我们需要在图2
的基础上增加一个分区4
,那么这个分区4
该由谁来消费呢?这个时候kafka会进行分区再均衡
,来为这个分区分配消费者,分区再均衡
期间该消费组
是不可用的,并且作为一个被消费者
,分区数的改动将影响到每一个消费者组
,所以在创建topic
的时候,我们就应该考虑好分区数,来尽量避免这种情况发生 -
分区分配过程
上面我们提到了为 分区分配消费者,那么我们现在就来看看分配过程是怎么样的。- 确定 群组协调器
每当我们创建一个消费组,kafka 会为我们分配一个 broker 作为该消费组的 coordinator(协调器) - 注册消费者 并选出 leader consumer
当我们的有了 coordinator 之后,消费者将会开始往该 coordinator上进行注册,第一个注册的 消费者将成为该消费组的 leader,后续的 作为 follower, - 当 leader 选出来后,他会从coordinator那里实时获取分区 和 consumer 信息,并根据分区策略给每个consumer 分配 分区,并将分配结果告诉 coordinator。
- follower 消费者将从 coordinator 那里获取到自己相关的分区信息进行消费,对于所有的 follower 消费者而言,他们只知道自己消费的分区,并不知道其他消费者的存在。
- 至此,消费者都知道自己的消费的分区,分区过程结束,当发送分区再均衡的时候,leader 将会重复分配过程
- 确定 群组协调器
消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
创建Kafka消费者
读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers与创建KafkaProducer的含义一样;key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象;group.id不是严格必须的,但通常都会指定,这个参数是消费者的消费组。
下面是一个代码样例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
props.put("value.deserializer",
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
订阅主题
创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:
consumer.subscribe(Collections.singletonList("customerCountries"));
这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:
consumer.subscribe("test.*");
拉取循环
消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}
其中,代码中标注了几点,说明如下:
1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。
另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。
单线程版
package com.neuedu;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import
public class Consumer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 该地址是集群的子集,用来探测集群。
props.put("group.id", "payment");// cousumer的分组id
"true");// 自动提交offsets
"1000");// 每隔1s,自动提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
props.put("key.deserializer",
反序列化器
props.put("value.deserializer",
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("payment"));// 订阅的topic,可以多个
// String topic = "payment";
// TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
// consumer.assign(Arrays.asList(partition0));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("赌东道赌东道赌东道赌东道赌东道赌东道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
}
}
}
}
多线程版
package com.neuedu;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerRunnable implements Runnable {
// 每个线程维护私有的KafkaConsumer实例
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
"true"); //本例使用自动提交位移
"1000");
props.put("session.timeout.ms", "30000");
//分区分配策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("key.deserializer",
props.put("value.deserializer",
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分区副本自动分配策略
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200); // 本例使用200ms作为获取超时时间
for (ConsumerRecord<String, String> record : records) {
// 这里面写处理消息的逻辑,本例中只是简单地打印消息
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}
package com.neuedu;
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
package com.neuedu;
public class ConsumerMain {
public static void main(String[] args) {
String brokerList = "hadoop03:9092";
String groupId = "testGroup1";
String topic = "payment";
int consumerNum = 2;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
使用起来还是很简单的,不过如果想要用好 consumer,可能你还需要了解以下这些东西:
- 分区控制策略
- consumer 的一些常用配置
- offset 的控制
ok,那么我们接下来一个个来看吧。。。
分区控制策略
- 手动控制分区
咱们先来说下最简单的手动分区控制,代码如下:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
看起来只是把普通的订阅方式修改成了订阅知道 topic
的分区,其余的还是照常使用,不过这里也需要注意一下的是:
- 一般只作为独立消费者,也就是不能加入消费组,或者说他本身就是作为一个消费组存在,要保证这一点,我们只需要保证其
group id
是唯一的就可以了。 - 对于
topic
的分区变动不敏感,也就是说当topic
新增了分区,分区的数据将会发生改变,但该消费组对此确是不感知的,依然照常运行,所以很多时候需要你手动consumer.partitionsFor()
去查看topic
的分区情况 - 不要和
subscription
混合使用
- 使用
partition.assignment.strategy
进行分区策略配置
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
这里的话 kafka 是自带两种分区策略的,为了方便理解,我们以如下场景为例来进行解释:
已知:
TopicA 有 3 个 partition(分区):A-1,A-2,A-3;
TopicB 有 3 个 partition(分区):B-1,B-2,B-3;
ConsumerA 和 ConsumerB 作为一个消费组 ConsumerGroup 同时消费 TopicA 和 TopicB
-
Range
该方式最大的特点就是会将连续的分区分配给一个消费者,根据示例,我们可以得出如下结论:ConsumerGroup 消费 TopicA 的时候:
ConsumerA 会分配到 A-1,A-2
ConsumerB 会分配到 A-3ConsumerGroup 消费 TopicB 的时候:
ConsumerA 会分配到 B-1,B-2
ConsumerB 会分配到 B-3所以:
ConsumerA 分配到了4个分区: A-1,A-2,B-1,B-2
ConsumerB 分配到了2个分区:A-3,B-3 -
RoundRobin
该方式最大的特点就是会以轮询的方式将分区分配给一个个消费者,根据示例,我们可以得出如下结论:ConsumerGroup 消费 TopicA 的时候:
ConsumerA 分配到 A-1
ConsumerB 分配到 A-2
ConsumerA 分配到 A-3ConsumerGroup 消费 TopicB 的时候,因为上次分配到了 ConsumerA,那么这次轮到 ConsumerB了 所以:
ConsumerB 分配到 B-1
ConsumerA 分配到 B-2
ConsumerB 分配到 B-3所以:
ConsumerA 分配到了4个分区: A-1,A-3,B-2
ConsumerB 分配到了2个分区:A-2,B-1,B-3
从上面我们也是可以看出这两种策略的异同,RoundRobin 相比较 Range 会使得分区分配的更加的均衡,但如果是消费单个 topic ,那么其均衡是差不多的,而 Range 会比 RoundRobin 更具优势一点,至于这个优势,还得看你的具体业务了。
-
自定义的分区策略
上面两种分区策略是 kafka 默认自带的策略,虽然大多数情况下够用了,但是可能针对一些特殊需求,我们也可以定义自己的分区策略- Range分区策略源码
如何自定义呢?最好的方式莫过于看源码是怎么实现的,然后自己依葫芦画瓢来一个,所以我们先来看看 Range分区策略源码,如下,我只做了简单的注释,因为它本身也很简单,重点看下assign
的参数以及返回注释就 ok了
public class RangeAssignor extends AbstractPartitionAssignor{ //省略部分代码。。。。 /** * 根据订阅者 和 分区数量来进行分区 * @param partitionsPerTopic: topic->分区数量 * @param subscriptions: memberId 消费者id -> subscription 消费者信息 * @return: memberId ->list<topic名称 和 分区序号(id)> */ @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //topic -> list<消费者> Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); //初始化 返回结果 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //topic String topic = topicEntry.getKey(); // 消费该topic的 consumer-id List<String> consumersForTopic = topicEntry.getValue(); //topic 的分区数量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //平均每个消费者分配的 分区数量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //平均之后剩下的 分区数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //这里就是将连续分区切开然后分配给每个消费者 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
- 自定义一个 分区策略
这里先缓缓把,太简单把,没什么用,太复杂把,一时也想不出好的场景,如果你有需求,欢迎留言,我们一起来实现
- Range分区策略源码
Consumer 常用配置
-
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,
如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。 -
fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设为 100ms,并且fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在100ms 后返回所有可用的数据,就看哪个条件先得到满足。 -
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间。 -
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。 -
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
它的默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。 -
partition.assignment.strategy
(这部分好像重复了 ~~~)
我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。
Range
该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题T1 和 主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。可以通过设置 partition.assignment.strategy 来选择分区策略。
默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。 -
client.id
该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。 -
max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。 -
receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
提交(commit)与位移(offset)
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。
假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:
dup假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:
miss因此,提交位移的方式会对应用有比较大的影响,下面来看下不同的提交方式。
自动提交
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
提交当前位移
为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。下面是一个自动提交的代码样例:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
上面代码poll消息,并进行简单的打印(在实际中有更多的处理),最后完成处理后进行了位移提交。
异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
因此,基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。
混合同步提交与异步提交
正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
} finally {
consumer.close();
}
}
在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。
提交特定位移
commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
null);
count++;
} }
代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。
重平衡监听器(Rebalance Listener)
变更分区命令
kafka-topics.sh --zookeeper hadoop03:2181 --alter --topic test --partitions 6
在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:
- public void onPartitionRevoked(Collection<topicpartition> partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。</topicpartition>
- public void onPartitionAssigned(Collection<topicpartition> partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。</topicpartition>
下面来看一个onPartitionRevoked9)的例子,该例子在消费者失去某个分区时提交位移(以便其他消费者可以接着消费消息并处理):
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
}
}
try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。
从指定位移开始消费
在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费。
如果想从分区开始端重新开始消费,那么可以使用seekToBeginning(TopicPartition tp);如果想从分区的最末端消费最新的消息,那么可以使用seekToEnd(TopicPartition tp)。而且,Kafka还支持我们从指定位移开始消费。从指定位移开始消费的应用场景有很多,其中最典型的一个是:位移存在其他系统(例如数据库)中,并且以其他系统的位移为准。
考虑这么个场景:我们从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据。对于这样的场景,我们可能会按如下逻辑处理:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
processRecord(record);
storeRecordInDB(record);
}
}
这个逻辑似乎没什么问题,但是要注意到这么个事实,在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理。对于这种情况,我们可以优化方案,将持久化到数据库与提交位移实现为原子性操作,也就是要么同时成功,要么同时失败。但这个是不可能的,因此我们可以在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费。这个方案是可行的,我们只需要通过seek()来指定分区位移开始消费即可。下面是一个改进的样例代码:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
//在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
//保存记录结果
storeRecordInDB(record);
//保存位移
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
//提交数据库事务,保存消费的记录以及位移
commitDBTransaction();
}
具体逻辑见代码注释,此处不再赘述。另外注意的是,seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。
package com.neuedu;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import
public class Consumer {
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
static int count = 0;
static KafkaConsumer<String, String> consumer;
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in current offsets:" + currentOffsets);
}
}
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers",
"hadoop03:9092,hadoop05:9092,hadoop06:9092");// 该地址是集群的子集,用来探测集群。
props.put("group.id", "payment");// cousumer的分组id
"false");// 自动提交offsets
"1000");// 每隔1s,自动提交offsets
props.put("session.timeout.ms", "30000");// Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
props.put("key.deserializer",
反序列化器
props.put("value.deserializer",
consumer = new KafkaConsumer<>(props);
// consumer.subscribe(Arrays.asList("payment"));// 订阅的topic,可以多个
String topic = "payment";
TopicPartition partition0 = new TopicPartition(topic, 0);
// TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0));
Collection<TopicPartition> partitions = Arrays.asList(partition0);
consumer.seekToBeginning(partitions);
// consumer.seek(partition0,495);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("赌东道赌东道赌东道赌东道赌东道赌东道 offset = %d, key = %s, value = %s, partition = %s",
record.offset(), record.key(), record.value(),record.partition());
System.out.println();
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1 == 0)
null);
count++;
}
}
}
}