ActiveMQ

              ActiveMQ消息中间件

1:point-point 点对点模式 (队列模式)一条消息只能被一个订阅者消费,只要订阅此消息即可参与平均消费该队列下的空闲消息。

2:topic 主题模式 消费者需提前订阅某消息。在提供者发送到消息服务器之后,基于长连接推送给所有订阅此消息的消费者。一条消息可以被多个消费者消费

ActiveMQ最大优点是支持jms 但是吞吐量没有kafka高,稳定性没有RabbitMQ高  综合性能一般

基于Spring整合ActiveMQ

common.xml配置bean指定jms连接工厂,mq连接工厂 以及消息模式 这里是公共配置,即生产方消费方都需要引用

<context:annotation-config/>

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
</bean>

<!--Spring JMS 提供连接池-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
</bean>

<!-- 队列目的地, 点对点模式 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="queue"/>
</bean>

<!-- 主题模式 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic"/>
</bean>

producer.xml需指定消息模板设置目标地、连接工厂

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"></property>
    <property name="defaultDestination" ref="topicDestination"></property>
    <!-- <property name="defaultDestination" ref="queueDestination"></property> -->
</bean>

consumer.xml需配置消息监听容器、消息监听器、使用连接工厂、目标地

<!-- 本地消息监听器 -->
<bean id="consumerContainerListener" class="com.chuan.jms.cosumer.ConsumerContainerListener"/>

<!-- 消息监听容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="topicDestination"/>
    <!-- <property name="destination" ref="queueDestination"/> -->
    <property name="messageListener" ref="consumerContainerListener"/>
</bean>

这里使用注解方式指定目标地

public class ProductorServiceImpl implements ProductorService {

    @Autowired
    JmsTemplate jmsTemplate;

    //@Resource(name = "queueDestination")
    @Resource(name = "topicDestination")   //主题模式
    Destination destination;

    @Override
    public void sendMessage(final String str) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(str);
                System.out.println("发送成功" + textMessage.getText());
                return textMessage;
            }
        });
    }
}

编写监听类

public class ConsumerContainerListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

producer测试Class

public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("productor.xml");
    context.getBean("productorServiceImpl");
    ProductorServiceImpl productorServiceImpl = context.getBean("productorServiceImpl", ProductorServiceImpl.class);
    for (int i = 0; i < 100; i++) {
        productorServiceImpl.sendMessage("TEST:" + i);
    }
    //context.close(); //清除容器
}

打开本地MQ后台服务http://127.0.0.1:8161  可以看到消息发送成功  启动消费类打印消费日志

最后上pom文件

<properties>
    <spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
        <exclusions>
            <exclusion>
                <artifactId>spring-context</artifactId>
                <groupId>org.springframework</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.10.0</version>
    </dependency>
</dependencies>

发表评论

电子邮件地址不会被公开。 必填项已用*标注