(四)多執行緒下的生產者消費者
阿新 • • 發佈:2021-12-06
本節列一下所有的管理類
1. 生產者消費者儲存分發的類(單例)
package com.joker; import com.joker.manager.StorageManager; import com.joker.task.CurrentTask; import com.joker.task.QueueTask; import com.joker.utils.FieldUtil; import org.jctools.queues.MpmcArrayQueue; import org.jctools.queues.SpmcArrayQueue; import org.springframework.jdbc.core.JdbcTemplate; import javax.validation.constraints.NotNull; /** * 任務儲存和分發者 * * @author Joker * @since 2021/12/05 */ public class StorageDispatcher { public MpmcArrayQueue<QueueTask> queue = new MpmcArrayQueue<>(4100); private JdbcTemplate jdbcTemplate; private final SpmcArrayQueue<CurrentTask> task = new SpmcArrayQueue<>(100); public void addTask(@NotNull JdbcTemplate jdbcTemplate, @NotNull CurrentTask... tasks) { if (tasks == null) { throw new RuntimeException("task can not be null!"); } else { this.jdbcTemplate = jdbcTemplate; for (CurrentTask tempTask : tasks) { this.task.offer(tempTask); } } } public void clearTask() { this.task.clear(); } public void popTask() { CurrentTask past = this.task.poll(); StorageManager manager = StorageManager.getInstance(); if (past != null) { this.jdbcTemplate.execute("insert into tb_schedule_api_result (id, api, expect, actual, execute_time, wrong_msg) values ('" + FieldUtil.getRandomId() + "', '" + past.getUri() + "', " + past.getTotalNum() + ", " + (past.getTotalNum() - manager.getFailed()) + ", '" + FieldUtil.now() + "', '" + FieldUtil.WrapText(manager.getWrongMsg().toString()) + "')"); } manager.clear(); } public CurrentTask getCurrentTask() { CurrentTask current = this.task.peek(); return current == null ? null : current.clone(); } private StorageDispatcher() { } public static StorageDispatcher getInstance() { return StorageDispatcher.StorageDispatcherHolder.INSTANCE; } private static class StorageDispatcherHolder { private static final StorageDispatcher INSTANCE = new StorageDispatcher(); } }
2. 網路請求管理介面
package com.joker.manager; import reactor.core.Disposable; /** * @author Joker * @since 2021/12/05 */ public interface DisposableContainer { /** * Adds a disposable to this container or disposes it if the * container has been disposed. * * @param d the disposable to add, not null * @return true if successful, false if this container has been disposed */ boolean add(Disposable d); /** * Removes and disposes the given disposable if it is part of this * container. * * @param d the disposable to remove and dispose, not null */ void remove(Disposable d); /** * Removes (but does not dispose) the given disposable if it is part of this * container. * * @param d the disposable to remove, not null * @return true if the operation was successful */ boolean delete(Disposable d); }
3. 網路請求管理
package com.joker.manager; import com.joker.exception.CompositeException; import com.joker.exception.ExceptionHelper; import com.joker.exception.Exceptions; import com.joker.exception.ObjectHelper; import com.joker.utils.OpenHashSet; import reactor.core.Disposable; import reactor.util.annotation.NonNull; import java.util.ArrayList; import java.util.List; /** * webclient 執行緒管理者 * * @author Joker * @since 2021/12/05 */ public final class CompositeDisposable implements Disposable, DisposableContainer { OpenHashSet<Disposable> resources; volatile boolean disposed; /** * Creates a CompositeDisposables with the given array of initial elements. * * @param disposables the array of Disposables to start with * @throws NullPointerException if {@code disposables} or any of its array items is null */ public CompositeDisposable(@NonNull Disposable... disposables) { ObjectHelper.requireNonNull(disposables, "disposables is null"); this.resources = new OpenHashSet<>(disposables.length + 1); for (Disposable d : disposables) { ObjectHelper.requireNonNull(d, "A Disposable in the disposables array is null"); this.resources.add(d); } } @Override public void dispose() { if (disposed) { return; } OpenHashSet<Disposable> set; synchronized (this) { if (disposed) { return; } disposed = true; set = resources; resources = null; } dispose(set); } @Override public boolean isDisposed() { return disposed; } /** * Adds a disposable to this container or disposes it if the * container has been disposed. * * @param disposable the disposable to add, not null * @return true if successful, false if this container has been disposed * @throws NullPointerException if {@code disposable} is null */ @Override public boolean add(@NonNull Disposable disposable) { ObjectHelper.requireNonNull(disposable, "disposable is null"); if (!disposed) { synchronized (this) { if (!disposed) { OpenHashSet<Disposable> set = resources; if (set == null) { set = new OpenHashSet<>(); resources = set; } set.add(disposable); return true; } } } disposable.dispose(); return false; } /** * Atomically adds the given array of Disposables to the container or * disposes them all if the container has been disposed. * * @param disposables the array of Disposables * @return true if the operation was successful, false if the container has been disposed * @throws NullPointerException if {@code disposables} or any of its array items is null */ @SuppressWarnings("unused") public boolean addAll(@NonNull Disposable... disposables) { ObjectHelper.requireNonNull(disposables, "disposables is null"); if (!disposed) { synchronized (this) { if (!disposed) { OpenHashSet<Disposable> set = resources; if (set == null) { set = new OpenHashSet<>(disposables.length + 1); resources = set; } for (Disposable d : disposables) { ObjectHelper.requireNonNull(d, "A Disposable in the disposables array is null"); set.add(d); } return true; } } } for (Disposable d : disposables) { d.dispose(); } return false; } /** * Removes and disposes the given disposable if it is part of this * container. * * @param disposable the disposable to remove and dispose, not null */ @Override public void remove(@NonNull Disposable disposable) { if (delete(disposable)) { disposable.dispose(); } } /** * Removes (but does not dispose) the given disposable if it is part of this * container. * * @param disposable the disposable to remove, not null * @return true if the operation was successful * @throws NullPointerException if {@code disposable} is null */ @Override public boolean delete(@NonNull Disposable disposable) { ObjectHelper.requireNonNull(disposable, "disposables is null"); if (disposed) { return false; } synchronized (this) { if (disposed) { return false; } OpenHashSet<Disposable> set = resources; if (set == null || !set.remove(disposable)) { return false; } } return true; } /** * Atomically clears the container, then disposes all the previously contained Disposables. */ @SuppressWarnings("unused") public void clear() { if (disposed) { return; } OpenHashSet<Disposable> set; synchronized (this) { if (disposed) { return; } set = resources; resources = null; } dispose(set); } /** * Returns the number of currently held Disposables. * * @return the number of currently held Disposables */ public int size() { if (disposed) { return 0; } synchronized (this) { if (disposed) { return 0; } OpenHashSet<Disposable> set = resources; return set != null ? set.size() : 0; } } /** * Dispose the contents of the OpenHashSet by suppressing non-fatal * Throwables till the end. * * @param set the OpenHashSet to dispose elements of */ void dispose(OpenHashSet<Disposable> set) { if (set == null) { return; } List<Throwable> errors = null; Object[] array = set.keys(); for (Object o : array) { if (o instanceof Disposable) { try { ((Disposable) o).dispose(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); if (errors == null) { errors = new ArrayList<>(); } errors.add(ex); } } } if (errors != null) { if (errors.size() == 1) { throw ExceptionHelper.wrapOrThrow(errors.get(0)); } throw new CompositeException(errors); } } }
4. 鎖管理(單例)
package com.joker.manager;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 執行緒安全管理者
*
* @author Joker
* @since 2021/12/05
*/
public final class LockManager {
private final ReentrantLock lock;
private final Condition producerCondition;
private final Condition consumerCondition;
private LockManager() {
this.lock = new ReentrantLock();
this.producerCondition = this.lock.newCondition();
this.consumerCondition = this.lock.newCondition();
}
public static LockManager getInstance() {
return LockManager.LockManagerHolder.INSTANCE;
}
public void wakeUpAll(LockManager.CallBack callBack) {
this.lock.lock();
try {
if (callBack != null) {
callBack.doSomething();
}
this.producerCondition.signalAll();
this.consumerCondition.signalAll();
} catch (Exception var6) {
var6.printStackTrace();
} finally {
this.lock.unlock();
}
}
public void wakeUpProducer(LockManager.CallBack callBack) {
this.lock.lock();
try {
if (callBack != null) {
callBack.doSomething();
}
this.producerCondition.signalAll();
} catch (Exception var6) {
var6.printStackTrace();
} finally {
this.lock.unlock();
}
}
public void wakeUpConsumer() {
this.lock.lock();
this.consumerCondition.signalAll();
this.lock.unlock();
}
public void makeProducerWait(LockManager.CallBack callBack) {
this.lock.lock();
try {
if (callBack != null) {
callBack.doSomething();
}
this.producerCondition.await();
} catch (InterruptedException var6) {
var6.printStackTrace();
} finally {
this.lock.unlock();
}
}
public void wakeUpConsumersAndWait() {
this.lock.lock();
try {
this.consumerCondition.signalAll();
this.producerCondition.await();
} catch (InterruptedException var5) {
var5.printStackTrace();
} finally {
this.lock.unlock();
}
}
public void wakeUpConsumersAndWait(LockManager.CallBack callBack) {
if (callBack != null) {
callBack.doSomething();
}
this.wakeUpConsumersAndWait();
}
public void wakeUpProducersAndWait(LockManager.CallBack callBack) {
this.lock.lock();
try {
if (callBack != null) {
callBack.doSomething();
}
this.producerCondition.signalAll();
this.consumerCondition.await();
} catch (InterruptedException var6) {
var6.printStackTrace();
} finally {
this.lock.unlock();
}
}
public interface CallBack {
void doSomething();
}
private static class LockManagerHolder {
private static final LockManager INSTANCE = new LockManager();
}
}
5. 關鍵資訊儲存管理(單例)
package com.joker.manager;
import com.joker.abs.LifeCycle.ThreadState;
import reactor.core.Disposable;
import reactor.util.annotation.NonNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* 關鍵資訊儲存
*
* @author Joker
* @since 2021/12/05
*/
public final class StorageManager {
private volatile boolean shouldSuspend = false;
AtomicLong currentPage = new AtomicLong(0L);
AtomicLong fail = new AtomicLong(0L);
private StringBuffer wrongMsg = new StringBuffer();
CompositeDisposable compositeDisposable = new CompositeDisposable();
AtomicInteger producer = new AtomicInteger(0);
AtomicInteger consumer = new AtomicInteger(0);
ConcurrentHashMap<String, ThreadState> producers = new ConcurrentHashMap<>(40, 1F, 40);
ConcurrentHashMap<String, ThreadState> consumers = new ConcurrentHashMap<>(40, 1F, 40);
public String getState() {
final long[] count0 = {0};
final long[] count1 = {0};
final long[] count2 = {0};
final long[] count3 = {0};
final long[] count4 = {0};
final long[] count5 = {0};
producers.keySet().forEach(s -> {
switch (producers.get(s)) {
case TERMINATE:
count0[0]++;
break;
case INIT:
count1[0]++;
break;
case RUNNING:
count2[0]++;
break;
case WAITING:
count3[0]++;
break;
case FAILED:
count4[0]++;
break;
case COMPLETE:
count5[0]++;
break;
}
});
return "TERMINATE:" + count0[0] + "個--INIT:" + count1[0] +
"個--RUNNING:" + count2[0] + "個--WAITING:" + count3[0] +
"個--FAILED:" + count4[0] + "個--COMPLETE:" + count5[0] + "個--";
}
public synchronized boolean isShouldSuspend() {
return this.shouldSuspend;
}
public synchronized void shouldSuspend() {
if (!isShouldSuspend()) {
this.shouldSuspend = true;
}
}
public void shouldNotSuspend() {
if (isShouldSuspend()) {
this.shouldSuspend = false;
}
}
public void addDispose(@NonNull Disposable disposable) {
compositeDisposable.add(disposable);
}
public void clearDispose() {
compositeDisposable.dispose();
compositeDisposable = new CompositeDisposable();
}
public void failIncrementAndGet() {
this.fail.incrementAndGet();
}
public long getFailed() {
return this.fail.get();
}
public int currentProducerIncrementAndGet() {
return producer.incrementAndGet();
}
public int currentConsumerIncrementAndGet() {
return consumer.incrementAndGet();
}
public long currentPageIncrementAndGet() {
return this.currentPage.incrementAndGet();
}
public long getCurrentPage() {
return this.currentPage.get();
}
public void clear() {
currentPage.set(0L);
fail.set(0L);
producer.set(0);
consumer.set(0);
wrongMsg = new StringBuffer();
}
public StringBuffer getWrongMsg() {
return this.wrongMsg;
}
public void addWrongMsg(String msg) {
if (msg != null) {
this.wrongMsg.append(msg);
}
}
public void addProducer(String producer) {
if (producer != null) {
this.producers.put(producer, ThreadState.INIT);
}
}
public void addConsumer(String consumer) {
if (consumer != null) {
this.consumers.put(consumer, ThreadState.INIT);
}
}
public void setProducer(String key, ThreadState value) {
this.producers.replace(key, value);
if (value == ThreadState.WAITING && this.allProducerNotWorking() && this.allConsumerNotWorking()) {
this.runAllWaitingConsumer();
}
}
public void setConsumer(String key, ThreadState value) {
this.consumers.replace(key, value);
if (value == ThreadState.WAITING && this.allConsumerNotWorking() && this.allProducerNotWorking()) {
this.runAllWaitingProducer();
}
}
public boolean allProducerFinish() {
return this.producers.entrySet().stream().allMatch((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() == ThreadState.INIT
|| threadThreadStateEntry.getValue() == ThreadState.FAILED
|| threadThreadStateEntry.getValue() == ThreadState.TERMINATE);
}
public boolean allProducerNotWorking() {
return this.producers.entrySet().stream().noneMatch((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() == ThreadState.RUNNING);
}
public boolean allConsumerNotWorking() {
return this.producers.entrySet().stream().noneMatch((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() != ThreadState.RUNNING);
}
public void runAllWaiting() {
this.runAllWaitingProducer();
this.runAllWaitingConsumer();
}
public void runAllWaitingProducer() {
this.producers.entrySet().stream().filter((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() != ThreadState.WAITING).forEach((threadThreadStateEntry) -> {
if (threadThreadStateEntry.getValue() != ThreadState.TERMINATE) {
this.producers.replace(threadThreadStateEntry.getKey(), ThreadState.RUNNING);
}
});
}
public void runAllWaitingConsumer() {
this.consumers.entrySet().stream().filter((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() != ThreadState.WAITING).forEach((threadThreadStateEntry) -> {
if (threadThreadStateEntry.getValue() != ThreadState.TERMINATE) {
this.consumers.replace(threadThreadStateEntry.getKey(), ThreadState.RUNNING);
}
});
}
public boolean onlyCurrentProducerWorking(String currentThread) {
long count = this.producers.entrySet().stream().filter((threadThreadStateEntry) ->
threadThreadStateEntry.getValue() == ThreadState.RUNNING).count();
return count == 1L && this.producers.get(currentThread) == ThreadState.RUNNING;
}
private StorageManager() {
}
public static StorageManager getInstance() {
return StorageManager.StorageManagerHolder.INSTANCE;
}
private static class StorageManagerHolder {
private static final StorageManager INSTANCE = new StorageManager();
}
}
6. 執行緒管理者(單例)
package com.joker.manager;
import com.joker.abs.AbstractQueuedConsumer;
import com.joker.abs.AbstractQueuedProducer;
import com.joker.abs.LifeCycle;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 執行緒管理者
*
* @author Joker
* @since 2021/12/05
*/
public final class ThreadManager implements Thread.UncaughtExceptionHandler {
private ThreadPoolExecutor executorServiceP;
private ThreadPoolExecutor executorServiceC;
private Timer timer2 = new Timer();
public void needStart(Prepare prepare) {
needProduceStop();
needConsumerStop();
int producer = 30;
int consumer = 5;
List<AbstractQueuedProducer> producerList = prepare.getProducer(producer);
List<AbstractQueuedConsumer> consumerList = prepare.getConsumers(consumer);
executorServiceP = (ThreadPoolExecutor) Executors.newFixedThreadPool(producer);
executorServiceC = (ThreadPoolExecutor) Executors.newFixedThreadPool(consumer);
for (AbstractQueuedProducer abstractQueuedProducer : producerList) {
abstractQueuedProducer.setUncaughtExceptionHandler(ThreadManager.getInstance());
executorServiceP.execute(abstractQueuedProducer);
}
for (AbstractQueuedConsumer abstractQueuedConsumer : consumerList) {
abstractQueuedConsumer.setUncaughtExceptionHandler(ThreadManager.getInstance());
executorServiceC.execute(abstractQueuedConsumer);
}
timer2.cancel();
timer2 = new Timer();
timer2.schedule(new TimerTask() {
public void run() {
System.out.println("pool-produce-狀態:" + executorServiceP.isShutdown() + "--活躍數量:" + executorServiceP.getActiveCount() + "--總數量:" + executorServiceP.getTaskCount() + "pool-consumer-狀態:" + executorServiceC.isShutdown() + "--活躍數量:" + executorServiceC.getActiveCount() + "--總數量:" + executorServiceC.getTaskCount());
}
}, 0L, 50L);
}
public void needProduceStop() {
if (executorServiceP != null && !executorServiceP.isShutdown()) {
executorServiceP.shutdown();
StorageManager.getInstance().clearDispose();
}
}
public ThreadPoolExecutor getExecutorServiceProducer() {
return executorServiceP;
}
public boolean shouldStop() {
return executorServiceP.isShutdown() && !executorServiceC.isShutdown();
}
public void needConsumerStop() {
if (executorServiceC != null && !executorServiceC.isShutdown()) {
executorServiceC.shutdown();
}
}
@Override
public void uncaughtException(Thread t, Throwable e) {
StorageManager.getInstance().setProducer(t.getName(), LifeCycle.ThreadState.FAILED);
StorageManager.getInstance().addWrongMsg("thread:" + t.getName() + "---異常:" + e.getMessage());
}
private ThreadManager() {
}
public static ThreadManager getInstance() {
return ThreadManager.ThreadManagerHolder.INSTANCE;
}
private static class ThreadManagerHolder {
private final static ThreadManager INSTANCE = new ThreadManager();
}
public interface Prepare {
List<AbstractQueuedProducer> getProducer(int size);
List<AbstractQueuedConsumer> getConsumers(int size);
}
}
本節完畢