1. 程式人生 > >一個簡單的rpc框架的實現

一個簡單的rpc框架的實現

為了降低開發成本,提升已有系統的利用率,企業往往會構建自己的SOA體系結構,SOA構建的手段有多種,可以通過webservice,restful,rmi,socket等等,筆者通過java socket,來構建一個簡單的SOA體系結構,以下程式碼僅供參考。

1.  需要實現SOA的服務介面

Java程式碼  收藏程式碼
  1. package com.chenkangxian.rpc.impl;  
  2. /** 
  3.  * 
  4.  * @Author: chenkangxian 
  5.  * 
  6.  * @Annotation: 根據key取資料介面 
  7.  * 
  8.  * @Date:2012-5-13 
  9.  * 
  10.  * @Copyright: 2012 chenkangxian, All rights reserved.
     
  11.  * 
  12.  */  
  13. public interface DataService {  
  14.     String getData(String key);  
  15. }  

2.   介面的實現

Java程式碼  收藏程式碼
  1. package com.chenkangxian.rpc.impl;  
  2. /** 
  3.  * 
  4.  * @Author: chenkangxian 
  5.  * 
  6.  * @Annotation: 根據key取資料服務實現 
  7.  * 
  8.  * @Date:2012-5-13 
  9.  * 
  10.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  11.  * 
  12.  */  
  13. public class
     DataServiceImpl implements DataService {  
  14.     public String getData(String key) {  
  15.         return "this is the data when key = " + key ;  
  16.     }  
  17. }  
 

3.  執行代理

Java程式碼  收藏程式碼
  1. /** 
  2.  *  
  3.  */  
  4. package com.chenkangxian.rpc.impl;  
  5. import java.io.ObjectInputStream;  
  6. import java.io.ObjectOutputStream;  
  7. import
     java.lang.reflect.InvocationHandler;  
  8. import java.lang.reflect.Method;  
  9. import java.net.Socket;  
  10. /** 
  11.  * @Author: chenkangxian 
  12.  * 
  13.  * @Annotation: 執行代理 
  14.  * 
  15.  * @Date:2012-5-15 
  16.  * 
  17.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  18.  *  
  19.  */  
  20. public class InvocationProxy implements InvocationHandler{  
  21.     private String host;  
  22.     private int port;  
  23.     public InvocationProxy(String host, int port){  
  24.         this.host = host;  
  25.         this.port = port;  
  26.     }  
  27.     public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {  
  28.         Socket socket = new Socket(host, port);  
  29.         try {  
  30.             ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
  31.             try {  
  32.                 output.writeUTF(method.getName());  
  33.                 output.writeObject(method.getParameterTypes());  
  34.                 output.writeObject(arguments);  
  35.                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  36.                 try {  
  37.                     Object result = input.readObject();  
  38.                     if (result instanceof Throwable) {  
  39.                         throw (Throwable) result;  
  40.                     }  
  41.                     return result;  
  42.                 } finally {  
  43.                     input.close();  
  44.                 }  
  45.             } finally {  
  46.                 output.close();  
  47.             }  
  48.         } finally {  
  49.             socket.close();  
  50.         }  
  51.     }  
  52. }  

4.    服務消費者

Java程式碼  收藏程式碼
  1. package com.chenkangxian.rpc.impl;  
  2. /** 
  3.  * @Author: chenkangxian 
  4.  * 
  5.  * @Annotation: 服務消費者 
  6.  * 
  7.  * @Date:2012-5-13 
  8.  * 
  9.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  10.  * 
  11.  */  
  12. public class RpcConsumer {  
  13.     public static void main(String[] args) throws Exception {  
  14.         DataService service = RpcFramework.refer(DataService.class"127.0.0.1"1234);  
  15.         for (int i = 0; i < Integer.MAX_VALUE; i ++) {  
  16.             String value = service.getData("key_" + i);  
  17.             System.out.println(value);  
  18.             Thread.sleep(1000);  
  19.         }  
  20.     }  
  21. }  

5.    遠端呼叫框架

Java程式碼  收藏程式碼
  1. package com.chenkangxian.rpc.impl;  
  2. import java.lang.reflect.Proxy;  
  3. import java.net.ServerSocket;  
  4. import java.net.Socket;  
  5. /** 
  6.  * 
  7.  * @Author: chenkangxian 
  8.  * 
  9.  * @Annotation: 簡單的遠端呼叫框架實現 
  10.  * 
  11.  * @Date:2012-5-13 
  12.  * 
  13.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  14.  * 
  15.  */  
  16. public class RpcFramework {  
  17.     /** 
  18.      * 暴露服務 
  19.      *  
  20.      * Author: chenkangxian 
  21.      * 
  22.      * Last Modification Time: 2012-5-15 
  23.      * 
  24.      * @param service 服務實現 
  25.      * @param port 服務埠 
  26.      * @throws Exception 
  27.      */  
  28.     public static void export(final Object service, int port) throws Exception {  
  29.         if (service == null)  
  30.             throw new IllegalArgumentException("service instance == null");  
  31.         if (port <= 0 || port > 65535)  
  32.             throw new IllegalArgumentException("Invalid port " + port);  
  33.         System.out.println("Export service " + service.getClass().getName() + " on port " + port);  
  34.         ServerSocket server = new ServerSocket(port);  
  35.         for(;;) {  
  36.             try {  
  37.                 final Socket socket = server.accept();  
  38.                 ThreadPoolHelp.getExecutorInstance().execute(new WorkThread(service, socket));  
  39.             } catch (Exception e) {  
  40.                 e.printStackTrace();  
  41.             }  
  42.         }  
  43.     }  
  44.     /** 
  45.      * 引用服務 
  46.      *  
  47.      * Author: chenkangxian 
  48.      * 
  49.      * Last Modification Time: 2012-5-15 
  50.      * 
  51.      * @param <T> 介面泛型 
  52.      * @param interfaceClass 介面型別 
  53.      * @param host 伺服器主機名 
  54.      * @param port 伺服器埠 
  55.      * @return 遠端服務 
  56.      * @throws Exception 
  57.      */  
  58.     @SuppressWarnings("unchecked")  
  59.     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {  
  60.         if (interfaceClass == null)  
  61.             throw new IllegalArgumentException("Interface class == null");  
  62.         if (! interfaceClass.isInterface())  
  63.             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");  
  64.         if (host == null || host.length() == 0)  
  65.             throw new IllegalArgumentException("Host == null!");  
  66.         if (port <= 0 || port > 65535)  
  67.             throw new IllegalArgumentException("Invalid port " + port);  
  68.         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);  
  69.         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationProxy(host,port));  
  70.     }  
  71. }  

6.    服務提供者

Java程式碼  收藏程式碼
  1. package com.chenkangxian.rpc.impl;  
  2. /** 
  3.  * @Author: chenkangxian 
  4.  * 
  5.  * @Annotation: 服務提供者 
  6.  * 
  7.  * @Date:2012-5-13 
  8.  * 
  9.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  10.  * 
  11.  */  
  12. public class RpcProvider {  
  13.     public static void main(String[] args) throws Exception {  
  14.         DataService service = new DataServiceImpl();  
  15.         RpcFramework.export(service, 1234);  
  16.     }  
  17. }  

7.    執行緒池幫助類

Java程式碼  收藏程式碼
  1. /** 
  2.  *  
  3.  */  
  4. package com.chenkangxian.rpc.impl;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. /** 
  8.  * @Author: chenkangxian 
  9.  * 
  10.  * @Annotation:  執行緒池幫助類 
  11.  * 
  12.  * @Date:2012-5-15 
  13.  * 
  14.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  15.  *  
  16.  */  
  17. public class ThreadPoolHelp {  
  18.     private static ExecutorService executor ;  
  19.     static{  
  20.         executor = Executors.newFixedThreadPool(20);  
  21.     }  
  22.     public static ExecutorService getExecutorInstance(){  
  23.         return executor;  
  24.     }  
  25. }  

8.    工作執行緒

Java程式碼  收藏程式碼
  1. /** 
  2.  *  
  3.  */  
  4. package com.chenkangxian.rpc.impl;  
  5. import java.io.ObjectInputStream;  
  6. import java.io.ObjectOutputStream;  
  7. import java.lang.reflect.Method;  
  8. import java.net.Socket;  
  9. /** 
  10.  * 
  11.  * @Author: chenkangxian 
  12.  * 
  13.  * @Annotation: 工作執行緒 
  14.  * 
  15.  * @Date:2012-5-15 
  16.  * 
  17.  * @Copyright: 2012 chenkangxian, All rights reserved. 
  18.  *  
  19.  */  
  20. public class WorkThread implements Runnable {  
  21.     private Object service;  
  22.     private Socket socket;  
  23.     public WorkThread(Object service,Socket socket){  
  24.         this.service = service;  
  25.         this.socket = socket;  
  26.     }  
  27.     @Override  
  28.     public void run() {  
  29.         try {  
  30.             try {  
  31.                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  32.                 try {  
  33.                     String methodName = input.readUTF();  
  34.                     Class<?>[] parameterTypes = (Class<?>[])input.readObject();  
  35.                     Object[] arguments = (Object[])input.readObject();  
  36.                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
  37.                     try {  
  38.                         Method method = service.getClass().getMethod(methodName, parameterTypes);  
  39.                         Object result = method.invoke(service, arguments);  
  40.                         output.writeObject(result);  
  41.                     } catch (Throwable t) {  
  42.                         output.writeObject(t);  
  43.                     } finally {  
  44.                         output.close();  
  45.                     }  
  46.                 } finally {  
  47.                     input.close();  
  48.                 }  
  49.             } finally {  
  50.                 socket.close();  
  51.             }  
  52.         } catch (Exception e) {  
  53.             e.printStackTrace();  
  54.         }  
  55.     }  
  56. }