ZMQ原始碼分析(七) --程序內通訊
之前兩節分析了zmq的tcp通訊流程,除了tcp之外,zmq還支援許多其他的通訊模式,比如inproc,ipc,pgm,epgm,tipc等。這一節接著分析inpro,即程序內通訊。
和tcp通訊相比,程序內通訊要簡單許多,因為不涉及到遠端連線的認證以及資料的編碼和解碼,只是簡單的在兩個socket_base_t之間連線一個pipe,通過pipe線上程間傳遞資料即可。
inproc通訊也兩個套接字分別呼叫bind和connect進行連線,但是同樣對順序沒有要求,下面分別看一下bind和connect對inproc的實現,首先是connect方法:
int zmq::socket_base_t::connect (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
if (unlikely (rc != 0))
return -1;
// Parse addr_ string.
std::string protocol;
std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
if (protocol == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_);
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int sndhwm = 0;
if (peer.socket == NULL)
sndhwm = options.sndhwm;
else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
sndhwm = options.sndhwm + peer.options.rcvhwm;
int rcvhwm = 0;
if (peer.socket == NULL)
rcvhwm = options.rcvhwm;
else
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create a bi-directional pipe to connect the peers.
object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
attach_pipe (new_pipes [0]);
if (!peer.socket) {
// The peer doesn't exist yet so we don't know whether
// to send the identity message or not. To resolve this,
// we always send our identity and drop it later if
// the peer doesn't expect it.
msg_t id;
rc = id.init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
const endpoint_t endpoint = {this, options};
pend_connection (std::string (addr_), endpoint, new_pipes);
}
else {
// If required, send the identity of the local socket to the peer.
if (peer.options.recv_identity) {
msg_t id;
rc = id.init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
}
// If required, send the identity of the peer to the local socket.
if (options.recv_identity) {
msg_t id;
rc = id.init_size (peer.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [1]->write (&id);
zmq_assert (written);
new_pipes [1]->flush ();
}
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, new_pipes [1], false);
}
// Save last endpoint URI
last_endpoint.assign (addr_);
// remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
return 0;
}
}
connect方法首先判斷需要連線的地址是否已經繫結過,之後建立pipepair,把其中的一條attach在自身,如果需要連線socket_base_t已經存在,則向它傳送繫結命令。這期間還會根據是否需要identity資訊來決定是否傳送identity訊息。如果需要連線的socket_base_t不存在,則connect方法呼叫pend_connection:
void zmq::ctx_t::pend_connection (const std::string &addr_,
const endpoint_t &endpoint_, pipe_t **pipes_)
{
const pending_connection_t pending_connection =
{endpoint_, pipes_ [0], pipes_ [1]};
endpoints_sync.lock ();
endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) {
// Still no bind.
endpoint_.socket->inc_seqnum ();
pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
}
else
// Bind has happened in the mean time, connect directly
connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
endpoints_sync.unlock ();
}
pend_connection呼叫endpoints_sync鎖,之後會在判斷需要連線的地址是否剛剛新增進來(其他執行緒的操作),如果有馬上呼叫connect_inproc_sockets,如果沒有則在pending_connections註冊這一條connect操作。
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
{
bind_socket_->inc_seqnum();
pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
if (!bind_options.recv_identity) {
msg_t msg;
const bool ok = pending_connection_.bind_pipe->read (&msg);
zmq_assert (ok);
const int rc = msg.close ();
errno_assert (rc == 0);
}
int sndhwm = 0;
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
int rcvhwm = 0;
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;
bool conflate = pending_connection_.endpoint.options.conflate &&
(pending_connection_.endpoint.options.type == ZMQ_DEALER ||
pending_connection_.endpoint.options.type == ZMQ_PULL ||
pending_connection_.endpoint.options.type == ZMQ_PUSH ||
pending_connection_.endpoint.options.type == ZMQ_PUB ||
pending_connection_.endpoint.options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);
if (side_ == bind_side) {
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pending_connection_.bind_pipe;
bind_socket_->process_command (cmd);
bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
}
else
pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
if (pending_connection_.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.bind_pipe->write (&id);
zmq_assert (written);
pending_connection_.bind_pipe->flush ();
}
}
connect_inproc_sockets操作主要是把pipe和socket_bast_t進行繫結。這裡需要注意identity,connect中如果判斷需要連線的socket_base_t不存在也會發送一條identity。所以bind socket建立之後如果不需要identity,要先讀出這條髒資料處理掉。
接下來看bind操作:
int zmq::socket_base_t::bind (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
if (unlikely (rc != 0))
return -1;
// Parse addr_ string.
std::string protocol;
std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
if (protocol == "inproc") {
const endpoint_t endpoint = { this, options };
const int rc = register_endpoint (addr_, endpoint);
if (rc == 0) {
connect_pending (addr_, this);
last_endpoint.assign (addr_);
}
return rc;
}
}
bind操作首先呼叫register_endpoint:
int zmq::ctx_t::register_endpoint (const char *addr_,
const endpoint_t &endpoint_)
{
endpoints_sync.lock ();
const bool inserted = endpoints.insert (
endpoints_t::value_type (std::string (addr_), endpoint_)).second;
endpoints_sync.unlock ();
if (!inserted) {
errno = EADDRINUSE;
return -1;
}
return 0;
}
該方法比較簡單,用endpoints儲存所有繫結的地址。socket_base_t之後呼叫connect_pending:
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
{
endpoints_sync.lock ();
std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
pending_connections.erase(pending.first, pending.second);
endpoints_sync.unlock ();
}
connect_pending檢查是否之前是否有socket_base_t請求連線剛剛bind的地址,如果有,分別呼叫connect_inproc_sockets進行連線,該方法上面已經分析過,注意該方法的最後一個引數是判斷呼叫該方法的執行緒和bind_socket是否在一個執行緒內,已決定command是需要傳送到郵箱中還是直接呼叫對應的處理方法。
連線建立好之後,執行緒間就可以通過socket_base_t互相通訊了。
之前說過,除了inproc外,zmq還提供了很多其他的通訊模式:
ipc:主要用於程序間通訊
pgm/epgm :多路廣播
tipc:基於tipc協議的通訊
這幾種通訊模式用到的比較少,這裡不做詳細分析(其實我也沒細看這幾種的實現方式)。