1. 程式人生 > >jms及active(jdk api)的實現

jms及active(jdk api)的實現

nal 例如 入隊 port 消費者 -a service gettext false

一、jms:java message service java消息服務,他不是一種協議,而是一個api,用來服務於消息中間件的api;

二、消息中間件:為了異步可靠的在兩個系統結構之間(泛指兩個,可以多個)進行消息傳輸,令項目服務化,解耦性強。例如:我們有個登錄系統,如果用戶登錄成功返回給用戶成功提示之前會調用積分增加服務,日誌服務等,這樣一旦其中一個服務失效可能就會延遲用戶獲得登陸成功提示的時間,降低用戶體驗,所以我們可以異步將用戶登錄信息傳輸到附加服務中,只返回用戶關心的登錄信息。

三、常用的消息中間件:ActiveMQ、RabbitMQ、Kafka

四、以下為active利用jdk自帶的jms實現,結合spring的請點擊:

結構如下:

技術分享圖片

  pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zxc.jms</
groupId> <artifactId>jms</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</
version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> </dependencies> </project>

log4j.properties:(用的是slf4j日誌)如果是idea註意一定要放在resource下,如果用的是eclipse要放在src下

### 設置###
log4j.rootLogger = debug,stdout,D,E

### 輸出信息到控制擡 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

### 輸出DEBUG 級別以上的日誌到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = D://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### 輸出ERROR 級別以上的日誌到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =D://logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

ActiveMQ分為兩種類型:

  第一種隊列模式:點對點模式,一個消息生產者生產的消息存入消息隊列中,然後可以有多個消費者的多個連接,這些連接基本平均分配這些消息。

  第二種訂閱發布模式:一定要訂閱者(就是消費者)先要訂閱隊列,然後發布者發布消息存入隊列,訂閱者全部接受所有隊列消息,不會均分。

第一種的隊列模式:

  appProducer:

package com.zxc.jms.queue;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class appProducer {

    private static String url = "tcp://localhost:61616";
    private static String queuename = "queue-test";
    private static Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queuename);
        MessageProducer producer = session.createProducer(destination);
        for (int i = 1; i <= 10000; i++) {
            TextMessage message = session.createTextMessage("生產者發出第" + i + "消息");
            producer.send(message);
            logger.info("生產者發出的第{}條數據", i);
        }
        connection.close();
    }
}

  

  appConsumer:

  

package com.zxc.jms.queue;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appConsumer {
    private static String url = "tcp://localhost:61616";
    private static String queuename = "queue-test";
    private static Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queuename);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                logger.info("接受者接到消息");

            }
        });
    }
}

第二種:訂閱發布模式

  appProducer:

package com.zxc.topic;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appProducer {
    private static final String url = "tcp://localhost:61616";
    private static final String topicName = "topic-test";
    private static final Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i <= 100; i++) {
            TextMessage message = session.createTextMessage("消息"+i);
            producer.send(message);
            logger.info("生產者廣播第{}條消息",i);
            Thread.sleep(2000);
        }
        connection.close();
    }
}

  appConsumer:

  

package com.zxc.topic;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class appConsumer {
    private static final String url = "tcp://localhost:61616";
    private static final String topicName = "topic-test";
    private static final Logger logger = LoggerFactory.getLogger(appProducer.class);

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener((message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                logger.info("收到消息:{}",textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }));
    }
}

  

    

jms及active(jdk api)的實現