it编程 > 编程语言 > Asp.net

Kafka整合WebFlux实践

19人参与 2026-03-06 Asp.net

kafka整合webflux

1、引入依赖

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-webflux</artifactid>
</dependency>
<dependency>
    <groupid>io.projectreactor.kafka</groupid>
    <artifactid>reactor-kafka</artifactid>
    <version>1.1.0.release</version>
</dependency>

2、代码示例

@component
public class kafkaservice {

    private static final objectmapper object_mapper = new objectmapper();

    private kafkasender<string, string> kafkasender;
    private kafkareceiver<string, string> kafkareceiver;

    @postconstruct
    public void init() {
        final map<string, object> producerprops = new hashmap<>();
        producerprops.put(producerconfig.key_serializer_class_config, integerserializer.class);
        producerprops.put(producerconfig.value_serializer_class_config, stringserializer.class);
        producerprops.put(producerconfig.bootstrap_servers_config, "localhost:9092");
        final senderoptions<string, string> produceroptions = senderoptions.create(producerprops);
        this.kafkasender = kafkasender.create(produceroptions);

        final map<string, object> consumerprops = new hashmap<>();
        consumerprops.put(consumerconfig.key_deserializer_class_config, integerdeserializer.class);
        consumerprops.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
        consumerprops.put(consumerconfig.client_id_config, "payment-validator-1");
        consumerprops.put(consumerconfig.group_id_config, "payment-validator");
        consumerprops.put(consumerconfig.bootstrap_servers_config, "localhost:9092");
        receiveroptions<string, string> consumeroptions = receiveroptions.<string, string>create(consumerprops)
                .subscription(collections.singleton("demo"))
                .addassignlistener(partitions -> system.out.println("onpartitionsassigned " + partitions))
                .addrevokelistener(partitions -> system.out.println("onpartitionsrevoked " + partitions));
        kafkareceiver<string, string> kafkareceiver = kafkareceiver.create(consumeroptions);
        kafkareceiver.receive().doonnext(r -> {
            system.out.println(r.value());
            r.receiveroffset().acknowledge();
        }).subscribe();
        this.kafkareceiver = kafkareceiver;
    }

    public mono< ?> send() {
        senderrecord<string, string, object> senderrecord = senderrecord.create(new producerrecord<>("demo", value()), 1);
        return kafkasender.send(mono.just(senderrecord)).next();
    }

    private string value() {
        map<string, string> map = new hashmap<>();
        map.put("name", uuid.randomuuid().tostring());
        try {
            return object_mapper.writevalueasstring(map);
        } catch (jsonprocessingexception e) {
            return "{}";
        }
    }
}

3、其它

server:
  port: 8888

spring:
  jackson:
    serialization:
      fail_on_empty_beans: false

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

您想发表意见!!点此发布评论

推荐阅读

C#代码实现生成带Logo的二维码

03-06

C#使用Spire.Doc for .NET实现Word首页面页眉页脚不同设置指南

03-06

C#实现将RTF文档高效转换为PDF格式的两种方法详解

03-06

C#借助Spire.Doc for .NET实现调整Word文档页眉和页脚的高度

03-06

C#中空值校验应用场景分析

03-09

基于C#实现的多线程文件上传下载工具

03-09

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论