it编程 > 编程语言 > Java

docker kafka go demo

45人参与 2024-07-28 Java

配置

创建网桥

docker network create app-tier --driver bridge

拉取并启动镜像

docker run -d --name kafka-server --hostname kafka-server \
    --network app-tier \
    -p 9092:9092 \
    -e allow_plaintext_listener=yes \
    -e kafka_cfg_advertised_listeners=plaintext://192.168.66.1:9092 \
    -e kafka_cfg_node_id=0 \
    -e kafka_cfg_process_roles=controller,broker \
    -e kafka_cfg_listeners=plaintext://:9092,controller://:9093 \
    -e kafka_cfg_listener_security_protocol_map=controller:plaintext,plaintext:plaintext \
    -e kafka_cfg_controller_quorum_voters=0@kafka-server:9093 \
    -e kafka_cfg_controller_listener_names=controller \
    bitnami/kafka:latest

kafka_cfg_advertised_listeners=plaintext://192.168.66.1:9092,高亮位置为自己的服务器ip

创建一个first分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092

查看一下分区

docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092

go生产与消费kafka中的消息

package main

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"os"
	"os/signal"
)

func prod() {
	// 设置 kafka 代理地址
	brokerlist := []string{"192.168.66.1:9092"}

	// 创建一个 kafka 生产者
	producer := kafka.newwriter(kafka.writerconfig{
		brokers:  brokerlist,
		topic:    "first",
		balancer: &kafka.leastbytes{},
	})

	// 待发送的消息
	message := kafka.message{
		key:   []byte("key"),
		value: []byte("hello, kafka!"),
	}

	// 发送消息
	err := producer.writemessages(context.background(), message)
	if err != nil {
		log.fatal("failed to write messages:", err)
	}

	// 关闭 kafka 生产者
	err = producer.close()
	if err != nil {
		log.fatal("failed to close writer:", err)
	}

	fmt.println("message sent successfully!")
}

func main() {

	go prod()
	// 设置 kafka broker 地址和主题名称
	brokeraddress := "192.168.66.1:9092"
	topic := "first"

	// 创建 kafka 连接
	conn, err := kafka.dialleader(context.background(), "tcp", brokeraddress, topic, 0)
	if err != nil {
		log.fatalf("failed to connect to kafka broker: %s", err)
	}
	defer conn.close()

	// 设置消费者起始偏移量为最新
	//conn.resetoffsets()
	
	// 创建消费者
	consumer := kafka.newreader(kafka.readerconfig{
		brokers:   []string{brokeraddress},
		topic:     topic,
		partition: 0,
		minbytes:  10e3, // 最小字节数
		maxbytes:  10e6, // 最大字节数
	})

	// 创建一个信号通道,用于捕获中断信号
	signals := make(chan os.signal, 1)
	signal.notify(signals, os.interrupt)

	// 开始消费消息
	for {
		select {
		case <-signals:
			log.println("received interrupt signal, shutting down...")
			return
		default:
			// 从 kafka 获取一条消息
			msg, err := consumer.readmessage(context.background())
			if err != nil {
				log.printf("failed to read message: %s", err)
				continue
			}

			// 处理消息
			fmt.printf("received message: %s\n", string(msg.value))
		}
	}
}

上图
在这里插入图片描述

reference
https://hub.docker.com/r/bitnami/kafka

(0)

您想发表意见!!点此发布评论

推荐阅读

Kafka topic消息清理几种方式

07-28

Kafka是什么,以及如何使用SpringBoot对接Kafka

07-28

BAT面试必考Java面试题100+:Kafka+JVM+数据库+分布式(1)

07-28

【Kafka】Windows下安装Kafka(图文记录详细步骤)

07-28

深入探索:Zookeeper+消息队列(kafka)集群

07-28

python kazoo 踩坑记录

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论