區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二
阿新 • • 發佈:2018-10-31
same 實例 reg acceptor sprintf acc .info sum adf 區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二
Fabric 1.0源代碼筆記 之 gossip(流言算法) #GossipServer(Gossip服務端)
5.2、commImpl結構體方法
//conn.serviceConnection(),啟動連接服務 func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error //return &proto.Empty{} func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) func (c *commImpl) GetPKIid() common.PKIidType //向指定節點發送消息 func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) //探測遠程節點是否有響應,_, err = cl.Ping(context.Background(), &proto.Empty{}) func (c *commImpl) Probe(remotePeer *RemotePeer) error //握手驗證遠程節點,_, err = cl.Ping(context.Background(), &proto.Empty{}) func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage func (c *commImpl) PresumedDead() <-chan common.PKIidType func (c *commImpl) CloseConn(peer *RemotePeer) func (c *commImpl) Stop() //創建並啟動gRPC Server,以及註冊GossipServer實例 func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType, //將GossipServer實例註冊至peerServer func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, func extractRemoteAddress(stream stream) string func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) //創建gRPC Server,grpc.NewServer(serverOpts...) func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte) //創建與服務端連接 func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) //向指定節點發送消息 func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) //return atomic.LoadInt32(&c.stopping) == int32(1) func (c *commImpl) isStopping() bool func (c *commImpl) emptySubscriptions() func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) func (c *commImpl) disconnect(pkiID common.PKIidType) func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error) //代碼在gossip/comm/comm_impl.go
5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)
創建並啟動gRPC Server,以及註冊GossipServer實例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType, ????secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) { ????var ll net.Listener ????var s *grpc.Server ????var certHash []byte ????if len(dialOpts) == 0 { ????????//peer.gossip.dialTimeout,gRPC連接撥號的超時 ????????dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))} ????} ????if port > 0 { ????????//創建gRPC Server,grpc.NewServer(serverOpts...) ????????s, ll, secureDialOpts, certHash = createGRPCLayer(port) ????} ????commInst := &commImpl{ ????????selfCertHash: certHash, ????????PKIID: idMapper.GetPKIidOfCert(peerIdentity), ????????idMapper: idMapper, ????????logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)), ????????peerIdentity: peerIdentity, ????????opts: dialOpts, ????????secureDialOpts: secureDialOpts, ????????port: port, ????????lsnr: ll, ????????gSrv: s, ????????msgPublisher: NewChannelDemultiplexer(), ????????lock: &sync.RWMutex{}, ????????deadEndpoints: make(chan common.PKIidType, 100), ????????stopping: int32(0), ????????exitChan: make(chan struct{}, 1), ????????subscriptions: make([]chan proto.ReceivedMessage, 0), ????} ????commInst.connStore = newConnStore(commInst, commInst.logger) ????if port > 0 { ????????commInst.stopWG.Add(1) ????????go func() { ????????????defer commInst.stopWG.Done() ????????????s.Serve(ll) //啟動gRPC Server ????????}() ????????//commInst註冊到gRPC Server ????????proto.RegisterGossipServer(s, commInst) ????} ????return commInst, nil } //代碼在gossip/comm/comm_impl.go
5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)
將GossipServer實例註冊至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, ????peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts, ????dialOpts ...grpc.DialOption) (Comm, error) { ????dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))) ????//構造commImpl ????commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...) ????if cert != nil { ????????inst := commInst.(*commImpl) ????????inst.selfCertHash = certHashFromRawCert(cert.Certificate[0]) ????} ????proto.RegisterGossipServer(s, commInst.(*commImpl)) ????return commInst, nil } //代碼在gossip/comm/comm_impl.go
//創建與服務端連接
5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
????var err error
????var cc *grpc.ClientConn
????var stream proto.Gossip_GossipStreamClient
????var pkiID common.PKIidType
????var connInfo *proto.ConnectionInfo
????var dialOpts []grpc.DialOption
????dialOpts = append(dialOpts, c.secureDialOpts()...)
????dialOpts = append(dialOpts, grpc.WithBlock())
????dialOpts = append(dialOpts, c.opts...)
????cc, err = grpc.Dial(endpoint, dialOpts...)
????cl := proto.NewGossipClient(cc)
????if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
????????cc.Close()
????????return nil, err
????}
????ctx, cf := context.WithCancel(context.Background())
????stream, err = cl.GossipStream(ctx)
????connInfo, err = c.authenticateRemotePeer(stream)
????pkiID = connInfo.ID
????conn := newConnection(cl, cc, stream, nil)
????conn.pkiID = pkiID
????conn.info = connInfo
????conn.logger = c.logger
????conn.cancel = cf
????h := func(m *proto.SignedGossipMessage) {
????????c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
????????????conn: conn,
????????????lock: conn,
????????????SignedGossipMessage: m,
????????????connInfo: connInfo,
????????})
????}
????conn.handler = h
????return conn, nil
}
//代碼在gossip/comm/comm_impl.go
6、connectionStore和connection結構體及方法
6.1、connection結構體及方法
type connection struct {
????cancel context.CancelFunc
????info *proto.ConnectionInfo
????outBuff chan *msgSending
????logger *logging.Logger // logger
????pkiID common.PKIidType // pkiID of the remote endpoint
????handler handler // function to invoke upon a message reception
????conn *grpc.ClientConn // gRPC connection to remote endpoint
????cl proto.GossipClient // gRPC stub of remote endpoint
????clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
????serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
????stopFlag int32 // indicates whether this connection is in process of stopping
????stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
????sync.RWMutex // synchronizes access to shared variables
}
//構造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//關閉connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m,其中m為msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同時msg := <-msgChan,conn.handler(msg)
func (conn *connection) serviceConnection() error
//循環不間斷從conn.outBuff取數據,然後stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循環不間斷envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//獲取conn.serverStream
func (conn *connection) getStream() stream
//代碼在gossip/comm/conn.go
6.2、connectionStore結構體及方法
type connectionStore struct {
????logger *logging.Logger // logger
????isClosing bool // whether this connection store is shutting down
????connFactory connFactory // creates a connection to remote peer
????sync.RWMutex // synchronize access to shared variables
????pki2Conn map[string]*connection //connection map, key為pkiID,value為connection
????destinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,
????// used to prevent concurrent connection establishment to the same remote endpoint
}
//構造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//從connection map中獲取連接,如無則創建並啟動連接,並寫入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//連接數量
func (cs *connectionStore) connNum() int
//關閉指定連接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//關閉所有連接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//註冊連接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//關閉指定連接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType)
//代碼在gossip/comm/conn.go
6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
????cs.RLock()
????isClosing := cs.isClosing
????cs.RUnlock()
????pkiID := peer.PKIID
????endpoint := peer.Endpoint
????cs.Lock()
????destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
????if !hasConnected {
????????destinationLock = &sync.RWMutex{}
????????cs.destinationLocks[string(pkiID)] = destinationLock
????}
????cs.Unlock()
????destinationLock.Lock()
????cs.RLock()
????//從connection map中獲取
????conn, exists := cs.pki2Conn[string(pkiID)]
????if exists {
????????cs.RUnlock()
????????destinationLock.Unlock()
????????return conn, nil
????}
????cs.RUnlock()
????//創建連接
????createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
????destinationLock.Unlock()
????conn = createdConnection
????cs.pki2Conn[string(createdConnection.pkiID)] = conn
????go conn.serviceConnection() //啟動連接的消息接收處理、以及向對方節點發送消息
????return conn, nil
}
//代碼在gossip/comm/conn.go
7、ChannelDeMultiplexer結構體及方法(多路復用器)
type ChannelDeMultiplexer struct {
????channels []*channel
????lock *sync.RWMutex
????closed int32
}
//構造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) == int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//關閉
func (m *ChannelDeMultiplexer) Close()
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{}
//挨個通道發送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{})
區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二