如何通過Java中的物件值來同步塊
問題
有時,我們需要通過變數的值來同步程式碼塊。
為了解這個問題,我們將考慮一個簡單的銀行應用程式,它對客戶的每次轉賬進行以下操作:
- 通過此外部Web服務轉移評估現金返還金額(
CashBackService
) - 在資料庫中執行匯款(
AccountService
) - 更新現金返還評估系統中的資料(
CashBackService
)
匯款操作如下:
public void withdrawMoney(UUID userId,int amountOfMoney){
synchronized(userId){
結果 result = externalCashBackService。evaluateCashBack(userId,amountOfMoney);
accountService。轉移(使用者id,amountOfMoney + 結果。getCashBackAmount());
externalCashBackService。cashBackComplete(使用者id,結果。getCashBackAmount());
}
}
應用程式的基本元件如下圖所示:
我試圖儘可能清楚地做出一個例子。支付服務中的資金轉移取決於其他兩項服務:
- 第一個是
CashBackService
與REST協議下的另一個(外部)Web應用程式互動。而且,為了計算實際的現金返還,我們需要與此應用程式同步事務。這是因為下一筆現金返還金額可能取決於使用者付款總額。 - 第二個是
AccountService
與內部資料庫通訊並存儲與其使用者帳戶相關的資料。在此服務中,我們可以使用JPA事務在資料庫中將某些操作作為原子操作。
在現實生活中,我強烈建議重構這樣的系統,以避免這種情況,如果可能的話。但在我們的例子中,想象一下我們別無選擇。
我們來看看這個應用程式的草案程式碼:
@服務
公共 類 PaymentService {
@Autowired
private ExternalCashBackService externalCashBackService ;
@Autowired
私人 AccountService 帳戶服務 ;
public void withdrawMoney(UUID userId,int amountOfMoney){
synchronized(userId){
結果 result = externalCashBackService。evaluateCashBack(userId,amountOfMoney);
accountService。轉移(使用者id,amountOfMoney + 結果。getCashBackAmount());
externalCashBackService。cashBackComplete(使用者id,結果。getCashBackAmount());
}
}
}
@服務
公共 類 ExternalCashBackService {
@Autowired
私人 RestTemplate restTemplate ;
public Result evaluateCashBack(UUID userId,int amountOfMoney){
return sendRestRequest(“evaluate”,userId,amountOfMoney);
}
public Result cashBackComplete(UUID userId,int cashBackAmount){
return sendRestRequest(“complete”,userId,cashBackAmount);
}
private Result sendRestRequest(String action,UUID userId,int value){
URI externalCashBackSystemUrl =
URI。create(“http://cash-back-system.org/api/” + action);
HttpHeaders headers = new HttpHeaders();
標題。集(“接受”,的MediaType。APPLICATION_JSON_VALUE);
RequestDto requestDto = new RequestDto(userId,value);
HttpEntity <?> request = new HttpEntity <>(requestDto,headers);
ResponseDto responseDto = restTemplate。exchange(externalCashBackSystemUrl,
HttpMethod。GET,
要求,
ResponseDto。課程)
。getBody();
返回 新的 結果(responseDto。的getStatus(),responseDto。的getValue());
}
}
@服務
公共 類 AccountService {
@Autowired
private AccountRepository accountRepository ;
@Transactional(隔離 = REPEATABLE_READ)
public void transfer(UUID userId,int amountOfMoney){
帳戶 account = accountRepository。getOne(userId);
帳戶。的setBalance(帳戶。所以getBalance()- amountOfMoney);
accountRepository。儲存(帳戶);
}
}
但是,您可以擁有多個具有相同值的物件(userId
在此示例中),但同步適用於物件的例項而不是其值。
下面的程式碼不能很好地工作。因為它不正確同步; 靜態工廠方法UUID.fromString(..)
在每次呼叫時都會生成UUID類的新例項,即使您傳遞了相等的字串引數。
因此,我們得到UUID
了相同鍵的不同例項。如果我們從多個執行緒執行此程式碼,那麼我們很有可能遇到同步問題:
public void threadA(){
paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),1000);
}
public void threadB(){
paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),5000);
}
在這種情況下,您需要為equals物件獲取相同的引用以在其上進行同步。
解決這個問題的錯誤方法
同步方法
你可以移動synchronized
一個方法:
public synchronized void withdrawMoney(UUID userId,int amountOfMoney){
..
}
該解決方案效能不佳。您將阻止絕對所有使用者的資金轉賬。如果您需要使用相同的金鑰同步不同類中的不同操作,則此解決方案根本不會對您有所幫助。
字串實習生
為了確保包含使用者ID的類的例項在所有同步塊中都是相同的,我們可以將它序列化為String並使用它String.intern()
來獲取equals字串的相同連結。
String.intern
使用全域性池來儲存被攔截的字串。當您在字串上請求實習生時,如果此類字串存在,則從此池中獲取引用,否則此字串將放入池中。
您可以String.intern
在The Java Language Specification - 3.10.5 String Literals或有關String.intern的Oracle Java文件中找到更多詳細資訊。
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(使用者id。的toString()。實習生()){
..
}
}
使用實習生不是一個好習慣,因為使用GC很難清理字串池。並且,您的應用程式可以通過主動使用來消耗太多資源 String.intern
。
此外,外部程式碼有可能在與應用程式相同的字串例項上同步。這可能導致死鎖。
一般來說,實習生的使用最好留給JDK的內部庫; Aleksey Shipilev有關於這個概念的好文章。
我們如何才能正確解決這個問題?
建立自己的同步原語
我們需要實現描述下一個圖的行為:
首先,我們需要建立一個新的同步原語 - 自定義互斥鎖。這將由變數的值起作用,而不是由物件的引用起作用。
它會像一個“命名的互斥體,” 但有點寬,與使用任何物品的價值鑑定,而不僅僅是一個字串的值的能力。您可以找到同步原語的示例,以便通過其他語言(C ++,C#)中的名稱進行鎖定。現在,我們將用Java解決這個問題。
解決方案看起來像這樣:
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(XMutex。的(使用者id)){
..
}
}
為了確保獲得相同的變數值相同的互斥鎖,我們將建立互斥鎖工廠。
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(XMutexFactory。得到(使用者id)){
..
}
}
public void purchase(UUID userId,int amountOfMoney,VendorDescription 供應商){
同步(XMutexFactory。得到(使用者id)){
..
}
}
為了使用相等的鍵在每個請求上返回相同的互斥鎖例項,我們需要儲存建立的互斥鎖。如果我們將這些互斥鎖儲存在簡單中HashMap
,那麼當新鍵出現時,地圖的大小將會增加。我們沒有工具來評估互斥鎖在任何地方都沒有使用的時間。
在這種情況下,我們可以使用它WeakReference
來儲存地圖中互斥鎖的引用,就在它使用時。為了實現這種行為,我們可以使用WeakHashMap
資料結構。幾個月前我寫了一篇關於這類參考文章的文章; 你可以在這裡更詳細地考慮它:Java中的Soft,Weak,Phantom References
我們的互斥工廠將以此為基礎WeakHashMap
。互斥鎖工廠會建立一個新的互斥鎖,如果value(key)
找不到 互斥鎖的話 HashMap
。然後,將建立的互斥鎖新增到 HashMap
。使用它WeakHashMap
允許我們HashMap
在存在任何對它的任何引用的同時儲存互斥 。並且,HashMap
當釋放所有對它的引用時,互斥鎖將自動從中刪除 。
我們需要使用同步版本WeakHashMap
; 讓我們看看文件中描述的內容:
此類未同步。可以構造同步的WeakHashMap
使用Collections.synchronizedMap方法。
這很難過,不久之後,我們會仔細研究一下原因。但是現在,讓我們考慮一下實現的例子,這是官方文件提出的(我的意思是使用 Collections.synchronizedMap
):
public final Map < XMutex < KeyT >,WeakReference < XMutex < KeyT >>> weakHashMap =
收藏。synchronizedMap(new WeakHashMap < XMutex < KeyT >,
WeakReference < XMutex < KeyT >>>());
public XMutex < KeyT > getMutex(KeyT key){
validateKey(key);
return getExist(key)
。orElseGet(()- > saveNewReference(key));
}
private 可選< XMutex < KeyT >> getExist(KeyT key){
return 可選。ofNullable(WeakHashMap中,得到(XMutex。的(關鍵)))
。map(WeakReference :: get);
}
private XMutex < KeyT > saveNewReference(KeyT key){
XMutex < KeyT > 互斥鎖 = XMutex。的(鍵);
WeakReference < XMutex < KeyT >> res = weakHashMap。put(互斥,新的 WeakReference <>(互斥));
如果(RES != 空 && 資源。獲得()!= 空){
返回 資源。get();
}
返回 互斥 ;
}
效能怎麼樣?
如果我們檢視程式碼Collections.synchronizedMap
,那麼我們會在全域性互斥上找到很多同步,這是與SynchronizedMap
例項配對建立的 。
SynchronizedMap(Map < K,V > m){
這個。m = 物體。requireNonNull(m);
互斥 = 這個 ;
}
並且所有其他方法 SynchronizedMap
都在互斥鎖上同步:
public int size(){
synchronized(互斥){ return m。size();}
}
public boolean containsKey(Object key){
synchronized(互斥){ return m。containsKey(key);}
}
public V get(Object key){
synchronized(互斥){ return m。得到(關鍵);}
}
public V put(K 鍵,V 值){
synchronized(互斥){ return m。put(key,value);}
}
public V remove(Object key){
synchronized(互斥){ return m。刪除(鍵);}
}
...
此解決方案沒有最佳效能。所有這些同步都會導致我們使用互斥鎖工廠對每個操作進行永久鎖定。
將WeakReference作為鍵的ConcurrentHashMap
我們需要看一下使用的ConcurrentHashMap
。它具有比Collections.synchronizedMap更好的效能。但我們有一個問題 - ConcurrentHashMap
不允許使用弱引用。這意味著垃圾收集器無法刪除未使用的互斥鎖。
我找到了兩種方法來解決這個問題:
- 首先是建立我自己的
ConcurrentMap
實現。這是正確的決定,但需要很長時間。 - 第二個是使用
ConcurrentReferenceHashMap
Spring Framework 中的實現。這是一個很好的實現,但它有一些細微差別。我們將在下面考慮它們。
讓我們改變 XMutexFactory
實現來使用ConcurrentReferenceHashMap
:
公共 類 XMutexFactory < KeyT > {
/ **
*使用預設設定建立互斥鎖工廠
* /
public XMutexFactory(){
這個。map = new ConcurrentReferenceHashMap <>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_REFERENCE_TYPE);
}
/ **
*通過鍵建立並返回互斥鎖。
*如果此鍵的互斥鎖已存在於弱對映中,
*然後返回互斥鎖的相同引用。
* /
public XMutex < KeyT > getMutex(KeyT key){
歸還 這個。地圖。compute(key,(k,v)- >(v == null)? new XMutex <>(k):v);
}
}
這很酷!
程式碼少,但效能比以前更多。我們試著檢查一下這個解決方案的效能。
建立一個簡單的基準
為了選擇實現,我做了一個小基準測試。
Map
測試中涉及三種實現 :
-
Collections.synchronizedMap
基於WeakHashMap
-
ConcurrentHashMap
-
ConcurrentReferenceHashMap
我使用 ConcurrentHashMap
in基準測試來比較測量。此實現不適合在互斥鎖的工廠中使用,因為它不支援使用弱引用或軟引用。
所有基準測試都是使用JMH庫編寫的。
# 執行 完成。總 時間:00:04:39
基準 模式 Cnt 評分 誤差 單位
ConcurrentMap。ConcurrentHashMap的 thrpt 5 0,015 ? 0,004 OPS / 納秒
ConcurrentMap。ConcurrentReferenceHashMap thrpt 5 0,008 ? 0,001 OPS / 納秒
ConcurrentMap。SynchronizedMap thrpt 5 0,005 ? 0,001 OPS / 納秒
ConcurrentMap。ConcurrentHashMap的 avgt 5 565,515 ? 23,638 納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap avgt 5 1098,939 ? 28,828 納秒/ 運算
ConcurrentMap。SynchronizedMap avgt 5 1503,593 ? 150,552 納秒/ 運算
ConcurrentMap。ConcurrentHashMap的 樣品 301796 663,330 ? 11,708 納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap 樣品 180062 1110,882 ? 6,928 納秒/ 運算
ConcurrentMap。SynchronizedMap 樣品 136290 1465,543 ? 5,150 納秒/ 運算
ConcurrentMap。的ConcurrentHashMap SS 5 336419,150 ? 617549,053 納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap SS 5 922844,750 ? 468380,489 納秒/ 運算
ConcurrentMap。SynchronizedMap SS 5 1199159,700 ? 4339391,394 納秒/ 運算
在這個微基準測試中,我建立了一個情況,當幾個執行緒計算地圖中的值。您可以在Concurrent Map基準測試中更詳細地考慮此基準測試的原始碼
把它放在圖表上:
因此,ConcurrentReferenceHashMap
在這種情況下使用它是 正確的。
XSync庫入門
我將此程式碼打包到XSync庫中,您可以將其用作變數值同步的現成解決方案。
為此,您需要新增下一個依賴項:
< 依賴>
< groupId > com.antkorwin </ groupId >
< artifactId > xsync </ artifactId >
< version > 1.1 </ version >
</ dependency >
然後,您可以建立XSync類的例項,以便在需要的型別上進行同步。對於Spring Framework,您可以將它們作為bean:
@豆
public XSync < UUID > xSync(){
返回 新的 XSync <>();
}
現在,您可以使用它:
@Autowired
私有 XSync < UUID > xSync ;
public void withdrawMoney(UUID userId,int amountOfMoney){
xSync。execute(userId,()- > {
結果 result = externalPolicySystem。validateTransfer(userId,amountOfMoney,WITHDRAW);
accountService。轉移(userId,amountOfMoney,WITHDRAW);
});
}
public void purchase(UUID userId,int amountOfMoney,VendorDescription 供應商){
xSync。execute(userId,()- > {
..
});
}
併發測試
為了確保這段程式碼執行良好,我寫了幾個併發測試。
有一個這樣的測試的例子:
public void testSyncBySingleKeyInConcurrency(){
//安排
XSync < UUID > xsync = new XSync <>();
String id = UUID。randomUUID()。toString();
NonAtomicInt var = new NonAtomicInt(0);
//這裡有一個魔力:
//我們建立了一個並行流並嘗試增加
//每個流中的相同非原子整數變數
IntStream。範圍(0,THREAD_CNT)
。盒裝()
。並行()
。的forEach(Ĵ - > XSYNC。執行(UUID。fromString(ID),VAR :: 增量));
//斷言
等待()。atMost(5,TIMEUNIT。SECONDS)
。直到(var :: getValue,equalTo(THREAD_CNT));
斷言。assertThat(VAR。的getValue())。isEqualTo(THREAD_CNT);
}
/ **
*執行不是執行緒安全的整數變數:
* /
@Getter
@AllArgsConstructor
私有 類 NonAtomicInt {
私有 int 值 ;
public int increment(){
返回 值++ ;
}
}
讓我們看看這個測試的結果: