自己寫的非同步多執行緒佇列
阿新 • • 發佈:2019-02-03
- 使用 synchronized版本
public abstract class MessageQueue2<T> {
private LinkedList<T> queue = new LinkedList<T>();
public MessageQueue2() {
}
public void start() {
new Thread(new MessageThread()).start();
}
public abstract void takeMessage(T message);
public void pushMessage(T message) {
synchronized (queue) {
// Log.e("queue", "push:" + message.toString());
this.queue.offer(message);
this.queue.notify();
// Log.e("queue", "notify:");
}
}
class MessageThread implements Runnable {
public void run() {
synchronized (queue) {
while (true) {
// Log.e("queue", "while");
if (!queue.isEmpty()) {
T message = queue.poll();
// Log.e("queue", "takemess:"+message.toString());
takeMessage(message);
} else {
try {
// Log.e("queue", "thread:wait");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
- 使用ReentrantLock版本
public abstract class MessageQueue<T> {
protected LinkedList<T> queue = new LinkedList<T>();
protected ReentrantLock lock = new ReentrantLock();
private Condition emptySignal = lock.newCondition();
// private byte[] lock = new byte[0];
private int threadCount = 1;
private boolean disposeFlag = false;
public MessageQueue() {
}
public void setThreadCount(int count) {
this.threadCount = count;
}
protected LinkedList<T> getQueue(){
return this.queue;
}
public void start() {
ConsumeThread thread = new ConsumeThread();
for (int i = 0; i < this.threadCount; i++) {
new Thread(thread).start();
}
}
public abstract void takeMessage(T message);
public void pushMessage(T message) {
if (!disposeFlag) {
try {
this.lock.lock();
// Log.e("queue", "push:" + message.toString());
this.queue.offer(message);
this.emptySignal.signalAll();
} finally {
this.lock.unlock();
// Log.e("queue", "notify:");
}
}
}
public boolean containsMessage(T message) {
boolean flag = false;
try {
this.lock.lock();
flag = this.queue.contains(message);
} finally {
this.lock.unlock();
}
return flag;
}
public void dispose() {
// Log.e("messqueue", "dispose");
try {
this.lock.lock();
this.emptySignal.signalAll();
} finally {
this.lock.unlock();
// Log.e("queue", "notify:");
}
// this.emptySignal.signal();
this.disposeFlag = true;
}
class ConsumeThread implements Runnable {
public void run() {
while (!disposeFlag) {
T message = null;
// Log.e("queue", "while");
try {
lock.lock();
if (!queue.isEmpty()) {
message = queue.poll();
} else {
// Log.e("queue", "thread:wait");
emptySignal.await();
}
} catch (InterruptedException e) {
// Log.e("messagequeue wait", Log.getStackTraceString(e));
} finally {
lock.unlock();
}
if (message != null) {
// Log.e("queue", "takemess:" + message.toString());
takeMessage(message);
}
}
}
}
}
2016-12-22更新版本
使用了blockingqueue封裝
public class MessageFactory {
private BlockingQueue messageQueue = new LinkedBlockingDeque<>();
private static ConcurrentHashMap<Object,MessageAction> cache = new ConcurrentHashMap<>();
private static MessageFactory instance = new MessageFactory();
private boolean stop;
private Logger logger = LoggerFactory.getLogger(this.getClass());
private MessageFactory(){
this.start();
}
public static MessageFactory instance(){
return instance;
}
public<T> void put(T message,MessageAction<T> action){
try {
this.messageQueue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.cache.put(message,action);
}
private<T> void start(){
new Thread(() -> {
while (!stop) {
T message = null;
try {
message = (T)messageQueue.take();
//為了防止此時cache.put還沒有完成
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
MessageAction<T> action = cache.get(message);
if (action == null){
continue;
}
action.take(message);
}
this.messageQueue.clear();
}).start();
}
public void dispose(){
this.stop = true;
try {
//通過put來喚醒queue阻塞執行緒,退出迴圈
this.messageQueue.put(" ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}