grpc 在java中的使用 ---2
歡迎回來!
2.請求流介面
(客戶端可以源源不斷的給服務端傳引數,服務端會源源不斷的接受服務端的引數,最後在客戶端完成請求的時候,服務端返回一個結果)
在.proto檔案中新加一個方法,這個方法的引數被 stream 關鍵字修飾
rpc methodRequestStream(stream Request) returns (Result) {}
然後用maven,清理一下快取,重新編譯一下
2.1.服務端
重新編譯之後,實現剛剛新加的方法
@Override public StreamObserver<Request> methodRequestStream(StreamObserver<Result> responseObserver) { return new StreamObserver<Request>() { @Override public void onNext(Request request) { System.out.print("收到了請求 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onCompleted(); } }; }
(友情提示,如果 StreamObserver 的的泛型是Result 我們就叫 返回流觀察者,如果是 Request 就叫請求流觀察者,這樣好描述一些)
這個和普通的有點不一樣,直接返回了一個 請求流觀察者 的介面實現,而且方法的引數還是一個 返回流觀察者 ,好像搞反了一樣,至於為什麼,一會在客戶端那裡 統一說
2.2.客戶端
請求流式非同步呼叫,普通的是同步呼叫,我們在普通的方法裡建立的例項 也是同步的,所以我們要在 JavaGrpcClient 中新加一個 非同步呼叫的方法,新增一個非同步的例項
public <Result> Result runAsync(Functional<TestServiceGrpc.TestServiceStub,Result> functional) { TestServiceGrpc.TestServiceStub testServiceStub = TestServiceGrpc.newStub(channel); return functional.run(testServiceStub); }
TestServiceGrpc.newStub 返回的是一個非同步的例項
再加一個測試
@Test public void contextLoads2() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
這裡我們實現了一個 返回流觀察者
StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } };
呼叫方法的時候,將我們實現的 返回流觀察者 傳進去,返回給我們一個 請求流觀察者
StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
其實這裡返回的 請求流觀察者 就是服務端那裡返回給我們的內個實現,服務端那裡 返回流觀察者 是我們實現的 傳給他的
由於是非同步呼叫,最後暫停一下,要不測試跑完,程式結束 開沒開始就結束了
try { Thread.sleep(600000); } catch (Exception ex){}
執行起來看結果
服務端的列印
客戶端的列印
這裡我們傳送了三次引數過去
result.onNext(request); result.onNext(request); result.onNext(request);
就相當於 服務端 那邊返回的 請求流觀察者 被呼叫了 三次 ,所以就列印了三句話
傳送完引數結束請求
result.onCompleted();
服務端那裡的結束請求中呼叫了一次我們傳給他的 返回流觀察者 中的 onNext 方法
所以客戶端就列印了一次
這裡會有人問 這裡不能返回 多個嗎
不能,雖然 這兩個觀察者 看上去一樣 都是 StreamObserver 介面,但是,這個方法只是請求流呼叫,在grpc的內部 最後返回的時候 只返回第一個指定的返回只,不管返回了多少個,在客戶端那邊只會收到 第一個返回的結果
3.響應流介面
(和請求流介面完全相反,請求流是非同步,響應流是同步,請求流是接受多個請求返回一個結果,響應流是接受一個請求返回多個結果)
我們在.proto檔案中再增加一個方法,這回這個方法的返回值被 stream 關鍵字修飾
rpc methodResultStream(Request) returns (stream Result){}
清快取,重新編譯
3.1.服務端
實現剛剛新加的方法
@Override public void methodResultStream(Request request, StreamObserver<Result> responseObserver) { System.out.print("收到了請求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result); responseObserver.onCompleted(); }
這裡跟普通的差不多,只是我們返回了三次結果
responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result);
3.2.客戶端
沒啥好加的了,直接上測試
@Test public void contextLoads3() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request)); result.forEachRemaining(o -> { System.out.print("返回了結果 \n"); }); System.out.print("結束 \n"); }
返回流請求是同步的,所以要調同步的方法,返回了一個迭代器
Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));
迭代器中有服務端的所有返回結果
result.forEachRemaining(o -> { System.out.print("返回了結果 \n"); });
執行結果
服務端結果
客戶端結果
由於是同步呼叫,在forEach中會等待服務端的每一個返回結果
4.雙向流介面
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
歇會,抽根菸!
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在.proto檔案中再加一個方法
rpc methodDoubleStream(stream Request) returns (stream Result){}
實現她
雙向流的服務端和請求流的沒啥區別,只是在接收到請求的時候沒有立刻結束請求
@Override public StreamObserver<Request> methodDoubleStream(StreamObserver<Result> responseObserver) { return new StreamObserver<Request>() { @Override public void onNext(Request value) { System.out.print("收到了請求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
客戶端也沒啥區別
@Test public void contextLoads4() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
雙向流也是非同步的,所以要等待
try { Thread.sleep(600000); } catch (Exception ex){}
服務端結果
客戶端結果
完結!撒花!