Kafka学习笔记

Kafka学习笔记,第1张

Kafka学习笔记 Kafka学习笔记 安装

省略
linux下面的安装就简单的尼玛离谱

zooker.config改一下
开放一下kafka的端口就ok

Kafka运作的方法论

首先要明白什么是消息队列。
信息的传递一定有发送方和接收方
发送方和接收方如果同步接受消息会带来很多不便,比如,发送方和接收方不一定同时有空,如果能有一个中间的人,存也在这个人里面存,出也是这个人出,那就方便很多了。消息中间件应运而生。

消息队列有很多模式
点对点模式

点对点的特点:

    一个producer产生的消息只被一个consumer消费消费者无法感知Queue中是否有消息(所以需要增加额外的线程)

发布订阅模式

3. 一个producer的订阅者可以接受这个producer的消息
4. 但是传递消息的速度不确定,因为不同机器处理消息的速度,能力不同

Kafka模型架构

producer:消息的生产者kafka cluster

Broker:Broker是Kafka的实例,每一个服务器上有一个或者多个Kafka实例,我们姑且认为一台broker对应一台服务器。每一个kafka集群内的broker都有一个不重复的编号。Topic:消息主题,可以理解为消息的分类,Kafka的数据保存在topic中,每一个broker都可以创建多个topicPartition:Topic的分区,每一个topic可以有多个分区,分区的作用是负载,提高Kafka的吞吐量。同在一个topic中的不同分区数据不相同。可以理解为这个时数据库分表。Replication:每一个分区都有一个或者多个副本。副本的作用是主分区故障的时候,还能有机子顶上。这么分析,foller和leaer的关系就显而易见了。
对应关系是
kafka cluster 包含 多个broker ,broker包含多个Topic,Topic包含多个partition.其中,每一个分区对多个Replication,存放在不同机子中,实现高可用。副本的数量绝对不能大于broker的数量,也就是说一个broke中不可能存在两个包含相同内容的ReplicaitionMessage:每一条发送的消息Consumer:消费者,就是消息的发送方,用于消费信息,是消息的出口。Consumer Group:可以将多个消费者组成一个消费者组。同一个消费者组的消费者可以消费同一个topic不同分区的信息。

Kafka的工作流程
生产者发送消息 to Kafka Cluster

    生产者获得集群中learder的信息生产者发送信息给Leaderleader写信息在本地leader发送信息给followerfollower写完,发送ack给leaderleader收到所有follower的ack发送消息给producer

采用分区的目的是为了,方便扩展,提高并发,那么不同分区存放的是不同的内容,服务器是如何将不同的请求分发到不同的partition中去的呢?
7. 人工制定partition
8. 数据如果有key,用hash的手段去得到一个partition
9. 轮询出一个patition。
要知道,在消息队列中,保证数据不丢失是一件很重要的事情。它是通过一个叫做ack的参数来保证的。
0表示producer在往集群中发送数据的时候不需要等到集群返回。
1表示就是leader级别的停等协议,保证leader接收到应答信息,就可以发送下一条。
all 表示所有的leader follower接收到消息,才能继续发送下一条数据。

Kafka是如何保存数据的
像Kafka需要高并发额组件,它会单独开辟一块连续的磁盘空间,顺序写入数据。

Partition结构
一个Partition包含多个Segement
一个Segment包含

    xx.index索引文件xx.loag信息存储文件xx.timeindex索引文件
    存储策略
    无论信息是否被消费,Kafka都会保存所有信息。基于时间,7天删除

消费数据
消息存储在log文件后,消费者就可以进行消费了。首先Kafka采用的是 点对点的模式(???我记得不是发布订阅吗,这个待斟酌)其次,消费者也是主动去kafka集群中找leader拉去信息。每一个消费者都有一个组id,同一个消费者组的可以消费同一个topic下不同分区的数据。简单来说,如果A B同属与一个消费者组,对于Topic test这个主题下的一个patition并不能都读取,只能用一个读。


所以会出现某一个消费者消费多个partition的情况,一般消费者组的consumer数量和partion数量一一对应。不然那一个消费者处理两个partition会可能效率更不上。多个消费者反正也不能两个消费者消费同一个partition

Kafka实践 *** 作
    简单介绍一下server.properties

    broker.id 每一个broker在集群中的唯一表示,要求是正数。当broker.id没有发生变化,则不会影响consumers的消息情况。log.dirs kafka数据存放的地址port broker server的服务端口message.max.byte,消息体的最大大小,单位是字节num.network.threads,broker能处理消息的最大线程数,一般为cpu核数num.io.threads brokers处理磁盘IO的线程数hostname broker的主机地址。如果设置了,就会绑定到这个低智商,若是没有,就会绑定到所有接口上,并将其中之一发送到zookeeper.connect zookeeper集群的地址,可以是多个zookeeper.session.timeout.ms zookeeper最大的超时时间zookeeper.connection.timeouout.mslistener:监听端口

Kafka对于很多节点的功能都是通过zookeeper来完成的

    实 *** 命令

    服务器启动

zkServer.sh start

创建主题

	[root@k8s-master1 kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1

kafka-topics.sh这个大类中
用–bootstrap-server localhost:9092来代替 --zookeeper localhost:2081
–creae是定义动作
–topic后面跟了一些信息
名称
parition:设置分区数量
replication-factor,定义副本数量

启动服务器

bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ice

启动客户端,绑定主题

这是生产者,生产信息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ice
以一个简单的Java例子来讲解一下kafka

首先要在Maven中配置

        
            org.springframework.kafka
            spring-kafka
        

来一个消费者实例

public class ProducerFastStart {

    private static final String brokerList = "192.168.236.137:9092";

    private static final String topic = "ice";

    public static void main(String[] args) {
        Properties properties = new Properties();

//        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.put(ProducerConfig.RETRIES_CONFIG,10);

        //设置值序列化器
//        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //设置集群地址

//        properties.put("bootstrap.servers",brokerList);

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);

        KafkaProducer producer = new KafkaProducer(properties);

        //record封装了对象

        ProducerRecord record = new ProducerRecord<>(topic,"kafka-demo","123");

        try {
            Future send = producer.send(record);

            Recordmetadata recordmetadata = send.get();

            System.out.println("topic = "+recordmetadata.topic());
            System.out.println("partition = "+recordmetadata.partition());
            System.out.println("offset = " + recordmetadata.offset());
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

首先要指定brokerlist的ip与端口,你要告诉这个生产者去哪儿放消息。指定一个topic,告诉去去哪个主题下放信息。
整一个信息在Java中,是以Message的形式存在,这个Message是以properties为配置工具配置的。

properties.put(ProducerConfig.RETRIES_CONFIG,10);
配置一下如果发送失败,总计重传几次

在使用java在kafka中传送信息的时候,一般都要跟上key和value。
而且 key和value都需要序列化。

KafkaProducer producer = new KafkaProducer(properties);
在这里绑定了properties中的一些信息,实现了Kafka的一个配置

ProducerRecord record = new ProducerRecord<>(topic,“kafka-demo”,“123”);
在这里设置了信息,用key value键值对的形式产生。

consumer的java实现

    private static final String brokerList = "192.168.236.135:9092";

    private static final String topic = "ice";

    private static final String groupId = "group.demo";
    public static void main(String[] args) {
        Properties properties = new Properties();
//        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);

        properties.put("bootstrap.servers",brokerList);
        properties.put("group.id",groupId);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton(topic));

        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(5000));

            for (ConsumerRecord record:records){
                System.out.println(record.value());
            }
        }

    }

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

在这里,之前的producer是要序列化,那么对应取出来的信息我们要反序列化,这样才能获取到信息。

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
在这里,在这里value也是也是需要反序列化的

    properties.put("bootstrap.servers",brokerList);
    properties.put("group.id",groupId);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

不同的是,要设置消费者的组才行。
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton(topic));
然后需要让consumer订阅一下topic主题

while (true){
ConsumerRecords records = consumer.poll(Duration.ofMillis(5000));

        for (ConsumerRecord record:records){
            System.out.println(record.value());
        }
    }

设置监听的时间,并且解析监听得到的信息。

浅分析一下producer的写入过程
    客户端写程序。通过KafkaProducer这个类来连接broker,会通过properties制定broker的地址,并且制定kv的序列化类型,topic,partition等信息。通过ProducerRecord来封装要发送的消息。KafkaProducer装载ProducerRecord,并且通过send()函数发送。在这个时候会一个序列化器进入一个分区器。序列化器,序列化信息,分区器,来决定这个信息被放入分区器中的那个分区中。
    注意,分区器和序列化器都是可以由我们自己重写的。
    最后还有一个拦截器,拦截器的作用就像AOP的作用一样,相当于一个增强的功能。
浅分析一下consumer的读取过程

首先来讲解一下位移的概念。首先他和在分区中的位移不是一会儿是,虽然他们的英文都是Offset。消费的位移是指,这个位移记录了Consumer要消费的下一条消息的位移。是下一条消息的位移,不是最新消费消息的位移。

比如,一个分区中有10条信息,位移分别是0-9.某个Consumer应用已经消费了5条信息,那么说明这个Consumer消费了位移为0-4的五条消息,此时Consumer的位移是5,指向了下一条消息的位移。

并且,Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为Consumer能够同事消费讴歌分区的数据,所以位移的提交是在分区的粒度上进行的。Consumer需要分配给他每个分区,都进行位移提交。

这个位移的存在是为了保障你不会消费之前已经消费了的数据。而且Kafka对于位移的提交容忍能力非常强,可以分为自动提交和手动提交。自动提交,就是Kafka默默地为你提交位移,作为用户你完全不必 *** 心这些事,而手动提交是指自己提交位移,Kafka Consumer不用管。

自动提交的Kafka使用的大概5s就自动提交一次位移。
但是自动提交会到来一个重复消费的问题,核心问题在于
开启自动提交

props.put("enable.auto.commit","true");
props.put(""auto.commit.interval.ms,"2000")

总体来说还是很简单的

自动提交,Kafka会保证在调用poll方法时,提交上次poll返回的所有消息。从顺序上说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息。但是如果发生了Rebalance就会变得不同,具体怎么不同到时候再说。

手动提交很灵活,但是commitSync()调用的时候Consumer会处于一个阻塞状态,知道远端的Broker返回提交结果,这个状态才会结束。就是因为非资源原因导致的系统阻塞降低了效率。
这就来了异步提交。

但是我的项目里用的是同步提交嘛,接受传递来的数据,然后进行切割预处理,然后进行使用。

异步处理,会导致信息丢失,因为这个时候重试是没有意义的,你重试的时候,这个有效信息位都不知道去哪儿了。

最好的方法其实是两个叠加

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5718028.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-18
下一篇2022-12-18

发表评论

登录后才能评论

评论列表(0条)

    保存