(三)多執行緒下的生產者消費者
阿新 • • 發佈:2021-12-06
本節列一下所需的bean和工具類
1. 請求介面對應的實體類
package com.joker.result;
import java.io.Serializable;
/**
* @author Joker
* @since 2021/12/05
*/
public class Result implements Serializable {
String data;
Long errCode;
String requestId;
String errMsg;
String apiLog;
'''
getter
setter
'''
}
2. 請求介面對應的實體類
package com.joker.result; /** * @author Joker * @since 2021/12/05 */ public class Data { private Long totalNum; private Long pageSize; private Long pageNum; private String rows; ''' getter setter ''' }
3. 單產單消佇列
package com.joker.queue; import org.jctools.queues.SpscArrayQueue; /** * 一個可以克隆的單產單消佇列 * * @author Joker * @since 2021/12/05 */ public class SpscQueue<T> extends SpscArrayQueue<T> implements Cloneable { public SpscQueue(int capacity) { super(capacity); } @SuppressWarnings({"unchecked", "rawtypes"}) @Override public SpscQueue<T> clone() { try { return (SpscQueue) super.clone(); } catch (CloneNotSupportedException var2) { throw new AssertionError(); } } }
4. 當前正在執行的任務
package com.joker.task; import com.joker.factory.TaskFactory; import javax.validation.constraints.NotNull; /** * 當前正在執行的任務(建造者模式) * * @author Joker * @since 2021/12/05 */ public class CurrentTask implements Cloneable { private final String uri; private final TaskFactory taskFactory; private final long totalNum; private final long totalPage; private final int pageSize; public CurrentTask(String uri, TaskFactory taskFactory, long totalNum, long totalPage, int pageSize) { this.uri = uri; this.taskFactory = taskFactory; this.totalNum = totalNum; this.totalPage = totalPage; this.pageSize = pageSize; } CurrentTask(CurrentTask.CurrentTaskBuilder builder) { this.uri = builder.uri; this.taskFactory = builder.taskFactory; this.totalNum = builder.totalNum; this.totalPage = builder.totalPage; this.pageSize = builder.pageSize; } ''' getter ''' @Override public CurrentTask clone() { try { return (CurrentTask) super.clone(); } catch (CloneNotSupportedException var2) { throw new AssertionError(); } } public static class CurrentTaskBuilder { private String uri; private TaskFactory taskFactory; private long totalNum; private long totalPage; private int pageSize; public CurrentTask.CurrentTaskBuilder setUri(@NotNull String uri) { this.uri = uri; return this; } public CurrentTask.CurrentTaskBuilder setTaskFactory(TaskFactory taskFactory) { this.taskFactory = taskFactory; return this; } public CurrentTask.CurrentTaskBuilder setTotalNum(long totalNum) { this.totalNum = totalNum; return this; } public CurrentTask.CurrentTaskBuilder setTotalPage(long totalPage) { this.totalPage = totalPage; return this; } public CurrentTask.CurrentTaskBuilder setPageSize(int pageSize) { this.pageSize = pageSize; return this; } public CurrentTask build() { this.checkNotNull(); return new CurrentTask(this); } void checkNotNull() { if (this.uri == null || this.taskFactory == null || this.totalNum == 0L || this.totalPage == 0L || this.pageSize == 0) { throw new RuntimeException("Something can not be empty"); } } } }
5. 當前正在執行的任務(工廠模式)
package com.joker.task;
import com.joker.factory.TaskFactory;
/**
* 當前正在執行的任務(工廠模式)
*
* @author Joker
* @since 2021/12/05
*/
public class QueueTask {
private final TaskFactory factory;
private final String[] parameters;
public QueueTask(TaskFactory factory, String[] parameters) {
this.factory = factory;
this.parameters = parameters;
}
public TaskFactory getFactory() {
return this.factory;
}
public String[] getParameters() {
return this.parameters;
}
}
6. 以下工具類大部分來自RxJava,其中有精簡部分
①. CompositeException
package com.joker.exception;
import reactor.util.annotation.NonNull;
import java.io.PrintStream;
import java.util.*;
/**
* RxJava
*
* @author Joker
* @since 2021/12/05
*/
public final class CompositeException extends RuntimeException {
private static final long serialVersionUID = 3026362227162912146L;
private final List<Throwable> exceptions;
private final String message;
/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
*
* @param errors the Throwables to have as initially suppressed exceptions
* @throws IllegalArgumentException if <code>errors</code> is empty.
*/
public CompositeException(@NonNull Iterable<? extends Throwable> errors) {
Set<Throwable> deDupedExceptions = new LinkedHashSet<>();
for (Throwable ex : errors) {
if (ex != null) {
deDupedExceptions.add(ex);
} else {
deDupedExceptions.add(new NullPointerException("Throwable was null!"));
}
}
if (deDupedExceptions.isEmpty()) {
throw new IllegalArgumentException("errors is empty");
}
List<Throwable> localExceptions = new ArrayList<>(deDupedExceptions);
this.exceptions = Collections.unmodifiableList(localExceptions);
this.message = exceptions.size() + " exceptions occurred. ";
}
/**
* Retrieves the list of exceptions that make up the {@code CompositeException}.
*
* @return the exceptions that make up the {@code CompositeException}, as a {@link List} of {@link Throwable}s
*/
@NonNull
public List<Throwable> getExceptions() {
return exceptions;
}
@Override
@NonNull
public String getMessage() {
return message;
}
/**
* All of the following {@code printStackTrace} functionality is derived from JDK {@link Throwable}
* {@code printStackTrace}. In particular, the {@code PrintStreamOrWriter} abstraction is copied wholesale.
* <p>
* Changes from the official JDK implementation:<ul>
* <li>no infinite loop detection</li>
* <li>smaller critical section holding {@link PrintStream} lock</li>
* <li>explicit knowledge about the exceptions {@link List} that this loops through</li>
* </ul>
*/
@Override
public void printStackTrace() {
printStackTrace(System.err);
}
/**
* Returns the number of suppressed exceptions.
*
* @return the number of suppressed exceptions
*/
public int size() {
return exceptions.size();
}
}
②. ExceptionHelper
package com.joker.exception;
/**
* RxJava
*
* @author Joker
* @since 2021/12/05
*/
public final class ExceptionHelper {
/**
* Utility class.
*/
private ExceptionHelper() {
throw new IllegalStateException("No instances!");
}
/**
* If the provided Throwable is an Error this method
* throws it, otherwise returns a RuntimeException wrapping the error
* if that error is a checked exception.
*
* @param error the error to wrap or throw
* @return the (wrapped) error
*/
public static RuntimeException wrapOrThrow(Throwable error) {
if (error instanceof Error) {
throw (Error) error;
}
if (error instanceof RuntimeException) {
return (RuntimeException) error;
}
return new RuntimeException(error);
}
}
③. Exceptions
package com.joker.exception;
import reactor.util.annotation.NonNull;
/**
* RxJava
*
* @author Joker
* @since 2021/12/05
*/
public final class Exceptions {
/** Utility class. */
private Exceptions() {
throw new IllegalStateException("No instances!");
}
/**
* Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error varieties. These
* varieties are as follows:
* <ul>
* <li>{@code VirtualMachineError}</li>
* <li>{@code ThreadDeath}</li>
* <li>{@code LinkageError}</li>
* </ul>
* This can be useful if you are writing an operator that calls user-supplied code, and you want to
* notify subscribers of errors encountered in that code by CALLING their {@code onError} methods, but only
* if the errors are not so catastrophic that such a call would be futile, in which case you simply want to
* rethrow the error.
*
* @param t
* the {@code Throwable} to test and perhaps throw
* @see <a href="https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495">RxJava: StackOverflowError is swallowed (Issue #748)</a>
*/
public static void throwIfFatal(@NonNull Throwable t) {
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
}
④. ObjectHelper
package com.joker.exception;
/**
* RxJava
*
* @author Joker
* @since 2021/12/05
*/
public final class ObjectHelper {
/**
* Utility class.
*/
private ObjectHelper() {
throw new IllegalStateException("No instances!");
}
/**
* Verifies if the object is not null and returns it or throws a NullPointerException
* with the given message.
*
* @param <T> the value type
* @param object the object to verify
* @param message the message to use with the NullPointerException
* @throws NullPointerException if object is null
*/
public static <T> void requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
}
}
⑤. FieldUtil
package com.joker.utils;
import org.springframework.lang.Nullable;
import java.math.BigDecimal;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
/**
* @author Joker
* @since 2021/12/05
*/
@SuppressWarnings("unused")
public class FieldUtil {
public static String getRandomId() {
return UUID.randomUUID().toString().replace("-", "");
}
public static String now() {
return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
}
public static String WrapText(@Nullable String str) {
return hasText(str) ? "'" + str + "'" : null;
}
public static String getText(@Nullable String str) {
return hasText(str) ? str : null;
}
public static String WrapNumber(@Nullable String str) {
return hasText(str) ? str : null;
}
public static BigDecimal getBigDecimal(@Nullable String str) {
return hasText(str) ? new BigDecimal(str.trim()) : null;
}
public static String WrapStringDate(@Nullable String str) {
return !hasText(str) ? null : "'" + trimAllWhitespace(str) + "'";
}
public static Date WrapSqlDate(@Nullable String str) {
if (!hasText(str)) {
return null;
} else {
try {
return new Date((new SimpleDateFormat("yyyy-MM-dd")).parse(trimAllWhitespace(str)).getTime());
} catch (Exception var2) {
System.out.println("str 空:" + str);
return null;
}
}
}
public static Date WrapMsSqlDate(@Nullable String str) {
if (!hasText(str)) {
return null;
} else {
try {
return new Date((new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(trimAllWhitespace(str)).getTime());
} catch (Exception var2) {
System.out.println("str 空:" + str);
return null;
}
}
}
public static boolean hasText(@Nullable String str) {
return str != null && !str.isEmpty() && containsText(str);
}
private static boolean containsText(CharSequence str) {
int strLen = str.length();
for (int i = 0; i < strLen; ++i) {
if (!Character.isWhitespace(str.charAt(i))) {
return true;
}
}
return false;
}
public static boolean hasLength(@Nullable String str) {
return str != null && !str.isEmpty();
}
public static String trimAllWhitespace(String str) {
if (!hasLength(str)) {
return str;
} else {
int len = str.length();
StringBuilder sb = new StringBuilder(str.length());
for (int i = 0; i < len; ++i) {
char c = str.charAt(i);
if (!Character.isWhitespace(c)) {
sb.append(c);
}
}
return sb.toString();
}
}
public static String getRealSql(String pre, String... param) {
if (param == null) {
return pre;
} else {
for (String s : param) {
pre = pre.replaceFirst("\\?", s == null ? "null" : s);
}
return pre;
}
}
public static void main(String[] args) {
String aaa = "aa?aaa";
System.out.println(aaa.replaceFirst("b", "c"));
System.out.println(aaa.replaceFirst("\\?", "d"));
}
}
⑥. OpenHashSet
package com.joker.utils;
import rx.functions.Action1;
import rx.internal.util.unsafe.Pow2;
import java.util.Arrays;
/**
* RxJava
* A simple open hash set with add, remove and clear capabilities only.
* <p>Doesn't support nor checks for {@code null}s.
*
* @param <T> the element type
*/
public final class OpenHashSet<T> {
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
private static final int INT_PHI = 0x9E3779B9;
public OpenHashSet() {
this(16, 0.75f);
}
/**
* Creates an OpenHashSet with the initial capacity and load factor of 0.75f.
*
* @param capacity the initial capacity
*/
public OpenHashSet(int capacity) {
this(capacity, 0.75f);
}
@SuppressWarnings("unchecked")
public OpenHashSet(int capacity, float loadFactor) {
this.loadFactor = loadFactor;
int c = Pow2.roundToPowerOfTwo(capacity);
this.mask = c - 1;
this.maxSize = (int) (loadFactor * c);
this.keys = (T[]) new Object[c];
}
public boolean add(T value) {
final T[] a = keys;
final int m = mask;
int pos = mix(value.hashCode()) & m;
T curr = a[pos];
if (curr != null) {
if (curr.equals(value)) {
return false;
}
for (; ; ) {
pos = (pos + 1) & m;
curr = a[pos];
if (curr == null) {
break;
}
if (curr.equals(value)) {
return false;
}
}
}
a[pos] = value;
if (++size >= maxSize) {
rehash();
}
return true;
}
public boolean remove(T value) {
T[] a = keys;
int m = mask;
int pos = mix(value.hashCode()) & m;
T curr = a[pos];
if (curr == null) {
return false;
}
if (curr.equals(value)) {
return removeEntry(pos, a, m);
}
for (; ; ) {
pos = (pos + 1) & m;
curr = a[pos];
if (curr == null) {
return false;
}
if (curr.equals(value)) {
return removeEntry(pos, a, m);
}
}
}
boolean removeEntry(int pos, T[] a, int m) {
size--;
int last;
int slot;
T curr;
for (; ; ) {
last = pos;
pos = (pos + 1) & m;
for (; ; ) {
curr = a[pos];
if (curr == null) {
a[last] = null;
return true;
}
slot = mix(curr.hashCode()) & m;
if (last <= pos ? last >= slot || slot > pos : last >= slot && slot > pos) {
break;
}
pos = (pos + 1) & m;
}
a[last] = curr;
}
}
public void clear(Action1<? super T> clearAction) {
if (size == 0) {
return;
}
T[] a = keys;
int len = a.length;
for (int i = 0; i < len; i++) {
T e = a[i];
if (e != null) {
clearAction.call(e);
}
}
Arrays.fill(a, null);
size = 0;
}
@SuppressWarnings("unchecked")
public void terminate() {
size = 0;
keys = (T[]) new Object[0];
}
@SuppressWarnings("unchecked")
void rehash() {
T[] a = keys;
int i = a.length;
int newCap = i << 1;
int m = newCap - 1;
T[] b = (T[]) new Object[newCap];
for (int j = size; j-- != 0; ) {
while (a[--i] == null) {
} // NOPMD
int pos = mix(a[i].hashCode()) & m;
if (b[pos] != null) {
for (; ; ) {
pos = (pos + 1) & m;
if (b[pos] == null) {
break;
}
}
}
b[pos] = a[i];
}
this.mask = m;
this.maxSize = (int) (newCap * loadFactor);
this.keys = b;
}
static int mix(int x) {
final int h = x * INT_PHI;
return h ^ (h >>> 16);
}
public boolean isEmpty() {
return size == 0;
}
/**
* Returns the raw array of values of this set, watch out for null entries.
*
* @return the raw array of values of this set
*/
public T[] values() {
return keys; // NOPMD
}
public Object[] keys() {
return keys; // NOPMD
}
public int size() {
return size;
}
}
⑦. SQL
package com.joker.utils;
/**
* @author Joker
* @since 2021/12/05
*/
public class SQL {
public static final String INSERT_LEGAL_CONTRACT = "insert into tb_legal_contract_info_qqqqqq ( lg_id, contract_type, contract_name, contract_id, contractor_id, contractor_name, contract_amount, contract_company, win_time, effective_time, ecp_code, old_contract_id, undertaker_orgname, undertaker_deptname, contract_state_code, is_caigou, caigou_type, erpcode, ht_sum_adjust, ht_sum_chang, inputuser, inputtime) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
public static final String INSERT_LEGAL_CONTRACT_C = "insert into tb_legal_contract_info_qqqqqq_c ( lg_id, contract_type, contract_name, contract_id, contractor_id, contractor_name, contract_amount, contract_company, win_time, effective_time, ecp_code, old_contract_id, undertaker_orgname, undertaker_deptname, contract_state_code, is_caigou, caigou_type, erpcode, ht_sum_adjust, ht_sum_chang, inputuser, inputtime) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
}
本節完畢