Apache Ignite 改裝(一) -- 服務異步化支持
使用Apache Ignite的Service grid作為微服務開發框架, 通常是如下定義和實現Service的:
服務接口:
public interface MyService {
public String sayHello(String to);
}
本文將實現如下樣式的Service,使其異步化:
異步化的服務接口:
public interface MyServiceAsync {
public CompletionStage<String> sayHello(String to);
}
當前ignite對上邊這樣的異步的service方法並沒有remote支持。當調用端與服務部署再同一節點時,ignite會發起一個本地方法調用,這樣是沒有問題的,但是當服務部署端與調用端在不同節點時,ignite通過發起一個distributed task,將調用通過消息方式發布到服務部署節點,由於服務實現是異步的,通常來說,會返回一個未完成狀態的CompletionStage,後續當真正complete的時候,調用端的CompletionStage並不會被notify,即調用端永遠無法得到真正的調用結果。
org/apache/ignite/internal/processors/service/GridServiceProxy.java
... // line 192 if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) { CompletableFuture<Object> cs = new CompletableFuture<>(); //call async and notify completion stage ctx.closure().callAsyncNoFailover( GridClosureCallMode.BROADCAST, new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), Collections.singleton(node), false, waitTimeout, true).listen(f -> { if(f.error() != null) { cs.completeExceptionally(f.error()); }else if(f.isCancelled()) { cs.cancel(false); } if(f.isDone()) { try { Object result = f.get(); if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) { cs.completeExceptionally((IgniteException)result); }else { cs.complete(f.get()); } } catch (IgniteCheckedException e) { cs.completeExceptionally(e); } } }); return cs; } ...
這段代碼做了如下的事情:檢測當服務方法返回值是一個CompletionStage的時候,則創建一個CompletableFuture作為代理對象返回給調用端。隨後監聽服務的遠程調用的結果,並且用這個結果來更新這個CompletableFuture。到這裏,調用端的service proxy的改造就完成了。接下來,我們還需要改造服務節點這一端:
org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的這個finally塊),改造前:
finally {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
}
改造後:
finally {
if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
final boolean sendResult = sndRes;
final IgniteException igException = ex;
@SuppressWarnings("unchecked")
CompletionStage<Object> cs = (CompletionStage<Object>)res;
cs.exceptionally(t->{
return new IgniteException(t);
}).thenAccept(r->{
if (!HOLD.get()) {
IgniteException e = igException;
finishJob(r, e, sendResult);
} else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
});
} else {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
}
這裏做的事情是:當在服務部署節點上拿到執行結果的時候,如果發現服務返回結果是一個CompletionStage,那麽處理這個CompletionStage的exceptionally和thenAccept, 把結果發送給remote的調用端。
就這樣,通過簡單的改裝,我們使ignite有了處理異步服務方法調用的能力。下邊我們實現一個服務來看看改裝結果:
服務定義與實現:
import java.util.concurrent.CompletionStage;
import org.apache.ignite.services.Service;
public interface MyService extends Service {
public CompletionStage<String> sayHelloAsync(String to);
public String sayHelloSync(String to);
}
import java.util.concurrent.CompletionStage;
public class MyServiceImpl implements MyService {
private ScheduledExecutorService es;
@Override public void init(ServiceContext ctx) throws Exception {
es = Executors.newSingleThreadScheduledExecutor();
}
@Override public CompletionStage<String> sayHelloAsync(String to){
CompletableFuture<String> ret = new CompletableFuture<>();
//return "async hello $to" after 3 secs
es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
return ret;
}
@Override public String sayHelloSync(String to){
return "sync hello " + to;
}
...
}
然後將服務部署在Service grid中:
...
ServiceConfiguration sConf = new ServiceConfiguration();
sConf.setName("myservice.version.1");
sConf.setService(new MyServiceImpl());
sConf.setMaxPerNodeCount(2);
sConf.setTotalCount(4);
ignite.services().deploy(sConf);
...
然後啟動一個客戶端節點進行服務調用:
MyService service = ignite.services().serviceProxy("myservice.version.1", MyService.class, false);
//test async service
service.sayHelloAsync("nathan").thenAccept(r->{
System.out.println(r);
});
//test sync service
System.out.println(service.sayHelloSync("nathan"));
...
輸出結果:
sync hello nathan
async hello nathan
可以看到先輸出了sync的結果,大約3秒後輸出async的結果。
Apache Ignite 改裝(一) -- 服務異步化支持