Fluentd-kafka插件用法详解
Fluentd支持从kafka订阅数据,同时支持向kafka发布数据。这两项功能集成在一个插件中:fluent-plugin-kafka,我们在下文中分别称之为输入插件和输出插件。
【安装说明】
通过以下命令安装fluent-plugin-kafka:
td-agent-gem install fluent-plugin-kafka此插件需要Ruby版本不低于2.1,且输入插件要求源kafka版本不低于0.9,输出插件要求目的kafka版本不低于0.8。
如果要使用插件的zookeeper相关参数,需要安装zookeeper gem,可能还需要安装linux开发工具,如ruby-devel、gcc、make等。
【输入插件 - kafka】
插件以“单消费者”模式订阅kafka消息。
单消费者模式是指:每个kafka输入插件独立地订阅kafka消息。
这种模式可以满足极简单的应用场景。其缺点为:
每次只能从一个topic获取消息 如果有多个单消费者进程同时订阅相同的topic,进程之间无法协调如何分配不同的分区 如果多个单消费者进程中某个进程挂掉,其他进程无法从该进程原先订阅位置进行恢复。
<source> @type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. topics <listening topics(separate with comma',')> format <input text type (text|json|ltsv|msgpack)> :default => json message_key <key (Optional, for text format only, default is message)> add_prefix <tag prefix (Optional)> add_suffix <tag suffix (Optional)>
# Optionally, you can manage topic offset by using zookeeper offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)> offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
# ruby-kafka consumer options max_bytes (integer) :default => nil (Use default of ruby-kafka) max_wait_time (integer) :default => nil (Use default of ruby-kafka) min_bytes (integer) :default => nil (Use default of ruby-kafka)</source>@type:插件类型,取值为kafka brokers:逗号分隔的broker列表,每个broker需要指定ip和端口 topics:逗号分隔的topic列表 format:输入消息的格式,有text、json、ltsv、msgpack等几种 message_key:消息格式为text时,指定文本中message字段的名称 add_prefix:tag增加前缀 add_suffix:tag增加后缀
add_prefix kafka<source> @type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. format <input text type (text|json|ltsv|msgpack)> <topic> topic <listening topic> partition <listening partition: default=0> offset <listening start offset: default=-1> </topic> <topic> topic <listening topic> partition <listening partition: default=0> offset <listening start offset: default=-1> </topic></source><source> @type kafka_group
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. consumer_group <consumer group name, must set> topics <listening topics(separate with comma',')> format <input text type (text|json|ltsv|msgpack)> :default => json message_key <key (Optional, for text format only, default is message)> add_prefix <tag prefix (Optional)> add_suffix <tag suffix (Optional)> retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved> time_source <source for message timestamp (now|kafka|record)> :default => now time_format <string (Optional when use_record_time is used)> # ruby-kafka consumer options max_bytes (integer) :default => 1048576 max_wait_time (integer) :default => nil (Use default of ruby-kafka) min_bytes (integer) :default => nil (Use default of ruby-kafka) offset_commit_interval (integer) :default => nil (Use default of ruby-kafka) offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka) fetcher_max_queue_size (integer) :default => nil (Use default of ruby-kafka) start_from_beginning (bool) :default => true</source>@type:插件类型,取值为kafka_group consumer_group:设定消费者组名称,必选 time_source:指定日志事件中时间戳来源,可取now、kafka和record time_format:当时间源为record时,设置时间格式以提取其中的时间戳 offset_commit_interval:设置offset提交时间间隔,默认10秒 offset_commit_threshold:插件可批量处理消息后再提交一次offset,此参数用于设置批量处理的消息数。默认为0,不采用批量提交机制。 start_from_beginning:true,从头开始消费topic;false,只消费新消息。默认为true。
<match app.**> @type kafka2
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly topic_key (string) :default => 'topic' partition_key (string) :default => 'partition' partition_key_key (string) :default => 'partition_key' message_key_key (string) :default => 'message_key' default_topic (string) :default => nil default_partition_key (string) :default => nil default_message_key (string) :default => nil exclude_topic_key (bool) :default => false exclude_partition_key (bool) :default => false exclude_partition (bool) :default => false exclude_message_key (bool) :default => false get_kafka_client_log (bool) :default => false headers (hash) :default => {} headers_from_record (hash) :default => {} use_default_for_unknown_topic (bool) :default => false
<format> @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json </format>
# Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section <inject> tag_key tag time_key time </inject>
# See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used. <buffer topic> flush_interval 10s </buffer>
# ruby-kafka producer options idempotent (bool) :default => false sasl_over_ssl (bool) :default => true max_send_retries (integer) :default => 1 required_acks (integer) :default => -1 ack_timeout (integer) :default => nil (Use default of ruby-kafka) compression_codec (string) :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)</match>@type:插件类型,取值为kafka2 topic_key:设置目的topic取自日志记录中的哪个字段。 比如:topic_key为日志中的category字段,如果该字段的某个值为app,那么消息会被发布到kafka的名称为app的topic中。 需要注意的是,在插件的缓存配置中也需要设置该参数的取值。 topic_key category<buffer category> # topic_key should be included in buffer chunk key# ...</buffer>如果你设置了topic_key为category,那么在<buffer>配置中也需要以此作为chunk的类型值。 default_topic:默认topic,若未设置topic_key,则topic取此处的值。 <format>:设置输出消息格式,支持json、ltsv或其他输出插件 required_acks:设置每个请求的ack数,可设置1、2这样的小的数字以提高性能。 compression_codec:设置输出消息的压缩方式,支持gzip和snappy。
| default_partition_key | partition_key_key | 消息负载均衡方式 |
| 未设置 | 不存在 | 随机分配分区 |
| 已设置 | 不存在 | 分配到default_partition_key指定的分区 |
| 未设置 | 存在 | 含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息随机分配一个分区 |
| 已设置 | 存在 | 含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息分配到default_partition_key指定的分区 |
通常是由于插件使用的ruby-kafka和kafka集群版本不匹配导致的。
解决办法有两个:
升级kafka集群到最新版本,最新版更快更健壮
降级ruby-kafka或fluent-plugin-kafka以适配当前使用的kafka
我们这里只是简单介绍了一些fluent-plugin-kafka插件的使用规则,后续将会根据通过一些示例来进一步了解其用法。
欢迎持续关注,也欢迎大家收藏、转发、点击【在看】,多多支持。
赞 (0)
