Spring 对Apache Kafka的支持与集成

1. 引言

Apache Kafka 是一个分布式的、容错的流处理系统。在本文中,我们将介绍Spring对Apache Kafka的支持,以及原生Kafka Java客户端Api 所提供的抽象级别。

Spring Kafka 通过 @KafkaListener 注解,带来了一个简单而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和消息驱动的 POJO 。

2. 安装和设置

要下载和安装Kafka,请参考官方指南。然后还需要在 pom.xml 文件中添加 spring-kafka:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.3.7.RELEASE</version></dependency>

新建一个 Spring Boot 示例应用程序,以默认配置启动。

3. 配置 Topics

以前我们使用命令行工具在 Kafka 中创建 topic,例如:

$ bin/kafka-topics.sh --create   --zookeeper localhost:2181   --replication-factor 1 --partitions 1   --topic mytopic

但是随着 AdminClient 在Kafka中的引入,我们现在可以通过编程来创建 Topic

如下代码,添加 KafkAdmin bean 到 Spring中,它将自动为 NewTopic 类的所有 bean 添加 topic

@Configurationpublic class KafkaTopicConfig {        @Value(value = "${kafka.bootstrapAddress}")    private String bootstrapAddress;     @Bean    public KafkaAdmin kafkaAdmin() {        Map<String, Object> configs = new HashMap<>();        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);        return new KafkaAdmin(configs);    }        @Bean    public NewTopic topic1() {         return new NewTopic("developlee", 1, (short) 1);    }}

4. 消息生成

要创建消息,首先需要配置 ProducerFactory ,并设置创建 Kafka Producer 实例的策略,然后使用 KafkaTemplateKafkaTemplate 包装了 Producer 实例,并提供向 Kafka Topic 发送消息的简便方法。

在整个应用程序上下文中使用单个实例将提供更高的性能。因此推荐使用一个 Producer 实例。该实例是线程安全的,所以 KakfaTemplate 实例也是线程安全的,

4.1. Producer 配置

@Configurationpublic class KafkaProducerConfig {     @Bean    public ProducerFactory<String, String> producerFactory() {        Map<String, Object> configProps = new HashMap<>();        configProps.put(          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,           bootstrapAddress);        configProps.put(          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,           StringSerializer.class);        configProps.put(          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,           StringSerializer.class);        return new DefaultKafkaProducerFactory<>(configProps);    }     @Bean    public KafkaTemplate<String, String> kafkaTemplate() {        return new KafkaTemplate<>(producerFactory());    }}

4.2. 消息发布

我们使用 KafkaTemplate 来发布消息:

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) {    kafkaTemplate.send(topicName, msg);}

send API 返回 ListenableFuture 对象。如果我们想阻塞发送线程并获得关于发送消息的结果,我们可以调用ListenableFuture 对象的 get API。线程将会等待结果,但它会降低生产者的速度。

Kafka是一个快速流处理平台。因此,最好异步处理结果,这样后续消息就无需等待前一条消息的结果。我们可以通过回调来实现:

public void sendMessage(String message) {                ListenableFuture<SendResult<String, String>> future =       kafkaTemplate.send(topicName, message);    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {         @Override        public void onSuccess(SendResult<String, String> result) {            System.out.println("Sent message=[" + message +               "] with offset=[" + result.getRecordMetadata().offset() + "]");        }        @Override        public void onFailure(Throwable ex) {            System.out.println("Unable to send message=["               + message + "] due to : " + ex.getMessage());        }    });}

5. 消息消费

5.1. 消费者配置

对于消费消息,我们需要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory

一旦这些bean在Spring Bean工厂中可用,就可以使用 @KafkaListener 注解配置基于POJO的消费者。

配置类上需要添加 @EnableKafka 注解,以便能够检测Spring管理的bean上的 @KafkaListener 注解:

@EnableKafka@Configurationpublic class KafkaConsumerConfig {     @Bean    public ConsumerFactory<String, String> consumerFactory() {        Map<String, Object> props = new HashMap<>();        props.put(          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,           bootstrapAddress);        props.put(          ConsumerConfig.GROUP_ID_CONFIG,           groupId);        props.put(          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,           StringDeserializer.class);        props.put(          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,           StringDeserializer.class);        return new DefaultKafkaConsumerFactory<>(props);    }     @Bean    public ConcurrentKafkaListenerContainerFactory<String, String>       kafkaListenerContainerFactory() {           ConcurrentKafkaListenerContainerFactory<String, String> factory =          new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        return factory;    }}

5.2. 消息消费

@KafkaListener(topics = "topicName", groupId = "foo")public void listenGroupFoo(String message) {    System.out.println("Received Message in group foo: " + message);}

可以为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者可以监听来自不同 topic 的消息:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring 还支持使用 listener 中的 @Header 注解检索一个或多个消息标题:

@KafkaListener(topics = "topicName")public void listenWithHeaders(  @Payload String message,   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {      System.out.println(        "Received Message: " + message"        + "from partition: " + partition);}

5.3. 消费来自特定分区的消息

注意到,我们只使用一个分区创建了 topic “developlee”。但是,对于具有多个分区的主题,@KafkaListener 可以显式订阅具有初始偏移量 topic 的特定分区:

@KafkaListener(  topicPartitions = @TopicPartition(topic = "topicName",  partitionOffsets = {    @PartitionOffset(partition = "0", initialOffset = "0"),     @PartitionOffset(partition = "3", initialOffset = "0")}),  containerFactory = "partitionsKafkaListenerContainerFactory")public void listenToPartition(  @Payload String message,   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {      System.out.println(        "Received Message: " + message"        + "from partition: " + partition);}

由于 initialOffset 已被发送到该 listener 中的分区0,因此每次初始化该 listener 时,将重新使用以前从分区0和分区3消耗的所有消息。如果不需要设置偏移量,我们可以使用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:

@KafkaListener(topicPartitions   = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. 为Listener添加消息过滤器

通过添加自定义过滤器,可以将 listener 配置为使用特定类型的消息。这可以通过将 RecordFilterStrategy 设置为 KafkaListenerContainerFactory 来完成:

@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String>  filterKafkaListenerContainerFactory() {     ConcurrentKafkaListenerContainerFactory<String, String> factory =      new ConcurrentKafkaListenerContainerFactory<>();    factory.setConsumerFactory(consumerFactory());    factory.setRecordFilterStrategy(      record -> record.value().contains("World"));    return factory;}

然后可以将 listener 配置为使用此容器工厂:

@KafkaListener(  topics = "topicName",   containerFactory = "filterKafkaListenerContainerFactory")public void listenWithFilter(String message) {    System.out.println("Received Message in filtered listener: " + message);}

在这个 listener 中,所有与过滤器匹配的消息都将被丢弃

6. 自定义消息转换器

到目前为止,我们只讨论了字符串作为消息发送和接收的对象。但是,我们也可以发送和接收定制的Java对象。这需要在 ProducerFactory 中配置适当的序列化器,并在 ConsumerFactory 中配置反序列化器。

让我们看一个简单的bean,并将以消息的形式发送它:

public class Greeting {     private String msg;    private String name;     // standard getters, setters and constructor}

6.1. 生产自定义消息

在本例中,我们将使用 JsonSerializer。我们看看 ProducerFactoryKafkaTemplate 的代码:

@Beanpublic ProducerFactory<String, Greeting> greetingProducerFactory() {    // ...    configProps.put(      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,       JsonSerializer.class);    return new DefaultKafkaProducerFactory<>(configProps);} @Beanpublic KafkaTemplate<String, Greeting> greetingKafkaTemplate() {    return new KafkaTemplate<>(greetingProducerFactory());}

新的 KafkaTemplate 可用于发送 Greeting 消息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. 消费自定义消息

同样,我们修改 ConsumerFactoryKafkaListenerContainerFactory 来正确反序列化 Greeting 消息:

@Beanpublic ConsumerFactory<String, Greeting> greetingConsumerFactory() {    // ...    return new DefaultKafkaConsumerFactory<>(      props,      new StringDeserializer(),       new JsonDeserializer<>(Greeting.class));} @Beanpublic ConcurrentKafkaListenerContainerFactory<String, Greeting>   greetingKafkaListenerContainerFactory() {     ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =      new ConcurrentKafkaListenerContainerFactory<>();    factory.setConsumerFactory(greetingConsumerFactory());    return factory;}

spring-kafka JSON序列化器和反序列化器使用 Jackson 库,该库是 spring-kafka 项目的可选maven依赖项。我们也把它加到 pom.xml 文件:

<dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-databind</artifactId>    <version>2.9.7</version></dependency>

建议不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka 的版本。
最后,我们需要编写一个 listener 来 消费 Greeting 消息:

@KafkaListener(  topics = "topicName",   containerFactory = "greetingKafkaListenerContainerFactory")public void greetingListener(Greeting greeting) {    // process greeting message}

7. 结语

在本文中,我们介绍了Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接收消息的类。
本文的完整源代码可以在GitHub上找到. 在执行代码之前,请确保服务器正在运行 Kafka。

(0)

相关推荐

  • 真的,关于 Kafka 入门看这一篇就够了

    ImportNew 前天 以下文章来源于Java极客技术 ,作者cxuan Java极客技术java教程 java开发 java入门 java学习 java小课 java进阶 java知识 java技 ...

  • Kafka系列1:Kafka概况

    Kafka是当前分布式系统中最流行的消息中间件之一,凭借着其高吞吐量的设计,在日志收集系统和消息系统的应用场景中深得开发者喜爱.本篇就聊聊Kafka相关的一些知识点.主要包括以下内容: Kafka简介 ...

  • asp .net core发布订阅kafka

    Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件Ka ...

  • java开发之开源平台Kafka知识总结分享

    kafka的基本体系结构 一个完整的kafka消息中间件应该包含如下几个节点: 生产者:生产消息的节点 消费者:消费消息的节点 broker:接收生产者发送消息存储的节点 zookeeper:管理维护 ...

  • 消息中间件

    摘要:消息存储对于每一款消息队列都非常重要,那么Kafka在这方面是如何来设计做到高效的呢?Kafka这款分布式消息队列使用文件系统和操作系统的页缓存(page cache)分别存储和缓存消息,摒弃了 ...

  • 安卓系统即将进入宝马-从2020年中开始支持无线集成

    宝马宣布从2020年中开始,所有安装了BMW-OS 7.0系统的车型将与Android Auto系统进行无线集成. 宝马的智能手机用户最近比较开心.几天前宝马宣布,用户今后可以免费使用Apple Ca ...

  • 在规模上使用Apache Kafka的20个最佳实践

    Apache Kafka是一种广受欢迎的分布式流媒体平台,New Relic,Uber和Square等数千家公司使用它来构建可扩展,高吞吐量,可靠的实时流媒体系统.例如,New Relic的生产Kaf ...

  • Atlas(元数据管理)从扫盲到和Hive、HBase、Kafka、Flink等集成

    先对数据分个类 企业数据管理的内容及范畴通常包括交易数据.主数据以及元数据. (1)交易数据:用于纪录业务事件,如客户的订单,投诉记录,客服申请等,它往往用于描述在某一个时间点上业务系统发生的行为. ...

  • Apache Kafka 概述

    在大数据中,使用了大量的数据. 关于数据,我们有两个主要挑战.第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据. 为了克服这些挑战,您必须需要一个消息系统. Kafka专为分布式高吞吐量系统 ...

  • spring+springmvc+kafka分布式消息中间件集成方案

    Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案:kafka消息平台使用spring+kafka的集成方案,详情如下:1. 使用最高版本2 ...

  • 浙江省转升办发布支持“四有”企业“长高长壮”政策集成清单

    为支持"有意愿.有市场.有技术.有前景"工业企业就地"长高长壮",提高亩产效益和土地利用率,浙江省工业转型升级领导小组办公室近日全面梳理和系统集成有关政策,形成 ...

  • 腾讯宣布开源 RoP:Apache Pulsar 支持原生 RocketMQ 协议

    RocketMQ 用户可以无缝迁移到 Apache Pulsar 了.自此,Apache Pulsar 补齐了兼容主流消息队列协议的能力. 我们很高兴地宣布腾讯云中间件开源 RoP!RoP 将 Roc ...

  • Spring Boot 极简集成 Shiro

    程序员涨点薪吧 今天1. 前言Apache Shiro是一个功能强大且易于使用的Java安全框架,提供了认证,授权,加密,和会话管理. Shiro有三大核心组件:Subject:即当前用户,在权限管理 ...

  • Spring Boot 集成 JUnit5,更优雅单元测试!

    为什么使用JUnit5 JUnit4被广泛使用,但是许多场景下使用起来语法较为繁琐,JUnit5中支持lambda表达式,语法简单且代码不冗余. JUnit5易扩展,包容性强,可以接入其他的测试引擎. ...