1. 程式人生 > >curator實現zookeeper的領導選舉

curator實現zookeeper的領導選舉

通過curator來實現zookeeper的領導選舉,具體看程式碼
public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable{

private  String name;
private LeaderSelector leaderSelector;
private AtomicInteger leaderCount = new AtomicInteger();

public ExampleClient(CuratorFramework client,String path,String name){
    this.name=name;
    leaderSelector=new LeaderSelector(client, path, this);
    leaderSelector.autoRequeue();
}

public void takeLeadership(CuratorFramework client) throws Exception
{
    // we are now the leader. This method should not return until we want to relinquish leadership

    final int  waitSeconds = (int)(5 * Math.random()) + 1;
    System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
    System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
    try
    {
        Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
    }
    catch ( InterruptedException e )
    {
        System.err.println(name + " was interrupted.");
        Thread.currentThread().interrupt();
    }
    finally
    {
        System.out.println(name + " relinquishing leadership.\n");
    }
}

public void start() throws IOException{
    leaderSelector.start();
}
public void close() throws IOException
{
      leaderSelector.close();
}

}

程式入口
public class LeaderSelectorExample {

private static String PATH="/example/cache";
private final static String zkStr="master:2181,worker1:2181,worker2:2181";
private static List<CuratorFramework> clients= Lists.newArrayList();
private static List<ExampleClient> examples= Lists.newArrayList();
public static void main(String []args)throws Exception{
  try
  {
      for ( int i = 0; i < 100; ++i )
      {
          CuratorFramework client = CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
          clients.add(client);
          ExampleClient  example = new ExampleClient(client, PATH, "Client #" + i);
          examples.add(example);
          client.start();
          example.start();
      }
      System.out.println("Press enter/return to quit\n");
      //new BufferedReader(new InputStreamReader(System.in)).readLine();
  }
  finally
  {
      System.out.println("Shutting down...");
      for ( ExampleClient exampleClient : examples )
      {
          CloseableUtils.closeQuietly(exampleClient);
      }
      for ( CuratorFramework client : clients )
      {
          CloseableUtils.closeQuietly(client);
      }
  }

}
}