NetMQ Pull-Push 訊息模式 + 多執行緒 + 序列化
阿新 • • 發佈:2018-12-20
近期研究了一下NetMQ,設想把他用在分散式爬蟲上面,NetMQ是一個封裝了Socket佇列的開源庫,他是ZeroMQ的.net移植版,而ZeroMQ是用C寫成的,有人測試過他的效能,幾乎可以秒殺其他所有的MQ(MSMQ,RabitMQ等等,都不是他的對手),不過他也有一個弱點,訊息不支援持久化!當然,這個功能可以自己實現,我這裡只講效能,不需要持久化
下面的例子是我基於NetMQ官網的例子修改的,下面有三個物件Ventilator 訊息分發者,Worker 訊息處理者,Sink 接受Worker處理訊息後返回的結果,耗時的計算處理工作是交給Worker的,如果開多個Worker.exe,可以提升處理速度,Worker的最終目的是分散式計算,部署到多臺PC上面,把計算工作交給他們去做(在分散式爬蟲上面,每個Worker相當於一個爬蟲)。
不廢話,上程式碼(本來打算用protobuf.net作為序列化格式,在多執行緒環境下老是報一個錯,暫時不知道是什麼原因,所以這段註釋掉了)
首先是定義要傳送到訊息裡的物件
using System;using ProtoBuf;namespace Model{ [Serializable] [ProtoContract] public class Person { [ProtoMember(1)] public int Id { get; set; } [ProtoMember(2)] public string Name { get ; set; } [ProtoMember(3)] public DateTime BirthDay { set; get; } [ProtoMember(4)] public Address Address { get; set; } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
using System;using ProtoBuf;namespace Model{ [Serializable] [ProtoContract] public class Address { [ProtoMember(1 )] public string Line1 { get; set; } [ProtoMember(2)] public string Line2 { get; set; } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
然後是訊息的傳送者
using System;using System.IO;using System.Runtime.Remoting.Channels;using System.Runtime.Serialization.Formatters.Binary;using System.Threading;using System.Threading.Tasks;using Model;using NetMQ;using ProtoBuf;using ProtoBuf.Meta;namespace Ventilator{ sealed class Ventilator { public void Run() { Task.Run(() => { using (var ctx = NetMQContext.Create()) using (var sender = ctx.CreatePushSocket()) using (var sink = ctx.CreatePushSocket()) { sender.Bind("tcp://*:5557"); sink.Connect("tcp://localhost:5558"); sink.Send("0"); Console.WriteLine("Sending tasks to workers"); RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000; //send 100 tasks (workload for tasks, is just some random sleep time that //the workers can perform, in real life each work would do more than sleep for (int taskNumber = 0; taskNumber < 10000; taskNumber++) { Console.WriteLine("Workload : {0}", taskNumber); var person = new Person { Id = taskNumber, Name = "First", BirthDay = DateTime.Parse("1981-11-15"), Address = new Address { Line1 = "Line1", Line2 = "Line2" } }; using (var sm = new MemoryStream()) { //Serializer.PrepareSerializer<Person>(); //Serializer.Serialize(sm, person); //sender.Send(sm.ToArray()); var binaryFormatter = new BinaryFormatter(); binaryFormatter.Serialize(sm, person); sender.Send(sm.ToArray()); } } } }); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using NetMQ;namespace Ventilator{ public class Program { public static void Main(string[] args) { // Task Ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket Console.WriteLine("====== VENTILATOR ======"); Console.WriteLine("Press enter when worker are ready"); Console.ReadLine(); //the first message it "0" and signals start of batch //see the Sink.csproj Program.cs file for where this is used Console.WriteLine("Sending start of batch to Sink"); var ventilator = new Ventilator(); ventilator.Run(); Console.WriteLine("Press Enter to quit"); Console.ReadLine(); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
訊息的處理者
using System;using System.IO;using System.Runtime.Serialization.Formatters.Binary;using System.Threading;using System.Threading.Tasks;using Model;using NetMQ;using ProtoBuf;namespace Worker{ sealed class Worker { public void Run() { Task.Run(() => { using (NetMQContext ctx = NetMQContext.Create()) { //socket to receive messages on using (var receiver = ctx.CreatePullSocket()) { receiver.Connect("tcp://localhost:5557"); //socket to send messages on using (var sender = ctx.CreatePushSocket()) { sender.Connect("tcp://localhost:5558"); //process tasks forever while (true) { //workload from the vetilator is a simple delay //to simulate some work being done, see //Ventilator.csproj Proram.cs for the workload sent //In real life some more meaningful work would be done //string workload = receiver.ReceiveString(); var receivedBytes = receiver.Receive(); using (var sm = new MemoryStream(receivedBytes)) { //Protobuf.net 序列化在多執行緒方式下報錯: /* Timeout while inspecting metadata; this may indicate a deadlock. This can often be avoided by preparing necessary serializers during application initialization, rather than allowing multiple threads to perform the initial metadata inspection; please also see the LockContended event */ //var person = Serializer.Deserialize<Person>(sm); //採用二進位制方式 var binaryFormatter = new BinaryFormatter(); var person = binaryFormatter.Deserialize(sm) as Person; Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" + person.BirthDay + ",Address:{Line1:" + person.Address.Line1 + ",Line2:" + person.Address.Line2 + "}}"); Console.WriteLine("Sending to Sink:" + person.Id); sender.Send(person.Id + ""); } //simulate some work being done //Thread.Sleep(int.Parse(workload)); } } } } }); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
using System;using System.Collections.Generic;using System.Linq;using System.Threading.Tasks;namespace Worker{ public class Program { public static void Main(string[] args) { // Task Worker // Connects PULL socket to tcp://localhost:5557 // collects workload for socket from Ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to Sink via that socket Console.WriteLine("====== WORKER ======"); //Task 方式多執行緒 //foreach (Worker client in Enumerable.Range(0, 1000).Select( // x => new Worker())) //{ // client.Run(); //} //多核計算方式多執行緒 var actList = Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList(); var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.Invoke(paraOption, actList.ToArray()); Console.ReadLine(); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
接受訊息處理的結果
using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Text;using System.Threading.Tasks;using NetMQ;namespace Sink{ public class Program { public static void Main(string[] args) { // Task Sink // Bindd PULL socket to tcp://localhost:5558 // Collects results from workers via that socket Console.WriteLine("====== SINK ======"); using (NetMQContext ctx = NetMQContext.Create()) { //socket to receive messages on using (var receiver = ctx.CreatePullSocket()) { receiver.Bind("tcp://localhost:5558"); //wait for start of batch (see Ventilator.csproj Program.cs) var startOfBatchTrigger = receiver.ReceiveString(); Console.WriteLine("Seen start of batch"); //Start our clock now Stopwatch watch = new Stopwatch(); watch.Start(); for (int taskNumber = 0; taskNumber < 10000; taskNumber++) { //while (true) //{ var workerDoneTrigger = receiver.ReceiveString(); Console.WriteLine(workerDoneTrigger); //} } watch.Stop(); //Calculate and report duration of batch Console.WriteLine(); Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds); Console.ReadLine(); } } } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
再次提醒,Worker.exe 可以開多個,以提高效率