1. 程式人生 > 其它 >(三)多執行緒下的生產者消費者

(三)多執行緒下的生產者消費者

本節列一下所需的bean和工具類

1. 請求介面對應的實體類

package com.joker.result;

import java.io.Serializable;

/**
 * @author Joker
 * @since 2021/12/05
 */
public class Result implements Serializable {
	String data;
	Long errCode;
	String requestId;
	String errMsg;
	String apiLog;

	'''
	getter
	setter
	'''
}

2. 請求介面對應的實體類

package com.joker.result;

/**
 * @author Joker
 * @since 2021/12/05
 */
public class Data {

	private Long totalNum;
	private Long pageSize;
	private Long pageNum;
	private String rows;

	'''
	getter
	setter
	'''
}

3. 單產單消佇列

package com.joker.queue;

import org.jctools.queues.SpscArrayQueue;

/**
 * 一個可以克隆的單產單消佇列
 *
 * @author Joker
 * @since 2021/12/05
 */
public class SpscQueue<T> extends SpscArrayQueue<T> implements Cloneable {

	public SpscQueue(int capacity) {
		super(capacity);
	}

	@SuppressWarnings({"unchecked", "rawtypes"})
	@Override
	public SpscQueue<T> clone() {
		try {
			return (SpscQueue) super.clone();
		} catch (CloneNotSupportedException var2) {
			throw new AssertionError();
		}
	}
}

4. 當前正在執行的任務

package com.joker.task;

import com.joker.factory.TaskFactory;

import javax.validation.constraints.NotNull;

/**
 * 當前正在執行的任務(建造者模式)
 *
 * @author Joker
 * @since 2021/12/05
 */
public class CurrentTask implements Cloneable {
	private final String uri;
	private final TaskFactory taskFactory;
	private final long totalNum;
	private final long totalPage;
	private final int pageSize;

	public CurrentTask(String uri, TaskFactory taskFactory, long totalNum, long totalPage, int pageSize) {
		this.uri = uri;
		this.taskFactory = taskFactory;
		this.totalNum = totalNum;
		this.totalPage = totalPage;
		this.pageSize = pageSize;
	}

	CurrentTask(CurrentTask.CurrentTaskBuilder builder) {
		this.uri = builder.uri;
		this.taskFactory = builder.taskFactory;
		this.totalNum = builder.totalNum;
		this.totalPage = builder.totalPage;
		this.pageSize = builder.pageSize;
	}

	'''
	getter
	'''

	@Override
	public CurrentTask clone() {
		try {
			return (CurrentTask) super.clone();
		} catch (CloneNotSupportedException var2) {
			throw new AssertionError();
		}
	}

	public static class CurrentTaskBuilder {
		private String uri;
		private TaskFactory taskFactory;
		private long totalNum;
		private long totalPage;
		private int pageSize;

		public CurrentTask.CurrentTaskBuilder setUri(@NotNull String uri) {
			this.uri = uri;
			return this;
		}

		public CurrentTask.CurrentTaskBuilder setTaskFactory(TaskFactory taskFactory) {
			this.taskFactory = taskFactory;
			return this;
		}

		public CurrentTask.CurrentTaskBuilder setTotalNum(long totalNum) {
			this.totalNum = totalNum;
			return this;
		}

		public CurrentTask.CurrentTaskBuilder setTotalPage(long totalPage) {
			this.totalPage = totalPage;
			return this;
		}

		public CurrentTask.CurrentTaskBuilder setPageSize(int pageSize) {
			this.pageSize = pageSize;
			return this;
		}

		public CurrentTask build() {
			this.checkNotNull();
			return new CurrentTask(this);
		}

		void checkNotNull() {
			if (this.uri == null || this.taskFactory == null || this.totalNum == 0L || this.totalPage == 0L || this.pageSize == 0) {
				throw new RuntimeException("Something can not be empty");
			}
		}
	}
}

5. 當前正在執行的任務(工廠模式)

package com.joker.task;

import com.joker.factory.TaskFactory;

/**
 * 當前正在執行的任務(工廠模式)
 *
 * @author Joker
 * @since 2021/12/05
 */
public class QueueTask {
	private final TaskFactory factory;
	private final String[] parameters;

	public QueueTask(TaskFactory factory, String[] parameters) {
		this.factory = factory;
		this.parameters = parameters;
	}

	public TaskFactory getFactory() {
		return this.factory;
	}

	public String[] getParameters() {
		return this.parameters;
	}
}

6. 以下工具類大部分來自RxJava,其中有精簡部分

①. CompositeException

package com.joker.exception;

import reactor.util.annotation.NonNull;

import java.io.PrintStream;
import java.util.*;

/**
 * RxJava
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class CompositeException extends RuntimeException {

	private static final long serialVersionUID = 3026362227162912146L;

	private final List<Throwable> exceptions;
	private final String message;

	/**
	 * Constructs a CompositeException with the given array of Throwables as the
	 * list of suppressed exceptions.
	 *
	 * @param errors the Throwables to have as initially suppressed exceptions
	 * @throws IllegalArgumentException if <code>errors</code> is empty.
	 */
	public CompositeException(@NonNull Iterable<? extends Throwable> errors) {
		Set<Throwable> deDupedExceptions = new LinkedHashSet<>();
		for (Throwable ex : errors) {
			if (ex != null) {
				deDupedExceptions.add(ex);
			} else {
				deDupedExceptions.add(new NullPointerException("Throwable was null!"));
			}
		}
		if (deDupedExceptions.isEmpty()) {
			throw new IllegalArgumentException("errors is empty");
		}
		List<Throwable> localExceptions = new ArrayList<>(deDupedExceptions);
		this.exceptions = Collections.unmodifiableList(localExceptions);
		this.message = exceptions.size() + " exceptions occurred. ";
	}

	/**
	 * Retrieves the list of exceptions that make up the {@code CompositeException}.
	 *
	 * @return the exceptions that make up the {@code CompositeException}, as a {@link List} of {@link Throwable}s
	 */
	@NonNull
	public List<Throwable> getExceptions() {
		return exceptions;
	}

	@Override
	@NonNull
	public String getMessage() {
		return message;
	}

	/**
	 * All of the following {@code printStackTrace} functionality is derived from JDK {@link Throwable}
	 * {@code printStackTrace}. In particular, the {@code PrintStreamOrWriter} abstraction is copied wholesale.
	 * <p>
	 * Changes from the official JDK implementation:<ul>
	 * <li>no infinite loop detection</li>
	 * <li>smaller critical section holding {@link PrintStream} lock</li>
	 * <li>explicit knowledge about the exceptions {@link List} that this loops through</li>
	 * </ul>
	 */
	@Override
	public void printStackTrace() {
		printStackTrace(System.err);
	}


	/**
	 * Returns the number of suppressed exceptions.
	 *
	 * @return the number of suppressed exceptions
	 */
	public int size() {
		return exceptions.size();
	}
}

②. ExceptionHelper

package com.joker.exception;

/**
 * RxJava
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class ExceptionHelper {

	/**
	 * Utility class.
	 */
	private ExceptionHelper() {
		throw new IllegalStateException("No instances!");
	}

	/**
	 * If the provided Throwable is an Error this method
	 * throws it, otherwise returns a RuntimeException wrapping the error
	 * if that error is a checked exception.
	 *
	 * @param error the error to wrap or throw
	 * @return the (wrapped) error
	 */
	public static RuntimeException wrapOrThrow(Throwable error) {
		if (error instanceof Error) {
			throw (Error) error;
		}
		if (error instanceof RuntimeException) {
			return (RuntimeException) error;
		}
		return new RuntimeException(error);
	}
}

③. Exceptions

package com.joker.exception;

import reactor.util.annotation.NonNull;

/**
 * RxJava
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class Exceptions {

    /** Utility class. */
    private Exceptions() {
        throw new IllegalStateException("No instances!");
    }

    /**
     * Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error varieties. These
     * varieties are as follows:
     * <ul>
     * <li>{@code VirtualMachineError}</li>
     * <li>{@code ThreadDeath}</li>
     * <li>{@code LinkageError}</li>
     * </ul>
     * This can be useful if you are writing an operator that calls user-supplied code, and you want to
     * notify subscribers of errors encountered in that code by CALLING their {@code onError} methods, but only
     * if the errors are not so catastrophic that such a call would be futile, in which case you simply want to
     * rethrow the error.
     *
     * @param t
     *         the {@code Throwable} to test and perhaps throw
     * @see <a href="https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495">RxJava: StackOverflowError is swallowed (Issue #748)</a>
     */
    public static void throwIfFatal(@NonNull Throwable t) {
        // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
        if (t instanceof VirtualMachineError) {
            throw (VirtualMachineError) t;
        } else if (t instanceof ThreadDeath) {
            throw (ThreadDeath) t;
        } else if (t instanceof LinkageError) {
            throw (LinkageError) t;
        }
    }
}

④. ObjectHelper

package com.joker.exception;

/**
 * RxJava
 *
 * @author Joker
 * @since 2021/12/05
 */
public final class ObjectHelper {

	/**
	 * Utility class.
	 */
	private ObjectHelper() {
		throw new IllegalStateException("No instances!");
	}

	/**
	 * Verifies if the object is not null and returns it or throws a NullPointerException
	 * with the given message.
	 *
	 * @param <T>     the value type
	 * @param object  the object to verify
	 * @param message the message to use with the NullPointerException
	 * @throws NullPointerException if object is null
	 */
	public static <T> void requireNonNull(T object, String message) {
		if (object == null) {
			throw new NullPointerException(message);
		}
	}
}

⑤. FieldUtil

package com.joker.utils;

import org.springframework.lang.Nullable;

import java.math.BigDecimal;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;

/**
 * @author Joker
 * @since 2021/12/05
 */
@SuppressWarnings("unused")
public class FieldUtil {

	public static String getRandomId() {
		return UUID.randomUUID().toString().replace("-", "");
	}

	public static String now() {
		return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
	}

	public static String WrapText(@Nullable String str) {
		return hasText(str) ? "'" + str + "'" : null;
	}

	public static String getText(@Nullable String str) {
		return hasText(str) ? str : null;
	}

	public static String WrapNumber(@Nullable String str) {
		return hasText(str) ? str : null;
	}

	public static BigDecimal getBigDecimal(@Nullable String str) {
		return hasText(str) ? new BigDecimal(str.trim()) : null;
	}

	public static String WrapStringDate(@Nullable String str) {
		return !hasText(str) ? null : "'" + trimAllWhitespace(str) + "'";
	}

	public static Date WrapSqlDate(@Nullable String str) {
		if (!hasText(str)) {
			return null;
		} else {
			try {
				return new Date((new SimpleDateFormat("yyyy-MM-dd")).parse(trimAllWhitespace(str)).getTime());
			} catch (Exception var2) {
				System.out.println("str 空:" + str);
				return null;
			}
		}
	}

	public static Date WrapMsSqlDate(@Nullable String str) {
		if (!hasText(str)) {
			return null;
		} else {
			try {
				return new Date((new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(trimAllWhitespace(str)).getTime());
			} catch (Exception var2) {
				System.out.println("str 空:" + str);
				return null;
			}
		}
	}

	public static boolean hasText(@Nullable String str) {
		return str != null && !str.isEmpty() && containsText(str);
	}

	private static boolean containsText(CharSequence str) {
		int strLen = str.length();

		for (int i = 0; i < strLen; ++i) {
			if (!Character.isWhitespace(str.charAt(i))) {
				return true;
			}
		}

		return false;
	}

	public static boolean hasLength(@Nullable String str) {
		return str != null && !str.isEmpty();
	}

	public static String trimAllWhitespace(String str) {
		if (!hasLength(str)) {
			return str;
		} else {
			int len = str.length();
			StringBuilder sb = new StringBuilder(str.length());

			for (int i = 0; i < len; ++i) {
				char c = str.charAt(i);
				if (!Character.isWhitespace(c)) {
					sb.append(c);
				}
			}

			return sb.toString();
		}
	}

	public static String getRealSql(String pre, String... param) {
		if (param == null) {
			return pre;
		} else {
			for (String s : param) {
				pre = pre.replaceFirst("\\?", s == null ? "null" : s);
			}
			return pre;
		}
	}

	public static void main(String[] args) {
		String aaa = "aa?aaa";
		System.out.println(aaa.replaceFirst("b", "c"));
		System.out.println(aaa.replaceFirst("\\?", "d"));
	}
}

⑥. OpenHashSet

package com.joker.utils;

import rx.functions.Action1;
import rx.internal.util.unsafe.Pow2;

import java.util.Arrays;

/**
 * RxJava
 * A simple open hash set with add, remove and clear capabilities only.
 * <p>Doesn't support nor checks for {@code null}s.
 *
 * @param <T> the element type
 */
public final class OpenHashSet<T> {
	final float loadFactor;
	int mask;
	int size;
	int maxSize;
	T[] keys;
	private static final int INT_PHI = 0x9E3779B9;

	public OpenHashSet() {
		this(16, 0.75f);
	}

	/**
	 * Creates an OpenHashSet with the initial capacity and load factor of 0.75f.
	 *
	 * @param capacity the initial capacity
	 */
	public OpenHashSet(int capacity) {
		this(capacity, 0.75f);
	}

	@SuppressWarnings("unchecked")
	public OpenHashSet(int capacity, float loadFactor) {
		this.loadFactor = loadFactor;
		int c = Pow2.roundToPowerOfTwo(capacity);
		this.mask = c - 1;
		this.maxSize = (int) (loadFactor * c);
		this.keys = (T[]) new Object[c];
	}

	public boolean add(T value) {
		final T[] a = keys;
		final int m = mask;

		int pos = mix(value.hashCode()) & m;
		T curr = a[pos];
		if (curr != null) {
			if (curr.equals(value)) {
				return false;
			}
			for (; ; ) {
				pos = (pos + 1) & m;
				curr = a[pos];
				if (curr == null) {
					break;
				}
				if (curr.equals(value)) {
					return false;
				}
			}
		}
		a[pos] = value;
		if (++size >= maxSize) {
			rehash();
		}
		return true;
	}

	public boolean remove(T value) {
		T[] a = keys;
		int m = mask;
		int pos = mix(value.hashCode()) & m;
		T curr = a[pos];
		if (curr == null) {
			return false;
		}
		if (curr.equals(value)) {
			return removeEntry(pos, a, m);
		}
		for (; ; ) {
			pos = (pos + 1) & m;
			curr = a[pos];
			if (curr == null) {
				return false;
			}
			if (curr.equals(value)) {
				return removeEntry(pos, a, m);
			}
		}
	}

	boolean removeEntry(int pos, T[] a, int m) {
		size--;

		int last;
		int slot;
		T curr;
		for (; ; ) {
			last = pos;
			pos = (pos + 1) & m;
			for (; ; ) {
				curr = a[pos];
				if (curr == null) {
					a[last] = null;
					return true;
				}
				slot = mix(curr.hashCode()) & m;

				if (last <= pos ? last >= slot || slot > pos : last >= slot && slot > pos) {
					break;
				}

				pos = (pos + 1) & m;
			}
			a[last] = curr;
		}
	}

	public void clear(Action1<? super T> clearAction) {
		if (size == 0) {
			return;
		}
		T[] a = keys;
		int len = a.length;
		for (int i = 0; i < len; i++) {
			T e = a[i];
			if (e != null) {
				clearAction.call(e);
			}
		}
		Arrays.fill(a, null);
		size = 0;
	}

	@SuppressWarnings("unchecked")
	public void terminate() {
		size = 0;
		keys = (T[]) new Object[0];
	}

	@SuppressWarnings("unchecked")
	void rehash() {
		T[] a = keys;
		int i = a.length;
		int newCap = i << 1;
		int m = newCap - 1;

		T[] b = (T[]) new Object[newCap];


		for (int j = size; j-- != 0; ) {
			while (a[--i] == null) {
			} // NOPMD
			int pos = mix(a[i].hashCode()) & m;
			if (b[pos] != null) {
				for (; ; ) {
					pos = (pos + 1) & m;
					if (b[pos] == null) {
						break;
					}
				}
			}
			b[pos] = a[i];
		}

		this.mask = m;
		this.maxSize = (int) (newCap * loadFactor);
		this.keys = b;
	}

	static int mix(int x) {
		final int h = x * INT_PHI;
		return h ^ (h >>> 16);
	}

	public boolean isEmpty() {
		return size == 0;
	}

	/**
	 * Returns the raw array of values of this set, watch out for null entries.
	 *
	 * @return the raw array of values of this set
	 */
	public T[] values() {
		return keys; // NOPMD
	}

	public Object[] keys() {
		return keys; // NOPMD
	}

	public int size() {
		return size;
	}
}

⑦. SQL

package com.joker.utils;

/**
 * @author Joker
 * @since 2021/12/05
 */
public class SQL {

    public static final String INSERT_LEGAL_CONTRACT = "insert into tb_legal_contract_info_qqqqqq (            lg_id, contract_type, contract_name, contract_id, contractor_id,            contractor_name, contract_amount, contract_company, win_time,            effective_time, ecp_code, old_contract_id, undertaker_orgname, undertaker_deptname,            contract_state_code, is_caigou, caigou_type, erpcode, ht_sum_adjust, ht_sum_chang, inputuser, inputtime)            values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    public static final String INSERT_LEGAL_CONTRACT_C = "insert into tb_legal_contract_info_qqqqqq_c (            lg_id, contract_type, contract_name, contract_id, contractor_id,            contractor_name, contract_amount, contract_company, win_time,            effective_time, ecp_code, old_contract_id, undertaker_orgname, undertaker_deptname,            contract_state_code, is_caigou, caigou_type, erpcode, ht_sum_adjust, ht_sum_chang, inputuser, inputtime)            values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
}

本節完畢