1. 程式人生 > >RPC遠端呼叫

RPC遠端呼叫

客戶端通過定義介面,通過動態代理的方式向服務端請求執行結果,服務端通過解析字串,來解析所要呼叫介面、方法、引數,並將介面實現的結果通過Socket的方式返回給客戶端。

1.    服務端將介面通過map的方式對映成方法名和介面類,並啟動socket連線。

package server;

public interface ServerCenter {
	
	public void start();
	
	public void stop();
	
	public void register(Class service , Class serviceImpl);

}
package serverImpl;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import server.ServerCenter;

public class ServerCenterImpl implements ServerCenter {

	private final int port = 12000;
	@SuppressWarnings("rawtypes")
	private static ConcurrentHashMap<String, Class> register = new ConcurrentHashMap<>();
	// 連線池:連線池中存在多個連線物件,每個物件都可以處理一個客戶請求
	private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
	// 是否啟動執行緒
	private static boolean isRunning = false;

	@SuppressWarnings("resource")
	@Override
	public void start() {
		ServerSocket server = null;
		try {
			server = new ServerSocket();
			server.bind(new InetSocketAddress(port));
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

		isRunning = true;
		System.out.println("開啟服務執行緒...");
		while (true) {
			Socket socket = null;
			try {
				socket = server.accept();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			executor.execute(new ServiceTask(socket));
		}

	}

	// 關停請求執行緒
	@Override
	public void stop() {
		// 如果有執行緒執行,則關閉
		if (isRunning) {
			executor.shutdown();
		}
	}

	// 註冊服務
	@Override
	public void register(Class service, Class serviceImpl) {
		register.put(service.getName(), serviceImpl);
	}

	private static class ServiceTask implements Runnable {

		private Socket socket;

		public ServiceTask(Socket socket) {
			this.socket = socket;
		}

		@Override
		public void run() {
			ObjectInputStream input = null;
			ObjectOutputStream output = null;
			try {
				// 等待客戶端連線
				input = new ObjectInputStream(socket.getInputStream());

				// 序列流需要順序接收解析
				String serviceName = input.readUTF();
				String methodName = input.readUTF();
				@SuppressWarnings("rawtypes")
				Class[] parametersTypes = (Class[]) input.readObject();
				Object[] args = (Object[]) input.readObject();

				// 根據client請求找到具體介面
				Class<?> serviceClass = register.get(serviceName);
				// 根據方法名和引數型別獲取方法
				Method method = serviceClass.getMethod(methodName, parametersTypes);
				// 執行方法
				Object res = method.invoke(serviceClass.newInstance(), args);

				// 返回結果
				output = new ObjectOutputStream(socket.getOutputStream());
				output.writeObject(res);

			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				if (input != null) {
					try {
						input.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
				if (output != null) {
					try {
						output.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}
}

 2. 客戶端與服務端建立連線,通過JDK動態代理方式將請求的介面名、方法名、引數型別、引數列表傳送給服務端。服務端解析這些欄位,呼叫服務端的介面實現,將結果返回給客戶端。

package client;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class Client {
	
	@SuppressWarnings("unchecked")
	public static <T> T getRemoteProxyRes(Class<?> serviceInterface,InetSocketAddress addr){
		
        /**
         * 1. 類載入器 classLoader: 載入需要代理的類(HelloServic介面)
         * 2. 需要代理物件,具有哪些功能 介面
         * 3. 
         */
		return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] {serviceInterface} , new InvocationHandler() {
			
			/*
			 * (non-Javadoc)
			 * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
			 * proxy: 代理物件
			 * method: 哪個方向
			 * args :引數列表
			 */
			@Override
			public Object invoke(Object proxy , Method method, Object[] args)
					throws Throwable {
				ObjectOutputStream output = null;
				ObjectInputStream input = null;
				@SuppressWarnings("resource")
				Socket socket = new Socket();
				socket.connect(addr);
				
				try {		
				// 傳送資訊
				output = new ObjectOutputStream(socket.getOutputStream());
				// 介面名
				output.writeUTF(serviceInterface.getName());
				// 方法名
				output.writeUTF(method.getName());
				// 引數型別
				output.writeObject(method.getParameterTypes());
				// 引數列表
				output.writeObject(args);
				
				// 等待伺服器處理...
				
				// 讀取伺服器返回的結果
				input = new ObjectInputStream(socket.getInputStream());
				return input.readObject();
				
				} catch (Exception e) {
					e.printStackTrace();
					return null;
				} finally {
					if (input != null){
						try {
							input.close();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
					
					if (output != null){
						try {
							output.close();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				}
			}
		});
	}
}