1. 程式人生 > 其它 >(四)多執行緒下的生產者消費者

(四)多執行緒下的生產者消費者

本節列一下所有的管理類

1. 生產者消費者儲存分發的類(單例)

package com.joker;

import com.joker.manager.StorageManager;
import com.joker.task.CurrentTask;
import com.joker.task.QueueTask;
import com.joker.utils.FieldUtil;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.queues.SpmcArrayQueue;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.validation.constraints.NotNull;

/**
 * 任務儲存和分發者
 *
 * @author Joker
 * @since 2021/12/05
 */
public class StorageDispatcher {

	public MpmcArrayQueue<QueueTask> queue = new MpmcArrayQueue<>(4100);
	private JdbcTemplate jdbcTemplate;
	private final SpmcArrayQueue<CurrentTask> task = new SpmcArrayQueue<>(100);

	public void addTask(@NotNull JdbcTemplate jdbcTemplate, @NotNull CurrentTask... tasks) {
		if (tasks == null) {
			throw new RuntimeException("task can not be null!");
		} else {
			this.jdbcTemplate = jdbcTemplate;
			for (CurrentTask tempTask : tasks) {
				this.task.offer(tempTask);
			}
		}
	}

	public void clearTask() {
		this.task.clear();
	}

	public void popTask() {
		CurrentTask past = this.task.poll();
		StorageManager manager = StorageManager.getInstance();
		if (past != null) {
			this.jdbcTemplate.execute("insert into tb_schedule_api_result (id, api, expect, actual, execute_time, wrong_msg) values ('" + FieldUtil.getRandomId() + "', '" + past.getUri() + "', " + past.getTotalNum() + ", " + (past.getTotalNum() - manager.getFailed()) + ", '" + FieldUtil.now() + "', '" + FieldUtil.WrapText(manager.getWrongMsg().toString()) + "')");
		}
		manager.clear();
	}

	public CurrentTask getCurrentTask() {
		CurrentTask current = this.task.peek();
		return current == null ? null : current.clone();
	}

	private StorageDispatcher() {
	}

	public static StorageDispatcher getInstance() {
		return StorageDispatcher.StorageDispatcherHolder.INSTANCE;
	}

	private static class StorageDispatcherHolder {
		private static final StorageDispatcher INSTANCE = new StorageDispatcher();
	}
}

2. 網路請求管理介面

package com.joker.manager;

import reactor.core.Disposable;

/**
 * @author Joker
 * @since 2021/12/05
 */
public interface DisposableContainer {

	/**
	 * Adds a disposable to this container or disposes it if the
	 * container has been disposed.
	 *
	 * @param d the disposable to add, not null
	 * @return true if successful, false if this container has been disposed
	 */
	boolean add(Disposable d);

	/**
	 * Removes and disposes the given disposable if it is part of this
	 * container.
	 *
	 * @param d the disposable to remove and dispose, not null
	 */
	void remove(Disposable d);

	/**
	 * Removes (but does not dispose) the given disposable if it is part of this
	 * container.
	 *
	 * @param d the disposable to remove, not null
	 * @return true if the operation was successful
	 */
	boolean delete(Disposable d);
}

3. 網路請求管理

package com.joker.manager;

import com.joker.exception.CompositeException;
import com.joker.exception.ExceptionHelper;
import com.joker.exception.Exceptions;
import com.joker.exception.ObjectHelper;
import com.joker.utils.OpenHashSet;
import reactor.core.Disposable;
import reactor.util.annotation.NonNull;

import java.util.ArrayList;
import java.util.List;

/**
 * webclient 執行緒管理者
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class CompositeDisposable implements Disposable, DisposableContainer {

	OpenHashSet<Disposable> resources;

	volatile boolean disposed;


	/**
	 * Creates a CompositeDisposables with the given array of initial elements.
	 *
	 * @param disposables the array of Disposables to start with
	 * @throws NullPointerException if {@code disposables} or any of its array items is null
	 */
	public CompositeDisposable(@NonNull Disposable... disposables) {
		ObjectHelper.requireNonNull(disposables, "disposables is null");
		this.resources = new OpenHashSet<>(disposables.length + 1);
		for (Disposable d : disposables) {
			ObjectHelper.requireNonNull(d, "A Disposable in the disposables array is null");
			this.resources.add(d);
		}
	}

	@Override
	public void dispose() {
		if (disposed) {
			return;
		}
		OpenHashSet<Disposable> set;
		synchronized (this) {
			if (disposed) {
				return;
			}
			disposed = true;
			set = resources;
			resources = null;
		}

		dispose(set);
	}

	@Override
	public boolean isDisposed() {
		return disposed;
	}

	/**
	 * Adds a disposable to this container or disposes it if the
	 * container has been disposed.
	 *
	 * @param disposable the disposable to add, not null
	 * @return true if successful, false if this container has been disposed
	 * @throws NullPointerException if {@code disposable} is null
	 */
	@Override
	public boolean add(@NonNull Disposable disposable) {
		ObjectHelper.requireNonNull(disposable, "disposable is null");
		if (!disposed) {
			synchronized (this) {
				if (!disposed) {
					OpenHashSet<Disposable> set = resources;
					if (set == null) {
						set = new OpenHashSet<>();
						resources = set;
					}
					set.add(disposable);
					return true;
				}
			}
		}
		disposable.dispose();
		return false;
	}

	/**
	 * Atomically adds the given array of Disposables to the container or
	 * disposes them all if the container has been disposed.
	 *
	 * @param disposables the array of Disposables
	 * @return true if the operation was successful, false if the container has been disposed
	 * @throws NullPointerException if {@code disposables} or any of its array items is null
	 */
	@SuppressWarnings("unused")
	public boolean addAll(@NonNull Disposable... disposables) {
		ObjectHelper.requireNonNull(disposables, "disposables is null");
		if (!disposed) {
			synchronized (this) {
				if (!disposed) {
					OpenHashSet<Disposable> set = resources;
					if (set == null) {
						set = new OpenHashSet<>(disposables.length + 1);
						resources = set;
					}
					for (Disposable d : disposables) {
						ObjectHelper.requireNonNull(d, "A Disposable in the disposables array is null");
						set.add(d);
					}
					return true;
				}
			}
		}
		for (Disposable d : disposables) {
			d.dispose();
		}
		return false;
	}

	/**
	 * Removes and disposes the given disposable if it is part of this
	 * container.
	 *
	 * @param disposable the disposable to remove and dispose, not null
	 */
	@Override
	public void remove(@NonNull Disposable disposable) {
		if (delete(disposable)) {
			disposable.dispose();
		}
	}

	/**
	 * Removes (but does not dispose) the given disposable if it is part of this
	 * container.
	 *
	 * @param disposable the disposable to remove, not null
	 * @return true if the operation was successful
	 * @throws NullPointerException if {@code disposable} is null
	 */
	@Override
	public boolean delete(@NonNull Disposable disposable) {
		ObjectHelper.requireNonNull(disposable, "disposables is null");
		if (disposed) {
			return false;
		}
		synchronized (this) {
			if (disposed) {
				return false;
			}

			OpenHashSet<Disposable> set = resources;
			if (set == null || !set.remove(disposable)) {
				return false;
			}
		}
		return true;
	}

	/**
	 * Atomically clears the container, then disposes all the previously contained Disposables.
	 */
	@SuppressWarnings("unused")
	public void clear() {
		if (disposed) {
			return;
		}
		OpenHashSet<Disposable> set;
		synchronized (this) {
			if (disposed) {
				return;
			}
			set = resources;
			resources = null;
		}
		dispose(set);
	}

	/**
	 * Returns the number of currently held Disposables.
	 *
	 * @return the number of currently held Disposables
	 */
	public int size() {
		if (disposed) {
			return 0;
		}
		synchronized (this) {
			if (disposed) {
				return 0;
			}
			OpenHashSet<Disposable> set = resources;
			return set != null ? set.size() : 0;
		}
	}

	/**
	 * Dispose the contents of the OpenHashSet by suppressing non-fatal
	 * Throwables till the end.
	 *
	 * @param set the OpenHashSet to dispose elements of
	 */
	void dispose(OpenHashSet<Disposable> set) {
		if (set == null) {
			return;
		}
		List<Throwable> errors = null;
		Object[] array = set.keys();
		for (Object o : array) {
			if (o instanceof Disposable) {
				try {
					((Disposable) o).dispose();
				} catch (Throwable ex) {
					Exceptions.throwIfFatal(ex);
					if (errors == null) {
						errors = new ArrayList<>();
					}
					errors.add(ex);
				}
			}
		}
		if (errors != null) {
			if (errors.size() == 1) {
				throw ExceptionHelper.wrapOrThrow(errors.get(0));
			}
			throw new CompositeException(errors);
		}
	}
}

4. 鎖管理(單例)

package com.joker.manager;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 執行緒安全管理者
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class LockManager {
	private final ReentrantLock lock;
	private final Condition producerCondition;
	private final Condition consumerCondition;

	private LockManager() {
		this.lock = new ReentrantLock();
		this.producerCondition = this.lock.newCondition();
		this.consumerCondition = this.lock.newCondition();
	}

	public static LockManager getInstance() {
		return LockManager.LockManagerHolder.INSTANCE;
	}

	public void wakeUpAll(LockManager.CallBack callBack) {
		this.lock.lock();

		try {
			if (callBack != null) {
				callBack.doSomething();
			}

			this.producerCondition.signalAll();
			this.consumerCondition.signalAll();
		} catch (Exception var6) {
			var6.printStackTrace();
		} finally {
			this.lock.unlock();
		}

	}

	public void wakeUpProducer(LockManager.CallBack callBack) {
		this.lock.lock();

		try {
			if (callBack != null) {
				callBack.doSomething();
			}

			this.producerCondition.signalAll();
		} catch (Exception var6) {
			var6.printStackTrace();
		} finally {
			this.lock.unlock();
		}

	}

	public void wakeUpConsumer() {
		this.lock.lock();
		this.consumerCondition.signalAll();
		this.lock.unlock();
	}

	public void makeProducerWait(LockManager.CallBack callBack) {
		this.lock.lock();
		try {
			if (callBack != null) {
				callBack.doSomething();
			}
			this.producerCondition.await();
		} catch (InterruptedException var6) {
			var6.printStackTrace();
		} finally {
			this.lock.unlock();
		}

	}

	public void wakeUpConsumersAndWait() {
		this.lock.lock();

		try {
			this.consumerCondition.signalAll();
			this.producerCondition.await();
		} catch (InterruptedException var5) {
			var5.printStackTrace();
		} finally {
			this.lock.unlock();
		}

	}

	public void wakeUpConsumersAndWait(LockManager.CallBack callBack) {
		if (callBack != null) {
			callBack.doSomething();
		}

		this.wakeUpConsumersAndWait();
	}

	public void wakeUpProducersAndWait(LockManager.CallBack callBack) {
		this.lock.lock();

		try {
			if (callBack != null) {
				callBack.doSomething();
			}

			this.producerCondition.signalAll();
			this.consumerCondition.await();
		} catch (InterruptedException var6) {
			var6.printStackTrace();
		} finally {
			this.lock.unlock();
		}

	}

	public interface CallBack {
		void doSomething();
	}

	private static class LockManagerHolder {
		private static final LockManager INSTANCE = new LockManager();
	}
}

5. 關鍵資訊儲存管理(單例)

package com.joker.manager;

import com.joker.abs.LifeCycle.ThreadState;
import reactor.core.Disposable;
import reactor.util.annotation.NonNull;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 關鍵資訊儲存
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class StorageManager {

	private volatile boolean shouldSuspend = false;
	AtomicLong currentPage = new AtomicLong(0L);
	AtomicLong fail = new AtomicLong(0L);
	private StringBuffer wrongMsg = new StringBuffer();
	CompositeDisposable compositeDisposable = new CompositeDisposable();
	AtomicInteger producer = new AtomicInteger(0);
	AtomicInteger consumer = new AtomicInteger(0);
	ConcurrentHashMap<String, ThreadState> producers = new ConcurrentHashMap<>(40, 1F, 40);
	ConcurrentHashMap<String, ThreadState> consumers = new ConcurrentHashMap<>(40, 1F, 40);

	public String getState() {
		final long[] count0 = {0};
		final long[] count1 = {0};
		final long[] count2 = {0};
		final long[] count3 = {0};
		final long[] count4 = {0};
		final long[] count5 = {0};
		producers.keySet().forEach(s -> {
			switch (producers.get(s)) {
				case TERMINATE:
					count0[0]++;
					break;
				case INIT:
					count1[0]++;
					break;
				case RUNNING:
					count2[0]++;
					break;
				case WAITING:
					count3[0]++;
					break;
				case FAILED:
					count4[0]++;
					break;
				case COMPLETE:
					count5[0]++;
					break;
			}
		});
		return "TERMINATE:" + count0[0] + "個--INIT:" + count1[0] +
				"個--RUNNING:" + count2[0] + "個--WAITING:" + count3[0] +
				"個--FAILED:" + count4[0] + "個--COMPLETE:" + count5[0] + "個--";
	}

	public synchronized boolean isShouldSuspend() {
		return this.shouldSuspend;
	}

	public synchronized void shouldSuspend() {
		if (!isShouldSuspend()) {
			this.shouldSuspend = true;
		}
	}

	public void shouldNotSuspend() {
		if (isShouldSuspend()) {
			this.shouldSuspend = false;
		}
	}

	public void addDispose(@NonNull Disposable disposable) {
		compositeDisposable.add(disposable);
	}

	public void clearDispose() {
		compositeDisposable.dispose();
		compositeDisposable = new CompositeDisposable();
	}

	public void failIncrementAndGet() {
		this.fail.incrementAndGet();
	}

	public long getFailed() {
		return this.fail.get();
	}

	public int currentProducerIncrementAndGet() {
		return producer.incrementAndGet();
	}

	public int currentConsumerIncrementAndGet() {
		return consumer.incrementAndGet();
	}

	public long currentPageIncrementAndGet() {
		return this.currentPage.incrementAndGet();
	}

	public long getCurrentPage() {
		return this.currentPage.get();
	}

	public void clear() {
		currentPage.set(0L);
		fail.set(0L);
		producer.set(0);
		consumer.set(0);
		wrongMsg = new StringBuffer();
	}

	public StringBuffer getWrongMsg() {
		return this.wrongMsg;
	}

	public void addWrongMsg(String msg) {
		if (msg != null) {
			this.wrongMsg.append(msg);
		}

	}

	public void addProducer(String producer) {
		if (producer != null) {
			this.producers.put(producer, ThreadState.INIT);
		}

	}

	public void addConsumer(String consumer) {
		if (consumer != null) {
			this.consumers.put(consumer, ThreadState.INIT);
		}

	}

	public void setProducer(String key, ThreadState value) {
		this.producers.replace(key, value);
		if (value == ThreadState.WAITING && this.allProducerNotWorking() && this.allConsumerNotWorking()) {
			this.runAllWaitingConsumer();
		}

	}

	public void setConsumer(String key, ThreadState value) {
		this.consumers.replace(key, value);
		if (value == ThreadState.WAITING && this.allConsumerNotWorking() && this.allProducerNotWorking()) {
			this.runAllWaitingProducer();
		}

	}

	public boolean allProducerFinish() {
		return this.producers.entrySet().stream().allMatch((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() == ThreadState.INIT
						|| threadThreadStateEntry.getValue() == ThreadState.FAILED
						|| threadThreadStateEntry.getValue() == ThreadState.TERMINATE);
	}

	public boolean allProducerNotWorking() {
		return this.producers.entrySet().stream().noneMatch((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() == ThreadState.RUNNING);
	}

	public boolean allConsumerNotWorking() {
		return this.producers.entrySet().stream().noneMatch((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() != ThreadState.RUNNING);
	}

	public void runAllWaiting() {
		this.runAllWaitingProducer();
		this.runAllWaitingConsumer();
	}

	public void runAllWaitingProducer() {
		this.producers.entrySet().stream().filter((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() != ThreadState.WAITING).forEach((threadThreadStateEntry) -> {
			if (threadThreadStateEntry.getValue() != ThreadState.TERMINATE) {
				this.producers.replace(threadThreadStateEntry.getKey(), ThreadState.RUNNING);
			}
		});
	}

	public void runAllWaitingConsumer() {
		this.consumers.entrySet().stream().filter((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() != ThreadState.WAITING).forEach((threadThreadStateEntry) -> {
			if (threadThreadStateEntry.getValue() != ThreadState.TERMINATE) {
				this.consumers.replace(threadThreadStateEntry.getKey(), ThreadState.RUNNING);
			}
		});
	}

	public boolean onlyCurrentProducerWorking(String currentThread) {
		long count = this.producers.entrySet().stream().filter((threadThreadStateEntry) ->
				threadThreadStateEntry.getValue() == ThreadState.RUNNING).count();
		return count == 1L && this.producers.get(currentThread) == ThreadState.RUNNING;
	}


	private StorageManager() {
	}

	public static StorageManager getInstance() {
		return StorageManager.StorageManagerHolder.INSTANCE;
	}

	private static class StorageManagerHolder {
		private static final StorageManager INSTANCE = new StorageManager();
	}
}

6. 執行緒管理者(單例)

package com.joker.manager;

import com.joker.abs.AbstractQueuedConsumer;
import com.joker.abs.AbstractQueuedProducer;
import com.joker.abs.LifeCycle;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 執行緒管理者
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class ThreadManager implements Thread.UncaughtExceptionHandler {

	private ThreadPoolExecutor executorServiceP;
	private ThreadPoolExecutor executorServiceC;
	private Timer timer2 = new Timer();

	public void needStart(Prepare prepare) {
		needProduceStop();
		needConsumerStop();
		int producer = 30;
		int consumer = 5;
		List<AbstractQueuedProducer> producerList = prepare.getProducer(producer);
		List<AbstractQueuedConsumer> consumerList = prepare.getConsumers(consumer);
		executorServiceP = (ThreadPoolExecutor) Executors.newFixedThreadPool(producer);
		executorServiceC = (ThreadPoolExecutor) Executors.newFixedThreadPool(consumer);
		for (AbstractQueuedProducer abstractQueuedProducer : producerList) {
			abstractQueuedProducer.setUncaughtExceptionHandler(ThreadManager.getInstance());
			executorServiceP.execute(abstractQueuedProducer);
		}
		for (AbstractQueuedConsumer abstractQueuedConsumer : consumerList) {
			abstractQueuedConsumer.setUncaughtExceptionHandler(ThreadManager.getInstance());
			executorServiceC.execute(abstractQueuedConsumer);
		}

		timer2.cancel();
		timer2 = new Timer();
		timer2.schedule(new TimerTask() {
			public void run() {
				System.out.println("pool-produce-狀態:" + executorServiceP.isShutdown() + "--活躍數量:" + executorServiceP.getActiveCount() + "--總數量:" + executorServiceP.getTaskCount() + "pool-consumer-狀態:" + executorServiceC.isShutdown() + "--活躍數量:" + executorServiceC.getActiveCount() + "--總數量:" + executorServiceC.getTaskCount());
			}
		}, 0L, 50L);
	}

	public void needProduceStop() {
		if (executorServiceP != null && !executorServiceP.isShutdown()) {
			executorServiceP.shutdown();
			StorageManager.getInstance().clearDispose();
		}
	}

	public ThreadPoolExecutor getExecutorServiceProducer() {
		return executorServiceP;
	}

	public boolean shouldStop() {
		return executorServiceP.isShutdown() && !executorServiceC.isShutdown();
	}

	public void needConsumerStop() {
		if (executorServiceC != null && !executorServiceC.isShutdown()) {
			executorServiceC.shutdown();
		}
	}


	@Override
	public void uncaughtException(Thread t, Throwable e) {
		StorageManager.getInstance().setProducer(t.getName(), LifeCycle.ThreadState.FAILED);
		StorageManager.getInstance().addWrongMsg("thread:" + t.getName() + "---異常:" + e.getMessage());
	}

	private ThreadManager() {
	}

	public static ThreadManager getInstance() {
		return ThreadManager.ThreadManagerHolder.INSTANCE;
	}

	private static class ThreadManagerHolder {
		private final static ThreadManager INSTANCE = new ThreadManager();
	}

	public interface Prepare {
		List<AbstractQueuedProducer> getProducer(int size);

		List<AbstractQueuedConsumer> getConsumers(int size);
	}
}

本節完畢