ActiveMQ 入门实战(2)--Java 操作 ActiveMQ

本文主要介绍使用 JMS 1.1 API 来操作 ActiveMQ,文中所使用到的软件版本:Java 1.8.0_191、ActiveMQ "Classic" 5.16.2、ActiveMQ Artemis 2.17.0。

1、Java 操作ActiveMQ "Classic"

使用 JMS 1.1 的 API操作 ActiveMQ "Classic"。

1.1、引入依赖

<dependency>
    <groupId>org.apache.activemqgroupId>
    <artifactId>activemq-allartifactId>
    <version>5.16.2version>dependency>

1.2、发送消息

1.2.1、发送到 Queue

public static void sendToQueue() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);    //连接池
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);

    Connection connection = pooledConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    Destination destination = session.createQueue("testQueue");
    MessageProducer producer = session.createProducer(destination);    //消息持久化    producer.setDeliveryMode(DeliveryMode.PERSISTENT);    for (int i = 1; i <= 10; i++) {
        TextMessage message = session.createTextMessage("消息" + i);

        producer.send(message);
        System.out.println("已发送的消息:" + message.getText());
    }
    producer.close();
    session.close();
    connection.close();
    pooledConnectionFactory.stop();
}

1.2.2、发送到 Queue(事务)

public static void sendToQueueTransaction() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

    MessageProducer producer = null;    try {
        Destination destination = session.createQueue("testQueue");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("事务消息" + i);

            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        session.commit();
    } catch (JMSException e) {
        session.rollback();
        e.printStackTrace();
    } finally {
        producer.close();
        session.close();
        connection.close();
    }
}

1.2.3、发送到 Topic

public static void sendToTopic() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createTopic("testTopic");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);    for (int i = 1; i <= 10; i++) {
        TextMessage message = session.createTextMessage("消息" + i);
        producer.send(message);
        System.out.println("已发送的消息:" + message.getText());
    }
    producer.close();
    session.close();
    connection.close();
}

1.2.4、发送到 Topic(事务)

public static void sendToTopicTraction() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createTopic("testTopic");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);    try {        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("事务消息" + i);
            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        session.commit();
    } catch (JMSException e) {
        session.rollback();
        e.printStackTrace();
    } finally {
        producer.close();
        session.close();
        connection.close();
    }
}

完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.jms.pool.PooledConnectionFactory;import javax.jms.*;public class Producer {    private static String brokerURL = "tcp://10.40.96.140:61616";    public static void main(String[] args) throws JMSException {
        sendToQueue();//        sendToQueueTransaction();//        sendToTopic();//        sendToTopicTraction();    }    public static void sendToQueue() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);        //连接池
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);

        Connection connection = pooledConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createQueue("testQueue");
        MessageProducer producer = session.createProducer(destination);        //消息持久化        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("消息" + i);

            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        producer.close();
        session.close();
        connection.close();
        pooledConnectionFactory.stop();
    }    /**
     * 以事务方式发送消息
     * @throws JMSException     */
    public static void sendToQueueTransaction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        MessageProducer producer = null;        try {
            Destination destination = session.createQueue("testQueue");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("事务消息" + i);

                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }    public static void sendToTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("testTopic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("消息" + i);
            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        producer.close();
        session.close();
        connection.close();
    }    public static void sendToTopicTraction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("testTopic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        try {            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("事务消息" + i);
                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}

Producer.java

1.3、消费者

1.3.1、从 Queue 中消费消息

public static void recevieFromQueue() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);    //连接池
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
    Connection connection = pooledConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    Destination destination = session.createQueue("testQueue");

    MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(message -> {
        TextMessage textMessage = (TextMessage) message;        try {
            System.out.println("接受到的消息:" + textMessage.getText());
            textMessage.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
}

1.3.2、从 Queue 中消费消息(事务)

public static void recevieFromQueueTransction() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue("testQueue");

    MessageConsumer consumer = session.createConsumer(destination);
    AtomicInteger index = new AtomicInteger();    try {
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;            try {
                System.out.println("接受到的消息:" + textMessage.getText());
                index.getAndIncrement();                //每10条提交一次
                if (index.get() % 10 == 0) {
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }

        });
    } catch (JMSException e) {
        session.rollback();
        e.printStackTrace();
    }
}

1.3.3、从 Topic 中消费消息

public static void recevieFromTopic() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createTopic("testTopic");

    MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(message -> {
        TextMessage textMessage = (TextMessage) message;        try {
            System.out.println("接受到的消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
}

1.3.4、从 Topic 中消费消息(持久化订阅+事务)

对于 Topic,使用MessageConsumer 消费消息,只能消费订阅时间之后的消息;JMS 允许订阅者创建一个可持久化的订阅(TopicSubscriber),这样,即使订阅者宕机恢复后,也能接收宕机时生产者发布的消息。

public static void recevieFromTopicDurable() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.setClientID("12345678");
    connection.start();
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

    Topic topic = session.createTopic("testTopic");
    TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");

    AtomicInteger index = new AtomicInteger();
    topicSubscriber.setMessageListener(message -> {
        TextMessage textMessage = (TextMessage) message;        try {
            System.out.println("接受到的消息:" + textMessage.getText());
            index.getAndIncrement();            //每10条提交一次
            if (index.get() % 10 == 0) {
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
}

完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.jms.pool.PooledConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;public class Consumer {    private static String brokerURL = "tcp://10.40.96.140:61616";    public static void main(String[] args) throws JMSException {
        recevieFromQueue();//        recevieFromQueueTransction();//        recevieFromTopic();//        recevieFromTopicDurable();    }    public static void recevieFromQueue() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);        //连接池
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
        Connection connection = pooledConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createQueue("testQueue");

        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;            try {
                System.out.println("接受到的消息:" + textMessage.getText());
                textMessage.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }    public static void recevieFromQueueTransction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("testQueue");

        MessageConsumer consumer = session.createConsumer(destination);
        AtomicInteger index = new AtomicInteger();        try {
            consumer.setMessageListener(message -> {
                TextMessage textMessage = (TextMessage) message;                try {
                    System.out.println("接受到的消息:" + textMessage.getText());
                    index.getAndIncrement();                    //每10条提交一次
                    if (index.get() % 10 == 0) {
                        session.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            });
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
    }    public static void recevieFromTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("testTopic");

        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;            try {
                System.out.println("接受到的消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }    public static void recevieFromTopicDurable() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("12345678");
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testTopic");
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");

        AtomicInteger index = new AtomicInteger();
        topicSubscriber.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;            try {
                System.out.println("接受到的消息:" + textMessage.getText());
                index.getAndIncrement();                //每10条提交一次
                if (index.get() % 10 == 0) {
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

Consumer.java

2、Java 操作ActiveMQ Artemis

使用 JMS 2.0 的 API操作 ActiveMQ Artemis。

2.1、引入依赖

<dependency>
    <groupId>org.apache.activemqgroupId>
    <artifactId>artemis-jms-client-allartifactId>
    <version>2.17.0version>dependency>

2.2、发送消息

2.2.1、发送到 Queue

public static void sendToQueue() throws Exception {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext();
    JMSProducer producer = context.createProducer();

    Destination destination = context.createQueue("testQueue");    //消息持久化    producer.setDeliveryMode(DeliveryMode.PERSISTENT);    //延迟投递
    producer.setDeliveryDelay(1000 * 5);    //异步发送
    producer.setAsync(new CompletionListener() {
        @Override        public void onCompletion(Message message) {
            System.out.println("消息发送完成");
        }

        @Override        public void onException(Message message, Exception exception) {
            exception.printStackTrace();
        }
    });    for (int i = 1; i <= 5; i++) {
        TextMessage message = context.createTextMessage("消息" + i);

        producer.send(destination, message);
        System.out.println("已发送的消息:" + message.getText());
    }
    context.close();
}

2.2.2、发送到 Queue(事务)

public static void sendToQueueTransaction() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);    try {
        Destination destination = context.createQueue("testQueue");
        JMSProducer producer = context.createProducer();
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 10; i++) {
            TextMessage message = context.createTextMessage("事务消息" + i);

            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.commit();
    } catch (JMSException e) {
        context.rollback();
        e.printStackTrace();
    } finally {
        context.close();
    }
}

2.2.3、发送到 Topic

public static void sendToTopic() throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext();
    JMSProducer producer = context.createProducer();

    Destination destination = context.createTopic("testTopic");
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);    for (int i = 1; i <= 10; i++) {
        TextMessage message = context.createTextMessage("消息" + i);
        producer.send(destination, message);
        System.out.println("已发送的消息:" + message.getText());
    }
    context.close();
}

2.2.4、发送到 Topic(事务)

public static void sendToTopicTraction() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);    try {
        JMSProducer producer = context.createProducer();
        Destination destination = context.createTopic("testTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 5; i++) {
            TextMessage message = context.createTextMessage("事务消息" + i);
            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.commit();
    } catch (JMSException e) {
        context.rollback();
        e.printStackTrace();
    } finally {
        context.close();
    }
}

完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;import javax.jms.*;public class ProducerJms20 {    private static String brokerURL = "tcp://10.40.96.11:61616";    public static void main(String[] args) throws Exception {
        sendToQueue();//        sendToQueueTransaction();//        sendToTopic();//        sendToTopicTraction();    }    public static void sendToQueue() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext();
        JMSProducer producer = context.createProducer();

        Destination destination = context.createQueue("testQueue");        //消息持久化        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        //延迟投递
        producer.setDeliveryDelay(1000 * 5);        //异步发送
        producer.setAsync(new CompletionListener() {
            @Override            public void onCompletion(Message message) {
                System.out.println("消息发送完成");
            }

            @Override            public void onException(Message message, Exception exception) {
                exception.printStackTrace();
            }
        });        for (int i = 1; i <= 5; i++) {
            TextMessage message = context.createTextMessage("消息" + i);

            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.close();
    }    /**
     * 以事务方式发送消息
     * @throws JMSException     */
    public static void sendToQueueTransaction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);        try {
            Destination destination = context.createQueue("testQueue");
            JMSProducer producer = context.createProducer();
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);            for (int i = 1; i <= 5; i++) {
                TextMessage message = context.createTextMessage("事务消息" + i);

                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.commit();
        } catch (JMSException e) {
            context.rollback();
            e.printStackTrace();
        } finally {
            context.close();
        }
    }    public static void sendToTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext();
        JMSProducer producer = context.createProducer();

        Destination destination = context.createTopic("testTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        for (int i = 1; i <= 5; i++) {
            TextMessage message = context.createTextMessage("消息" + i);
            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.close();
    }    public static void sendToTopicTraction() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);        try {
            JMSProducer producer = context.createProducer();
            Destination destination = context.createTopic("testTopic");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);            for (int i = 1; i <= 5; i++) {
                TextMessage message = context.createTextMessage("事务消息" + i);
                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.commit();
        } catch (JMSException e) {
            context.rollback();
            e.printStackTrace();
        } finally {
            context.close();
        }
    }
}

View Code

2.3、消费者

2.3.1、从 Queue 中消费消息

public static void recevieFromQueue() throws Exception {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
    Destination destination = context.createQueue("testQueue");
    JMSConsumer consumer = context.createConsumer(destination);
        consumer.setMessageListener(message -> {        try {
            String msg = message.getBody(String.class);
            System.out.println("接受到的消息:" + msg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });    //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程    System.in.read();
}

2.3.2、从 Queue 中消费消息(事务)

public static void recevieFromQueueTransction() throws Exception {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    Destination destination = context.createQueue("testQueue");
    JMSConsumer consumer = context.createConsumer(destination);
    AtomicInteger index = new AtomicInteger();    try {
        consumer.setMessageListener(message -> {            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
                index.getAndIncrement();                //每10条提交一次
                if (index.get() % 10 == 0) {
                    context.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    } catch (Exception e) {
        context.rollback();
        e.printStackTrace();
    }
    System.in.read();
}

2.3.3、从 Topic 中消费消息

public static void recevieFromTopic() throws Exception {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
    Topic topic = context.createTopic("testTopic");
    JMSConsumer consumer = context.createConsumer(topic);
    consumer.setMessageListener(message -> {        try {
            String msg = message.getBody(String.class);
            System.out.println("接受到的消息:" + msg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
    System.in.read();
}

2.3.4、从 Topic 中消费消息(持久化订阅+事务)

public static void recevieFromTopicDurable() throws Exception {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    context.setClientID("12345678");

    Topic topic = context.createTopic("testTopic");
    JMSConsumer consumer = context.createDurableConsumer(topic, "test");
    AtomicInteger index = new AtomicInteger();
    consumer.setMessageListener(message -> {        try {
            String msg = message.getBody(String.class);
            System.out.println("接受到的消息:" + msg);
            index.getAndIncrement();            //每5条提交一次
            if (index.get() % 5 == 0) {
                context.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
    System.in.read();
}

2.3.5、从 Topic 中消费消息(共享订阅)

public static void recevieFromTopicShare() throws Exception {    //模拟三个消费者
    for (int i = 0; i < 3; i++) {        new Thread(() -> {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);

            Topic topic = context.createTopic("testTopic");
            JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");

            consumer.setMessageListener(message -> {                try {
                    String msg = message.getBody(String.class);
                    System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }).start();
    }
    System.in.read();
}

2.3.6、从 Topic 中消费消息(共享持久订阅+事务)

public static void recevieFromTopicShareDurable() throws Exception {    //模拟三个消费者
    for (int i = 0; i < 3; i++) {        new Thread(() -> {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);

            Topic topic = context.createTopic("testTopic");
            JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
            consumer.setMessageListener(message -> {                try {
                    String msg = message.getBody(String.class);
                    System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);                    //处理完一条就提交                    context.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }).start();
    }
    System.in.read();
}

完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;public class ConsumerJms20 {    private static String brokerURL = "tcp://10.40.96.11:61616";    public static void main(String[] args) throws Exception {
        recevieFromQueue();//        recevieFromQueueTransction();//        recevieFromTopic();//        recevieFromTopicDurable();//        recevieFromTopicShare();//        recevieFromTopicShareDurable();    }    public static void recevieFromQueue() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
        Destination destination = context.createQueue("testQueue");
        JMSConsumer consumer = context.createConsumer(destination);
            consumer.setMessageListener(message -> {            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });        //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程        System.in.read();
    }    public static void recevieFromQueueTransction() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
        Destination destination = context.createQueue("testQueue");
        JMSConsumer consumer = context.createConsumer(destination);
        AtomicInteger index = new AtomicInteger();        try {
            consumer.setMessageListener(message -> {                try {
                    String msg = message.getBody(String.class);
                    System.out.println("接受到的消息:" + msg);
                    index.getAndIncrement();                    //每10条提交一次
                    if (index.get() % 10 == 0) {
                        context.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            context.rollback();
            e.printStackTrace();
        }
        System.in.read();
    }    public static void recevieFromTopic() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
        Topic topic = context.createTopic("testTopic");
        JMSConsumer consumer = context.createConsumer(topic);
        consumer.setMessageListener(message -> {            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        System.in.read();
    }    /**
     * 持久订阅+事务
     * @throws Exception     */
    public static void recevieFromTopicDurable() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
        context.setClientID("12345678");

        Topic topic = context.createTopic("testTopic");
        JMSConsumer consumer = context.createDurableConsumer(topic, "test");
        AtomicInteger index = new AtomicInteger();
        consumer.setMessageListener(message -> {            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
                index.getAndIncrement();                //每5条提交一次
                if (index.get() % 5 == 0) {
                    context.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        System.in.read();
    }    /**
     * 共享订阅
     * @throws Exception     */
    public static void recevieFromTopicShare() throws Exception {        //模拟三个消费者
        for (int i = 0; i < 3; i++) {            new Thread(() -> {
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);

                Topic topic = context.createTopic("testTopic");
                JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");

                consumer.setMessageListener(message -> {                    try {
                        String msg = message.getBody(String.class);
                        System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
        System.in.read();
    }    /**
     * 共享持久订阅+事务
     * @throws Exception     */
    public static void recevieFromTopicShareDurable() throws Exception {        //模拟三个消费者
        for (int i = 0; i < 3; i++) {            new Thread(() -> {
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);

                Topic topic = context.createTopic("testTopic");
                JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
                consumer.setMessageListener(message -> {                    try {
                        String msg = message.getBody(String.class);
                        System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);                        //处理完一条就提交                        context.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
        System.in.read();
    }
}
(0)

相关推荐