1. 程式人生 > >.net中RabbitMQ生產者/消費者

.net中RabbitMQ生產者/消費者

 

#region 伺服器配置資訊是配置再webconfig中
private static string _HostName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQHostName"].ToString();
private static string _UserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQUserName"].ToString();
private static string _Pass = System.Configuration.ConfigurationManager.AppSettings["RabbitMQPassword"].ToString();
//宣告交換器
private static string EXCHANGE_NAME = System.Configuration.ConfigurationManager.AppSettings["RabbitMQExchange"].ToString();
#endregion

//往MQ推送訊息---生產者

private StandardResult<string> AccountBalance(BalanceAction BalanceAction)
{
string ROUTKEY = "querybalancereq.key";
string _QueueName = "queue.fclouds.querybalancereq";
StandardResult<string> result = new StandardResult<string>();
try
{
/*首先,需要建立一個ConnectionFactory,設定目標,由於是在本機,所以設定為localhost,如果RabbitMQ不在本機
* ,只需要設定目標機器的IP地址或者機器名稱即可,然後設定前面建立的使用者名稱和密碼。*/
var factory = new ConnectionFactory();
factory.HostName = _HostName;//RabbitMQ服務在本地執行
factory.UserName = _UserName;//使用者名稱
factory.Password = _Pass;//密碼
//要啟用自動連線恢復
factory.AutomaticRecoveryEnabled = true;
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//在MQ上定義一個持久化佇列,如果名稱相同不會重複建立
channel.QueueDeclare(_QueueName, true, false, false, null);
try
{
InterfaceLog log = new InterfaceLog();
log.BatchNumber = "";
log.InterfaceCode = "ZPINACCOUNTBALANCE";
log.RecordDate = DateTime.Now;
log.ID = Generator.GenerateGuid();
//傳送資料
string xmlStr = ModelSerializer.SerializerToString<BalanceAction>(BalanceAction, true, "", "");
string message = xmlStr;
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.ContentType = "text/plain";
properties.Persistent = true;
//開始傳遞
channel.BasicPublish(EXCHANGE_NAME, ROUTKEY, properties, body);
result.Code = StandardStatus.success;
result.Msg = "操作成功";
log.RequestContent = message;
log.InterfaceCode = InterfaceCodeEnum.ZPINACCOUNTBALANCE.ToString();
log.StatusCode = LogStatus.Success;
_projectAPIService.AddInterfaceLog(log);
}
catch (Exception e)
{
result.Code = StandardStatus.fail;
result.Msg = "操作失敗";
InterfaceLog log = new InterfaceLog();
log.BatchNumber = "";
log.InterfaceCode = InterfaceCodeEnum.ZPINACCOUNTBALANCE.ToString();
log.RecordDate = DateTime.Now;
log.ID = Generator.GenerateGuid();
log.ErrorMessage = e.Message;
log.StatusCode = LogStatus.Fail;
SendInfo("交易明細查詢時推送至MQ時異常", log);
_projectAPIService.AddInterfaceLog(log);
}
}
}
}
catch (Exception ex)
{
result.Code = StandardStatus.fail;
result.Msg = "操作失敗";
InterfaceLog log = new InterfaceLog();
log.BatchNumber = "";
log.InterfaceCode = InterfaceCodeEnum.ZPINACCOUNTBALANCE.ToString();
log.RecordDate = DateTime.Now;
log.ID = Generator.GenerateGuid();
log.ErrorMessage = ex.Message;
log.StatusCode = LogStatus.Fail;
SendInfo("交易明細查詢時推送至MQ時異常", log);
_projectAPIService.AddInterfaceLog(log);
}
return result;
}

/// <summary>
/// 賬戶餘額查詢響應----消費者
/// </summary>
public void AccountBalanceResponse()
{
string str = string.Empty;

var factory = new ConnectionFactory();
factory.HostName = _HostName;
factory.UserName = _UserName;
factory.Password = _Pass;
//要啟用自動連線恢復
factory.AutomaticRecoveryEnabled = true;
string ROUTKEY = "queryhistoryres.key";
string _QueueName = "queue.fclouds.querybalanceres";//佇列名
try
{
using (var connection = factory.CreateConnection())
{
try
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(_QueueName, true, false, false, null);

//繫結佇列,通過鍵 ROUTKEY將佇列和交換器繫結起來
channel.QueueBind(_QueueName, EXCHANGE_NAME, ROUTKEY);

//公平分發 同一時間只處理一個訊息
channel.BasicQos(0, 1, false);

#region 執行消費
//在佇列上定義一個消費者
var consumer = new QueueingBasicConsumer(channel);

//消費佇列,並設定應答模式為程式主動應答
channel.BasicConsume(_QueueName, false, consumer);

try
{
try
{
//獲取資訊
BasicDeliverEventArgs ea = new BasicDeliverEventArgs();
bool IsSuccess = consumer.Queue.Dequeue(2000, out ea);
//var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
if (IsSuccess && ea != null)
{
byte[] bytes = ea.Body;

str = Encoding.UTF8.GetString(bytes);

try
{

//將響應報文轉成model
TransactionBalance transBalance = new TransactionBalance();
var tranBlance = ModelSerializer.DeserializerWithXmlString<TransactionBalance>(transBalance, str);
if (tranBlance != null)
{
if (tranBlance.TransactionBody != null)
{
if (tranBlance.TransactionBody.response != null)
{
try
{
//業務操作

//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), true, "", "", str);
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}
else
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, "響應報文response節點為空", "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}
else
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, "響應報文TransactionBody節點為空", "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}

}
else
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, "賬戶餘額查詢響應報文為空", "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}

channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
#endregion
}
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}
}
catch (Exception ex)
{
//寫日誌
var logInfo = AddLog(InterfaceCodeEnum.ZPINACCOUNTBCPONSE.ToString(), false, ex.Message, "", str);
SendInfo("賬戶餘額查詢響應時", logInfo);
}
}