1. 程式人生 > >ActiveMQ環境部署+C#推送和接收消息

ActiveMQ環境部署+C#推送和接收消息

pri active erro 技術 exc using serialize 生產 sage

一、 ActiveMQ環境部署

  1. Jdk:jdk-8u91-windows-i586.exe
  2. ActiveMQ:apache-activemq-5.15.0,選擇win64,啟動服務activemq,要求端口號61616不被占用,啟動服務效果如圖:
技術分享

  1. 如果安裝提示Failed to execute start task,解決方法:停止ICS(運行-->services.msc找到Internet Connection Sharing (ICS)服務,改成手動啟動或禁用)
  2. ActiveMQ類庫:

    (1)Apache.NMS.dll路徑:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-3.5

    (2)Apache.NMS.ActiveMQ.dll路徑:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-3.5\debug

安裝完成,訪問地址效果如圖:

技術分享

  1. ActiveMQ後臺管理地址:http://localhost:8161/admin,默認賬號:admin,密碼:admin
技術分享

二、 C#ActiveMQ實現推送接收數據

  1. 添加ActiveMQ類庫Apache.NMS.dll、Apache.NMS.ActiveMQ.dll
技術分享

  1. 定義傳值參數類:ActiveMQModel,命名空間定義:ActiveMQClient。

namespace ActiveMQClient

{

[Serializable]

public class ActiveMQModel

{

/// <summary>

/// guid

/// </summary>

public string guid { get; set; }

/// <summary>

/// 方法名

/// </summary>

public string method { get; set; }

/// <summary>

/// 接口參數(T轉json)

/// </summary>

public string json { get; set; }

}

}

  1. 初始化ActiveMQ,註冊推送事件,定義推送方法。

using ActiveMQClient;

using Apache.NMS;

using Apache.NMS.Util;

using Newtonsoft.Json;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace ActiveMQ

{

public class ActiveMQHelper

{

private static IConnectionFactory connFac;

private static IConnection connection;

private static ISession session;

private static IDestination destination;

private static IMessageProducer producer;

private static IMessageConsumer consumer;

/// <summary>

/// 初始化ActiveMQ

/// </summary>

public static void initAMQ()

{

string strsendTopicName = "A";//推送方topic名

string strreceiveTopicName = "B";//接受方toptic名

var url = "localhost:61616";//activemq地址

var userid = "oa";//帳戶

var pwd = "oa";//密碼

try

{

connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://" + url + ")")); //new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));

//新建連接

connection = connFac.CreateConnection(userid, pwd);//connFac.CreateConnection("oa", "oa");//設置連接要用的用戶名、密碼

//如果你要持久“訂閱”,則需要設置ClientId,這樣程序運行當中被停止,恢復運行時,能拿到沒接收到的消息!

connection.ClientId = "ClientId_" + strsendTopicName;

//connection = connFac.CreateConnection();//如果你是缺省方式啟動Active MQ服務,則不需填用戶名、密碼

//創建Session

session = connection.CreateSession();

//發布/訂閱模式,適合一對多的情況

destination = SessionUtil.GetDestination(session, "topic://" + strreceiveTopicName);

//新建生產者對象

producer = session.CreateProducer(destination);

producer.DeliveryMode = MsgDeliveryMode.Persistent;//ActiveMQ服務器停止工作後,消息不再保留

//新建消費者對象:普通“訂閱”模式

//consumer = session.CreateConsumer(destination);//不需要持久“訂閱”

//新建消費者對象:持久"訂閱"模式:

// 持久“訂閱”後,如果你的程序被停止工作後,恢復運行,

//從第一次持久訂閱開始,沒收到的消息還可以繼續收

consumer = session.CreateDurableConsumer(

session.GetTopic(strsendTopicName)

, connection.ClientId, null, false);

//設置消息接收事件

consumer.Listener += new MessageListener(OnMessage);

//啟動來自Active MQ的消息偵聽

connection.Start();

}

catch (Exception e)

{

SysErrorLog.SaveErrorInfo(e, "初始化ActiveMQ失敗");

}

}

/// <summary>

/// 推送ActiveMQ

/// </summary>

/// <param name="guid"></param>

/// <param name="t"></param>

/// <param name="method"></param>

public static void Send(string guid, object t, string method)

{

if (producer == null)

{

initAMQ();

}

if (session == null)

{

throw new Exception("請初始化ActiveMQ!");

}

if (producer == null)

{

throw new Exception("請初始化ActiveMQ!");

}

var model = new ActiveMQModel();

model.guid = guid;

model.method = method;

model.json = JsonConvert.SerializeObject(t);

var i = session.CreateObjectMessage(model);

producer.Send(i);

}

/// <summary>

/// 接收ActiveMQ消息

/// </summary>

/// <param name="receivedMsg"></param>

protected static void OnMessage(IMessage receivedMsg)

{

if (receivedMsg is IObjectMessage)

{

var message = receivedMsg as IObjectMessage;

if (message.Body is ActiveMQModel)

{

SysErrorLog.SaveErrorInfo("ActiveMQModel=" + JsonConvert.SerializeObject(message.Body));

}

}

}

}

}

三、 C#推送ActiveMQ,以更新機構商品庫存為例:

推送代碼:

var model = new

{

ShopId = ShopId,//門店編碼

proNum = newKuc,//庫存

skuNo = skuno,//sku

};

var guid=Guid.NewGuid().ToString();

var method = "updatestoreproductkuc";

var lst = new List<object>();

lst.Add(model);

ActiveMQHelper.Send(guid, lst, method);

接收報文格式如下

技術分享

ActiveMQ環境部署+C#推送和接收消息