Java併發任務處理之Executor執行緒池
阿新 • • 發佈:2018-12-28
乾貨
import org.junit.After;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
private ExecutorService executorService;
/**
* 監聽至所有執行緒結束
*/
@After
public void terminated() {
executorService. shutdown();
while(true) {
if(executorService.isTerminated()) {
return;
} else {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
*/
@Test
public void testCachedThreadPool() {
executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Runing...");
}
});
}
}
/**
* 定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
*/
@Test
public void testFixedThreadPool() {
executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Runing...");
}
});
}
}
/**
* 單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
*/
@Test
public void testSingleThreadExecutor() {
executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 50; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Runing...");
}
});
}
}
}
應用場景
現有一業務,對資料庫內使用者賬戶資訊更新,需要遍歷10k+Ethereum地址,並訪問geth客戶端,獲取地址內的餘額。專案內使用QuartzJob定時執行該這一業務,程式碼:
public class BalanceJob extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(BalanceJob.class);
@Autowired
private AccountMapper accountMapper;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("==>Update balance {}", new Date());
long start = System.currentTimeMillis();
AccountExample accountExample = new AccountExample();
AccountExample.Criteria accountExampleCriteria = accountExample.createCriteria();
accountExampleCriteria.andIsDeleteEqualTo(false);
List<Account> accountList = accountMapper.selectByExample(accountExample);
List<Account> recordList = new ArrayList<>(accountList.size());
logger.info("==>record amount {}", accountList.size());
WalletUtil.getWeb3j();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
accountList.forEach(account -> {
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
Crypto crypto = account.getCrypto();
Coin coin = account.getCoin();
if (null == crypto || null == coin) {
return;
}
BigDecimal balanceOri = account.getBalance();
String address = crypto.getAddress();
String contract = coin.getContract();
String symbol = coin.getSymbol();
int decimals = coin.getDecimals();
BigDecimal balance = BigDecimal.ZERO;
if ("ETH".equalsIgnoreCase(symbol)) {
try {
balance = WalletUtil.getBalance(address);
} catch (Exception e) {
e.printStackTrace();
}
} else if (StringUtils.isNotBlank(contract)) {
try {
balance = WalletUtil.getBalance(address, contract, decimals);
} catch (Exception e) {
e.printStackTrace();
}
} else {
logger.error("==>Coin is not exist : ", symbol);
return;
}
if (BigDecimal.ZERO.compareTo(balance) != 0 && balance.compareTo(balanceOri) != 0) {
Account record = new Account.Builder().id(account.getId()).balance(balance).build();
recordList.add(record);
}
}
});
});
cachedThreadPool.shutdown();
while (true) {
if (cachedThreadPool.isTerminated()) {
break;
} else {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (recordList.size() > 0) {
accountMapper.batchUpdateBalanceById(recordList);
}
long end = System.currentTimeMillis();
logger.info("<==Update balance end. took {}ms", end - start);
}
}
使用CachedThreadPool處理getBalance方法。優勢:16k條記錄可以在6000ms左右處理完畢;劣勢:與需求“與生俱來”,增加以太坊RPC客戶端負荷。