mongodb原始碼分析(十五)replication replset模式的初始化
相對於主從模式,replset模式複雜得多,其中的主從對應於這裡的primary,secondary概念,primary和
secondary之間可以切換,primary掉線後能夠自動的選取一個secondary成為新的primary,當然這裡也是有
限制的,本文將會分析到.首先來看replset模式用到的幾個集合.
local.oplog.rs: 記錄replset模式下的操作日誌,master/slave模式下為local.oplog.$main.
local.system.replset replset模式的配置.就是rs.initiate,rs.add等設定的資訊.
先來看看一個典型的replset 配置.
當我們寫一個數據時如:
db.foo.insert({x:1})
db.runCommand({getLastError:1,w:"veryImportant"})
只有當這次寫被寫到了veryImportant指定的三個地方,如ny sf cloud時,getLastError才會返回成功,否則其
會一直等待.這種方式可以確保一份資料被寫到了不同的伺服器上.來看看另一種的replset配置.
{_id:'myset', members:[{_id:0,host:'192.168.136.1:27040'},{_id:1,host:'192.168.136.1:27050',votes:0}]}
這裡只有兩臺伺服器,若埠為27050的伺服器關閉,那麼27040埠的伺服器還是primary.並不會轉成secondary
並且無法工作.但是如下配置:
{_id:'myset', members:[{_id:0,host:'192.168.136.1:27040'},{_id:1,host:'192.168.136.1:27050'}]}
那麼當27050關閉後27040將從primary轉成secondary,整個replset將無法工作.原因在於這裡的votes.mongodb的
replset規定線上的伺服器的votes總和的兩倍要大於所有replset中配置的伺服器votes的總和,
2*online_votes>all_replset_config_vote,這時replset才能正常的工作,否則將無法正常的工作.如果不設定votes默
認其值為1.討論另外一種情況,當27040的伺服器掉線時那麼27050的伺服器將無法成為primary,系統將不再工作.
若一開始配置如下,27040的伺服器成為primary,這個時候若27040掉線,27050將接管工作成為primary.但是若
27050掉線,那麼伺服器將變得不可用,因為votes值為0了.這裡最好通過新增仲裁來解決問題,仲裁雖然只做投票,並
{_id:'myset', members:[{_id:0,host:'192.168.136.1:27040',votes:0},{_id:1,host:'192.168.136.1:27050'}]}
不會成為primary,secondary,但是其可以在一些伺服器掉線時通過保證votes值讓整個系統保持正常執行,所以
10gen也建議:
Deploy an arbiter to ensure that a replica set will have a sufficient number of members to elect a primary. While having replica sets with 2 members is not recommended for production environments, if you have just two members, deploy an arbiter. Also, for any replica set with an even number of members, deploy an arbiter.
繼續看replset的流程.
1. 初始化時如果啟動引數不配置replset那麼啟動時replset會不斷的載入config.config的來源有三個.一是本地
local.system.replset集合中儲存的資料,二是呼叫rs.initiate函式設定的config,三是來自其它replset集的心跳協
議傳過來的.
2. 得到配置資訊後初始化,和其它伺服器建立心跳連線.
3. 啟動同步執行緒,replset集都需要啟動同步執行緒,但是隻有secondary會去同步primary的資料.
4. 啟動produce執行緒,這個執行緒負責向primary請求資料,同步執行緒從這個執行緒得到操作log然後在本地replay.
5. 啟動時和後面的心態協議部分會呼叫msgCheckNewState更改伺服器狀態,從secondary轉primary或者反之.
下面來看程式碼.首先從rs.initiate(cfg)初始化開始.初始化時執行replSetInitiate命令.直接轉到該命令的執行.
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( cmdObj["replSetInitiate"].type() != Object ) {//配置資料來自於啟動命令列
string name;
vector<HostAndPort> seeds;
set<HostAndPort> seedSet;
parseReplsetCmdLine(cmdLine._replSet, name, seeds, seedSet); // may throw...
bob b;
b.append("_id", name);
bob members;
members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::me().toString() ));
result.append("me", HostAndPort::me().toString());
for( unsigned i = 0; i < seeds.size(); i++ )
members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString()));
b.appendArray("members", members.obj());
configObj = b.obj();
}
else {//得到配置
configObj = cmdObj["replSetInitiate"].Obj();
}
bool parsed = false;
ReplSetConfig newConfig(configObj);//從配置資料中得到配置結構.
parsed = true;
checkMembersUpForConfigChange(newConfig, result, true);//檢視配置的伺服器是否能夠連線
createOplog();//建立local.system.replset集合.
Lock::GlobalWrite lk;
bo comment = BSON( "msg" << "initiating set");
newConfig.saveConfigLocally(comment);//將配置儲存到local.system.replset
result.append("info", "Config now saved locally. Should come online in about a minute.");
ReplSet::startupStatus = ReplSet::SOON;
ReplSet::startupStatusMsg.set("Received replSetInitiate - should come online shortly.");
return true;
}
run->ReplSetConfig
ReplSetConfig::ReplSetConfig(BSONObj cfg, bool force) :
_ok(false),_majority(-1)
{
_constructed = false;
clear();
from(cfg);//具體的讀取配置,每一個伺服器得到一個MemberCfg,解析可能的setting設定.
if( force ) {
version += rand() % 100000 + 10000;
}
if( version < 1 )
version = 1;
_ok = true;
_constructed = true;
}
run->checkMembersUpForConfigChange
void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial) {
int failures = 0, allVotes = 0, allowableFailures = 0;
int me = 0;
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
allVotes += i->votes;//得到投票總數
}
allowableFailures = allVotes - (allVotes/2 + 1);//允許丟掉的投票數
vector<string> down;
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
// we know we're up
if (i->h.isSelf()) {
continue;
}
BSONObj res;
{
bool ok = false;
{
int theirVersion = -1000;//心跳協議檢視配置的伺服器是否能夠連線
ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/);
if( theirVersion >= cfg.version ) {
stringstream ss;
ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure";
uasserted(13259, ss.str());
}
}
if( !ok && !res["rs"].trueValue() ) {//不能連線
down.push_back(i->h.toString());
bool allowFailure = false;
failures += i->votes;
if( !initial && failures <= allowableFailures ) {
const Member* m = theReplSet->findById( i->_id );
// it's okay if the down member isn't part of the config,
// we might be adding a new member that isn't up yet
allowFailure = true;
}
if( !allowFailure ) {//初始化時要求所有配置的伺服器能夠被連線
string msg = string("need all members up to initiate, not ok : ") + i->h.toString();
if( !initial )
msg = string("need most members up to reconfigure, not ok : ") + i->h.toString();
uasserted(13144, msg);
}
}
}
if( initial ) {
bool hasData = res["hasData"].Bool();
uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.",
!hasData || i->h.isSelf());
}
}
if (down.size() > 0) {
result.append("down", down);
}
}
run->saveConfigLocally
void ReplSetConfig::saveConfigLocally(bo comment) {
checkRsConfig();
{
Lock::GlobalWrite lk; // TODO: does this really need to be a global lock?
Client::Context cx( rsConfigNs );
cx.db()->flushFiles(true);
//theReplSet->lastOpTimeWritten = ??;
//rather than above, do a logOp()? probably
BSONObj o = asBson();//得到實際的配置,下面的putSingletonGod將配置儲存到local.system.replset中
Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/);
if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) )
logOpInitiate(comment);
cx.db()->flushFiles(true);
}
}
到這裡初始化配置完成,下面看mongod啟動時的初始化過程.啟動部分是在repl.cpp startReplication
void startReplication() {//和master/slave一樣,啟動都是在這個函式,只是流程不一樣
/* if we are going to be a replica set, we aren't doing other forms of replication. */
if( !cmdLine._replSet.empty() ) {//replset指定了--replSet xxx,這裡不為空表面是啟動replSet模式
newRepl();
replSet = true;
ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);//解析cmdline,cmdline可能是<setname>/<seedhost1>,<seedhost2>,那麼啟動的時候就指定了replSet的配置
boost::thread t( boost::bind( &startReplSets, replSetCmdline) );//開啟一個執行緒來做replSet的初始化
return;
}
}
void startReplSets(ReplSetCmdline *replSetCmdline) {
Client::initThread("rsStart");
replLocalAuth();
(theReplSet = new ReplSet(*replSetCmdline))->go();//真正的初始化過程
cc().shutdown();//關閉這個執行緒的client
}
ReplSet::ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) {}
ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) :
elect(this),
_forceSyncTarget(0),
_blockSync(false),
_hbmsgTime(0),
_self(0),
_maintenanceMode(0),
mgr( new Manager(this) ),
ghost( new GhostSync(this) ),
_writerPool(replWriterThreadCount),
_prefetcherPool(replPrefetcherThreadCount),
_indexPrefetchConfig(PREFETCH_ALL) {
_cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
strcpy( _hbmsg , "initial startup" );
lastH = 0;
changeState(MemberState::RS_STARTUP);
loadConfig();//載入replset的config,若config為空,則一直在其中迴圈載入,直到找到真正的config
// Figure out indexPrefetch setting
std::string& prefetch = cmdLine.rsIndexPrefetch;//通過--replIndexPrefetch啟動設定,同步操作時首先預載入索引
if (!prefetch.empty()) {
IndexPrefetchConfig prefetchConfig = PREFETCH_ALL;
if (prefetch == "none")
prefetchConfig = PREFETCH_NONE;
else if (prefetch == "_id_only")
prefetchConfig = PREFETCH_ID_ONLY;
else if (prefetch == "all")
prefetchConfig = PREFETCH_ALL;
else
warning() << "unrecognized indexPrefetch setting: " << prefetch << endl;
setIndexPrefetchConfig(prefetchConfig);
}
}
繼續來看loadConfig的載入配置部分.
void ReplSetImpl::loadConfig() {
startupStatus = LOADINGCONFIG;
while( 1 ) {
{
vector<ReplSetConfig> configs;
configs.push_back( ReplSetConfig(HostAndPort::me()) );//從本地的local.system.replset查詢配置,
//這裡可能是上一次設定的或者是rs.initiate初始化時儲存下來的設定
for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ )
configs.push_back( ReplSetConfig(*i) );//從啟動時設定的位置查詢配置
{
scoped_lock lck( replSettings.discoveredSeeds_mx );
if( replSettings.discoveredSeeds.size() > 0 ) {//來自遠端的心跳協議,通過心跳協議知道遠端
//存在同一個replset集的伺服器,從遠端讀取配置
for (set<string>::iterator i = replSettings.discoveredSeeds.begin();
i != replSettings.discoveredSeeds.end();
i++) {
configs.push_back( ReplSetConfig(HostAndPort(*i)) );
}
}
}
if (!replSettings.reconfig.isEmpty())//來自本地配置如rs.add等的新的配置
configs.push_back(ReplSetConfig(replSettings.reconfig, true));
int nok = 0;
int nempty = 0;
for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
if( i->ok() )//成功的配置個數
nok++;
if( i->empty() )
nempty++;
}
if( nok == 0 ) {//沒有配置是可用的
if( nempty == (int) configs.size() ) {
startupStatus = EMPTYCONFIG;
static unsigned once;
if( ++once == 1 ) {
log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog;
}
}
sleepsecs(10);
continue;
}
if( !_loadConfigFinish(configs) ) {
sleepsecs(20);
continue;
}
}
break;
}
startupStatus = STARTED;
}
繼續看_loadConfigFinish,這個函式從可用配置中找出版本最高的一個配置,然後使用其做初始化.
bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
int v = -1;
ReplSetConfig *highest = 0;
int myVersion = -2000;
int n = 0;//選擇一個版本最高的config,每當修改一次配置,如rs.add,rs.remove,version加一
for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
ReplSetConfig& cfg = *i;
if( ++n == 1 ) myVersion = cfg.version;
if( cfg.ok() && cfg.version > v ) {
highest = &cfg;
v = cfg.version;
}
}
if( !initFromConfig(*highest) )//使用該config初始化replset
return false;
if( highest->version > myVersion && highest->version >= 0 ) {//儲存該配置
highest->saveConfigLocally(BSONObj());//儲存該config
}
return true;
}
_loadConfigFinish->initFromConfig,主要流程是對於每一個伺服器建立一個MemberCfg的結構,並對其啟動心跳協議.
bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf/*=false*/) {
lock lk(this);
if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) {
// see comment in dbcommands.cpp for getlasterrordefault
getLastErrorDefault = new BSONObj( c.getLastErrorDefaults );
}
list<ReplSetConfig::MemberCfg*> newOnes;
// additive short-cuts the new config setup. If we are just adding a
// node/nodes and nothing else is changing, this is additive. If it's
// not a reconfig, we're not adding anything
bool additive = reconf;
{
unsigned nfound = 0;
int me = 0;
for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
ReplSetConfig::MemberCfg& m = *i;
if( m.h.isSelf() ) {
me++;
}
if( reconf ) {//從新的配置
const Member *old = findById(m._id);
if( old ) {
nfound++;
if( old->config() != m ) {//同一臺伺服器配置配置更改了,如vote,priority更改
additive = false;
}
}
else {
newOnes.push_back(&m);
}
}
}//配置中沒有本機地址,進入RS_SHUNNED狀態,關閉所有連線,關閉心跳協議,重新進入載入配置狀態
if( me == 0 ) { // we're not in the config -- we must have been removed
if (state().shunned()) {
// already took note of our ejection from the set
// so just sit tight and poll again
return false;
}
_members.orphanAll();
// kill off rsHealthPoll threads (because they Know Too Much about our past)
endOldHealthTasks();
// close sockets to force clients to re-evaluate this member
MessagingPort::closeAllSockets(0);
// take note of our ejection
changeState(MemberState::RS_SHUNNED);
loadConfig(); // redo config from scratch
return false;
}
// if we found different members that the original config, reload everything
if( reconf && config().members.size() != nfound )
additive = false;
}
_cfg = new ReplSetConfig(c);
_name = config()._id;
// this is a shortcut for simple changes
if( additive ) {//reconfig配置的路徑
for( list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
ReplSetConfig::MemberCfg *m = *i;
Member *mi = new Member(m->h, m->_id, m, false);
/** we will indicate that new members are up() initially so that we don't relinquish our
primary state because we can't (transiently) see a majority. they should be up as we
check that new members are up before getting here on reconfig anyway.
*/
mi->get_hbinfo().health = 0.1;
_members.push(mi);//新新增的member,啟動心跳協議
startHealthTaskFor(mi);
}
// if we aren't creating new members, we may have to update the
// groups for the current ones
_cfg->updateMembers(_members);//更新replset集中的member
return true;
}
// start with no members. if this is a reconfig, drop the old ones.
_members.orphanAll();//這裡不只是初始化的配置,還可能是因為修改了某些member的配置來到這裡
endOldHealthTasks();//所以結束所有心跳協議
int oldPrimaryId = -1;
{
const Member *p = box.getPrimary();
if( p )
oldPrimaryId = p->id();
}
forgetPrimary();//重置primary為空,後面primary將重新設定
// not setting _self to 0 as other threads use _self w/o locking
int me = 0;
string members = "";
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = config().members.begin(); i != config().members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
members += ( members == "" ? "" : ", " ) + m.h.toString();
if( m.h.isSelf() ) {//該member是自己,且自己在配置前是primary,則再次將自己設定為primary,初始化時primary並不在這裡決定
mi = new Member(m.h, m._id, &m, true);
setSelfTo(mi);
if( (int)mi->id() == oldPrimaryId )
box.setSelfPrimary(mi);
}
else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
if( (int)mi->id() == oldPrimaryId )
box.setOtherPrimary(mi);
}
}
if( me == 0 ){
log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog;
}
else {//啟動心跳設定,每有一個member就需要一個執行緒與之通訊,每2s啟動一次連線
// Do this after we've found ourselves, since _self needs
// to be set before we can start the heartbeat tasks
for( Member *mb = _members.head(); mb; mb=mb->next() ) {
startHealthTaskFor( mb );
}
}
return true;
}
_loadConfigFinish->initFromConfig->startHealthTaskFor void ReplSetImpl::startHealthTaskFor(Member *m) {
ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
healthTasks.insert(task);
task::repeat(task, 2000);//這裡開啟一個新的執行緒,並與m指定的伺服器建立連線2000ms,執行一次replSetHeartbeat,檢視遠端伺服器是否可達
}
繼續來看ReplSetHealthPollTask執行命令的函式ReplSetHealthPollTask::doWork
void doWork() {
HeartbeatInfo mem = m;
HeartbeatInfo old = mem;
try {
BSONObj info;
int theirConfigVersion = -10000;//心跳協議檢視是否能夠連線遠端伺服器
bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
if (old.ping != 0) {//設定ping一次的時間
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
if( ok ) {//遠端伺服器可達,則嘗試將其加入到候選名單
up(info, mem);
}
else if (!info["errmsg"].eoo() &&//心跳協議顯示該機有問題,從候選名單中刪除
info["errmsg"].str() == "need to login") {//無法成為primary了
authIssue(mem);
}
else {//無法連線該機器
down(mem, info.getStringField("errmsg"));
}
}
catch(DBException& e) {
down(mem, e.what());
}
catch(...) {
down(mem, "replSet unexpected exception in ReplSetHealthPollTask");
}
m = mem;//更新該member的資訊,包括狀態如RS_STARTUP,RS_SECONDARY等
theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
static time_t last = 0;
time_t now = time(0);
bool changed = mem.changed(old);
if( changed ) {
if( old.hbstate != mem.hbstate )
log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
}
if( changed || now-last>4 ) {//需要進行一次狀態檢查.
last = now;
theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
}
}
_loadConfigFinish->initFromConfig->startHealthTaskFor->up
void up(const BSONObj& info, HeartbeatInfo& mem) {
HeartbeatInfo::numPings++;
mem.authIssue = false;
if( mem.upSince == 0 ) {
mem.upSince = mem.lastHeartbeat;
}
mem.health = 1.0;
mem.lastHeartbeatMsg = info["hbmsg"].String();
if( info.hasElement("opTime") )
mem.opTime = info["opTime"].Date();
// see if this member is in the electable set
if( info["e"].eoo() ) {
// for backwards compatibility
const Member *member = theReplSet->findById(mem.id());
if (member && member->config().potentiallyHot()) {//不是仲裁,且priority設定不為0,預設是1,為0則不可能成為primary
theReplSet->addToElectable(mem.id());
}
else {
theReplSet->rmFromElectable(mem.id());
}
}
// add this server to the electable set if it is within 10
// seconds of the latest optime we know of
else if( info["e"].trueValue() &&
mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
if (lastOp > 0 && mem.opTime >= lastOp - 10) {
theReplSet->addToElectable(mem.id());
}
}
else {
theReplSet->rmFromElectable(mem.id());
}
be cfg = info["config"];
if( cfg.ok() ) {//有新的config配置到來,更新配置
// received a new config
boost::function<void()> f =
boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
theReplSet->mgr->send(f);
}
}
void down(HeartbeatInfo& mem, string msg) {
mem.authIssue = false;//無法連線的伺服器,將其標誌為RS_DOWN,無法成為primary候選.
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
mem.downSince = jsTime();
mem.hbstate = MemberState::RS_DOWN;
log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
}
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
回到initFromConfig,該函式執行完畢,繼續回到ReplSetImpl,該物件構造完畢.回到startReplSets繼續
執行
(theReplSet = new ReplSet(*replSetCmdline))->go();
其執行的是ReplSetImpl::_go函式,繼續來看這裡的_go函式.
void ReplSetImpl::_go() {
loadLastOpTimeWritten();//得到最近一次寫local.oplog.rs的時間,初始化時在saveConfigLocally時第一次寫
changeState(MemberState::RS_STARTUP2);
startThreads();//開啟同步執行緒,讀取操作日誌的執行緒.
newReplUp(); // oplog.cpp設定新的log函式
}
void ReplSetImpl::startThreads() {
task::fork(mgr);//這裡啟動管理服務,可以通過如下mgr->send讓其執行send指定的函式,其內部是一個做服務的執行緒,接收執行任務,然後執行
mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
if (myConfig().arbiterOnly) {//該伺服器只執行仲裁動作
return;
}
boost::thread t(startSyncThread);//這個執行緒除了sync外還有一個功能將當前伺服器設定為secondary,初始化時到這裡其狀態為RS_STARTUP2
replset::BackgroundSync* sync = replset::BackgroundSync::get();
boost::thread producer(boost::bind(&replset::BackgroundSync::producerThread, sync));//為syncThread獲取同步資料
boost::thread notifier(boost::bind(&replset::BackgroundSync::notifierThread, sync));//為tags準備的,後面會有一篇文章專門講到replset tags
task::fork(ghost);
// member heartbeats are started in ReplSetImpl::initFromConfig
}
本文就分析到這裡,幾個執行緒的作用以及狀態的切換留待下文.總結:
本文分析replication replset模式初始化流程,初始化過程中主要是根據配置資訊在各個伺服器間建
立心跳協議,保證連線可達,根據連線資訊更新各個伺服器的狀態,為下一步選取primary做準備.
作者:yhjj0108,楊浩