1. 程式人生 > >七、curator recipes之阻塞佇列SimpleDistributedQueue

七、curator recipes之阻塞佇列SimpleDistributedQueue

簡介

Java在單機環境實現了BlockQueue阻塞佇列,與之類似的curator實現了分散式場景下的阻塞佇列,SimpleDistributedQueue

官方文件:http://curator.apache.org/curator-recipes/simple-distributed-queue.html

javaDoc:http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.html

注意:zookeeper雖然可以實現佇列,但是官方並不推薦使用zookeeper來做佇列,具體請參考官網

程式碼示例

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class SimpleQueueDemo {
    
private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2)); private static String path = "/queue/path001"; public static void main(String[] args) throws InterruptedException { client.start(); System.out.println(
"started"); SimpleDistributedQueue queue = new SimpleDistributedQueue(client, path); new Thread(() -> { try { System.out.println("sleeping"); Thread.sleep(3000); System.out.println("sleep end"); new SimpleDistributedQueue(client, path).offer("lay".getBytes("utf-8")); System.out.println("offered"); } catch (Exception e) { System.out.println("exception"); e.printStackTrace(); } }).start(); System.out.println("polling"); String data = null; try { data = new String(queue.take()); } catch (Exception e) { e.printStackTrace(); } System.out.println("data=" + data); client.close(); } }

輸出結果

started
polling
sleeping
sleep end
offered
data=lay

主執行緒會阻塞直到offer了資料