alpakka-kafka(2)-consumer

alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。在kafka-consumer的实现细节上,为了达到高可用、高吞吐的目的,topic又可用划分出多个分区partition。分区是分布在kafka集群节点broker上的。由于一个topic可能有多个partition,对应topic就会有多个consumer,形成一个consumer组,共用统一的groupid。一个partition只能对应一个consumer、而一个consumer负责从多个partition甚至多个topic读取消息。kafka会根据实际情况将某个partition分配给某个consumer,即partition-assignment。所以一般来说我们会把topic订阅与consumer-group挂钩。这个可以在典型的ConsumerSettings证实:

val system = ActorSystem("kafka-sys")  val config = system.settings.config.getConfig("akka.kafka.consumer")  val bootstrapServers = "localhost:9092"  val consumerSettings =    ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)      .withBootstrapServers(bootstrapServers)      .withGroupId("group1")      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

我们先用一个简单的consumer plainSource试试把前一篇示范中producer写入kafka的消息读出来:

import akka.actor.ActorSystemimport akka.kafka._import akka.kafka.scaladsl._import akka.stream.{RestartSettings, SystemMaterializer}import akka.stream.scaladsl.{Keep, RestartSource, Sink}import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}import scala.concurrent._import scala.concurrent.duration._object plain_source extends App {  val system = ActorSystem("kafka-sys")  val config = system.settings.config.getConfig("akka.kafka.consumer")  implicit val mat = SystemMaterializer(system).materializer  implicit val ec: ExecutionContext = mat.executionContext  val bootstrapServers = "localhost:9092"  val consumerSettings =    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)      .withBootstrapServers(bootstrapServers)      .withGroupId("group1")      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")  val subscription = Subscriptions.topics("greatings")  Consumer.plainSource(consumerSettings, subscription)    .runWith(Sink.foreach(msg => println(msg.value())))  scala.io.StdIn.readLine()  system.terminate()}

以上我们没有对读出的消息做任何的业务处理,直接显示出来。注意每次都会从头完整读出,因为设置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用读出的数据进行业务处理的话,每次开始运行应用时都会重复从头执行这些业务。所以需要某种机制来标注已经读取的消息,也就是需要记住当前读取位置offset。

Consumer.plainSource输入ConsumerRecord类型:

public ConsumerRecord(String topic,                          int partition,                          long offset,                          K key,                          V value) {        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);    }

这个ConsumerRecord类型里包括了offset,用户可以自行commit这个位置参数,也就是说用户可以选择把这个offset存储在kafka或者其它的数据库里。说到commit-offset,offset管理机制在kafka-consumer业务应用中应该属于关键技术。kafka-consumer方面的业务流程可以简述为:从kafka读出业务指令,执行指令并更新业务状态,然后再从kafka里读出下一批指令。为了实现业务状态的准确性,无论错过一些指令或者重复执行一些指令都是不能容忍的。所以,必须准确的标记每次从kafka读取数据后的指针位置,commit-offset。但是,如果读出数据后即刻commit-offset,那么在执行业务指令时如果系统发生异常,那么下次再从标注的位置开始读取数据时就会越过一批业务指令。这种情况称为at-most-once,即可能会执行一次,但绝不会重复。另一方面:如果在成功改变业务状态后再commit-offset,那么,一旦执行业务指令时发生异常而无法进行commit-offset,下次读取的位置将使用前一次的标注位置,就会出现重复改变业务状态的情况,这种情况称为at-least-once,即一定会执行业务指令,但可能出现重复更新情况。如果涉及资金、库存等业务,两者皆不可接受,只能采用exactly-once保证一次这种模式了。不过也有很多业务要求没那么严格,比如某个网站统计点击量,只需个约莫数,无论at-least-once,at-most-once都可以接受。

kafka-consumer-offset是一个Long类型的值,可以存放在kafka内部或者外部的数据库里。如果选择在kafka内部存储offset, kafka配置里可以设定按时间间隔自动进行位置标注,自动把当前offset存入kafka里。当我们在上面例子的ConsumerSettings里设置自动commit后,多次重新运行就不会出现重复数据的情况了:

val consumerSettings =    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)      .withBootstrapServers(bootstrapServers)      .withGroupId("group1")      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")        //自动commit      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")   //自动commit间隔      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

alpakka-kafka提供了Committer类型,是akka-streams的Sink或Flow组件,负责把offset写入kafka。如果用Committer的Sink或Flow就可以按用户的需要控制commit-offset的发生时间。如下面这段示范代码:

val committerSettings = CommitterSettings(system)  val control: DrainingControl[Done] =    Consumer      .committableSource(consumerSettings, Subscriptions.topics("greatings"))      .mapAsync(10) { msg =>        BusinessLogic.runBusiness(msg.record.key, msg.record.value)          .map(_ => msg.committableOffset)      }      .toMat(Committer.sink(committerSettings))(DrainingControl.apply)      .run()control.drainAndShutdown();  scala.io.StdIn.readLine()  system.terminate()}object BusinessLogic {  def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)}

上面这个例子里BusinessLogic.runBusiess()模拟一段业务处理代码,也就是说完成了业务处理之后就用Committer.sink进行了commit-offset。这是一种at-least-once模式,因为runBusiness可能会发生异常失败,所以有可能出现重复运算的情况。Consumer.committableSource输出CommittableMessage:

def committableSource[K, V](settings: ConsumerSettings[K, V],                              subscription: Subscription): Source[CommittableMessage[K, V], Control] =    Source.fromGraph(new CommittableSource[K, V](settings, subscription))  final case class CommittableMessage[K, V](      record: ConsumerRecord[K, V],      committableOffset: CommittableOffset  )  @DoNotInherit sealed trait CommittableOffset extends Committable {    def partitionOffset: PartitionOffset  }

Committer.sink接受输入Committable类型并将之写入kafka,上游的CommittableOffset 继承了 Committable。另外,这个DrainingControl类型结合了Control类型和akka-streams终结信号可以有效控制整个consumer-streams安全终结。

alpakka-kafka还有一个atMostOnceSource。这个Source组件每读一条数据就会立即自动commit-offset:

def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],                             subscription: Subscription): Source[ConsumerRecord[K, V], Control] =    committableSource[K, V](settings, subscription).mapAsync(1) { m =>      m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)    }

可以看出来,atMostOnceSource在输出ConsumerRecord之前就进行了commit-offset。atMostOnceSource的一个具体使用示范如下:

import scala.collection.immutable  val control: DrainingControl[immutable.Seq[Done]] =    Consumer      .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))      .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))      .toMat(Sink.seq)(DrainingControl.apply)      .run()  control.drainAndShutdown();  scala.io.StdIn.readLine()  system.terminate()

所以,使用atMostOnceSource后是不需要任何Committer来进行commit-offset的了。值得注意的是atMostOnceSource是对每一条数据进行位置标注的,所以运行效率必然会受到影响,如果要求不是那么严格的话还是启动自动commit比较合适。

对于任何类型的交易业务系统来说,无论at-least-once或at-most-once都是不可接受的,只有exactly-once才妥当。实现exactly-once的其中一个方法是把offset与业务数据存放在同一个外部数据库中。如果在外部数据库通过事务处理机制(transaction-processing)把业务状态更新与commit-offset放在一个事务单元中同进同退就能实现exactly-once模式了。下面这段是官方文档给出的一个示范:

val db = new mongoldb  val control = db.loadOffset().map { fromOffset =>    Consumer      .plainSource(        consumerSettings,        Subscriptions.assignmentWithOffset(          new TopicPartition(topic, /* partition = */ 0) -> fromOffset        )      )      .mapAsync(1)(db.businessLogicAndStoreOffset)      .toMat(Sink.seq)(DrainingControl.apply)      .run()  }class mongoldb {  def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...  def loadOffset(): Future[Long] = // ...}

在上面这段代码里:db.loadOffset()从mongodb里取出上一次读取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把这个offset放在一个tuple (TopicPartition,Long)里。TopicPartition定义如下:

public TopicPartition(String topic, int partition) {        this.partition = partition;        this.topic = topic;    }

这样Consumer.plainSource就可以从offset开始读取数据了。plainSource输出ConsumerRecord类型:

public ConsumerRecord(String topic,                          int partition,                          long offset,                          K key,                          V value) {        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);    }

这里面除业务指令value外还提供了当前offset。这些已经足够在businessLogicAndStoreOffset()里运算一个单独的business+offset事务了(transaction)。

(0)

相关推荐

  • Kafka到底会不会丢消息

    目录1.kafka是什么一种高吞吐量的分布式.发布订阅消息系统,它可以处理消费者规模的,网站中的所有动作流数据,具有高性能.持久化.多副本备份.横向扩展能力--以时间复杂度为 O(1) 的方式提供消息 ...

  • Kafka 如果丢了消息,怎么处理的?

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种. Broker Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞 ...

  • Kafka如果丢了消息,怎么处理的?

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种.Java面试宝典PDF完整版 Broker Broker丢失消息是由于Kafka本身的原因造成的,ka ...

  • 30分钟带你了解「消息中间件」Kafka、RocketMQ

    消息中间件的应用场景 主流 MQ 框架及对比 说明 Kafka 优点 Kafka 缺点 RocketMQ Pulsar 发展趋势 各公司发展 Kafka Kafka 是什么? Kafka 术语 Kaf ...

  • Kafka 实现延迟队列、死信队列、重试队列

    Kafka中实现延迟队列 在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见, ...

  • kafka 异步双活方案 mirror maker2 深度解析

    mirror maker2背景 通常情况下,我们都是使用一套kafka集群处理业务.但有些情况需要使用另一套kafka集群来进行数据同步和备份.在kafka早先版本的时候,kafka针对这种场景就有推 ...

  • 掌握了它,Go 对接 Kafka 就不是问题了

    Go语言中文网 昨天 最近看到了个重磅消息,即将发布的 Kafka 2.8 版本会移除 ZooKeeper,这可是 Kafka 在架构上的重大升级.让一向"重量级"的 Kafka ...

  • kafka介绍和使用

    1.1.       主要功能 根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能: 1:It lets you publish and subscribe to str ...

  • Kafka如何做到1秒处理1500万条消息?

    " 一位软件工程师将通过本文向您呈现 Apache Kafka 在大型应用中的 20 项最佳实践. Apache Kafka 是一款流行的分布式数据流平台,它已经广泛地被诸如 New Rel ...