1. 程式人生 > 實用技巧 >Java如何處理延遲任務過程解析

Java如何處理延遲任務過程解析

1、利用延遲佇列

延時佇列,第一他是個佇列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務否則獲取不到……

應用場景比較多,比如延時1分鐘發簡訊,延時1分鐘再次執行等,下面先看看延時佇列demo之後再看延時佇列在專案中的使用:

簡單的延時佇列要有三部分:第一實現了Delayed介面的訊息體、第二消費訊息的消費者、第三存放訊息的延時佇列,那下面就來看看延時佇列demo。

一、訊息體

package com.delqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * 訊息體定義 實現Delayed介面就是實現兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… */ 
public class Message implements Delayed { 
  private int id; 
  private String body; // 訊息內容 
  private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。 
 
  public int getId() { 
    return id; 
  } 
 
  public String getBody() { 
    return body; 
  } 
 
  public long getExcuteTime() { 
    return excuteTime; 
  } 
 
  public Message(int id, String body, long delayTime) { 
    this.id = id; 
    this.body = body; 
    this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); 
  } 
 
  // 自定義實現比較方法返回 1 0 -1三個引數 
  @Override 
  public int compareTo(Delayed delayed) { 
    Message msg = (Message) delayed; 
    return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 
        : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); 
  } 
 
  // 延遲任務是否到時就是按照這個方法判斷如果返回的是負數則說明到期否則還沒到期 
  @Override 
  public long getDelay(TimeUnit unit) { 
    return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
}

二、訊息消費者

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
 
public class Consumer implements Runnable { 
  // 延時佇列 ,消費者從其中獲取訊息進行消費 
  private DelayQueue<Message> queue; 
 
  public Consumer(DelayQueue<Message> queue) { 
    this.queue = queue; 
  } 
 
  @Override 
  public void run() { 
    while (true) { 
      try { 
        Message take = queue.take(); 
        System.out.println("消費訊息id:" + take.getId() + " 訊息體:" + take.getBody()); 
      } catch (InterruptedException e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
}

三、延時佇列

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
 
public class DelayQueueTest { 
   public static void main(String[] args) {  
      // 建立延時佇列  
      DelayQueue<Message> queue = new DelayQueue<Message>();  
      // 新增延時訊息,m1 延時3s  
      Message m1 = new Message(1, "world", 3000);  
      // 新增延時訊息,m2 延時10s  
      Message m2 = new Message(2, "hello", 10000);  
      //將延時訊息放到延時佇列中 
      queue.offer(m2);  
      queue.offer(m1);  
      // 啟動消費執行緒 消費新增到延時佇列中的訊息,前提是任務到了延期時間  
      ExecutorService exec = Executors.newFixedThreadPool(1); 
      exec.execute(new Consumer(queue)); 
      exec.shutdown(); 
    }  
}

將訊息體放入延遲佇列中,在啟動消費者執行緒去消費延遲佇列中的訊息,如果延遲佇列中的訊息到了延遲時間則可以從中取出訊息否則無法取出訊息也就無法消費。

這就是延遲佇列demo,下面我們來說說在真實環境下的使用。

使用場景描述:

在打車軟體中對訂單進行派單的流程,當有訂單的時候給該訂單篩選司機,然後給當訂單繫結司機,但是有時運氣沒那麼好,訂單進來後第一次沒有篩選到合適的司機,但我們也不能就此結束派單,而是將該訂單的資訊放到延時佇列中過個2秒鐘在進行一次,其實這個2秒鐘就是一個延遲,所以這裡我們就可以使用延時佇列來實現……

下面看看簡單的流程圖:

下面來看看具體程式碼實現:

在專案中有如下幾個類:第一 、任務類 第二、按照任務類組裝的訊息體類 第三、延遲佇列管理類

任務類即執行篩選司機、綁單、push訊息的任務類

package com.test.delayqueue; 
/** 
 * 具體執行相關業務的業務類 
 * @author whd 
 * @date 2017年9月25日 上午12:49:32 
 */ 
public class DelayOrderWorker implements Runnable { 
 
  @Override 
  public void run() { 
    // TODO Auto-generated method stub 
    //相關業務邏輯處理 
    System.out.println(Thread.currentThread().getName()+" do something ……"); 
  } 
}

訊息體類,在延時佇列中這個實現了Delayed介面的訊息類是比不可少的,實現介面時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的

這裡定義的是一個泛型類,所以可以將我們上面的任務類作為其中的task,這樣就將任務類分裝成了一個訊息體

package com.test.delayqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * 延時佇列中的訊息體將任務封裝為訊息體 
 * 
 * @author whd 
 * @date 2017年9月25日 上午12:48:30 
 * @param <T> 
 */ 
public class DelayOrderTask<T extends Runnable> implements Delayed { 
  private final long time; 
  private final T task; // 任務類,也就是之前定義的任務類 
 
  /** 
   * @param timeout 
   *      超時時間(秒) 
   * @param task 
   *      任務 
   */ 
  public DelayOrderTask(long timeout, T task) { 
    this.time = System.nanoTime() + timeout; 
    this.task = task; 
  } 
 
  @Override 
  public int compareTo(Delayed o) { 
    // TODO Auto-generated method stub 
    DelayOrderTask other = (DelayOrderTask) o; 
    long diff = time - other.time; 
    if (diff > 0) { 
      return 1; 
    } else if (diff < 0) { 
      return -1; 
    } else { 
      return 0; 
    } 
  } 
 
  @Override 
  public long getDelay(TimeUnit unit) { 
    // TODO Auto-generated method stub 
    return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
 
  @Override 
  public int hashCode() { 
    return task.hashCode(); 
  } 
 
  public T getTask() { 
    return task; 
  } 
}

延時佇列管理類,這個類主要就是將任務類封裝成訊息並並新增到延時佇列中,以及輪詢延時佇列從中取出到時的訊息體,在獲取任務類放到執行緒池中執行任務

package com.test.delayqueue; 
 
import java.util.Map; 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicLong; 
 
/** 
 * 延時佇列管理類,用來新增任務、執行任務 
 * 
 * @author whd 
 * @date 2017年9月25日 上午12:44:59 
 */ 
public class DelayOrderQueueManager { 
  private final static int DEFAULT_THREAD_NUM = 5; 
  private static int thread_num = DEFAULT_THREAD_NUM; 
  // 固定大小執行緒池 
  private ExecutorService executor; 
  // 守護執行緒 
  private Thread daemonThread; 
  // 延時佇列 
  private DelayQueue<DelayOrderTask<?>> delayQueue; 
  private static final AtomicLong atomic = new AtomicLong(0); 
  private final long n = 1; 
  private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); 
 
  private DelayOrderQueueManager() { 
    executor = Executors.newFixedThreadPool(thread_num); 
    delayQueue = new DelayQueue<>(); 
    init(); 
  } 
 
  public static DelayOrderQueueManager getInstance() { 
    return instance; 
  } 
 
  /** 
   * 初始化 
   */ 
  public void init() { 
    daemonThread = new Thread(() -> { 
      execute(); 
    }); 
    daemonThread.setName("DelayQueueMonitor"); 
    daemonThread.start(); 
  } 
 
  private void execute() { 
    while (true) { 
      Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); 
      System.out.println("當前存活執行緒數量:" + map.size()); 
      int taskNum = delayQueue.size(); 
      System.out.println("當前延時任務數量:" + taskNum); 
      try { 
        // 從延時佇列中獲取任務 
        DelayOrderTask<?> delayOrderTask = delayQueue.take(); 
        if (delayOrderTask != null) { 
          Runnable task = delayOrderTask.getTask(); 
          if (null == task) { 
            continue; 
          } 
          // 提交到執行緒池執行task 
          executor.execute(task); 
        } 
      } catch (Exception e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
 
  /** 
   * 新增任務 
   * 
   * @param task 
   * @param time 
   *      延時時間 
   * @param unit 
   *      時間單位 
   */ 
  public void put(Runnable task, long time, TimeUnit unit) { 
    // 獲取延時時間 
    long timeout = TimeUnit.NANOSECONDS.convert(time, unit); 
    // 將任務封裝成實現Delayed介面的訊息體 
    DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); 
    // 將訊息體放到延時佇列中 
    delayQueue.put(delayOrder); 
  } 
 
  /** 
   * 刪除任務 
   * 
   * @param task 
   * @return 
   */ 
  public boolean removeTask(DelayOrderTask task) { 
 
    return delayQueue.remove(task); 
  } 
}

測試類

package com.delqueue; 
 
import java.util.concurrent.TimeUnit; 
 
import com.test.delayqueue.DelayOrderQueueManager; 
import com.test.delayqueue.DelayOrderWorker; 
 
public class Test { 
  public static void main(String[] args) { 
    DelayOrderWorker work1 = new DelayOrderWorker();// 任務1 
    DelayOrderWorker work2 = new DelayOrderWorker();// 任務2 
    DelayOrderWorker work3 = new DelayOrderWorker();// 任務3 
    // 延遲佇列管理類,將任務轉化訊息體並將訊息體放入延遲對列中等待執行 
    DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); 
    manager.put(work1, 3000, TimeUnit.MILLISECONDS); 
    manager.put(work2, 6000, TimeUnit.MILLISECONDS); 
    manager.put(work3, 9000, TimeUnit.MILLISECONDS); 
  } 
 
}

OK 這就是專案中的具體使用情況,當然具體內容被忽略,整體框架就是這樣,還有這裡使用java的延時佇列但是這種方式是有問題的如果如果down機則會出現任務丟失,所以也可以考慮使用mq、redis來實現

2、mq實現延遲訊息

在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。

外掛原始碼地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

外掛下載地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安裝:

進入外掛安裝目錄

{rabbitmq-server}/plugins/(可以檢視一下當前已存在的外掛)

下載外掛

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下載的檔名稱不規則就手動重新命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用外掛

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

關閉外掛

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

外掛使用

通過宣告一個x-delayed-message型別的exchange來使用delayed-messaging特性
x-delayed-message是外掛提供的型別,並不是rabbitmq本身的,傳送訊息的時候通過在header新增”x-delay”引數來控制訊息的延時時間

直接在maven工程的pom.xml檔案中加入

接下來在 application.properties 檔案中加入redis配置:

也很簡單,程式碼如下:

實現訊息傳送

x-delay

在這裡我設定的延遲時間是3秒。

訊息消費者

直接在main方法裡執行Spring Boot程式,Spring Boot會自動解析 MessageReceiver 類的。

接下來只需要用Junit執行一下發送訊息的介面即可。

執行完後,可以看到如下資訊:

訊息傳送時間:2018-05-03 12:44:53
3秒鐘後,Spring Boot控制檯會輸出:
訊息接收時間:2018-05-03 12:44:56
接收到的訊息:hello i am delay msg

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援碼農教程。