极速精简 Go 版 Logstash

hxl Go语言中文网 今天

前言

今天来介绍 go-zero 生态的另一个组件 go-stash。这是一个 logstash 的 Go 语言替代版,我们用 go-stash 相比原先的 logstash 节省了2/3的服务器资源。如果你在用 logstash,不妨试试,也可以看看基于 go-zero 实现这样的工具是多么的容易,这个工具作者仅用了两天时间。

整体架构

先从它的配置中,我们来看看设计架构。

Clusters:  - Input:      Kafka:        # Kafka 配置 --> 联动 go-queue    Filters:     # filter action      - Action: drop                  - Action: remove_field      - Action: transfer          Output:      ElasticSearch:        # es 配置 {host, index}

看配置名:kafka 是数据输出端,es 是数据输入端,filter 抽象了数据处理过程。

对,整个 go-stash 就是如 config 配置中显示的,所见即所得。

image.png

启动

从 stash.go 的启动流程大致分为几个部分。因为可以配置多个cluster,那从一个 cluster 分析:

  1. 建立与 es 的连接【传入 es 配置】
  2. 构建 filter processorses 前置处理器,做数据过滤以及处理,可以设置多个】
  3. 完善对 es 中 索引配置,启动 handle ,同时将 filter 加入handle【处理输入输出】
  4. 连接下游的 kafka,将上面创建的 handle 传入,完成 kafka 和 es之间的数据消费和数据写入

MessageHandler

在上面架构图中,中间的 filter 只是从 config 中看到,其实更详细是 MessageHandler 的一部分,做数据过滤和转换,下面来说说这块。

以下代码:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go

type MessageHandler struct { writer  *es.Writer indexer *es.Index filters []filter.FilterFunc}

这个就对应上面说的,filter 只是其中一部分,在结构上 MessageHandler是对接下游 es ,但是没有看到对 kafka 的操作。

别急,从接口设计上 MessageHandler 实现了 go-queue 中 ConsumeHandler接口。

这里,上下游就串联了:

  1. MessageHandler 接管了 es 的操作,负责数据处理到数据写入
  2. 对上实现了 kafka 的 Consume 操作。这样在消费过程中执行 handler的操作,从而写入 es

实际上,Consume() 也是这么处理的:

func (mh *MessageHandler) Consume(_, val string) error { var m map[string]interface{}  // 反序列化从 kafka 中的消息 if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {  return err } // es 写入index配置 index := mh.indexer.GetIndex(m)  // filter 链式处理【因为没有泛型,整个处理都是 `map进map出`】 for _, proc := range mh.filters {  if m = proc(m); m == nil {   return nil  } } bs, err := jsoniter.Marshal(m) if err != nil {  return err } // es 写入 return mh.writer.Write(index, string(bs))}

数据流

说完了数据处理,以及上下游的连接点。但是数据要从 kafka -> es ,数据流出这个动作从 kafka 角度看,应该是由开发者主动 pull data from kafka

那么数据流是怎么动起来?我们回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go

其实 启动 整个流程中,其实就是一个组合模式:

func main() { // 解析命令行参数,启动优雅退出 ...  // service 组合模式 group := service.NewServiceGroup() defer group.Stop()

 for _, processor := range c.Clusters {  // 连接es    ...  // filter processors 构建    ...    // 准备es的写入操作 {写入的index, 写入器writer}  handle := handler.NewHandler(writer, indexer)  handle.AddFilters(filters...)  handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))    // 按照配置启动kafka,并将消费操作传入,同时加入组合器  for _, k := range toKqConf(processor.Input.Kafka) {   group.Add(kq.MustNewQueue(k, handle))  } } // 启动这个组合器 group.Start()}

整个数据流,就和这个 group 组合器有关了。

group.Start() |- group.doStart()  |- [service.Start() for service in group.services]

那么说明加入 group 的 service 都是实现 Start()。也就是说 kafka 端的启动逻辑在 Start()

func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers()

 q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait()}
  1. 启动 kafka 消费程序
  2. 启动 kafka 消费拉取端【可能会被名字迷惑,实际上是从 kafka 拉取消息到 q.channel
  3. 消费程序终止,收尾工作

而我们传入 kafka 中的 handler,上文说过其实是 Consume,而这个方法就是在 q.startConsumers() 中执行的:

q.startConsumers() |- [q.consumeOne(key, value) for msg in q.channel]  |- q.handler.Consume(key, value)

这样整个数据流就彻底串起来了:

image.png

总结

作为 go-stash 第一篇文章,本篇从架构和设计上整体介绍 go-stash ,有关性能和为什么我们要开发一个这样的组件,我们下篇文章逐渐揭晓。

https://github.com/tal-tech/go-stash

关于 go-zero 更多的设计和实现文章,可以持续关注。

https://github.com/tal-tech/go-zero

(0)

相关推荐

  • 如何对你的 ELK 生产安全部署?本文交给你~

    如何对你的 ELK 生产安全部署?本文交给你~

  • 高频数据采集请求如何不影响主业务(7)

    上一篇文章讨论了写缓存的架构解决方案,它虽然可以减少数据库写操作的压力,但也存在不足.比如需要长期高频插入数据时,这个解决方案就无法满足,本篇文章我们就围绕这个问题逐步提出解决方案.在架构方案层层展开 ...

  • ELK迁移到splunk

    Elk到Splunk迁移选项 麋鹿的Splunk应用程序(孵化) –连接到Elasticsearch并将数据和仪表板导入Splunk 导入Splunk的仪表板最初搜索Elasticsearch数据节点 ...

  • 圆运动的实战古方总结!(精简领悟版)

    人身疾病,只分内伤病与外感病(温病应属广义伤寒范畴).内伤病,属形质病,身体圆运动仍在,因形质的损灭必待经年累月,故内伤病皆可慢慢调治:外感病,属本气病,初则一气偶偏.继则一气独胜以至身体运动不圆之病 ...

  • 高考古代文化常识10大版块精简图片版,记起来超方便!

    英语园 专注高中英语教学,资源每日更新, 公众号 " 语文 高考古代文化常识精简版 (建议保存图片,方便背诵) 数学学苑 高中数学学习帮手,我们一起学数学 公众号

  • 《辅行诀》精简整理版!

    一.五行体用化图解 陶隐居曰:此图乃<汤液经法>尽要之妙,学者能谙于此,医道毕矣. 经释: 肝德在散.以辛补之,以酸泻之.肝苦急,急食甘以缓之,适其性而衰之也. 心德在耎.以咸补之,苦泻之 ...

  • office 2016精简激活版

    说到办公软件,大家首先想到的肯定是WPS和Office这两款软件,但无一例外,正常情况我们都需要激活才可以永久使用. 类似的软件已经给朋友安利过了,今天给朋友们分享的是office 2016精简激活版 ...

  • 【圆运动的实战古方总结(精简领悟版)】

    人身疾病,只分内伤病与外感病(温病应属广义伤寒范畴).内伤病,属形质病,身体圆运动仍在,因形质的损灭必待经年累月,故内伤病皆可慢慢调治:外感病,属本气病,初则一气偶偏.继则一气独胜以至身体运动不圆之病 ...

  • 精简Command版SqlHelper

    我在写CSharp程序对数据库进行操作时发现Connection对象起到了连接数据库的做用,实际执行SQL语句使用的是Command对象的方法,所以对SqlHelper进行了重写,具体如下: 一.创建 ...

  • 《辅行诀》精简整理版

    一.五行体用化图解           陶隐居曰:此图乃<汤液经法>尽要之妙,学者能谙于此,医道毕矣. 经释: 肝德在散.以辛补之,以酸泻之.肝苦急,急食甘以缓之,适其性而衰之也. 心德在 ...

  • 自制 Win10x64 Pro.iso 精简稳定版系统分享,完全免费

    自制 Win10x64 Pro.iso 精简稳定版系统分享,完全免费

  • 最新版windows10x64/x32 20H2系统 俄罗斯大神精简纯净版来啦

    最新版windows10x64/x32 20H2系统 俄罗斯大神精简纯净版来啦