七、curator recipes之阻塞隊列SimpleDistributedQueue
阿新 • • 發佈:2019-01-14
分布 zookeeper host bytes ref ted start static imp
簡介
Java在單機環境實現了BlockQueue阻塞隊列,與之類似的curator實現了分布式場景下的阻塞隊列,SimpleDistributedQueue
官方文檔:http://curator.apache.org/curator-recipes/simple-distributed-queue.html
javaDoc:http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.html
註意:zookeeper雖然可以實現隊列,但是官方並不推薦使用zookeeper來做隊列,具體請參考官網
代碼示例
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue; import org.apache.curator.retry.ExponentialBackoffRetry; public class SimpleQueueDemo { privatestatic CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2)); private static String path = "/queue/path001"; public static void main(String[] args) throws InterruptedException { client.start(); System.out.println("started"); SimpleDistributedQueue queue= new SimpleDistributedQueue(client, path); new Thread(() -> { try { System.out.println("sleeping"); Thread.sleep(3000); System.out.println("sleep end"); new SimpleDistributedQueue(client, path).offer("lay".getBytes("utf-8")); System.out.println("offered"); } catch (Exception e) { System.out.println("exception"); e.printStackTrace(); } }).start(); System.out.println("polling"); String data = null; try { data = new String(queue.take()); } catch (Exception e) { e.printStackTrace(); } System.out.println("data=" + data); client.close(); } }
輸出結果
started
polling
sleeping
sleep end
offered
data=lay
主線程會阻塞直到offer了數據
七、curator recipes之阻塞隊列SimpleDistributedQueue