1. 程式人生 > 其它 >Advanced .NET Remoting: 第 9 章 3.在 Remoting 中傳遞額外的執行時資訊

Advanced .NET Remoting: 第 9 章 3.在 Remoting 中傳遞額外的執行時資訊

Advanced .NET Remoting:第 9 章

3.傳遞執行時資訊

前面使用的接收器 ( Sink ) 是 IClientChannelSinks 與 IServerChannelSinks。這意味著它們工作在格式化器已經序列化 IMessage 物件 之後 。而對於 IMessageSink 來說,不同的是,可以直接工作在訊息的內容被格式化 之前 。這意味著,你對 IMessage 內容的任何修改都將被序列化,進而反映到最終的流中。

注意:即使你試圖在 IClienntChannelSink 中修改 IMessage 物件的內容,需要注意的是,這些修改 不會 被傳播到伺服器端,因為序列化的流已經通過 IMessage 物件生成了。

基於該區別,客戶端 IMessageSink 可以用來從客戶端將執行時資訊傳遞到伺服器端。在隨後的示例中,我將向您展示如何將客戶端的執行緒 ( thread ) 的當前優先級別 ( priority ) 傳遞到伺服器端,以便遠端方法可以執行在同樣的優先順序上。

為了從客戶端向伺服器端傳送任意資料,你需要將它加入到 Message 物件的邏輯呼叫上下文 ( logical call context ) 中。通過這種方式,你可以傳遞可序列化物件 ( serializable ) 或者擴充套件了 MarshalByRefObject 的物件。例如,為了在對伺服器上任何方法的呼叫中,傳遞客戶端的執行緒的當前上下文,你可以實現如下的 SyncProcessMessage()

方法:

public IMessage SyncProcessMessage(IMessage msg)
{
   if (msg as IMethodCallMessage != null)
   {
      LogicalCallContext lcc =
         (LogicalCallContext) msg.Properties["__CallContext"];

      lcc.SetData("priority",Thread.CurrentThread.Priority);
      return _nextMsgSink.SyncProcessMessage(msg);
   }
   else
   {
      return _nextMsgSink.SyncProcessMessage(msg);
   }
}

對於 AsyncProcessMessage() 方法也同樣處理。

public IMessageCtrl AsyncProcessMessage(IMessage msg, IMessageSink replySink)
{
   if (msg as IMethodCallMessage != null)
   {
      LogicalCallContext lcc =
         (LogicalCallContext) msg.Properties["__CallContext"];

      lcc.SetData("priority",Thread.CurrentThread.Priority);
      return _nextMsgSink.AsyncProcessMessage(msg,replySink);
   }
   else
   {
      return _nextMsgSink.AsyncProcessMessage(msg,replySink);
   }
}

在伺服器端,你也必須實現一個 IServerChannelSink 來從 IMessage 物件中提取呼叫上下文資訊 ( call context ),然後設定到 Thread.CurrentThread.Priority 上來應用該值。

public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
   IMessage requestMsg,
   ITransportHeaders requestHeaders,
   Stream requestStream,
   out IMessage responseMsg,
   out ITransportHeaders responseHeaders,
   out Stream responseStream)
{
   LogicalCallContext lcc =
      (LogicalCallContext) requestMsg.Properties["__CallContext"];

   // storing the current priority
   ThreadPriority oldprio = Thread.CurrentThread.Priority;

   // check if the logical call context contains "priority"
   if (lcc != null && lcc.GetData("priority") != null)
   {
      // fetch the priority from the call context
      ThreadPriority priority =
         (ThreadPriority) lcc.GetData("priority");

      Console.WriteLine(" -> Pre-execution priority change {0} to {1}",
         oldprio.ToString(),priority.ToString());

      // set the priority
      Thread.CurrentThread.Priority = priority;
   }

   // push on the stack and pass the call to the next sink
   // the old priority will be used as "state" for the response
   sinkStack.Push(this,oldprio);
   ServerProcessing spres = _next.ProcessMessage (sinkStack,
      requestMsg, requestHeaders, requestStream,
      out responseMsg,out responseHeaders,out responseStream);

   // restore priority if call is not asynchronous
   if (spres != ServerProcessing.Async)
   {
      if (lcc != null && lcc.GetData("priority") != null)
      {
         Console.WriteLine(" -> Post-execution change back to {0}",oldprio);
         Thread.CurrentThread.Priority = oldprio;
      }
   }  
   return spres;
}

用於伺服器端的接收器的接收器提供器非常簡單。它看起來多少與前面的 IServerChannelSink 是相同的。

在客戶端,這種方式有一點不太方便。請記住,你現在是在實現 IMessageSink 而不是 IClientChannelSing。如果尋找 IMessageSinkProvider 的話不會有 任何結果,所以,此時你還是不得不實現一個 IClientChannelSink - 即使實際上該聯結器是 IMessageSink。當檢視隨後部分的 IClientChannelSinkProvider 介面的時候,就會發現這個問題。

IClientChannelSink CreateSink(IChannelSender channel,
   string url,
   object remoteChannelData);

這個介面說明,在任何場景下,CreateSink() 都只會返回 IClientChannelSink,即使你的聯結器只需要實現 IMessageSink。所以現在你不得不擴充套件你的 IMessageSink 同時去實現 IClientChannelSink。你還不得不使用條件判斷,因為 IClientChannelSink 也定義來更多的方法需要實現。這些方法將在這個聯結器作為通道聯結器 ( channel sink ) 的時候而不是作為訊息聯結器 ( 也就是說,在格式化器之後 ) 被呼叫。 你可能不希望你的使用者將該聯結器放置在格式化器 之後 ( 它在這裡並不有效,因為它修改了 IMessage 物件的內容 ),所以你會希望對於這些方法丟擲異常。

完整的客戶端 PriorityEmitterSink 程式碼,會在用於錯誤的順序時丟擲這些異常,如列表 13-12 所示:

using System;
using System.Collections;
using System.IO;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Messaging;
using System.Threading;
namespace PrioritySinks
{
    public class PriorityEmitterSink : BaseChannelObjectWithProperties,
        IClientChannelSink, IMessageSink
    {
        private IMessageSink _nextMsgSink;
        public IMessageCtrl AsyncProcessMessage(IMessage msg, IMessageSink     replySink)
        {
            // only for method calls
            if (msg as IMethodCallMessage != null)
            {
                LogicalCallContext lcc =
                    (LogicalCallContext) msg.Properties["__CallContext"];
                lcc.SetData("priority",Thread.CurrentThread.Priority);
                return _nextMsgSink.AsyncProcessMessage(msg,replySink);
            }
            else
            {
                return _nextMsgSink.AsyncProcessMessage(msg,replySink);
            }
        }

        public IMessage SyncProcessMessage(IMessage msg)
        {
            // only for method calls
            if (msg as IMethodCallMessage != null)
            {
                LogicalCallContext lcc =
                (LogicalCallContext) msg.Properties["__CallContext"];
                lcc.SetData("priority",Thread.CurrentThread.Priority);
                return _nextMsgSink.SyncProcessMessage(msg);
            }
            else
            {
                return _nextMsgSink.SyncProcessMessage(msg);
            }
        }

        public PriorityEmitterSink (object next)
        {
            if (next as IMessageSink != null)
            {
                _nextMsgSink = (IMessageSink) next;
            }
        }

        public IMessageSink NextSink
        {
            get
            {
                return _nextMsgSink;
            }
        }

        public IClientChannelSink NextChannelSink
        {
            get
            {
                throw new RemotingException("Wrong sequence.");
            }
        
        }

        public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
            IMessage msg,
            ITransportHeaders headers,
            Stream stream)
        {
            throw new RemotingException("Wrong sequence.");
        }

        public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
            IMessage msg,
            ITransportHeaders headers,
            Stream stream)
        {
            throw new RemotingException("Wrong sequence.");
        }

        public void AsyncProcessResponse(
            IClientResponseChannelSinkStack sinkStack,
            object state,
            ITransportHeaders headers,
            Stream stream)
        {
            throw new RemotingException("Wrong sequence.");
        }

        public System.IO.Stream GetRequestStream(IMessage msg,
            ITransportHeaders headers)
        {
            throw new RemotingException("Wrong sequence.");
        }

        public void ProcessMessage(IMessage msg,
            ITransportHeaders requestHeaders,
            Stream requestStream,
            out ITransportHeaders responseHeaders,
            out Stream responseStream)
        {
            throw new RemotingException("Wrong sequence.");
        }
    }
}

客戶端的 PriorityEmitterSinkProvider 如列表 13-13 所示,實現很直接。只有方法 CreateSink() 比較值得關注。

列表 13-13 客戶端的 PriorityEmitterSinkProvider

using System;
using System.Collections;
using System.Runtime.Remoting.Channels;

namespace PrioritySinks
{
    public class PriorityEmitterSinkProvider: IClientChannelSinkProvider
    {
        private IClientChannelSinkProvider next = null;
        public PriorityEmitterSinkProvider(IDictionary properties,
            ICollection providerData)
        {
            // not needed
        }

        public IClientChannelSink CreateSink(IChannelSender channel,
            string url, object remoteChannelData)
        {
            IClientChannelSink nextsink =
                next.CreateSink(channel,url,remoteChannelData);
            return new PriorityEmitterSink(nextsink);
        }

        public IClientChannelSinkProvider Next
        {
            get { return next; }
            set { next = value; }
        }
    }
}

列表 13-14 是伺服器端的 IServerChannelSink 實現,與客戶端不同,它不是 IMessageSink,所以該實現更為一致。你不需要在這裡實現任何其它介面。

列表 13-14 伺服器端的 PriorityChangerSink

using System;
using System.Collections;
using System.IO;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Messaging ;
using System.Runtime.Remoting.Channels;
using System.Threading;
namespace PrioritySinks
{
    public class PriorityChangerSink : BaseChannelObjectWithProperties,
        IServerChannelSink, IChannelSinkBase
    {
        private IServerChannelSink _next;
        public PriorityChangerSink (IServerChannelSink next)
        {
            _next = next;
        }

        public void AsyncProcessResponse (
            IServerResponseChannelSinkStack sinkStack,
            Object state,
            IMessage msg,
            ITransportHeaders headers,
            Stream stream)
        {
            // restore the priority
            ThreadPriority priority = (ThreadPriority) state;
            Console.WriteLine(" -> Post-execution change back to {0}",priority);
            Thread.CurrentThread.Priority = priority;
        }

        public Stream GetResponseStream (IServerResponseChannelSinkStack sinkStack,
            Object state,
            IMessage msg,
            ITransportHeaders headers )
        {
            return null;
        }

        public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
            IMessage requestMsg,
            ITransportHeaders requestHeaders,
            Stream requestStream,
            out IMessage responseMsg,
            out ITransportHeaders responseHeaders,
            out Stream responseStream)
        {
            LogicalCallContext lcc =
                (LogicalCallContext) requestMsg.Properties["__CallContext"];
            // storing the current priority
            ThreadPriority oldprio = Thread.CurrentThread.Priority;
            // check if the logical call context contains "priority"
            if (lcc != null && lcc.GetData("priority") != null)
            {
                // fetch the priority from the call context
                ThreadPriority priority =
                    (ThreadPriority) lcc.GetData("priority");
                Console.WriteLine("-> Pre-execution priority change {0} to {1}",
                    oldprio.ToString(),priority.ToString());
                // set the priority
                Thread.CurrentThread.Priority = priority;
            }

            // push on the stack and pass the call to the next sink
            // the old priority will be used as "state" for the response
            sinkStack.Push(this,oldprio);
            ServerProcessing spres = _next.ProcessMessage (sinkStack,
                requestMsg, requestHeaders, requestStream,
                out responseMsg,out responseHeaders,out responseStream);
            
            // restore priority if call is not asynchronous
            if (spres != ServerProcessing.Async)
            {
                if (lcc != null && lcc.GetData("priority") != null)
                {
                    Console.WriteLine("-> Post-execution change back to {0}",oldprio);
                    Thread.CurrentThread.Priority = oldprio;
                }
            }
            return spres;
        }

        public IServerChannelSink NextChannelSink
        {
            get {return _next;}
            set {_next = value;}
        }
    }
}

列表 13-15 是相關的伺服器端聯結器提供器,它實現了介面 IServerChannelSinkProvider

列表 13-15 伺服器端的 PriorityChangerSinkProvider

using System;
using System.Collections;
using System.Runtime.Remoting.Channels;
namespace PrioritySinks
{
    public class PriorityChangerSinkProvider: IServerChannelSinkProvider
    {
        private IServerChannelSinkProvider next = null;
        public PriorityChangerSinkProvider(IDictionary properties,
            ICollection providerData)
        {
            // not needed
        }

        public void GetChannelData (IChannelDataStore channelData)
        {
            // not needed
        }

        public IServerChannelSink CreateSink (IChannelReceiver channel)
        {
            IServerChannelSink nextSink = next.CreateSink(channel);
            return new PriorityChangerSink(nextSink);
        }

        public IServerChannelSinkProvider Next
        {
            get { return next; }
            set { next = value; }
        }
    }
}

為了測試該聯結器的組合,使用如下的 SAO (伺服器端啟用物件) ,它會返回伺服器端的當前執行緒的優先順序:

public class TestSAO: MarshalByRefObject
{
    public String getPriority()
    {
        return System.Threading.Thread.CurrentThread.Priority.ToString();
    }
}

這個 SAO 將被客戶端使用不同的執行緒優先順序呼叫多次。伺服器端使用的配置檔案如下所示:

<configuration>
    <system.runtime.remoting>
        <application>
            <channels>
                <channel ref="http" port="5555">
                    <serverProviders>
                        <formatter ref="soap" />
                        <provider
                            type="PrioritySinks.PriorityChangerSinkProvider, PrioritySinks" />
                    </serverProviders>
                </channel>
            </channels>
            <service>
                <wellknown mode="Singleton"
                    type="Server.TestSAO, Server" objectUri="TestSAO.soap" />
            </service>
        </application>
    </system.runtime.remoting>
</configuration>

客戶端的配置檔案如下所示:

<configuration>
    <system.runtime.remoting>
        <application>
            <channels>
                <channel ref="http">
                    <clientProviders>
                        <provider
                            type="PrioritySinks.PriorityEmitterSinkProvider, PrioritySinks" />
                        <formatter ref="soap" />
                    </clientProviders>
                </channel>
            </channels>
            <client>
                <wellknown type="Server.TestSAO, generated_meta"
                    url="http://localhost:5555/TestSAO.soap" />
            </client>
        </application>
    </system.runtime.remoting>
</configuration>

在用於測試的客戶端,你可以使用 SoapSuds 來抽取元資料。當你執行如列表 13-16 所示的應用程式的時候,將會看到如圖 13-8 所示的輸出。

using System;
using System.Runtime.Remoting;
using Server; // from generated_meta.dll
using System.Threading;
namespace Client
{
    delegate String getPrioAsync();
    class Client
    {
        static void Main(string[] args)
        {
            String filename = "client.exe.config";
            RemotingConfiguration.Configure(filename);
            TestSAO obj = new TestSAO();
            test(obj);
            
            Thread.CurrentThread.Priority = ThreadPriority.Highest;
            test(obj);
            
            Thread.CurrentThread.Priority = ThreadPriority.Lowest;
            test(obj);
            
            Thread.CurrentThread.Priority = ThreadPriority.Normal;
            test(obj);

            Console.ReadLine();
        }

        static void test(TestSAO obj)
        {
            Console.WriteLine("----------------- START TEST CASE ---------------");
            Console.WriteLine(" Local Priority: {0}",
            Thread.CurrentThread.Priority.ToString());
            String priority1 = obj.getPriority();
            Console.WriteLine(" Remote priority: {0}",priority1.ToString());
            Console.WriteLine("----------------- END TEST CASE ---------------");
        }
    }
}

圖 13-8 通過客戶端測試的輸出,展示了聯結器如願工作

參考資料