it编程 > 编程语言 > Java

logstash同步数据从kafka到es集群

30人参与 2024-07-28 Java

背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想到了通过一个中间件来做,python读取成功一个txt文件,并且加载到kafka中以后,就将这个txt文件备份然后删除掉原始文件。

第一步:向kafka中添加数据,我用python连接kafka集群,向其中加载数据
# -*- coding: utf-8 -*-

import json
import json
import msgpack
from loguru import logger
from kafka import kafkaproducer
from kafka.errors import kafkaerror

def kfk_produce_1():
    """
        发送 json 格式数据
    :return:
    """
    producer = kafkaproducer(
        bootstrap_servers='192.168.85.109:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    #logstash-topic-one
    #producer.send('python_test_topic', {'key': 'value'})
    producer.send('logstash-topic-one', {'name': 'value'})

kfk_produce_1()
执行完的结果,来界面工具上看,显示这样,说明数据已经加载进来了

在这里插入图片描述

第二步:配置logstash,将kafka中的数据加载到es集群中
编写的logstash.conf配置如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}
第三步:执行logstash,通过kibana查看数据是否在es集群中,展示如下,则说明配置是正确的

在这里插入图片描述

在这里插入图片描述

问题1:现在发现,name字段是在message下面,如果是多个字段的话,不方便查询,想着怎么讲字段从message中弄出来,修改的配置如下,增加一段这样的代码就ok了

type => "json"
        codec => json {
            charset => "utf-8"
        }
完整的配置文件logstash.conf代码如下;
input{
      kafka{
        bootstrap_servers => "192.168.85.109:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        topics => ["logstash-topic-one","logstash-topic-two"]
        type => "json"
        codec => json {
            charset => "utf-8"
        }
      }
}
output {

  if [@metadata][kafka][topic] == "logstash-topic-one" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-one-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "logstash-topic-two" {
        elasticsearch {
          hosts => "http://192.168.85.109:9200"
          index => "kafka-two-data"
          timeout => 300
        }
    }
  stdout {}
}
然后我又造了一个多字段的场景如下;

在这里插入图片描述

我先去logstash中查看日志如下,字段已经分离出来了
{
          "name" => "value",
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17t06:13:48.825z
}
{
      "@version" => "1",
          "type" => "json",
    "@timestamp" => 2023-05-17t06:20:57.729z,
          "name" => "令狐冲",
           "age" => "30",
        "height" => "180cm"
}
去kibana中去查询,显示如下,测试成功喽,😄

在这里插入图片描述
在这里插入图片描述

问题2:在查询结果中发现,有些字段是没有用的,看看怎么去掉?

在配置文件中增加一个过滤器就可以解决了
filter { mutate {
                 remove_field => ["@version","@timestamp","type"] # 删除字段
                 }
 }
然后再去kibana中去查看,就发现这会儿的字段格式非常好看了,😄

在这里插入图片描述

文档后续再继续完善,有好的建议或者问题可以留言交流,😄
(0)

您想发表意见!!点此发布评论

推荐阅读

黑马头条 Kafka

07-28

如何在SpringCloud中使用Kafka Streams实现实时数据处理

07-28

SpringBoot——集成Kafka详解

07-28

Kafka Rebanlace次数过高问题

07-28

(ROOT)KAFKA详解

07-28

kafka实现负载均衡的原理,OpenKruise v0(1)

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论