1. 程式人生 > >利用python多執行緒實現區塊鏈paxos演算法解決使用者共識

利用python多執行緒實現區塊鏈paxos演算法解決使用者共識

本來是做python 開發的老闆最近讓做區塊鏈也沒有頭緒從網上找了點視訊看著照的敲了了下感覺是自己寫過最多的程式碼啦 怪我才疏學淺  哈哈哈--

import threading, socket, pickle, queue

class Message(object):
    MSG_ACCEPTOR_AGREE = 0
    MSG_ACCEPTOR_ACCEPT = 1
    MSG_ACCEPTOR_REJECT = 2
    MSG_ACCEPTOR_UNACCEPT = 3
    MSG_ACCEPT = 4
    MSG_PROPOSE = 5
    MSG_EXT_PROPOSE = 6
    MSG_HEARTBEAT = 7

    def __init__(self, command=None):
        self.command = command

    def copyAsReply(self, message):
        self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
        self.value = message.value


class MessagePump(threading.Thread):
    #“”“messagepump封裝socket連線,並負責送郵件的主人”“”
    class MPHelper(threading.Thread):
        #“”這個幫助類的原因是要儘可能快地從套接字上拔出東西,以避免
        #填充緩衝區。回想起來,使用TCP可能更容易些:“”
        def __init__(self, owner):
            self.owner = owner
            threading.Thread.__init__(self)

        def run(self):
            while not self.owner.abort:
                try:
                    (bytes, addr) = self.owner.socket.recvfrom(2048)
                    msg = pickle.loads(bytes)
                    msg.source = addr[1]
                    self.owner.queue.put(msg)
                except:
                    pass

    def __init__(self, owner, port, timeout=2):
        self.owner = owner
        threading.Thread.__init__(self)
        self.abort = False
        self.timeout = 2
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000)
        self.socket.bind(("localhost", port))
        self.socket.settimeout(timeout)
        self.queue = queue.Queue()
        self.helper = MessagePump.MPHelper(self)

    def run(self):
        self.helper.start()
        while not self.abort:
            message = self.waitForMessage()
            # 這需要被阻塞,否則有一個世界
            #多執行緒在等待我們  # 疼痛
            self.owner.recvMessage(message)

    def waitForMessage(self):
        try:
            msg = self.queue.get(True, 3)
            return msg
        except:  # ugh, specialise the exception!
            return None

    def sendMessage(self, message):
        bytes = pickle.dumps(message)
        address = ("localhost", message.to)
        self.socket.sendto(bytes, address)
        return True

    def doAbort(self):
        self.abort = True


import random


class AdversarialMessagePump(MessagePump):
 #“”對抗性訊息泵隨機延遲訊息並以任意順序傳遞它們“”
    def __init__(self, owner, port, timeout=2):
        MessagePump.__init__(self, owner, port, timeout)
        self.messages = set()

    def waitForMessage(self):
        try:
            msg = self.queue.get(True, 0.1)
            self.messages.add(msg)
        except:  # ugh, specialise the exception!
            pass
        if len(self.messages) > 0 and random.random() < 0.95:  # Arbitrary!
            msg = random.choice(list(self.messages))
            self.messages.remove(msg)
        else:
            msg = None
        return msg


class InstanceRecord(object):
    #這是一個簿記類,它記錄了我們在某一特定記錄中所看到或承諾的所有建議,承兌人和領導者“

    def __init__(self):
        self.protocols = {}
        self.highestID = (-1, -1)
        self.value = None

    def addProtocol(self, protocol):
        self.protocols[protocol.proposalID] = protocol
        if protocol.proposalID[1] > self.highestID[1] or (
                protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
            self.highestID = protocol.proposalID

    def getProtocol(self, protocolID):
        return self.protocols[protocolID]

    def cleanProtocols(self):
        keys = self.protocols.keys()
        for k in keys:
            protocol = self.protocols[k]
            if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
                print( "Deleting protocol")

                del self.protocols[k]


class PaxosLeader(object):
    def __init__(self, port, leaders=None, acceptors=None):
        self.port = port
        if leaders == None:
            self.leaders = []
        else:
            self.leaders = leaders
        if acceptors == None:
            self.acceptors = []
        else:
            self.acceptors = acceptors
        self.group = self.leaders + self.acceptors
        self.isPrimary = False
        self.proposalCount = 0
        self.msgPump = MessagePump(self, port)
        self.instances = {}
        self.hbListener = PaxosLeader.HeartbeatListener(self)
        self.hbSender = PaxosLeader.HeartbeatSender(self)
        self.highestInstance = -1
        self.stopped = True
        # 上一次我們試圖彌補任何差距
        self.lasttime = time.time()

    # ------------------------------------------------------
    # 這兩類偵聽來自其他領導人的心跳
    # ,如果沒有出現,告訴領導應該
    # 是主要的

    class HeartbeatListener(threading.Thread):
        def __init__(self, leader):
            self.leader = leader
            self.queue = queue.Queue()
            self.abort = False
            threading.Thread.__init__(self)

        def newHB(self, message):
            self.queue.put(message)

        def doAbort(self):
            self.abort = True

        def run(self):
            elapsed = 0
            while not self.abort:
                s = time.time()
                try:
                    hb = self.queue.get(True, 2)
                    # 簡單的方法來解決衝突——如果你的埠號,比我的大,
                    # 你成為領導者
                    if hb.source > self.leader.port:
                        self.leader.setPrimary(False)
                except:  # Nothing was got
                    self.leader.setPrimary(True)

    class HeartbeatSender(threading.Thread):
        def __init__(self, leader):
            self.leader = leader
            self.abort = False
            threading.Thread.__init__(self)

        def doAbort(self):
            self.abort = True

        def run(self):
            while not self.abort:
                time.sleep(1)
                if self.leader.isPrimary:
                    msg = Message(Message.MSG_HEARTBEAT)
                    msg.source = self.leader.port
                    for l in self.leader.leaders:
                        msg.to = l
                        self.leader.sendMessage(msg)

    # ------------------------------------------------------
    def sendMessage(self, message):
        self.msgPump.sendMessage(message)

    def start(self):
        self.hbSender.start()
        self.hbListener.start()
        self.msgPump.start()
        self.stopped = False

    def stop(self):
        self.hbSender.doAbort()
        self.hbListener.doAbort()
        self.msgPump.doAbort()
        self.stopped = True

    def setPrimary(self, primary):
        if self.isPrimary != primary:
            # Only print if something's changed
            if primary:
                print( "I (%s) am the leader" % self.port)

            else:
                print("I (%s) am NOT the leader" % self.port)

        self.isPrimary = primary

    # ------------------------------------------------------

    def getGroup(self):
        return self.group

    def getLeaders(self):
        return self.leaders

    def getAcceptors(self):
        return self.acceptors

    def getQuorumSize(self):
        return (len(self.getAcceptors()) / 2) + 1

    def getInstanceValue(self, instanceID):
        if instanceID in self.instances:
            return self.instances[instanceID].value
        return None

    def getHistory(self):
        return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]

    def getNumAccepted(self):
        return len([v for v in self.getHistory() if v != None])

    # ------------------------------------------------------

    def findAndFillGaps(self):
        # 如果沒有收到訊息,我們就利用這個機會做一點清理工作。
        for i in range(1, self.highestInstance):
            if self.getInstanceValue(i) == None:
                print("Filling in gap", i)

                self.newProposal(0,
                                 i)  # 這要麼最終提交一個已經接受的值,要麼填充0或沒有OP的空白。
        self.lasttime = time.time()

    def garbageCollect(self):
        for i in self.instances:
            self.instances[i].cleanProtocols()

    def recvMessage(self, message):
        """訊息泵會週期性地呼叫它,即使沒有訊息可用。"""
        if self.stopped: return
        if message == None:
            # 只有每天15s否則你就切好協議了風華正茂的風險:(
            if self.isPrimary and time.time() - self.lasttime > 15.0:
                self.findAndFillGaps()
                self.garbageCollect()
            return
        if message.command == Message.MSG_HEARTBEAT:
            self.hbListener.newHB(message)
            return True
        if message.command == Message.MSG_EXT_PROPOSE:
            print("External proposal received at", self.port, self.highestInstance)

            if self.isPrimary:
                self.newProposal(message.value)
            # 別人忽略我們得到的建議時,我們不是主要的

            # 做,我們應該如果我們是善良,是一個訊息說“領導回覆了”

            # 並給予一個新的地址。然而,我們也可能失敗了。
            return True
        if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
            self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)


            # 可能,雖然我們仍然認為我們的主,我們會得到一個

            # 接受訊息,我們只是在聽。

            # 我們聽力都接受感興趣,所以我們一起玩,假裝我們有協議

            # ,越來越接受和聽的一個群體和往常一樣
        if message.command == Message.MSG_ACCEPTOR_ACCEPT:
            if message.instanceID not in self.instances:
                self.instances[message.instanceID] = InstanceRecord()
            record = self.instances[message.instanceID]
            if message.proposalID not in record.protocols:
                protocol = PaxosLeaderProtocol(self)
                # 我們只是按摩這個協議,在等待接受狀態。
                protocol.state = PaxosLeaderProtocol.STATE_AGREED
                protocol.proposalID = message.proposalID
                protocol.instanceID = message.instanceID
                protocol.value = message.value
                record.addProtocol(protocol)
            else:
                protocol = record.getProtocol(message.proposalID)
            # 如果我們啟動了這個協議例項,就應該到達這裡
            protocol.doTransition(message)
        return True

    def newProposal(self, value, instance=None):
        protocol = PaxosLeaderProtocol(self)
        if instance == None:
            self.highestInstance += 1
            instanceID = self.highestInstance
        else:
            instanceID = instance
        self.proposalCount += 1
        id = (self.port, self.proposalCount)
        if instanceID in self.instances:
            record = self.instances[instanceID]
        else:
            record = InstanceRecord()
            self.instances[instanceID] = record
        protocol.propose(value, id, instanceID)
        record.addProtocol(protocol)

    def notifyLeader(self, protocol, message):
        #協議完成後呼叫這個
        if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
            print( "Protocol instance %s accepted with value %s" % (message.instanceID, message.value))
            self.instances[message.instanceID].accepted = True
            self.instances[message.instanceID].value = message.value
            self.highestInstance = max(message.instanceID, self.highestInstance)
            return
        if protocol.state == PaxosLeaderProtocol.STATE_REJECTED:
            # 看訊息找到的值,然後重試
            # 最終,假設受眾會接受一些價值
            # 這一例項,該協議將完成。
            self.proposalCount = max(self.proposalCount, message.highestPID[1])
            self.newProposal(message.value)
            return True
        if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
            pass


class PaxosLeaderProtocol(object):
    # State variables
    STATE_UNDEFINED = -1
    STATE_PROPOSED = 0
    STATE_AGREED = 1
    STATE_REJECTED = 2
    STATE_ACCEPTED = 3
    STATE_UNACCEPTED = 4

    def __init__(self, leader):
        self.leader = leader
        self.state = PaxosLeaderProtocol.STATE_UNDEFINED
        self.proposalID = (-1, -1)
        self.agreecount, self.acceptcount = (0, 0)
        self.rejectcount, self.unacceptcount = (0, 0)
        self.instanceID = -1
        self.highestseen = (0, 0)

    def propose(self, value, pID, instanceID):
        self.proposalID = pID
        self.value = value
        self.instanceID = instanceID
        message = Message(Message.MSG_PROPOSE)
        message.proposalID = pID
        message.instanceID = instanceID
        message.value = value
        for server in self.leader.getAcceptors():
            message.to = server
            self.leader.sendMessage(message)
        self.state = PaxosLeaderProtocol.STATE_PROPOSED
        return self.proposalID

    def doTransition(self, message):

        """We run the protocol like a simple state machine. It's not always
                okay to error on unexpected inputs, however, due to message delays, so we silently
                ignore inputs that we're not expecting."""
        """我們像簡單的狀態機那樣執行協議。並非總是如此。

        可以在意外的輸入錯誤,但是,由於訊息延遲,所以我們默默地

        忽略我們不期望的輸入。"""
        if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
            if message.command == Message.MSG_ACCEPTOR_AGREE:
                self.agreecount += 1
                if self.agreecount >= self.leader.getQuorumSize():
                    #                    print "Achieved agreement quorum, last value replied was:", message.value
                    if message.value != None:  # If it's none, can do what we like. Otherwise we have to take the highest seen proposal
                        if message.sequence[0] > self.highestseen[0] or (
                                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
                            1]):
                            self.value = message.value
                            self.highestseen = message.sequence
                    self.state = PaxosLeaderProtocol.STATE_AGREED
                    # Send 'accept' message to group
                    msg = Message(Message.MSG_ACCEPT)
                    msg.copyAsReply(message)
                    msg.value = self.value
                    msg.leaderID = msg.to
                    for s in self.leader.getAcceptors():
                        msg.to = s
                        self.leader.sendMessage(msg)
                    self.leader.notifyLeader(self, message)
                return True
            if message.command == Message.MSG_ACCEPTOR_REJECT:
                self.rejectcount += 1
                if self.rejectcount >= self.leader.getQuorumSize():
                    self.state = PaxosLeaderProtocol.STATE_REJECTED
                    self.leader.notifyLeader(self, message)
                return True
        if self.state == PaxosLeaderProtocol.STATE_AGREED:
            if message.command == Message.MSG_ACCEPTOR_ACCEPT:
                self.acceptcount += 1
                if self.acceptcount >= self.leader.getQuorumSize():
                    self.state = PaxosLeaderProtocol.STATE_ACCEPTED
                    self.leader.notifyLeader(self, message)
            if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
                self.unacceptcount += 1
                if self.unacceptcount >= self.leader.getQuorumSize():
                    self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
                    self.leader.notifyLeader(self, message)
        pass


class PaxosAcceptor(object):
    def __init__(self, port, leaders):
        self.port = port
        self.leaders = leaders
        self.instances = {}
        self.msgPump = MessagePump(self, self.port)
        self.failed = False

    def start(self):
        self.msgPump.start()

    def stop(self):
        self.msgPump.doAbort()

    def fail(self):
        self.failed = True

    def recover(self):
        self.failed = False

    def sendMessage(self, message):
        self.msgPump.sendMessage(message)

    def recvMessage(self, message):
        if message == None: return
        if self.failed:
            return  # Failure means ignored and lost messages
        if message.command == Message.MSG_PROPOSE:
            if message.instanceID not in self.instances:
                record = InstanceRecord()
                self.instances[message.instanceID] = record
            protocol = PaxosAcceptorProtocol(self)
            protocol.recvProposal(message)
            self.instances[message.instanceID].addProtocol(protocol)
        else:
            self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

    def notifyClient(self, protocol, message):
        if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED:
            self.instances[protocol.instanceID].value = message.value
            #            print "Proposal accepted at client: ", message.value

    def getHighestAgreedProposal(self, instance):
        return self.instances[instance].highestID

    def getInstanceValue(self, instance):
        return self.instances[instance].value


class PaxosAcceptorProtocol(object):
    # State variables
    STATE_UNDEFINED = -1
    STATE_PROPOSAL_RECEIVED = 0
    STATE_PROPOSAL_REJECTED = 1
    STATE_PROPOSAL_AGREED = 2
    STATE_PROPOSAL_ACCEPTED = 3
    STATE_PROPOSAL_UNACCEPTED = 4

    def __init__(self, client):
        self.client = client
        self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

    def recvProposal(self, message):
        if message.command == Message.MSG_PROPOSE:
            self.proposalID = message.proposalID
            self.instanceID = message.instanceID
            # What's the highest already agreed proposal for this instance?
            (port, count) = self.client.getHighestAgreedProposal(message.instanceID)
            # Check if this proposal is numbered higher
            if count < self.proposalID[0] or (count == self.proposalID[0] and port < self.proposalID[1]):
                # Send agreed message back, with highest accepted value (if it exists)
                self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED
                #                print "Agreeing to proposal: ", message.instanceID, message.value
                value = self.client.getInstanceValue(message.instanceID)
                msg = Message(Message.MSG_ACCEPTOR_AGREE)
                msg.copyAsReply(message)
                msg.value = value
                msg.sequence = (port, count)
                self.client.sendMessage(msg)
            else:
                # Too late, we already told someone else we'd do it
                # Send reject message, along with highest proposal id and its value
                self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
            return self.proposalID
        else:
            # error, trying to receive a non-proposal?
            pass

    def doTransition(self, message):
        if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
            self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED
            # Could check on the value here, if we don't trust leaders to honour what we tell them
            # send reply to leader acknowledging
            msg = Message(Message.MSG_ACCEPTOR_ACCEPT)
            msg.copyAsReply(message)
            for l in self.client.leaders:
                msg.to = l
                self.client.sendMessage(msg)
            self.notifyClient(message)
            return True

        raise Exception("Unexpected state / command combination!")

    def notifyClient(self, message):
        self.client.notifyClient(self, message)


import time

if __name__ == '__main__':
    numclients = 5
    clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
    leader = PaxosLeader(54321, [54322], [c.port for c in clients])
    leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])
    leader.start()
    leader.setPrimary(True)
    leader2.setPrimary(True)
    leader2.start()
    for c in clients:
        c.start()

    clients[0].fail()
    clients[1].fail()
    #    clients[2].fail( )

    # Send some proposals through to test
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    start = time.time()
    for i in range(1000):
        m = Message(Message.MSG_EXT_PROPOSE)
        m.value = 0 + i
        m.to = 54322
        bytes = pickle.dumps(m)
        s.sendto(bytes, ("localhost", m.to))

    while leader2.getNumAccepted() < 999:
        print(        "Sleeping for 1s -- accepted:", leader2.getNumAccepted())
        time.sleep(1)
    end = time.time()
    print("Sleeping for 10s")
    time.sleep(10)
    print("Stopping leaders")
    leader.stop()
    leader2.stop()
    print("Stopping clients")
    for c in clients:
        c.stop()
    print("Leader 1 history: ", leader.getHistory())
    print( "Leader 2 history: ", leader2.getHistory())
    print(end - start)