.Net Rpc服務調用記錄3
阿新 • • 發佈:2018-11-22
bstr bootstrap class console rbo 完成後 chan oop flags
在服務端定義了IServer 和ServerBase負責服務端的啟動,關閉等;在Server啟動時,需要1.開啟端口檢測 2.註冊服務 3.提供了接口IServerAddInsInitializer註入,在啟動時執行額外邏輯;具體代碼如下:
using GP.RPC.Message; using GP.RPC.Route.Manager; using GP.RPC.Server.SerivceType; using GP.RPC.Server.ServerInitializer; using GP.RPC.Server.ServiceExecutor; using GP.RPC.Server.ServiceRegistion;View Codeusing GP.RPC.Transport.Receiver; using GP.RPC.Transport.Sender; using Microsoft.Extensions.DependencyModel; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace GP.RPC.Server.Abstract { public abstract class ServerBase : IServer {public IServerAddInsInitializer ServerAddInsInitializer { get; set; } public IServiceRegistion Registion { get; set; } public ServerBase() { Registion = NullServiceRegistion.Instance; } public virtual async Task StartAsync(int port) { awaitStartCoreAsync(port); await Registion.RegisterAsync(port); ServerAddInsInitializer = NullServerAddInsInitializer.Instance; ServerAddInsInitializer.Initialize(); } protected abstract Task StartCoreAsync(int port); public abstract void ShutDown(); public abstract void Print(); } }
通信使用DotNetty完成,因此,服務端實際實現為NettyServer類,具體代碼如下:
using DotNetty.Codecs; using DotNetty.Handlers.Logging; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using GP.RPC.IOC; using GP.RPC.Message; using GP.RPC.Server; using GP.RPC.Server.Abstract; using GP.RPC.Server.ServiceExecutor; using GP.RPC.Server.ServiceRegistion; using GP.RPC.Transport.Netty.Codec; using GP.RPC.Transport.Netty.Handler; using GP.RPC.Transport.Netty.Sender; using GP.RPC.Transport.Receiver; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace GP.RPC.Transport.Netty.Server { public class NettyServer : ServerBase { private ServerBootstrap bootstrap; private MultithreadEventLoopGroup boss; private MultithreadEventLoopGroup worker; private IChannel bootstrapChannel; //private NettyMessageDecoderAdapter decoder = IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(); //private NettyMessageEncoderAdapter encoder = IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(); //private NettyServerHandler handler = new NettyServerHandler(); private IMessageReceiver _receiver; private IServiceExecutor _executor; public NettyServer( IMessageReceiver receiver, IServiceExecutor executor) { _executor = executor; _receiver = receiver; _receiver.Received += Message_Received; //handler.AsyncServerReceiverHook += _receiver.OnReceived; } public override void ShutDown() { Task.WhenAll( bootstrapChannel.CloseAsync(), boss.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), worker.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) ); } protected override async Task StartCoreAsync(int port) { boss = new MultithreadEventLoopGroup(1); worker = new MultithreadEventLoopGroup(); bootstrap = new ServerBootstrap(); bootstrap.Group(boss, worker) .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .Handler(new LoggingHandler(LogLevel.INFO)) .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; var handler = new NettyServerHandler(); handler.AsyncServerReceiverHook += _receiver.OnReceived; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(), IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(), handler); })); bootstrapChannel = await bootstrap.BindAsync(port); } private async Task Message_Received(object message) { var context = message as ServerReceiverContext; if (context != null) { var request = context.Message as RequestMessage; if (request != null) { var response = await _executor.ExecuteAsync(request); var sender = new NettySeverMessageSender(context.Context); await sender.SendAsync(null,response); } } } public override void Print() { Console.WriteLine($"Channel id is{this.bootstrapChannel.Id} State is {this.bootstrapChannel.Open} Active is{this.bootstrapChannel.Active}"); } } }View Code
服務端接收到消息後,將消息解碼為RequestMessage,並交給IServiceExecutor進行實際的服務調用;實現代碼如下:
using System; using System.Collections.Generic; using System.Reflection; using System.Text; using System.Threading.Tasks; using GP.RPC.Message; using GP.RPC.Utilities; using Microsoft.Extensions.DependencyModel; using System.Linq; using GP.RPC.Server.SerivceType; namespace GP.RPC.Server.ServiceExecutor { public class SimpleServiceExecutor : IServiceExecutor { private IServiceInstanceFactory _factory; private IServiceTypeManager _typeManager; public SimpleServiceExecutor(IServiceTypeManager typeManager,IServiceInstanceFactory factory) { _factory = factory; _typeManager = typeManager; } public async Task<ResponseMessage> ExecuteAsync(RequestMessage request) { ResponseMessage response = null; try { ContractUtility.CheckNull(request, "RequestMessage"); ContractUtility.CheckEmptyOrNull(request.ServiceType, "ServiceType"); ContractUtility.CheckEmptyOrNull(request.ServiceMethod, "ServiceMethod"); //這裏僅通過名字進行匹配 var type = GetServiceType(request.ServiceType); ContractUtility.Require(type != null, $"Not found matched ServiceType {request.ServiceType}"); var parameterTypes = GetParameterTypes(request.Parameters); var method = GetServiceMethod(type,request.ServiceMethod, parameterTypes); ContractUtility.Require(method != null, $"Not found matched ServiceMethod {request.ServiceMethod} in SerivceType {request.ServiceType}"); var result = await ExecuteCoreAsync(_factory.Create(type), method, method.ReturnType, request.Parameters); response = new ResponseMessage { RequestId = request.Id, Success=true, Result = result }; //Console.WriteLine($"Server:{request.Id}"); } catch (Exception ex) { response =ResponseMessage.FromException(request.Id, ex); } return response; } protected Type GetServiceType(string serviceType) { return _typeManager.GetServiceType(serviceType); } protected Type[] GetParameterTypes(object[] parameters) { if (null == parameters || 0 == parameters.Length) { return Type.EmptyTypes; } return parameters.Select(p => p.GetType()).ToArray(); } protected MethodInfo GetServiceMethod(Type type,string serviceMethod, Type[] parameterTypes) { var method = type.GetMethod(serviceMethod, parameterTypes); if (method == null || method.IsGenericMethod) { return null; } return method; } protected async Task<object> ExecuteCoreAsync(object instance, MethodInfo method,Type returnType , object[] parameters) { var result = method.Invoke(instance, parameters); if (!IsAsync(returnType)) { return result; } if (returnType == typeof(Task)) { await ProcessAsyncTask((Task)result); return null; } return CallProcessAsyncTaskWithResult(returnType.GenericTypeArguments[0], result); } protected bool IsAsync(Type returnType) { return ( returnType == typeof(Task) || (returnType.GetTypeInfo().IsGenericType && returnType.GetGenericTypeDefinition() == typeof(Task<>)) ); } protected async Task ProcessAsyncTask(Task actualTask) { await actualTask; } protected async Task<T> ProcessAsyncTaskWithResult<T>(Task<T> actualTask) { return await actualTask; } protected object CallProcessAsyncTaskWithResult(Type taskReturnType, object actualTaskValue) { return this.GetType() .GetMethod("ProcessAsyncTaskWithResult", BindingFlags.Instance) .MakeGenericMethod(taskReturnType) .Invoke(this, new object[] { actualTaskValue }); } } }View Code
服務調用完成後,生成相應消息ResponseMessage,並交給Server端的MessageSender,進行發送回客戶端。由此服務端完成整個處理。
.Net Rpc服務調用記錄3