45人参与 • 2024-07-28 • Java
创建网桥
docker network create app-tier --driver bridge
拉取并启动镜像
docker run -d --name kafka-server --hostname kafka-server \
--network app-tier \
-p 9092:9092 \
-e allow_plaintext_listener=yes \
-e kafka_cfg_advertised_listeners=plaintext://192.168.66.1:9092 \
-e kafka_cfg_node_id=0 \
-e kafka_cfg_process_roles=controller,broker \
-e kafka_cfg_listeners=plaintext://:9092,controller://:9093 \
-e kafka_cfg_listener_security_protocol_map=controller:plaintext,plaintext:plaintext \
-e kafka_cfg_controller_quorum_voters=0@kafka-server:9093 \
-e kafka_cfg_controller_listener_names=controller \
bitnami/kafka:latest
kafka_cfg_advertised_listeners=plaintext://192.168.66.1
:9092,高亮位置为自己的服务器ip
创建一个first分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --topic first --create --bootstrap-server kafka-server:9092
查看一下分区
docker run -it --rm --network app-tier bitnami/kafka:latest kafka-topics.sh --list --bootstrap-server kafka-server:9092
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
"os"
"os/signal"
)
func prod() {
// 设置 kafka 代理地址
brokerlist := []string{"192.168.66.1:9092"}
// 创建一个 kafka 生产者
producer := kafka.newwriter(kafka.writerconfig{
brokers: brokerlist,
topic: "first",
balancer: &kafka.leastbytes{},
})
// 待发送的消息
message := kafka.message{
key: []byte("key"),
value: []byte("hello, kafka!"),
}
// 发送消息
err := producer.writemessages(context.background(), message)
if err != nil {
log.fatal("failed to write messages:", err)
}
// 关闭 kafka 生产者
err = producer.close()
if err != nil {
log.fatal("failed to close writer:", err)
}
fmt.println("message sent successfully!")
}
func main() {
go prod()
// 设置 kafka broker 地址和主题名称
brokeraddress := "192.168.66.1:9092"
topic := "first"
// 创建 kafka 连接
conn, err := kafka.dialleader(context.background(), "tcp", brokeraddress, topic, 0)
if err != nil {
log.fatalf("failed to connect to kafka broker: %s", err)
}
defer conn.close()
// 设置消费者起始偏移量为最新
//conn.resetoffsets()
// 创建消费者
consumer := kafka.newreader(kafka.readerconfig{
brokers: []string{brokeraddress},
topic: topic,
partition: 0,
minbytes: 10e3, // 最小字节数
maxbytes: 10e6, // 最大字节数
})
// 创建一个信号通道,用于捕获中断信号
signals := make(chan os.signal, 1)
signal.notify(signals, os.interrupt)
// 开始消费消息
for {
select {
case <-signals:
log.println("received interrupt signal, shutting down...")
return
default:
// 从 kafka 获取一条消息
msg, err := consumer.readmessage(context.background())
if err != nil {
log.printf("failed to read message: %s", err)
continue
}
// 处理消息
fmt.printf("received message: %s\n", string(msg.value))
}
}
}
上图
reference
https://hub.docker.com/r/bitnami/kafka
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论