38人参与 • 2024-07-28 • Java
高吞吐量:kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。
持久性:kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。
分布式:kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。
支持多种协议:kafka支持多种协议,如tcp、http、udp等,可以与不同的系统进行集成。
灵活的消费模式:kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。
可配置性强:kafka的配置参数非常丰富,可以根据需要进行灵活配置。
社区支持:kafka作为apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。
添加依赖:在pom.xml文件中添加kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。
创建生产者:创建一个kafka生产者类,实现producer接口,并使用kafkatemplate发送消息。
配置生产者:在spring boot的配置文件中配置kafka生产者的相关参数,例如bootstrap服务器地址、kafka主题等。
发送消息:在需要发送消息的地方,注入kafka生产者,并使用其发送消息到指定的kafka主题。
创建消费者:创建一个kafka消费者类,实现consumer接口,并使用kafkatemplate订阅指定的kafka主题。
配置消费者:在spring boot的配置文件中配置kafka消费者的相关参数,例如group id、auto offset reset等。
接收消息:在需要接收消息的地方,注入kafka消费者,并使用其接收消息。
处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。
pom中填了依赖
<dependency>
<groupid>org.springframework.kafka</groupid>
<artifactid>spring-kafka</artifactid>
<version>2.8.1</version>
</dependency>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka-clients</artifactid>
<version>2.8.1</version>
</dependency>
创建生产者:创建一个kafka生产者类,实现producer接口,并使用kafkatemplate发送消息。
import org.apache.kafka.clients.producer.*;
import org.springframework.beans.factory.annotation.value;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
@component
public class kafkaproducer {
@value("${kafka.bootstrap}")
private string bootstrapservers;
@value("${kafka.topic}")
private string topic;
private kafkatemplate<string, string> kafkatemplate;
public kafkaproducer(kafkatemplate<string, string> kafkatemplate) {
this.kafkatemplate = kafkatemplate;
}
public void sendmessage(string message) {
producer<string, string> producer = new kafkaproducer<>(bootstrapservers, new stringserializer(), new stringserializer());
try {
producer.send(new producerrecord<>(topic, message));
} catch (exception e) {
e.printstacktrace();
} finally {
producer.close();
}
}
}
配置生产者:在spring boot的配置文件中配置kafka生产者的相关参数,例如bootstrap服务器地址、kafka主题等。
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.consumerconfig;
import org.springframework.kafka.listener.concurrentmessagelistenercontainer;
import org.springframework.kafka.listener.messagelistener;
import org.springframework.context.annotation.propertysource;
import java.util.*;
import org.springframework.beans.factory.*;
import org.springframework.*;
import org.springframework.*;expression.*;value; @value("${kafka}") properties kafkaprops = new properties(); @bean public kafkatemplate<string, string> kafkatemplate(producerfactory<string, string> pf){ kafkatemplate<string, string> template = new kafkatemplate<>(pf); template .setmessageconverter(new stringjsonmessageconverter()); template .setsendtimeout(duration .ofseconds(30)); return template ; } @bean public producerfactory<string, string> producerfactory(){ defaultkafkaproducerfactory<string, string> factory = new defaultkafkaproducerfactory<>(kafkaprops); factory .setbootstrapservers(bootstrapservers); factory .setkeyserializer(new stringserializer()); factory .setvalueserializer(new stringserializer()); return factory ; } @bean public consumerfactory<string, string> consumerfactory(){ defaultkafkaconsumerfactory<string, string> factory = new defaultkafkaconsumerfactory<>(consumerconfigprops); factory .setbootstrapservers(bootstrapservers); factory .setkeydeserializer(new stringdeserializer()); factory .setvaluedeserializer(new stringdeserializer()); return factory ; } @bean public concurrentmessagelistenercontainer<string, string> container(consumerfactory<string, string> consumerfactory, messagelistener listener){ concurrentmessagelistenercontainer<string, string> container = new concurrentmessagelistenercontainer<>(consumerfactory); container .setmessagelistener(listener); container .setconcurrency(3); return container ; } @bean public messagelistener
消费者
import org.apache.kafka.clients.consumer.*;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
@component
public class kafkaconsumer {
@value("${kafka.bootstrap}")
private string bootstrapservers;
@value("${kafka.group}")
private string groupid;
@value("${kafka.topic}")
private string topic;
private kafkatemplate<string, string> kafkatemplate;
public kafkaconsumer(kafkatemplate<string, string> kafkatemplate) {
this.kafkatemplate = kafkatemplate;
}
public void consume() {
consumer<string, string> consumer = new kafkaconsumer<>(consumerconfigs());
consumer.subscribe(collections.singletonlist(topic));
while (true) {
consumerrecords<string, string> records = consumer.poll(duration.ofmillis(100));
for (consumerrecord<string, string> record : records) {
system.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
private properties consumerconfigs() {
properties props = new properties();
props.put(consumerconfig.bootstrap_servers_config, bootstrapservers);
props.put(consumerconfig.group_id_config, groupid);
props.put(consumerconfig.key_deserializer_class_config, "org.apache.kafka.common.serialization.stringdeserializer");
props.put(consumerconfig.value_deserializer_class_config, "org.apache.kafka.common.serialization.stringdeserializer");
return props;
}
}
kafka和rocketmq都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论