近期项目中有一个数据展示功能,其数据来源于Kafka,项目要求Kafka
每次都消费最新的数据,简要记录下其实现方案。
固定组实现-不生效
将auto.offset.reset
设置为latest
同时采用固定group,此种方式只在第一次读取时有效,后续再次读取时仍然从上次读取的地方开始继续读,不满足使用要求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
// 每次将组名动态生成
props.put("group.id", "test-group-1");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "raw_message";
consumer.subscribe(Arrays.asList(topic));
|
动态组实现-生效
将auto.offset.reset
设置为latest
,同时每次消费时将group的名称动态生成,这样即可确保每次读取的都是最新的消息。
其缺点是group的数量会不断增加,偏移量___consumer_offsets 多次保存,且没找到有效的删除group的方法,也没办法做到定期清理,会对性能产生影响,通常只作为测试与实现使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
// 每次将组名动态生成
props.put("group.id", "test-group-" + RandomStringUtils.randomAlphabetic(6));
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "raw_message";
consumer.subscribe(Arrays.asList(topic));
|
固定组实现-生效
将auto.offset.reset
设置为latest
的同时,group名称固定不变 ,给对应Consumer调用seekToEnd()方法,此种方式不需要动态切换组,推荐使用此方式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
props.put("group.id", "test-group-1");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "raw_message";
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> topics = Arrays.asList(topicPartition);
consumer.assign(topics);
consumer.seekToEnd(topics);
|