Hyperledger Fabric BYFN之配置進階篇
1. byfn.sh up down會清理所有容器映象,生成的配置檔案和證書,現實場景應該可以停止或恢復區塊鏈網路,該如何處理?
byfn.sh down會呼叫networkdDown去銷燬整個網路和已生成的配置。byfn.sh restart則不會清理。
# Tear down running network
function networkDown() {
# stop org3 containers also in addition to org1 and org2, in case we were running sample to add org3
docker-compose -f $COMPOSE_FILE -f $COMPOSE_FILE_COUCH -f $COMPOSE_FILE_ORG3 down --volumes --remove-orphans
# Don't remove the generated artifacts -- note, the ledgers are always removed
if [ "$MODE" != "restart" ]; then
# Bring down the network, deleting the volumes
#Delete any ledger backups
docker run -v $PWD:/tmp/first-network --rm hyperledger/fabric-tools:$IMAGETAG rm -Rf /tmp/first-network/ledgers-backup
#Cleanup the chaincode containers
clearContainers
#Cleanup images
removeUnwantedImages
# remove orderer block and other channel configuration transactions and certs
rm -rf channel-artifacts/*.block channel-artifacts/*.tx crypto-config ./org3-artifacts/crypto-config/ channel-artifacts/org3.json
# remove the docker-compose yaml file that was customized to the example
rm -f docker-compose-e2e.yaml
fi
}
# Generate the needed certificates, the genesis block and start the network.
function networkUp() {
checkPrereqs
# generate artifacts if they don't exist
if [ ! -d "crypto-config" ]; then
generateCerts
replacePrivateKey
generateChannelArtifacts
fi
if [ "${IF_COUCHDB}" == "couchdb" ]; then
IMAGE_TAG=$IMAGETAG docker-compose -f $COMPOSE_FILE -f $COMPOSE_FILE_COUCH up -d 2>&1
else
IMAGE_TAG=$IMAGETAG docker-compose -f $COMPOSE_FILE up -d 2>&1
fi
if [ $? -ne 0 ]; then
echo "ERROR !!!! Unable to start network"
exit 1
fi
# now run the end to end script
docker exec cli scripts/script.sh $CHANNEL_NAME $CLI_DELAY $LANGUAGE $CLI_TIMEOUT $VERBOSE
if [ $? -ne 0 ]; then
echo "ERROR !!!! Test failed"
exit 1
fi
}
注意
docker-compose down會停止和刪除容器,網路,映象和對映的卷。
只是停止服務的話的會最好使用docker-compose stop。
docker-compose up則是建立和啟動容器服務, 這裡用於啟動。
我們也可以參考例子fabric-samples/fabcar, 它會重用fabric-samples/basic-network中的start.sh, stop.sh允許停止和重啟。
2. byfn.sh還是fabcar兩個例子即使重啟區塊鏈網路,通道需要創新建立,節點需要重新加入通道,鏈碼也要全要重新安裝,如果節點多維護起來就麻煩且費時,重啟的時候能讓通道,節點,鏈碼,State DB自動恢復?
這自然是有的,但是必須開啟orderer, peer等的持久化配置。
以/fabric-samples/basic-networkd的docker-compose.yaml為例, 配置services, 容器路徑通常包含production的則是持久化路徑的對映配置, 見藍色部分。
orderer.example.com:
container_name: orderer.example.com
image: hyperledger/fabric-orderer
environment:
- ORDERER_GENERAL_LOGLEVEL=info
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_GENESISMETHOD=file
- ORDERER_GENERAL_GENESISFILE=/etc/hyperledger/configtx/genesis.block
- ORDERER_GENERAL_LOCALMSPID=OrdererMSP
- ORDERER_GENERAL_LOCALMSPDIR=/etc/hyperledger/msp/orderer/msp
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
ports:
- 7050:7050
volumes:
- ./config/:/etc/hyperledger/configtx
- ./crypto-config/ordererOrganizations/example.com/orderers/orderer.example.com/:/etc/hyperledger/msp/orderer
- ./crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/:/etc/hyperledger/msp/peerOrg1
- /mnt/hyperledger/orderer:/var/hyperledger/production/orderer
networks
peer0.org1.example.com:
container_name: peer0.org1.example.com
image: hyperledger/fabric-peer
environment:
- CORE_VM_ENDPOINT=unix:///host/var/run/docker.sock
- CORE_PEER_ID=peer0.org1.example.com
- CORE_LOGGING_PEER=info
- CORE_CHAINCODE_LOGGING_LEVEL=info
- CORE_PEER_LOCALMSPID=Org1MSP
- CORE_PEER_MSPCONFIGPATH=/etc/hyperledger/msp/peer/
- CORE_PEER_ADDRESS=peer0.org1.example.com:7051
# # the following setting starts chaincode containers on the same
# # bridge network as the peers
# # https://docs.docker.com/compose/networking/
- CORE_VM_DOCKER_HOSTCONFIG_NETWORKMODE=${COMPOSE_PROJECT_NAME}_basic
- CORE_LEDGER_STATE_STATEDATABASE=CouchDB
- CORE_LEDGER_STATE_COUCHDBCONFIG_COUCHDBADDRESS=couchdb:5984
# The CORE_LEDGER_STATE_COUCHDBCONFIG_USERNAME and CORE_LEDGER_STATE_COUCHDBCONFIG_PASSWORD
# provide the credentials for ledger to connect to CouchDB. The username and password must
# match the username and password set for the associated CouchDB.
- CORE_LEDGER_STATE_COUCHDBCONFIG_USERNAME=
- CORE_LEDGER_STATE_COUCHDBCONFIG_PASSWORD=
working_dir: /opt/gopath/src/github.com/hyperledger/fabric
command: peer node start
# command: peer node start --peer-chaincodedev=true
ports:
- 7051:7051
- 7053:7053
volumes:
- /var/run/:/host/var/run/
- ./crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/msp:/etc/hyperledger/msp/peer
- ./crypto-config/peerOrganizations/org1.example.com/users:/etc/hyperledger/msp/users
- ./config:/etc/hyperledger/configtx
- /mnt/hyperledger/org1/peer0:/var/hyperledger/production
depends_on:
- orderer.example.com
- couchdb
networks:
- basic
couchdb:
container_name: couchdb
image: hyperledger/fabric-couchdb
# Populate the COUCHDB_USER and COUCHDB_PASSWORD to set an admin user and password
# for CouchDB. This will prevent CouchDB from operating in an "Admin Party" mode.
environment:
- COUCHDB_USER=
- COUCHDB_PASSWORD=
ports:
- 5984:5984
networks:
- basic
volumes:
- /mnt/hyperledger/couchdb:/opt/couchdb/data
3. Peer節點預設使用level DB作為state DB, key-value鍵值對查詢較弱, couch DB支付富查詢,如何配置?
byfn.sh up -c mychannel -s couchdb 實際使用的配置檔案是cker-compose-couch.yaml,
services:
couchdb0:
container_name: couchdb0
image: hyperledger/fabric-couchdb
# Populate the COUCHDB_USER and COUCHDB_PASSWORD to set an admin user and password
# for CouchDB. This will prevent CouchDB from operating in an "Admin Party" mode.
environment:
- COUCHDB_USER=
- COUCHDB_PASSWORD=
# Comment/Uncomment the port mapping if you want to hide/expose the CouchDB service,
# for example map it to utilize Fauxton User Interface in dev environments.
ports:
- "5984:5984"
networks:
- byfn
peer0.org1.example.com:
environment:
- CORE_LEDGER_STATE_STATEDATABASE=CouchDB
- CORE_LEDGER_STATE_COUCHDBCONFIG_COUCHDBADDRESS=couchdb0:5984
# The CORE_LEDGER_STATE_COUCHDBCONFIG_USERNAME and CORE_LEDGER_STATE_COUCHDBCONFIG_PASSWORD
# provide the credentials for ledger to connect to CouchDB. The username and password must
# match the username and password set for the associated CouchDB.
- CORE_LEDGER_STATE_COUCHDBCONFIG_USERNAME=
- CORE_LEDGER_STATE_COUCHDBCONFIG_PASSWORD=
depends_on:
- couchdb0
peer主要設定連線的couchdb地址和賬號, couchdb主要是配置對應埠號,couchDB還支援一些欄位的索引,在學習鏈碼的時候我們再深入。
4. Peer節點我們配置了兩個埠,配置用來做什麼?
參看fabric-samples/first-network/base/docker-compose-base.yaml
peer0.org1.example.com:
container_name: peer0.org1.example.com
extends:
file: peer-base.yaml
service: peer-base
environment:
- CORE_PEER_ID=peer0.org1.example.com
- CORE_PEER_ADDRESS=peer0.org1.example.com:7051
- CORE_PEER_GOSSIP_BOOTSTRAP=peer1.org1.example.com:7051
- CORE_PEER_GOSSIP_EXTERNALENDPOINT=peer0.org1.example.com:7051
- CORE_PEER_LOCALMSPID=Org1MSP
volumes:
- /var/run/:/host/var/run/
- ../crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/msp:/etc/hyperledger/fabric/msp
- ../crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/tls:/etc/hyperledger/fabric/tls
- peer0.org1.example.com:/var/hyperledger/production
ports:
- 7051:7051
- 7053:7053
7051是Peer啟動的gRPC, 一般是客戶端應用接入。
7053是事件埠(Peer Event)
Fabric 1.1之前, 被定位為Event Hub(節點的訊息中心), 當Peer節點記賬本副本追加了新的區塊的時候,都會通知訂閱了這些訊息的客戶端應用。以fabcar的invoke.js程式碼為例
console.log(util.format(
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"',
proposalResponses[0].response.status, proposalResponses[0].response.message));
// build up the request for the orderer to have the transaction committed
var request = {
proposalResponses: proposalResponses,
proposal: proposal
};
// set the transaction listener and set a timeout of 30 sec
// if the transaction did not get committed within the timeout period,
// report a TIMEOUT status
var transaction_id_string = tx_id.getTransactionID(); //Get the transaction ID string to be used by the event processing
var promises = [];
var sendPromise = channel.sendTransaction(request);
promises.push(sendPromise); //we want the send transaction first, so that we know where to check status
// get an eventhub once the fabric client has a user assigned. The user
// is required bacause the event registration must be signed
let event_hub = fabric_client.newEventHub();
event_hub.setPeerAddr('grpc://localhost:7053');
// using resolve the promise so that result status may be processed
// under the then clause rather than having the catch clause process
// the status
let txPromise = new Promise((resolve, reject) => {
let handle = setTimeout(() => {
event_hub.disconnect();
resolve({event_status : 'TIMEOUT'}); //we could use reject(new Error('Trnasaction did not complete within 30 seconds'));
}, 3000);
event_hub.connect();
event_hub.registerTxEvent(transaction_id_string, (tx, code) => {
// this is the callback for transaction event status
// first some clean up of event listener
clearTimeout(handle);
event_hub.unregisterTxEvent(transaction_id_string);
event_hub.disconnect();
// now let the application know what happened
var return_status = {event_status : code, tx_id : transaction_id_string};
if (code !== 'VALID') {
console.error('The transaction was invalid, code = ' + code);
resolve(return_status); // we could use reject(new Error('Problem with the tranaction, event status ::'+code));
} else {
console.log('The transaction has been committed on peer ' + event_hub._ep._endpoint.addr);
resolve(return_status);
}
}, (err) => {
//this is the callback if something goes wrong with the event registration or processing
reject(new Error('There was a problem with the eventhub ::'+err));
});
});
promises.push(txPromise);
return Promise.all(promises);
Fabric 1.1之後peer event做了完全不同的設計,訊息的監聽不在peer節點了,而是基於channel,這樣設計提供了對Peer資料更細粒度的採訪控制和提供了接收訊息的可靠性。(官方文件是這麼扯,我也有點疑惑) 主要提供兩種服務,Deliver(通知提交到記賬本的整個區塊內容) 和DeliverFiltered(過濾一些區塊減少訊息通知返回的區塊大小)
有點晦澀, 找了下Java SDK的程式碼看下。
/**
* This code test the replay feature of the new peer event services.
* Instead of the default of starting the eventing peer to retrieve the newest block it sets it
* retrieve starting from the start parameter. Also checks with block and filterblock replays.
* Depends on end2end and end2endAndBackagain of have fully run to have the blocks need to work with.
*
* @param client
* @param replayTestChannel
* @param start
* @param stop
* @param useFilteredBlocks
* @throws InvalidArgumentException
*/
private void testPeerServiceEventingReplay(HFClient client, Channel replayTestChannel, final long start, final long stop,
final boolean useFilteredBlocks) throws InvalidArgumentException {
if (testConfig.isRunningAgainstFabric10()) {
return; // not supported for v1.0
}
assertFalse(replayTestChannel.isInitialized()); //not yet initialized
assertFalse(replayTestChannel.isShutdown()); // not yet shutdown.
//Remove all peers just have one ledger peer and one eventing peer.
List<Peer> savedPeers = new ArrayList<>(replayTestChannel.getPeers());
for (Peer peer : savedPeers) {
replayTestChannel.removePeer(peer);
}
assertTrue(savedPeers.size() > 1); //need at least two
Peer eventingPeer = savedPeers.remove(0);
eventingPeer = client.newPeer(eventingPeer.getName(), eventingPeer.getUrl(), eventingPeer.getProperties());
Peer ledgerPeer = savedPeers.remove(0);
ledgerPeer = client.newPeer(ledgerPeer.getName(), ledgerPeer.getUrl(), ledgerPeer.getProperties());
assertTrue(replayTestChannel.getPeers().isEmpty()); // no more peers.
assertTrue(replayTestChannel.getPeers(EnumSet.of(PeerRole.CHAINCODE_QUERY, PeerRole.ENDORSING_PEER)).isEmpty()); // just checking :)
assertTrue(replayTestChannel.getPeers(EnumSet.of(PeerRole.LEDGER_QUERY)).isEmpty()); // just checking
assertNotNull(client.getChannel(replayTestChannel.getName())); // should be known by client.
final PeerOptions eventingPeerOptions = createPeerOptions().setPeerRoles(EnumSet.of(PeerRole.EVENT_SOURCE));
if (useFilteredBlocks) {
eventingPeerOptions.registerEventsForFilteredBlocks();
}
if (-1L == stop) { //the height of the blockchain
replayTestChannel.addPeer(eventingPeer, eventingPeerOptions.startEvents(start)); // Eventing peer start getting blocks from block 0
} else {
replayTestChannel.addPeer(eventingPeer, eventingPeerOptions
.startEvents(start).stopEvents(stop)); // Eventing peer start getting blocks from block 0
}
//add a ledger peer
replayTestChannel.addPeer(ledgerPeer, createPeerOptions().setPeerRoles(EnumSet.of(PeerRole.LEDGER_QUERY)));
CompletableFuture<Long> done = new CompletableFuture<>(); // future to set when done.
// some variable used by the block listener being set up.
final AtomicLong bcount = new AtomicLong(0);
final AtomicLong stopValue = new AtomicLong(stop == -1L ? Long.MAX_VALUE : stop);
final Channel finalChannel = replayTestChannel;
final Map<Long, BlockEvent> blockEvents = Collections.synchronizedMap(new HashMap<>(100));
final String blockListenerHandle = replayTestChannel.registerBlockListener(blockEvent -> { // register a block listener
try {
final long blockNumber = blockEvent.getBlockNumber();
BlockEvent seen = blockEvents.put(blockNumber, blockEvent);
assertNull(format("Block number %d seen twice", blockNumber), seen);
assertTrue(format("Wrong type of block seen block number %d. expected filtered block %b but got %b",
blockNumber, useFilteredBlocks, blockEvent.isFiltered()),
useFilteredBlocks ? blockEvent.isFiltered() : !blockEvent.isFiltered());
final long count = bcount.getAndIncrement(); //count starts with 0 not 1 !
//out("Block count: %d, block number: %d received from peer: %s", count, blockNumber, blockEvent.getPeer().getName());
if (count == 0 && stop == -1L) {
final BlockchainInfo blockchainInfo = finalChannel.queryBlockchainInfo();
long lh = blockchainInfo.getHeight();
stopValue.set(lh - 1L); // blocks 0L 9L are on chain height 10 .. stop on 9
// out("height: %d", lh);
if (bcount.get() + start > stopValue.longValue()) { // test with latest count.
done.complete(bcount.get()); // report back latest count.
}
} else {
if (bcount.longValue() + start > stopValue.longValue()) {
done.complete(count);
}
}
} catch (AssertionError | Exception e) {
e.printStackTrace();
done.completeExceptionally(e);
}
});
try {
replayTestChannel.initialize(); // start it all up.
done.get(30, TimeUnit.SECONDS); // give a timeout here.
Thread.sleep(1000); // sleep a little to see if more blocks trickle in .. they should not
replayTestChannel.unregisterBlockListener(blockListenerHandle);
final long expectNumber = stopValue.longValue() - start + 1L; // Start 2 and stop is 3 expect 2
assertEquals(format("Didn't get number we expected %d but got %d block events. Start: %d, end: %d, height: %d",
expectNumber, blockEvents.size(), start, stop, stopValue.longValue()), expectNumber, blockEvents.size());
for (long i = stopValue.longValue(); i >= start; i--) { //make sure all are there.
final BlockEvent blockEvent = blockEvents.get(i);
assertNotNull(format("Missing block event for block number %d. Start= %d", i, start), blockEvent);
}
//light weight test just see if we get reasonable values for traversing the block. Test just whats common between
// Block and FilteredBlock.
int transactionEventCounts = 0;
int chaincodeEventsCounts = 0;
for (long i = stopValue.longValue(); i >= start; i--) {
final BlockEvent blockEvent = blockEvents.get(i);
// out("blockwalker %b, start: %d, stop: %d, i: %d, block %d", useFilteredBlocks, start, stopValue.longValue(), i, blockEvent.getBlockNumber());
assertEquals(useFilteredBlocks, blockEvent.isFiltered()); // check again
if (useFilteredBlocks) {
assertNull(blockEvent.getBlock()); // should not have raw block event.
assertNotNull(blockEvent.getFilteredBlock()); // should have raw filtered block.
} else {
assertNotNull(blockEvent.getBlock()); // should not have raw block event.
assertNull(blockEvent.getFilteredBlock()); // should have raw filtered block.
}
assertEquals(replayTestChannel.getName(), blockEvent.getChannelId());
for (BlockInfo.EnvelopeInfo envelopeInfo : blockEvent.getEnvelopeInfos()) {
if (envelopeInfo.getType() == TRANSACTION_ENVELOPE) {
BlockInfo.TransactionEnvelopeInfo transactionEnvelopeInfo = (BlockInfo.TransactionEnvelopeInfo) envelopeInfo;
assertTrue(envelopeInfo.isValid()); // only have valid blocks.
assertEquals(envelopeInfo.getValidationCode(), 0);
++transactionEventCounts;
for (BlockInfo.TransactionEnvelopeInfo.TransactionActionInfo ta : transactionEnvelopeInfo.getTransactionActionInfos()) {
// out("\nTA:", ta + "\n\n");
ChaincodeEvent event = ta.getEvent();
if (event != null) {
assertNotNull(event.getChaincodeId());
assertNotNull(event.getEventName());
chaincodeEventsCounts++;
}
}
} else {
assertEquals("Only non transaction block should be block 0.", blockEvent.getBlockNumber(), 0);
}
}
}
assertTrue(transactionEventCounts > 0);
if (expectNumber > 4) { // this should be enough blocks with CC events.
assertTrue(chaincodeEventsCounts > 0);
}
replayTestChannel.shutdown(true); //all done.
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
再結合初始化通道的程式碼
boolean everyOther = false;
for (String peerName : sampleOrg.getPeerNames()) {
String peerLocation = sampleOrg.getPeerLocation(peerName);
Properties peerProperties = testConfig.getPeerProperties(peerName);
Peer peer = client.newPeer(peerName, peerLocation, peerProperties);
final PeerOptions peerEventingOptions = // we have two peers on one use block on other use filtered
everyOther ?
createPeerOptions().registerEventsForBlocks().setPeerRoles(EnumSet.of(PeerRole.ENDORSING_PEER, PeerRole.LEDGER_QUERY, PeerRole.CHAINCODE_QUERY, PeerRole.EVENT_SOURCE)) :
createPeerOptions().registerEventsForFilteredBlocks().setPeerRoles(EnumSet.of(PeerRole.ENDORSING_PEER, PeerRole.LEDGER_QUERY, PeerRole.CHAINCODE_QUERY, PeerRole.EVENT_SOURCE));
newChannel.addPeer(peer, IS_FABRIC_V10 ?
createPeerOptions().setPeerRoles(EnumSet.of(PeerRole.ENDORSING_PEER, PeerRole.LEDGER_QUERY, PeerRole.CHAINCODE_QUERY)) : peerEventingOptions);
everyOther = !everyOther;
}
Fabric-SDK在設計的時候,無論是查詢,更新,大多操作都是基於org.hyperledger.fabric.sdk.Channel作為入口, 例如:
Collection<ProposalResponse> queryProposals = channel.queryByChaincode(queryByChaincodeRequest, channel.getPeers());
Collection<ProposalResponse> transactionPropResp = channel.sendTransactionProposal(transactionProposalRequest, channel.getPeers());
Event Hub基於單個Peer去監聽它的訊息,可能單節點會不穩定等,而通道包含多節點,訊息通知可能會更穩定一些,這個7053埠應該是專門給1.1前的Event Hub版本用的,1.1之後的具體怎麼回撥通知, 具體實現還有待查閱原始碼, 這裡是保守的猜測吧。
5. Orderer大多例子都是SOLO配置的,Kafka如何配置?
老實說要用好Kafka裡面蠻多引數需要考量,畢竟Orderer叢集是共識實現的重點,後面我們專門探討下kafka實現的共識和配置。
暫時先參考官方文件
https://hyperledger-fabric.readthedocs.io/en/release-1.2/kafka.html?highlight=kafka
更多請關注