30人参与 • 2024-07-28 • Java
# -*- coding: utf-8 -*-
import json
import json
import msgpack
from loguru import logger
from kafka import kafkaproducer
from kafka.errors import kafkaerror
def kfk_produce_1():
"""
发送 json 格式数据
:return:
"""
producer = kafkaproducer(
bootstrap_servers='192.168.85.109:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
#logstash-topic-one
#producer.send('python_test_topic', {'key': 'value'})
producer.send('logstash-topic-one', {'name': 'value'})
kfk_produce_1()
input{
kafka{
bootstrap_servers => "192.168.85.109:9092"
client_id => "consumer_id"
group_id => "consumer_group"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["logstash-topic-one","logstash-topic-two"]
}
}
output {
if [@metadata][kafka][topic] == "logstash-topic-one" {
elasticsearch {
hosts => "http://192.168.85.109:9200"
index => "kafka-one-data"
timeout => 300
}
}
if [@metadata][kafka][topic] == "logstash-topic-two" {
elasticsearch {
hosts => "http://192.168.85.109:9200"
index => "kafka-two-data"
timeout => 300
}
}
stdout {}
}
type => "json"
codec => json {
charset => "utf-8"
}
input{
kafka{
bootstrap_servers => "192.168.85.109:9092"
client_id => "consumer_id"
group_id => "consumer_group"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["logstash-topic-one","logstash-topic-two"]
type => "json"
codec => json {
charset => "utf-8"
}
}
}
output {
if [@metadata][kafka][topic] == "logstash-topic-one" {
elasticsearch {
hosts => "http://192.168.85.109:9200"
index => "kafka-one-data"
timeout => 300
}
}
if [@metadata][kafka][topic] == "logstash-topic-two" {
elasticsearch {
hosts => "http://192.168.85.109:9200"
index => "kafka-two-data"
timeout => 300
}
}
stdout {}
}
{
"name" => "value",
"@version" => "1",
"type" => "json",
"@timestamp" => 2023-05-17t06:13:48.825z
}
{
"@version" => "1",
"type" => "json",
"@timestamp" => 2023-05-17t06:20:57.729z,
"name" => "令狐冲",
"age" => "30",
"height" => "180cm"
}
filter { mutate {
remove_field => ["@version","@timestamp","type"] # 删除字段
}
}
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论