it编程 > 编程语言 > Java

Kafka是什么,以及如何使用SpringBoot对接Kafka

38人参与 2024-07-28 Java

系列文章目录



kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——kafka可靠性分析及优化实践



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用springboot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

一、kafka与流处理

我们先来看看比较正式的介绍:kafka是一种流处理平台,由linkedin公司创建,现在是apache下的开源项目。kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

二、spring boot与kafka的整合demo

1. 新建springboot工程

如果你没有现成的spring boot项目,那么我们可以使用idea自带的spring initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用apache kafka,另外项目还可以加个spring web准备让前台调用

在这里插入图片描述

2. 添加kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的spring boot应用程序中使用kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
      <version>2.8.11</version>
</dependency>

3. 配置kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了kafka服务器的地址和端口,并配置了消费者组的id,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建kafka生产者

在kafka中,生产者是发送消息的应用程序或服务。在spring boot中,我们可以使用kafkatemplate类来创建kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;

@service
public class kafkaservice {
    
    @autowired
    private kafkatemplate<string, string> kafkatemplate;

    public void sendmessage(string message) {
        kafkatemplate.send("test_topic", message);
    }
}

这里我们使用@autowired注解来自动注入kafkatemplate,并使用send方法将消息发送到名为“test_topic”的kafka主题中。


5. 创建kafka消费者

在kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在spring boot中,我们可以使用@kafkalistener注解来创建kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;

@component
public class kafkalis {

    @kafkalistener(topics = "test_topic", groupid = "test_group")
    public void receivemessage(string message) {
        system.out.println("received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了spring boot和kafka的整合。我们可以启动spring boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.kafkaservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.pathvariable;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class messagecontroller {

    @autowired
    private kafkaservice kafkaservice;

    @getmapping("/send/{message}")
    public string sendmessage(@pathvariable string message) {
        kafkaservice.sendmessage(message);
        return "message sent successfully";
    }
}

在这个例子中,我们使用@autowired注解来自动注入kafkaproducer,并通过发送消息的方法来调用sendmessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 kafka ,怎么启动可参考 ,然后是启动我们的spring boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、kafkatemplate 介绍

不难看出,在springboot中,使用kafka的关键在于 kafkatemplate, 它是 spring 提供的 kafka 生产者模版,用于向 kafka 集群发送消息。并且把 kafka 的生产者客户端封装成了一个 spring bean,提供更加方便易用的 api。

它有三个主要属性:

它的主要方法:

除了上述方法外,kafkatemplate 还提供了其他方法,如 senddefault()sendoffsetstotransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 kafkatemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的demo中,我们直接将其 @autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 kafkatemplate 为什么自己就会被spring 容器管理的呢?其实这得益于springboot中对kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对spring boot熟悉的同学看到 conditionalonclassconditionalonmissingbean 应该就明白了。其实spring boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 kafkatemplate 类,那么它就能被自动配置并存入spring 容器内

总结

今天我们通过一个demo讲解了在springboot中如何对接kafka,也介绍了下关键类 kafkatemplate ,得益于spring boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的api,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 kafka 的原理与结构

(0)

您想发表意见!!点此发布评论

推荐阅读

【Kafka】Windows下安装Kafka(图文记录详细步骤)

07-28

深入探索:Zookeeper+消息队列(kafka)集群

07-28

docker kafka go demo

07-28

python kazoo 踩坑记录

07-28

Kafka topic消息清理几种方式

07-28

BAT面试必考Java面试题100+:Kafka+JVM+数据库+分布式(1)

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论