Thrift之雙向通訊
阿新 • • 發佈:2019-01-08
在實際應用中,卻經常會有客戶端建立連線後,等待服務端資料的長連線模式,也可以稱為雙向連線。
一、雙連線,服務端與客戶端都開ThriftServer
如果網路環境可控,可以讓服務端與客戶端互相訪問,你可以給服務端與客戶端,兩者都開一個ThriftServer,也就是兩者互為服務端與客戶端。這樣就可以簡單實現互相訪問,比如:
客戶端: <-------------------> 服務端:
ThriftClient --------------> ThriftService
ThriftService <------------- ThriftClient
二、單連線,利用ProcessorFactory中TConnectionInfo的transport定時向客戶端傳送訊息,讓thrift保持長連線不立即關閉。
thrift是rpc結構的通訊框架,rpc結構預設是 【客戶端請求 -> 服務端迴應 -> 連線斷開】 的這種短連線形式,因此rpc預設是沒有服務端回撥功能,自然也沒有長連線。
如果要保持連線不關閉且被動接收到對方的資料,需要指定雙方連線的service必須為oneway,服務端定時向客戶端傳送資訊(利用客戶端傳送資料到服務端時連線成功時產生的transport,需客戶端也建立服務Processor),同時客戶端實時檢測transport的狀態,以便出現與服務端連線斷開的情況出現。具體流程:
1、雙向連線的service必須為oneway,否則會因為recv函式丟擲remote close異常。
2、客戶端重用建立client的protocol,開執行緒使用processor.Process(protocol,protocol)監聽服務端回調發送過來的訊息。
3、服務端Processor的建立,使用ProcessorFactory建立Processor,通過getProcessor函式中transport作為向客戶端傳送訊息的client的transport而建立一個Processor。
客戶端程式碼
http://www.cnblogs.com/xiaosuiba/p/4122459.html
http://blog.csdn.net/qq_27989757/article/details/50761051
一、雙連線,服務端與客戶端都開ThriftServer
如果網路環境可控,可以讓服務端與客戶端互相訪問,你可以給服務端與客戶端,兩者都開一個ThriftServer,也就是兩者互為服務端與客戶端。這樣就可以簡單實現互相訪問,比如:
客戶端: <-------------------> 服務端:
ThriftClient --------------> ThriftService
ThriftService <------------- ThriftClient
二、單連線,利用ProcessorFactory中TConnectionInfo的transport定時向客戶端傳送訊息,讓thrift保持長連線不立即關閉。
thrift是rpc結構的通訊框架,rpc結構預設是 【客戶端請求 -> 服務端迴應 -> 連線斷開】 的這種短連線形式,因此rpc預設是沒有服務端回撥功能,自然也沒有長連線。
如果要保持連線不關閉且被動接收到對方的資料,需要指定雙方連線的service必須為oneway,服務端定時向客戶端傳送資訊(利用客戶端傳送資料到服務端時連線成功時產生的transport,需客戶端也建立服務Processor),同時客戶端實時檢測transport的狀態,以便出現與服務端連線斷開的情況出現。具體流程:
1、雙向連線的service必須為oneway,否則會因為recv函式丟擲remote close異常。
2、客戶端重用建立client的protocol,開執行緒使用processor.Process(protocol,protocol)監聽服務端回調發送過來的訊息。
3、服務端Processor的建立,使用ProcessorFactory建立Processor,通過getProcessor函式中transport作為向客戶端傳送訊息的client的transport而建立一個Processor。
java例項
定義test.thrift
namespace java com.zychen.thrift
service ClientHandshakeService{
oneway void HandShake();
}
service ServerCallbackService{
oneway void Push(1: string msg);
}
生成介面程式碼
把thrift-0.9.3.exe和test.thrift檔案放在同一個目錄。
進入DOS命令執行:thrift-0.9.3.exe --gen java test.thrift
生成檔案gen-java/ com/zychen/thrift/Test.java
ClientHandshakeServiceHandler.java
ProcessorFactoryImpl.javapackage com.zychen.thrift; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TTransport; public class ClientHandshakeServiceHandler implements ClientHandshakeService.Iface { public ClientHandshakeServiceHandler(TTransport trans){ client = new ServerCallbackService.Client(new TBinaryProtocol(trans)); } @Override public void HandShake() throws TException { System.out.println("HandShake\n"); StartThread(); } //開始執行緒 public void StartThread(){ if(threadCallback == null){ stopThread = false; threadCallback = new Thread(new CallbackThread()); threadCallback.start(); } } //停止執行緒 public void StopThread(){ stopThread = true; if(threadCallback != null){ try { threadCallback.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } threadCallback = null; } } @Override protected void finalize() throws Throwable { // TODO Auto-generated method stub StopThread(); super.finalize(); } protected ServerCallbackService.Client client; protected boolean stopThread = false; protected Thread threadCallback = null; class CallbackThread implements Runnable { public void run() { while(true){ if(stopThread){ break; } try { client.Push("aaaaaaa"); Thread.sleep(50); } catch (TException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); return; } } } }; }
package com.zychen.thrift;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TTransport;
import com.zychen.thrift.ClientHandshakeService.Processor;
public class ProcessorFactoryImpl extends TProcessorFactory {
public ProcessorFactoryImpl(TProcessor processor) {
super(processor);
// TODO Auto-generated constructor stub
}
@Override
public TProcessor getProcessor(TTransport trans) {
// TODO Auto-generated method stub
//return super.getProcessor(trans);
return new ClientHandshakeService.Processor(new ClientHandshakeServiceHandler(trans));
}
}
ServerTest.javapackage com.zychen.thrift;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import com.zychen.thrift.ClientHandshakeService.Processor;
public class ServerTest {
/**
* @param args
*/
public static void main(String[] args) {
TServerSocket tServerSocket;
try {
tServerSocket = new TServerSocket(9999);
TThreadPoolServer.Args targs = new TThreadPoolServer.Args(tServerSocket);
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
//獲取processFactory
TProcessorFactory tProcessorFactory = new ProcessorFactoryImpl(null);
targs.protocolFactory(factory);
targs.processorFactory(tProcessorFactory);
TThreadPoolServer tThreadPoolServer = new TThreadPoolServer(targs);
System.out.println("start server...");
tThreadPoolServer.serve();
} catch (TTransportException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客戶端程式碼
ServerCallbackServiceImpl.java
package com.zychen.thrift;
import java.io.IOException;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
public class ServerCallbackServiceImpl implements ServerCallbackService.Iface{
public ServerCallbackServiceImpl(TSocket socket){
this.socket = socket;
}
@Override
public void Push(String msg) throws TException {
// TODO Auto-generated method stub
String str = String.format("receive msg %d: %s", nMsgCount++, msg);
System.out.println(str);
}
public void process(){
processor = new ServerCallbackService.Processor<ServerCallbackService.Iface>(this);
TBinaryProtocol protocol = new TBinaryProtocol(socket);
while (true)
{
try
{
//TProcessor,負責呼叫使用者定義的服務介面,從一個介面讀入資料,寫入一個輸出介面
while (processor.process(protocol, protocol)){
//阻塞式方法,不需要內容
System.out.println("走阻塞式方法");
//關閉socket
//socket.close();
}
//connection lost, return
return;
}catch (TException e){
System.out.println("連線已斷開...");
e.printStackTrace();
return;
}
}
}
protected int nMsgCount = 0;
protected TSocket socket;
protected TProcessor processor;
}
ClientTest.javapackage com.zychen.thrift;
import java.io.IOException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import com.zychen.thrift.ServerCallbackService.Iface;
public class ClientTest {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
TSocket tSocket = new TSocket("localhost",9999);
ClientHandshakeService.Client client = new ClientHandshakeService.Client(new TBinaryProtocol(tSocket));
try {
tSocket.open();
runMethod(tSocket);
//向服務端傳送訊息
for (int i = 0; i < 100; ++i){
client.HandShake();
Thread.sleep(50);
}
System.in.read();
tSocket.close();
} catch (TTransportException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void runMethod(final TSocket tSocket){
Thread thread = new Thread(new Runnable(){
ServerCallbackServiceImpl serverCallbackServiceImpl = new ServerCallbackServiceImpl(tSocket);
@Override
public void run() {
// TODO Auto-generated method stub
serverCallbackServiceImpl.process();
}
});
thread.start();
};
}
完整程式碼:下載參考資料:http://www.cnblogs.com/xiaosuiba/p/4122459.html
http://blog.csdn.net/qq_27989757/article/details/50761051