rocketmq學習筆記 二 官方例項
阿新 • • 發佈:2019-01-07
rocketmq實現分散式事務,是指訊息是有狀態的, 目前的3.5.8中沒有,當時在@李戰輝大神之前的某個分支中有簡單的實現
本人建議,如何解決分散式事務呢? 就是別有這種業務啊。別劃分99%去解決1%可能出現的問題。當然公司發展的階段不同,目標也不會相同。
A呼叫B,A先broker發了一條訊息, A本地開啟事務,執行完本地方法,先broker傳送確認訊息。
B從broker中訂閱訊息,來執行訊息。
0.註冊查詢本地事務Listener
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.concurrent.atomic.AtomicInteger; public class TransactionCheckListenerImpl implements TransactionCheckListener { private AtomicInteger transactionIndex = new AtomicInteger(0); /** * 主動檢查機制 */ @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("server checking TrMsg " + msg.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 6) == 0) { throw new RuntimeException("Could not find db"); } else if ((value % 5) == 0) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } }
1.實現本地事務邏輯LocalTransactionExecuter
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(1); /** * 執行本地業務邏輯 */ @Override public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { int value = transactionIndex.getAndIncrement(); if (value == 0) { throw new RuntimeException("Could not find db"); } else if ((value % 5) == 0) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } }
2.檢視呼叫方式
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.transaction; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(2); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
3.檢視rocketmq實現邏輯
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}