1. 程式人生 > >Thrift之雙向通訊

Thrift之雙向通訊

在實際應用中,卻經常會有客戶端建立連線後,等待服務端資料的長連線模式,也可以稱為雙向連線。
一、雙連線,服務端與客戶端都開ThriftServer
如果網路環境可控,可以讓服務端與客戶端互相訪問,你可以給服務端與客戶端,兩者都開一個ThriftServer,也就是兩者互為服務端與客戶端。這樣就可以簡單實現互相訪問,比如:
客戶端: <------------------->  服務端:
ThriftClient --------------> ThriftService
ThriftService <------------- ThriftClient

二、單連線,利用ProcessorFactory中TConnectionInfo的transport定時向客戶端傳送訊息,讓thrift保持長連線不立即關閉。
thrift是rpc結構的通訊框架,rpc結構預設是 【客戶端請求 -> 服務端迴應 -> 連線斷開】 的這種短連線形式,因此rpc預設是沒有服務端回撥功能,自然也沒有長連線。
如果要保持連線不關閉且被動接收到對方的資料,需要指定雙方連線的service必須為oneway,服務端定時向客戶端傳送資訊(利用客戶端傳送資料到服務端時連線成功時產生的transport,需客戶端也建立服務Processor),同時客戶端實時檢測transport的狀態,以便出現與服務端連線斷開的情況出現。具體流程:
1、雙向連線的service必須為oneway,否則會因為recv函式丟擲remote close異常。
2、客戶端重用建立client的protocol,開執行緒使用processor.Process(protocol,protocol)監聽服務端回調發送過來的訊息。
3、服務端Processor的建立,使用ProcessorFactory建立Processor,通過getProcessor函式中transport作為向客戶端傳送訊息的client的transport而建立一個Processor。

java例項

定義test.thrift

namespace java com.zychen.thrift
service ClientHandshakeService{
    oneway void HandShake();
}

service ServerCallbackService{
    oneway void Push(1: string msg); 
}

生成介面程式碼

thrift-0.9.3.exetest.thrift檔案放在同一個目錄。

進入DOS命令執行:thrift-0.9.3.exe --gen java test.thrift

生成檔案gen-java/ com/zychen/thrift/Test.java

服務端程式碼

ClientHandshakeServiceHandler.java

package com.zychen.thrift;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;

public class ClientHandshakeServiceHandler implements ClientHandshakeService.Iface {
	public ClientHandshakeServiceHandler(TTransport trans){
		client = new ServerCallbackService.Client(new TBinaryProtocol(trans));
	}
	
	@Override
	public void HandShake() throws TException {
		 System.out.println("HandShake\n");
		 StartThread();
	}

	//開始執行緒
	public void StartThread(){
		if(threadCallback == null){
			stopThread = false;
			threadCallback = new Thread(new CallbackThread());
			threadCallback.start();
		}
	}
	
	//停止執行緒
	public void StopThread(){
		stopThread = true;
		if(threadCallback != null){
			try {
				threadCallback.join();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			threadCallback = null;
		}
	}
	
	@Override
	protected void finalize() throws Throwable {
		// TODO Auto-generated method stub
		StopThread();
		super.finalize();
	}



	protected ServerCallbackService.Client client;
	protected boolean stopThread = false;
	protected Thread  threadCallback = null;
	
	class CallbackThread implements Runnable {
		public void run() {
			while(true){
				if(stopThread){
					break;
				}
				try {
					client.Push("aaaaaaa");
					Thread.sleep(50);
				} catch (TException | InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					return;
				}	
			}
		}
	};
}
ProcessorFactoryImpl.java
package com.zychen.thrift;

import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TTransport;

import com.zychen.thrift.ClientHandshakeService.Processor;

public class ProcessorFactoryImpl extends TProcessorFactory {

	public ProcessorFactoryImpl(TProcessor processor) {
		super(processor);
		// TODO Auto-generated constructor stub
	}

	@Override
	public TProcessor getProcessor(TTransport trans) {
		// TODO Auto-generated method stub
		//return super.getProcessor(trans);
        return new ClientHandshakeService.Processor(new ClientHandshakeServiceHandler(trans));
	}
}
ServerTest.java
package com.zychen.thrift;

import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;

import com.zychen.thrift.ClientHandshakeService.Processor;

public class ServerTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		TServerSocket tServerSocket;
		try {
			tServerSocket = new TServerSocket(9999);
	        TThreadPoolServer.Args targs = new TThreadPoolServer.Args(tServerSocket);
	        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
	        //獲取processFactory
	        TProcessorFactory tProcessorFactory = new ProcessorFactoryImpl(null);
	        targs.protocolFactory(factory);
	        targs.processorFactory(tProcessorFactory);
	        TThreadPoolServer tThreadPoolServer = new TThreadPoolServer(targs); 
	        System.out.println("start server...");
	        tThreadPoolServer.serve();
	        
		} catch (TTransportException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

客戶端程式碼

ServerCallbackServiceImpl.java

package com.zychen.thrift;

import java.io.IOException;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;

public class ServerCallbackServiceImpl implements ServerCallbackService.Iface{
	public ServerCallbackServiceImpl(TSocket socket){
		this.socket = socket;
	}
	
	@Override
	public void Push(String msg) throws TException {
		// TODO Auto-generated method stub
		String str = String.format("receive msg %d: %s", nMsgCount++, msg);
		System.out.println(str);
	}
	
	public void process(){
		processor = new ServerCallbackService.Processor<ServerCallbackService.Iface>(this);
		 TBinaryProtocol protocol = new TBinaryProtocol(socket);
		 while (true)
         {
             try
             {
            	 //TProcessor,負責呼叫使用者定義的服務介面,從一個介面讀入資料,寫入一個輸出介面
                 while (processor.process(protocol, protocol)){
                	//阻塞式方法,不需要內容
                     System.out.println("走阻塞式方法");
                     //關閉socket
                     //socket.close();
                 }
                 //connection lost, return
                 return;
             }catch (TException e){
            	 System.out.println("連線已斷開...");
                 e.printStackTrace();
                 return;
             }
         }
	}
	
	protected int nMsgCount = 0;
	protected TSocket socket;
	protected TProcessor processor;
}
ClientTest.java
package com.zychen.thrift;

import java.io.IOException;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import com.zychen.thrift.ServerCallbackService.Iface;

public class ClientTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		TSocket tSocket = new TSocket("localhost",9999);
		ClientHandshakeService.Client client = new ClientHandshakeService.Client(new TBinaryProtocol(tSocket));
		try {
			tSocket.open();
			runMethod(tSocket);
			//向服務端傳送訊息
			for (int i = 0; i < 100; ++i){
	            client.HandShake();
	            Thread.sleep(50);
	        }
			System.in.read();
			tSocket.close();
		} catch (TTransportException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static void runMethod(final TSocket tSocket){
		Thread thread = new Thread(new Runnable(){
			ServerCallbackServiceImpl serverCallbackServiceImpl = new ServerCallbackServiceImpl(tSocket);
			@Override
			public void run() {
				// TODO Auto-generated method stub
				serverCallbackServiceImpl.process();
			}
			
		});
		thread.start();
	};
}
完整程式碼:下載參考資料:
http://www.cnblogs.com/xiaosuiba/p/4122459.html
http://blog.csdn.net/qq_27989757/article/details/50761051