1. 程式人生 > >ZooKeeper使用場景-Leader選舉

ZooKeeper使用場景-Leader選舉

Leader選舉又稱為master選舉是zookeeper中最為經典的應用場景了。

在分散式環境中,相同的業務應用分佈在不同的機器上,有些業務邏輯(例如一些耗時的計算,網路I/O處理),往往只需要讓整個叢集中的某一臺機器進行執行,其餘機器可以共享這個結果,這樣可以大大減少重複勞動,提高效能,於是這Leader選舉便是這種場景下的碰到的主要問題。

ZooKeeper需要在所有的服務(可理解為伺服器)中選舉出一個Leader,然後讓這個Leader來負責管理叢集。此時,叢集中的其他伺服器則成了此Leader的follower。並且,當Leader出現故障的時候,ZooKeeper要能夠快速地在Follower中選舉出下一個Leader。這就是ZooKeeper的Leader機制,下面我們將簡單介紹如何使用ZooKeeper實現Leader選舉(Leader Election)。

此操作實現的核心思想是:首先建立一個EPHEMERAL的節點,例如"/election"。然後每一個ZooKeeper伺服器在此目錄下建立一個SEQUENCE|EPHEMERAL型別的節點,例如“/election/n_”。在SEQUENCE標誌下,ZooKeeper將自動地為每一個ZooKeeper服務分配一個比前面所分配的序號要大的序號。此時建立節點ZooKeeper伺服器中擁有最小編號的伺服器將成為Leader。

在實際的操作中,還需要保證:當Leader伺服器發生故障的時候,系統能夠快速地選出下一個ZooKeeper伺服器作為Leader。一個簡單的方案是,讓所有的Follower監視leader所對應的節點。當Leader發生故障時,Leader所對應的臨時節點會被自動刪除,此操作將會觸發所有監視Leader的伺服器的watch。這樣這些伺服器就會收到Leader故障的訊息,進而進行下一次的Leader選舉操作。但是,這種操作將會導致“從眾效應”的發生,尤其是當叢集中伺服器眾多並且寬頻延遲比較大的時候更為明顯。在ZooKeeper中,為了避免從眾效應的發生,它是這樣來實現的:每一個Follower為Follower叢集中對應著比自己節點序號小的節點中x序號最大的節點設定一個watch。只有當Followers所設定的watch被觸發時,它才驚醒Leader選舉操作,一般情況下它將成為叢集中的下一個Leader。很明顯,此Leader選舉操作的速度是很快的。因為每一次Leader選舉幾乎只涉及單個Follower的操作。

總結:叢集管理中所有客戶端建立請求,最終只有一個能夠建立成功。在這裡稍微變化下,就是允許所有請求都能夠建立成功,但是得有個建立順序,於是所有的請求最終在ZK上建立結果的一種可能情況是這樣:/currentMaster/{sessionId}-1,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次選取序列號最小的那個機器作為Master,如果這個機器掛了,由於他建立的節點會馬上消失,那麼之後最小的那個機器就是Master了。

在搜尋系統中,如果叢集中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此之間索引資料一致。因此讓叢集中的Master來進行全量索引的生成,然後同步到叢集中其它機器。另外,Master選舉的容災措施是,可以隨時進行手動指定master,就是說應用在zk在無法獲取master資訊時,可以通過比如http方式,向一個地方獲取master。

在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上儲存一些ROOT表的地址和HMaster的地址,HRegionServer也會把自己以臨時節點(Ephemeral)的方式註冊到Zookeeper中,使得HMaster可以隨時感知到各個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會重新選舉出一個HMaster來執行,從而避免了HMaster的單點問題。

下面的例子來自:zookeeper-3.4.*/recipes

一、選舉狀態的變化

1、EVENT.START

2、makeOffer() :OFFER_START、OFFER_COMPLETE;進行投票準備

3、determineElectionStatus() DETERMINE_START、DETERMINE_COMPLETE;進行投票

投票結果:becomeLeader()或becomeReady()

二、becomReady方法比較重要

1) Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this); 進行watch,見watch方法

2)如果stat為null,就進行determineElectionStatus()

三 LeaderEelctionAware介面的實現

onElectionEvent(EventType)的實現,需要根據eventtype型別不同,進行不同的操作。例如:makeoffer時,可以傳送訊息向各個client,進行投票準備,而如何傳送訊息需要自己實現(個人看法)。

package org.apache.zookeeper.recipes.leader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * A leader election support library implementing the ZooKeeper election recipe.
 * </p>
 * <p>
 * This support library is meant to simplify the construction of an exclusive
 * leader system on top of Apache ZooKeeper. Any application that can become the
 * leader (usually a process that provides a service, exclusively) would
 * configure an instance of this class with their hostname, at least one
 * listener (an implementation of {@link LeaderElectionAware}), and either an
 * instance of {@link ZooKeeper} or the proper connection information. Once
 * configured, invoking {@link #start()} will cause the client to connect to
 * ZooKeeper and create a leader offer. The library then determines if it has
 * been elected the leader using the algorithm described below. The client
 * application can follow all state transitions via the listener callback.
 * </p>
 * <p>
 * Leader election algorithm
 * </p>
 * <p>
 * The library starts in a START state. Through each state transition, a state
 * start and a state complete event are sent to all listeners. When
 * {@link #start()} is called, a leader offer is created in ZooKeeper. A leader
 * offer is an ephemeral sequential node that indicates a process that can act
 * as a leader for this service. A read of all leader offers is then performed.
 * The offer with the lowest sequence number is said to be the leader. The
 * process elected leader will transition to the leader state. All other
 * processes will transition to a ready state. Internally, the library creates a
 * ZooKeeper watch on the leader offer with the sequence ID of N - 1 (where N is
 * the process's sequence ID). If that offer disappears due to a process
 * failure, the watching process will run through the election determination
 * process again to see if it should become the leader. Note that sequence ID
 * may not be contiguous due to failed processes. A process may revoke its offer
 * to be the leader at any time by calling {@link #stop()}.
 * </p>
 * <p>
 * Guarantees (not) Made and Caveats
 * </p>
 * <p>
 * <ul>
 * <li>It is possible for a (poorly implemented) process to create a leader
 * offer, get the lowest sequence ID, but have something terrible occur where it
 * maintains its connection to ZK (and thus its ephemeral leader offer node) but
 * doesn't actually provide the service in question. It is up to the user to
 * ensure any failure to become the leader - and whatever that means in the
 * context of the user's application - results in a revocation of its leader
 * offer (i.e. that {@link #stop()} is called).</li>
 * <li>It is possible for ZK timeouts and retries to play a role in service
 * liveliness. In other words, if process A has the lowest sequence ID but
 * requires a few attempts to read the other leader offers' sequence IDs,
 * election can seem slow. Users should apply timeouts during the determination
 * process if they need to hit a specific SLA.</li>
 * <li>The library makes a "best effort" to detect catastrophic failures of the
 * process. It is possible that an unforeseen event results in (for instance) an
 * unchecked exception that propagates passed normal error handling code. This
 * normally doesn't matter as the same exception would almost certain destroy
 * the entire process and thus the connection to ZK and the leader offer
 * resulting in another round of leader determination.</li>
 * </ul>
 * </p>
 */
public class LeaderElectionSupport implements Watcher {

  private static final Logger logger = LoggerFactory
      .getLogger(LeaderElectionSupport.class);

  private ZooKeeper zooKeeper;

  private State state;
  private Set<LeaderElectionAware> listeners;

  private String rootNodeName;
  private LeaderOffer leaderOffer;
  private String hostName;

  public LeaderElectionSupport() {
    state = State.STOP;
    listeners = Collections.synchronizedSet(new HashSet<LeaderElectionAware>());
  }

  /**
   * <p>
   * Start the election process. This method will create a leader offer,
   * determine its status, and either become the leader or become ready. If an
   * instance of {@link ZooKeeper} has not yet been configured by the user, a
   * new instance is created using the connectString and sessionTime specified.
   * </p>
   * <p>
   * Any (anticipated) failures result in a failed event being sent to all
   * listeners.
   * </p>
   */
  public synchronized void start() {
    state = State.START;
    dispatchEvent(EventType.START);

    logger.info("Starting leader election support");

    if (zooKeeper == null) {
      throw new IllegalStateException(
          "No instance of zookeeper provided. Hint: use setZooKeeper()");
    }

    if (hostName == null) {
      throw new IllegalStateException(
          "No hostname provided. Hint: use setHostName()");
    }

    try {
      makeOffer();
      determineElectionStatus();
    } catch (KeeperException e) {
      becomeFailed(e);
      return;
    } catch (InterruptedException e) {
      becomeFailed(e);
      return;
    }
  }

  /**
   * Stops all election services, revokes any outstanding leader offers, and
   * disconnects from ZooKeeper.
   */
  public synchronized void stop() {
    state = State.STOP;
    dispatchEvent(EventType.STOP_START);

    logger.info("Stopping leader election support");

    if (leaderOffer != null) {
      try {
        zooKeeper.delete(leaderOffer.getNodePath(), -1);
        logger.info("Removed leader offer {}", leaderOffer.getNodePath());
      } catch (InterruptedException e) {
        becomeFailed(e);
      } catch (KeeperException e) {
        becomeFailed(e);
      }
    }

    dispatchEvent(EventType.STOP_COMPLETE);
  }

  private void makeOffer() throws KeeperException, InterruptedException {
    state = State.OFFER;
    dispatchEvent(EventType.OFFER_START);

    leaderOffer = new LeaderOffer();

    leaderOffer.setHostName(hostName);
    leaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",
        hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
        CreateMode.EPHEMERAL_SEQUENTIAL));

    logger.debug("Created leader offer {}", leaderOffer);

    dispatchEvent(EventType.OFFER_COMPLETE);
  }

  private void determineElectionStatus() throws KeeperException,
      InterruptedException {

    state = State.DETERMINE;
    dispatchEvent(EventType.DETERMINE_START);

    String[] components = leaderOffer.getNodePath().split("/");

    leaderOffer.setId(Integer.valueOf(components[components.length - 1]
        .substring("n_".length())));

    List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
        rootNodeName, false));

    /*
     * For each leader offer, find out where we fit in. If we're first, we
     * become the leader. If we're not elected the leader, attempt to stat the
     * offer just less than us. If they exist, watch for their failure, but if
     * they don't, become the leader.
     */
    for (int i = 0; i < leaderOffers.size(); i++) {
      LeaderOffer leaderOffer = leaderOffers.get(i);

      if (leaderOffer.getId().equals(this.leaderOffer.getId())) {
        logger.debug("There are {} leader offers. I am {} in line.",
            leaderOffers.size(), i);

        dispatchEvent(EventType.DETERMINE_COMPLETE);

        if (i == 0) {
          becomeLeader();
        } else {
          becomeReady(leaderOffers.get(i - 1));
        }

        /* Once we've figured out where we are, we're done. */
        break;
      }
    }
  }

  private void becomeReady(LeaderOffer neighborLeaderOffer)
      throws KeeperException, InterruptedException {
    dispatchEvent(EventType.READY_START);

    logger.info("{} not elected leader. Watching node:{}",
        leaderOffer.getNodePath(), neighborLeaderOffer.getNodePath());

    /*
     * Make sure to pass an explicit Watcher because we could be sharing this
     * zooKeeper instance with someone else.
     */
    Stat stat = zooKeeper.exists(neighborLeaderOffer.getNodePath(), this);

    if (stat != null) {
      logger.debug(
          "We're behind {} in line and they're alive. Keeping an eye on them.",
          neighborLeaderOffer.getNodePath());
      state = State.READY;
      dispatchEvent(EventType.READY_COMPLETE);
    } else {
      /*
       * If the stat fails, the node has gone missing between the call to
       * getChildren() and exists(). We need to try and become the leader.
       */
      logger
          .info(
              "We were behind {} but it looks like they died. Back to determination.",
              neighborLeaderOffer.getNodePath());
      determineElectionStatus();
    }

  }

  private void becomeLeader() {
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_START);

    logger.info("Becoming leader with node:{}", leaderOffer.getNodePath());

    dispatchEvent(EventType.ELECTED_COMPLETE);
  }

  private void becomeFailed(Exception e) {
    logger.error("Failed in state {} - Exception:{}", state, e);

    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
  }

  /**
   * Fetch the (user supplied) hostname of the current leader. Note that by the
   * time this method returns, state could have changed so do not depend on this
   * to be strongly consistent. This method has to read all leader offers from
   * ZooKeeper to deterime who the leader is (i.e. there is no caching) so
   * consider the performance implications of frequent invocation. If there are
   * no leader offers this method returns null.
   * 
   * @return hostname of the current leader
   * @throws KeeperException
   * @throws InterruptedException
   */
  public String getLeaderHostName() throws KeeperException,
      InterruptedException {

    List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(
        rootNodeName, false));

    if (leaderOffers.size() > 0) {
      return leaderOffers.get(0).getHostName();
    }

    return null;
  }

  private List<LeaderOffer> toLeaderOffers(List<String> strings)
      throws KeeperException, InterruptedException {

    List<LeaderOffer> leaderOffers = new ArrayList<LeaderOffer>(strings.size());

    /*
     * Turn each child of rootNodeName into a leader offer. This is a tuple of
     * the sequence number and the node name.
     */
    for (String offer : strings) {
      String hostName = new String(zooKeeper.getData(
          rootNodeName + "/" + offer, false, null));

      leaderOffers.add(new LeaderOffer(Integer.valueOf(offer.substring("n_"
          .length())), rootNodeName + "/" + offer, hostName));
    }

    /*
     * We sort leader offers by sequence number (which may not be zero-based or
     * contiguous) and keep their paths handy for setting watches.
     */
    Collections.sort(leaderOffers, new LeaderOffer.IdComparator());

    return leaderOffers;
  }

  @Override
  public void process(WatchedEvent event) {
    if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
      if (!event.getPath().equals(leaderOffer.getNodePath())
          && state != State.STOP) {
        logger.debug(
            "Node {} deleted. Need to run through the election process.",
            event.getPath());
        try {
          determineElectionStatus();
        } catch (KeeperException e) {
          becomeFailed(e);
        } catch (InterruptedException e) {
          becomeFailed(e);
        }
      }
    }
  }

  private void dispatchEvent(EventType eventType) {
    logger.debug("Dispatching event:{}", eventType);

    synchronized (listeners) {
      if (listeners.size() > 0) {
        for (LeaderElectionAware observer : listeners) {
          observer.onElectionEvent(eventType);
        }
      }
    }
  }

  /**
   * Adds {@code listener} to the list of listeners who will receive events.
   * 
   * @param listener
   */
  public void addListener(LeaderElectionAware listener) {
    listeners.add(listener);
  }

  /**
   * Remove {@code listener} from the list of listeners who receive events.
   * 
   * @param listener
   */
  public void removeListener(LeaderElectionAware listener) {
    listeners.remove(listener);
  }

  @Override
  public String toString() {
    return "{ state:" + state + " leaderOffer:" + leaderOffer + " zooKeeper:"
        + zooKeeper + " hostName:" + hostName + " listeners:" + listeners
        + " }";
  }

  /**
   * <p>
   * Gets the ZooKeeper root node to use for this service.
   * </p>
   * <p>
   * For instance, a root node of {@code /mycompany/myservice} would be the
   * parent of all leader offers for this service. Obviously all processes that
   * wish to contend for leader status need to use the same root node. Note: We
   * assume this node already exists.
   * </p>
   * 
   * @return a znode path
   */
  public String getRootNodeName() {
    return rootNodeName;
  }

  /**
   * <p>
   * Sets the ZooKeeper root node to use for this service.
   * </p>
   * <p>
   * For instance, a root node of {@code /mycompany/myservice} would be the
   * parent of all leader offers for this service. Obviously all processes that
   * wish to contend for leader status need to use the same root node. Note: We
   * assume this node already exists.
   * </p>
   */
  public void setRootNodeName(String rootNodeName) {
    this.rootNodeName = rootNodeName;
  }

  /**
   * The {@link ZooKeeper} instance to use for all operations. Provided this
   * overrides any connectString or sessionTimeout set.
   */
  public ZooKeeper getZooKeeper() {
    return zooKeeper;
  }

  public void setZooKeeper(ZooKeeper zooKeeper) {
    this.zooKeeper = zooKeeper;
  }

  /**
   * The hostname of this process. Mostly used as a convenience for logging and
   * to respond to {@link #getLeaderHostName()} requests.
   */
  public String getHostName() {
    return hostName;
  }

  public void setHostName(String hostName) {
    this.hostName = hostName;
  }

  /**
   * The type of event.
   */
  public static enum EventType {
    START, OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, ELECTED_START, ELECTED_COMPLETE, READY_START, READY_COMPLETE, FAILED, STOP_START, STOP_COMPLETE,
  }

  /**
   * The internal state of the election support service.
   */
  public static enum State {
    START, OFFER, DETERMINE, ELECTED, READY, FAILED, STOP
  }
}

package org.apache.zookeeper.recipes.leader;

import java.util.Comparator;

/**
 * A leader offer is a numeric id / path pair. The id is the sequential node id
 * assigned by ZooKeeper where as the path is the absolute path to the ZNode.
 */
public class LeaderOffer {

  private Integer id;
  private String nodePath;
  private String hostName;

  public LeaderOffer() {
    // Default constructor
  }

  public LeaderOffer(Integer id, String nodePath, String hostName) {
    this.id = id;
    this.nodePath = nodePath;
    this.hostName = hostName;
  }

  @Override
  public String toString() {
    return "{ id:" + id + " nodePath:" + nodePath + " hostName:" + hostName
        + " }";
  }

  public Integer getId() {
    return id;
  }

  public void setId(Integer id) {
    this.id = id;
  }

  public String getNodePath() {
    return nodePath;
  }

  public void setNodePath(String nodePath) {
    this.nodePath = nodePath;
  }

  public String getHostName() {
    return hostName;
  }

  public void setHostName(String hostName) {
    this.hostName = hostName;
  }

  /**
   * Compare two instances of {@link LeaderOffer} using only the {code}id{code}
   * member.
   */
  public static class IdComparator implements Comparator<LeaderOffer> {

    @Override
    public int compare(LeaderOffer o1, LeaderOffer o2) {
      return o1.getId().compareTo(o2.getId());
    }

  }

}
package org.apache.zookeeper.recipes.leader;

import org.apache.zookeeper.recipes.leader.LeaderElectionSupport.EventType;

/**
 * An interface to be implemented by clients that want to receive election
 * events.
 */
public interface LeaderElectionAware {

  /**
   * Called during each state transition. Current, low level events are provided
   * at the beginning and end of each state. For instance, START may be followed
   * by OFFER_START, OFFER_COMPLETE, DETERMINE_START, DETERMINE_COMPLETE, and so
   * on.
   * 
   * @param eventType
   */
  public void onElectionEvent(EventType eventType);

}