38人参与 • 2024-07-28 • Java
kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——kafka可靠性分析及优化实践
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用springboot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧
我们先来看看比较正式的介绍:kafka是一种流处理平台,由linkedin公司创建,现在是apache下的开源项目。kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案
介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。
如果你没有现成的spring boot项目,那么我们可以使用idea自带的spring initializr 来创建一个spring-boot的项目
此时我们可以直接选择使用apache kafka,另外项目还可以加个spring web准备让前台调用
如果你不是像上述一样新建的项目,那你也可以选择在已有的spring boot应用程序中使用kafka,那么你需要在pom.xml文件中添加以下依赖:
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
<version>2.8.11</version>
</dependency>
在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
这里我们指定了kafka服务器的地址和端口,并配置了消费者组的id,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。
在kafka中,生产者是发送消息的应用程序或服务。在spring boot中,我们可以使用kafkatemplate类来创建kafka生产者
package com.zhanfu.kafkademo.service;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;
@service
public class kafkaservice {
@autowired
private kafkatemplate<string, string> kafkatemplate;
public void sendmessage(string message) {
kafkatemplate.send("test_topic", message);
}
}
这里我们使用@autowired注解来自动注入kafkatemplate,并使用send方法将消息发送到名为“test_topic”的kafka主题中。
在kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在spring boot中,我们可以使用@kafkalistener注解来创建kafka消费者。
package com.zhanfu.kafkademo.listener;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
@component
public class kafkalis {
@kafkalistener(topics = "test_topic", groupid = "test_group")
public void receivemessage(string message) {
system.out.println("received message: " + message);
}
}
现在我们已经完成了spring boot和kafka的整合。我们可以启动spring boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与kafka集成。
package com.zhanfu.kafkademo.controller;
import com.zhanfu.kafkademo.service.kafkaservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.pathvariable;
import org.springframework.web.bind.annotation.restcontroller;
@restcontroller
public class messagecontroller {
@autowired
private kafkaservice kafkaservice;
@getmapping("/send/{message}")
public string sendmessage(@pathvariable string message) {
kafkaservice.sendmessage(message);
return "message sent successfully";
}
}
在这个例子中,我们使用@autowired注解来自动注入kafkaproducer,并通过发送消息的方法来调用sendmessage方法。最终项目整体框架如图:
首先自然是启动 kafka ,怎么启动可参考 ,然后是启动我们的spring boot项目
然后在浏览器中输入
http://127.0.0.1:8080/send/hello
最后检查我们的项目日志:
可以看到,整个发送和接收的流程都走通了
不难看出,在springboot中,使用kafka的关键在于 kafkatemplate
, 它是 spring 提供的 kafka 生产者模版,用于向 kafka 集群发送消息。并且把 kafka 的生产者客户端封装成了一个 spring bean,提供更加方便易用的 api。
它有三个主要属性:
producerfactory
:生产者工厂类,用于创建 kafkaproducer 实例。defaulttopic
:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。messageconverter
:消息转换器,用于将消息对象转换为 kafka producerrecord它的主要方法:
除了上述方法外,kafkatemplate 还提供了其他方法,如 senddefault()、sendoffsetstotransaction() 等,可以根据实际需要进行选择和使用。
需要注意的是,在使用 kafkatemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。
当然,很多同学可能还注意到一个细节,我们在上面的demo中,我们直接将其 @autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 kafkatemplate
为什么自己就会被spring 容器管理的呢?其实这得益于springboot中对kafka有了很多自动配置的内容。如下:
如上图,相信对spring boot熟悉的同学看到 conditionalonclass
、 conditionalonmissingbean
应该就明白了。其实spring boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 kafkatemplate 类,那么它就能被自动配置并存入spring 容器内
今天我们通过一个demo讲解了在springboot中如何对接kafka,也介绍了下关键类 kafkatemplate ,得益于spring boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的api,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 kafka 的原理与结构
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论