消息队列和发布订阅

编程语言集成了发布订阅

很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用EventListener实现订阅,使用ApplicationEventPublisher使用发布。这种系统集成的我们先叫它“集成组件”

与语言无关的消息队列

事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为rabbitmq为例子。

共同点

  1. 代码解耦,发布者与订阅者可以互不关心
  2. 异步处理,集成组件有的是同步的,需要加@Async注解
  3. 消息安全

不同点

  1. rabbitmq实现的是多服务之间的发布与订阅
  2. 集成组件实现的是一个服务内部的发布与订阅
  3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
  4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

实例

  • 集成组件的发布订阅

订阅

@Getter
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class CreateBookEvent {
  private String address;
  private String title;
}

@Component
public class EmailEventListener {
  @EventListener
  @Async
  public void handleEvent(CreateBookEvent event) throws Exception {
    System.out.println("email消息:建立图书:" + event.getTitle());
  }
}

发布

@Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  public void publish(){
      applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建图书").build());
}
  • rabbitmq的发布订阅

订阅

@Slf4j
@Component
public class DistributorSubscriber {
  public static final String WORK_QUEUE = "fx.activity.total";
  public static final String EXCHANGE = "fx.exchange";
  @Autowired
  DistributorActivityTotalRepository distributorActivityTotalRepository;
  @Autowired
  ObjectMapper objectMapper;

  @Bean
  public TopicExchange phoneTotalExchange() {
    return new TopicExchange(EXCHANGE);
  }

  @Bean
  public Queue phoneTotalQueue() {
    return new Queue(WORK_QUEUE);
  }

  @Bean
  public Binding bindSignQueue() {
    return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);
  }
   @RabbitListener(queues = WORK_QUEUE)
  public void phoneTotalQueueListener(String data) {
    try {
      logger.debug("fx.activity.total:{}", data);
      DistributorActivityTotal entity =
          objectMapper.readValue(data, DistributorActivityTotal.class);
      distributorActivityTotalRepository.incUpdate(entity);
    } catch (Exception ex) {
      logger.error("fx.activity.total.error", ex);
    }
  }

发布

  @Autowired
  private RabbitTemplate rabbitTemplate;

   public void modifySalesperson(SalesPersonDTO salesPersonDTO) {
    try {
      rabbitTemplate.convertAndSend(
          "EXCHANGE",
          "MQName",
          objectMapper.writeValueAsString(salesPersonDTO)
      );
      logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());

    } catch (Exception ex) {
      logger.error("MQ.modifySalesperson.error", ex);
    }
  }
(0)

相关推荐

  • 还不知道异步队列?点进来看!

    你不知道的分布式异步队列 关于异步队列你解多少? 它被誉为大数据高并发的终极解决方案. 来先给你介绍几种: RabbitQM:是爱立信的产品,基于erlang语言(函数式编程大数据 scala语言) ...

  • 消息队列全面了解(二)

    一.再谈消息队列的应用场景 1.异步处理:例如短信通知.终端状态推送.App推送.用户注册等 2.数据同部:业务数据推送同步 3.重试补偿:记账失败重试 4.系统解耦:通讯上下行.终端异常监控.分布式 ...

  • 04消息队列zmq的发布者-订阅者的计算π的简单程序。

    # 小知识:计算π的其中一个方法是,随机的向一个边长为n的正方形中撒豆子.# 然后看这些豆子是否在以n为半径的四分之一圆内,正方形面积:n*n,四分之一圆的面积:π*n*n/4# 因此落在四分之一圆内 ...

  • 微信怎样取消“订阅号”的消息推送?订阅号总是推送消息,太烦人

    相信很多人都有这种烦心事--微信总是向我们推送订阅号消息,可是我们根本就不需要这些东西,实在是太烦人了.就有一些网友咨询我,问我"怎样关闭微信订阅号的消息推送".其实,方法还是非常 ...

  • 微信怎样取消“订阅号”消息推送?订阅号总是推送消息,太烦人了

    微信怎样取消“订阅号”消息推送?订阅号总是推送消息,太烦人了

  • 消息队列在RTOS的应用

    传说互联网应用有两大利器,一个是缓存,另一个就是消息队列. 一直相对消息队列做一下梳理,希望早日另有成文. 一叶知秋,实际上消息队列在嵌入式系统中同样有着广泛的应用. 近来致力于IoT和智能硬件,现学 ...

  • 手把手教姐姐写消息队列

    前言 这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧.因为要用go语言写,这可给姐姐愁坏了.赶紧来求助我,我这么坚贞不 ...

  • C#后台异步消息队列实现

    简介 基于生产者消费者模式,我们可以开发出线程安全的异步消息队列. 知识储备 什么是生产者消费者模式? 为了方便理解,我们暂时将它理解为垃圾的产生到结束的过程. 简单来说,多住户产生垃圾(生产者)将垃 ...

  • Dapr微服务应用开发系列5:发布订阅构建块

    dotNET跨平台 1周前 以下文章来源于dotNET开发经验谈 ,作者朱永光 dotNET开发经验谈朱永光关于.NET开发各方面的技术分享,有时候也会精选整理.翻译和转载社区里对开发人员.架构师.团 ...

  • RabbitMQ消息队列之Windows下安装和部署(一)

    参考文档: https://jingyan.baidu.com/article/ed15cb1bb5c3411be369819d.html https://blog.csdn.net/hzw19920 ...

  • RabbitMQ 消息队列中 VirtualHost介绍 与权限管理 | IT工程师的生活足迹

    一.VirtualHost 像mysql服务有数据库的概念并且可以设置用户对库和表等对象的操作权限,RabbitMQ也有类似的权限管理. 在RabbitMQ中可以虚拟消息服务器 VirtualHost ...