
- 1.pom依赖
- 2.application.properties 配置文件
- 3. 消费者 配置自动读取
- 4. 偏移量工具
- 5.测试
1.pom依赖
如果是springboot项目可以不指定版本,自动匹配
2.application.properties 配置文件org.apache.kafka kafka-clients2.6.0 org.apache.kafka kafka-streams2.6.0
server.port=2333 #—————————————————————————————————Offset——————————————————————————————————————————— jmw.kafka.offset.bootstrap.servers=ip:9092 jmw.kafka.offset.zookeeper.connect=ip:2181 jmw.kafka.offset.topic=xxx jmw.kafka.offset.group.id=xxx jmw.kafka.offset.enable.auto.commit=false offset.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer offset.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer3. 消费者 配置自动读取
package cn.com.kaf.configuration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class OffsetApplicationProperties implements InitializingBean {
@Value("${jmw.kafka.offset.bootstrap.servers}")
private String serverHostPort;
@Value("${jmw.kafka.offset.zookeeper.connect}")
private String zookeeperConnectHostPort;
@Value("${jmw.kafka.offset.topic}")
private String offsetTopic;
@Value("${jmw.kafka.offset.group.id}")
private String groupId;
@Value("${jmw.kafka.offset.enable.auto.commit}")
private String autoAutoCommit;
@Value("${offset.key.deserializer}")
private String key;
@Value("${offset.value.deserializer}")
private String value;
public static String KAFKA_OFFSET_SERVER_HOST_PORT;
public static String KAFKA_OFFSET_ZOOKEEPER_CONNECT;
public static String KAFKA_OFFSET_TOPIC;
public static String KAFKA_OFFSET_GROUP_ID;
public static String KAFKA_OFFSET_ENABLE_AUTO_COMMIT;
public static String KAFKA_KEY_SERIALIZER;
public static String KAFKA_VALUE_SERIALIZER;
@Override
public void afterPropertiesSet() throws Exception {
KAFKA_OFFSET_SERVER_HOST_PORT = serverHostPort;
KAFKA_OFFSET_ZOOKEEPER_ConNECT = zookeeperConnectHostPort;
KAFKA_OFFSET_TOPIC = offsetTopic;
KAFKA_OFFSET_GROUP_ID = groupId;
KAFKA_OFFSET_ENABLE_AUTO_COMMIT = autoAutoCommit;
KAFKA_KEY_SERIALIZER = key;
KAFKA_VALUE_SERIALIZER = value;
}
}
4. 偏移量工具
package cn.com.kaf.seek;
import cn.com.kaf.configuration.OffsetApplicationProperties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class SeekOffset extends OffsetApplicationProperties {
private static KafkaConsumer consumer = null;
public static void SingleCase(int target){
KafkaConsumer kafkaConsumer = consumer;
final String topicall = KAFKA_OFFSET_TOPIC;
String[] topics = topicall.split(",");
for(String topic:topics) {
consumer.subscribe(Arrays.asList(topic.split(",")));
System.out.println("获取订阅-开始拉去数据");
ConsumerRecords records = consumer.poll(10000);
System.err.println("偏移量记录位置为: "+records.count());
System.err.println("希望偏移量参数位置为: "+target);
List list = consumer.partitionsFor(topic);
System.err.println(topic + " 主题 的分区数为:" + list.size());
List topicList = new ArrayList();
for (PartitionInfo pt : list) {
TopicPartition tp = new TopicPartition(topic, pt.partition());
topicList.add(tp);
}
Map endMap = consumer.endOffsets(topicList);
Map beginmap = consumer.beginningOffsets(topicList);
int i=0;
long aimOffset = 0;
for (TopicPartition tp : topicList) {
System.err.println("消费者为"+tp);
long endOffset = endMap.get(tp);
long beginOffset = beginmap.get(tp);
aimOffset = endOffset - target;
i++;
System.err.println("topic数据总量为:"+(endOffset-beginOffset));
if(aimOffset>0&&aimOffset>=beginOffset){
consumer.seek(tp, aimOffset);
System.err.println("偏移量—>移动成功: "+tp+"|"+aimOffset);
}else{
consumer.seek(tp, beginOffset);
System.err.println("移动失败->并且移动至起始位置:"+tp+"|"+aimOffset+"|"+beginOffset+"|"+endOffset);
}
}
consumer.commitSync();
consumer.close();
System.exit(0);
}
}
private SeekOffset() {
Properties properties = new Properties();
properties.put("bootstrap.servers",KAFKA_OFFSET_SERVER_HOST_PORT);//xxx服务器ip
properties.put("enable.auto.commit",KAFKA_OFFSET_ENABLE_AUTO_COMMIT);
properties.put("zookeeper.connect",KAFKA_OFFSET_ZOOKEEPER_CONNECT);
properties.put("key.deserializer", KAFKA_KEY_SERIALIZER);
properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER);
properties.put("group.id", KAFKA_OFFSET_GROUP_ID);
consumer = new KafkaConsumer<>(properties);
}
}
5.测试
import cn.com.kaf.DemoApplication;
import cn.com.kaf.consumer.ConsumerFactoryTool;
import cn.com.kaf.producer.ProducerFactoryTool;
import cn.com.kaf.seek.SeekOffset;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
public class Te {
@Before
public void StBefore() {
String[] args = new String[0];
SpringApplication.run(DemoApplication.class, args);
}
@Test
public void setOffset() throws InterruptedException {
SeekOffset.SingleCase(2);
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)