Python決策樹中文亂碼解決
①.訊息傳遞方式介紹:
Activemq支援兩種方式的訊息傳遞:
廣播模式:1-n的方式,是一種釋出訂閱模式,像騰訊新聞那樣,只要我們微信關注了騰訊新聞,那麼每個人都會收到推送的新聞
佇列模式:1-1的方式,只能有一個消費者端消費生產者生產的資料
②.訊息型別介紹:
Activemq提供了兩種訊息型別:持久化和非持久化:
訊息生產者使用持久(persistent)傳遞模式傳送訊息的時候,Producer.send() 方法會被阻塞,直到 broker 傳送一個確認訊息給生產者(ProducerAck),這個確認訊息暗示broker已經成功接收到訊息並把訊息儲存到二級儲存中。這個過程通常稱為同步傳送。速度較慢,資料基本不會丟失.可以持久化到kahaDB(aMq預設採用kahaDB儲存引擎來儲存訊息)或資料庫中
非同步傳送不會在受到 broker 的確認之前一直阻塞 Producer.send 方法,速度較快,不過可能會造成資料的丟失.
訊息簽收方式:
AUTO_ACKNOWLEDGE 自動確認
CLIENT_ACKNOWLEDGE 客戶端手動確認
DUPS_OK_ACKNOWLEDGE 自動批量確認
SESSION_TRANSACTED 事務提交併確認
③.下載可以到apache activeMQ官網下載
④.我這裡下載的是windows 5.10版本的就以此為例做介紹
解壓之後進入bin目錄根據作業系統找到對應的,activemq.bat檔案雙擊執行
activeMQ內建有一個控制檯可以訪問http://localhost:8161/檢視,預設賬戶密碼皆為admin
⑤.客戶端程式碼(java為例):
public class ActiveMqBo { // 建立連線工廠 private ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMqConstant.URL); /** * 獲取一個連線 * @return * @throws JMSException */ public Connection getConnection() throws JMSException { Connection conn; try { conn = factory.createConnection(); } catch (JMSException e) { e.printStackTrace(); throw e; } return conn; } }
public static void main(String[] args) {
ActiveMqBo mq = new ActiveMqBo();
Connection conn = null;
Session session = null;
MessageProducer producer = null;
try {
// 獲取一個連線
conn = mq.getConnection();
conn.start();
// 簽收方式
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 建立一個佇列
Destination destination = session.createQueue("queue-1");
// 獲取一個生產者
producer = session.createProducer(destination);
/*
* 持久化,會通過kahadb把訊息存入到db.log中,直到被消費後進行清除
* 速度較慢
* DeliveryMode.NON_PERSISTENT非持久化
* DeliveryMode.PERSISTENT持久化
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// producer.setTimeToLive(5000);//5秒後過期,這個對點對點模式有效
for (int i = 0; i < 1000; i++) {
MessageDto mes = new MessageDto();
mes.setCode("" + (i + 1));
mes.setMessage("send mes:" + (i + 1));
producer.send(session.createObjectMessage(mes));
System.out.println("send mes : " + mes);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
日誌部分:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/repository/org/apache/activemq/activemq-all/5.10.0/activemq-all-5.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
11:27:42.251 [main] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
11:27:42.252 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
11:27:42.257 [main] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
11:27:42.260 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
11:27:42.261 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
send mes : MessageDto [code=1, message=send mes:1]
send mes : MessageDto [code=2, message=send mes:2]
send mes : MessageDto [code=3, message=send mes:3]
send mes : MessageDto [code=4, message=send mes:4]
send mes : MessageDto [code=5, message=send mes:5]
send mes : MessageDto [code=6, message=send mes:6]
send mes : MessageDto [code=7, message=send mes:7]
send mes : MessageDto [code=8, message=send mes:8]
........省略
send mes : MessageDto [code=996, message=send mes:996]
send mes : MessageDto [code=997, message=send mes:997]
send mes : MessageDto [code=998, message=send mes:998]
send mes : MessageDto [code=999, message=send mes:999]
send mes : MessageDto [code=1000, message=send mes:1000]
11:27:42.490 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2e0f6c25[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
11:27:42.490 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@768508c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
11:27:42.491 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp:///127.0.0.1:61616@54559
11:27:42.492 [main] DEBUG o.a.a.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
11:27:42.493 [ActiveMQ Task-1] DEBUG o.a.a.transport.tcp.TcpTransport - Closed socket Socket[addr=/127.0.0.1,port=61616,localport=54559]
11:27:42.493 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
這裡我們再看控制檯,會發現queue1有資料資訊,因為我們是非持久化方式傳送訊息,我們關掉activemq後在重啟會發現資料丟失
,再以持久化的方式測試:
只需要將這裡設定為PERSISTENT即可
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
我們再啟動剛才的程式碼會發現傳送資料的速度很明顯的降低,但是我們關閉activemq後再重啟,重新整理控制檯資料沒有丟失.
⑥.消費端:
public class Consumer {
public static void main(String[] args) {
try {
MessageUtil.getConsumer("queue-1", Boolean.TRUE);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public class MessageUtil {
private static ActiveMqBo mq = new ActiveMqBo();
private static Connection conn = null;
private static Session session = null;
private static void init(){
try {
// 獲取一個連線
if(conn == null){
conn = mq.getConnection();
}
conn.start();
// 自動提交事務
if(session == null){
/*Session.AUTO_ACKNOWLEDGE 訊息自動簽收
Session.CLIENT_ACKNOWLEDGE 客戶端呼叫acknowledge方法手動簽收
Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息
頭的JmsDelivered會被置為true標示當前訊息已經傳送過一次,客戶端需要進行訊息的重複處理控制。*/
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param obj 序列化物件
* @param topic
* @param isQueue
* @throws JMSException
*/
public static void sendObjectMessage(Serializable obj,String topic,boolean isQueue) throws JMSException{
init();
MessageProducer producer = getProducer(getDestination(topic, isQueue));
producer.send(session.createObjectMessage(obj));
destroy(producer);
}
public static void sendTextMessage(String mes,String topic,boolean isQueue) throws JMSException{
init();
MessageProducer producer = getProducer(getDestination(topic, isQueue));
producer.send(session.createTextMessage(mes));
destroy(producer);
}
private static MessageProducer getProducer(Destination destination) throws JMSException{
MessageProducer producer = session.createProducer(destination);
/** PERSISTENT(永續性訊息):
* 這是ActiveMQ的預設傳送模式,此模式保證這些訊息只被傳送一次和成功使用一次。對於這些訊息,可靠性是優先考慮的因素。
* 可靠性的另一個重要方面是確保永續性訊息傳送至目標後,訊息服務在向消費者傳送它們之前不會丟失這些訊息。這意味著在永續性訊息傳送至目標時,
* 訊息服務將其放入永續性資料儲存。如果訊息服務由於某種原因導致失敗,
* 它可以恢復此訊息並將此訊息傳送至相應的消費者。雖然這樣增加了訊息傳送的開銷,但卻增加了可靠性。
* NON_PERSISTENT(非永續性訊息):
* 保證這些訊息最多被傳送一次。對於這些訊息,可靠性並非主要的考慮因素。
* 此模式並不要求永續性的資料儲存,也不保證訊息服務由於某種原因導致失敗後訊息不會丟失。
*
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return producer;
}
private static Destination getDestination(String topic, boolean isQueue) throws JMSException {
Destination destination = null;
if (isQueue) {
destination = session.createQueue(topic);
} else {
destination = session.createTopic(topic);
}
return destination;
}
public static void getConsumer(String topic, boolean isQueue) throws JMSException{
init();
MessageConsumer consumer = session.createConsumer(getDestination(topic, isQueue));
if(Arrays.asList(ActiveMqConstant.QUEUES).contains(topic) ||
Arrays.asList(ActiveMqConstant.TOPICS).contains(topic) ){
MessageListener listener = ActiveMqConstant.listeners.get(topic);
consumer.setMessageListener(listener);
}
}
private static void destroy(MessageProducer producer) throws JMSException{
if(producer != null){
producer.close();
}
if(session!=null){
session.close();
session = null;
}
if(conn!=null){
conn.close();
conn = null;
}
}
public static void destroy(MessageConsumer consumer) throws JMSException{
if(consumer != null){
consumer.close();
consumer = null;
}
if(session!=null){
session.close();
session = null;
}
if(conn!=null){
conn.close();
conn = null;
}
}
public class ActiveMqConstant {
public static final String USERNAME = "admin", PASSWORD = "admin";
public static final String URL = "tcp://127.0.0.1:61616";
public static final String[] QUEUES = { "queue-1", "queue2", "queue3" };
public static final String[] TOPICS = { "topic1", "topic2", "topic3" };
public static Map<String, MessageListener> listeners = new LinkedHashMap<String, MessageListener>();
static{
init();
}
private static void init(){
//後期可以從xml中配置獲取
listeners.put("queue-1",new ActiveMQMessageListener());
listeners.put("queue2",new ActiveMQMessageListener2());
listeners.put("queue3",new ActiveMQMessageListener3());
listeners.put("topic1",new ActiveMQMessageListener());
listeners.put("topic2",new ActiveMQMessageListener2 ());
listeners.put("topic3",new ActiveMQMessageListener3());
}
private ActiveMqConstant() {
}
}
....省略
receiver:MessageDto [code=402, message=send mes:402]
receiver:MessageDto [code=403, message=send mes:403]
receiver:MessageDto [code=404, message=send mes:404]
...省略
receiver:MessageDto [code=2085, message=send mes:2085]
receiver:MessageDto [code=2086, message=send mes:2086]
receiver:MessageDto [code=2087, message=send mes:2087]
receiver:MessageDto [code=2088, message=send mes:2088]
receiver:MessageDto [code=2089, message=send mes:2089]
13:53:21.896 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:53:21.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:53:31.897 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:53:31.897 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:53:41.897 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check.
13:53:41.898 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:53:41.898 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:53:51.899 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:53:51.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:54:01.900 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:54:01.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:54:11.899 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check.
13:54:11.901 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:54:11.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:54:21.902 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:54:21.902 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
13:54:31.903 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
13:54:31.903 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
再次重新整理控制檯看介面變化
⑦.瞭解activemq的目錄結構 ,會發現如下幾個檔案:
db.data,db.redo,db-1.log
在訊息未被消費之前會將資料儲存在db-*.log中, 其中activemq預設每超過32m重新生成一個新的日誌檔案.
db.data:儲存btree索引 ,BTree索引,儲存訊息的引用,並按照message ID排序。 db.redo:用來保證MQ broker未乾淨關閉情況下,用於BTree index的重建。
注意:對於非持久化的資料如果未及時消費,當activemq宕機時,儲存的db-*.log等資訊在下次啟動時全部丟失.
廣播模式不再介紹,跟佇列方式相似,可以自己多開幾個consumer看接收到的內容是否一致