死磕hyperledger fabric源码|Peer节点启动
文章及代码:https://github.com/blockchainGuide/
分支:v1.1.0
启动流程概述 入口:peer/main.go:
main
()函数负责初始化peer
主命令对象,注册子命令与初始化环境配置,解析用户输入子命令start
并启动Peer节点,包括如下流程步骤:
定义、注册命令与初始化基本配置。基于Cobra
组件定义peer主命令对象mainCmd
,并通过Viper
组件调用InitConfig
()函数,从本地core.yaml
配置文件、环境变量、命令行选项等读取与解析peer
命令的相关配置。同时,初始化主命令mainCmd
的标志位选项 version
、logging-level
等,然后在主命令mainCmd
上注册version、node、chaincode、channel
等子命令,设置最大可用CPU
核数与日志后端;
初始化本地MSP
组件。通过Viper
组件获取MSP
组件的配置文件路径mspMgrConfigDir
、BCCSP
配置项bccspConfig
、MSP名称ID即localMSPID、MSP
组件类型localMSPType
等,基于这4个参数构造本地MSP
配置对象,接着创建默认的bccspmsp
结构对象作为本地MSP
组件,并解析MSP
配置对象与初始化本地MSP
组件;
执行mainCmd.Execute()方法启动Peer
节点
接下来将会分别对这几个关键部分进行细说。
定义、注册命令与初始化基本配置 定义主命令 代码分析 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 var mainCmd = &cobra.Command{ Use: "peer" , PersistentPreRunE: func (cmd *cobra.Command, args []string ) error { var loggingSpec string if viper.GetString("logging_level" ) != "" { loggingSpec = viper.GetString("logging_level" ) } else { loggingSpec = viper.GetString("logging.level" ) } flogging.InitFromSpec(loggingSpec) return nil }, Run: func (cmd *cobra.Command, args []string ) { if versionFlag { fmt.Print(version.GetInfo()) } else { cmd.HelpFunc()(cmd, args) } },
注册子命令 将几类子命令注册到主命令上,种类如下:
channel通道子命令:用于创建应用通道、获取区块、Peer节点加入应用通道、获取节点所加入的应用通道列表、更新应用通道配置、签名配置交易文件、获取指定的应用通道信息等,包括create、fetch、join、list、update、signconfigtx、getinfo
等子命令;
chaincode链码子命令:用于安装链码、实例化(部署)链码、调用链码、打包链码、查询链码、签名链码包、升级链码、获取通道链码列表等,包括install、instantiate、invoke、package、query、signpackage、upgrade、list
等子命令;
node节点子命令:用于管理节点服务进程与查询服务状态,包括start、status
等子命令;
logging日志子命令:用于获取、设置与恢复日志级别功能,包括getlevel、setlevel、 revertlevels
等子命令;
version版本子命令:用于打印Fabric
中的Peer
节点服务器版本信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 viper.SetEnvPrefix(cmdRoot) viper.AutomaticEnv() replacer := strings.NewReplacer("." , "_" ) viper.SetEnvKeyReplacer(replacer) mainFlags := mainCmd.PersistentFlags() mainFlags.BoolVarP(&versionFlag, "version" , "v" , false , "Display current version of fabric peer server" ) mainFlags.String("logging-level" , "" , "Default logging level and overrides, see core.yaml for full syntax" ) viper.BindPFlag("logging_level" , mainFlags.Lookup("logging-level" )) mainCmd.AddCommand(version.Cmd()) mainCmd.AddCommand(node.Cmd()) mainCmd.AddCommand(chaincode.Cmd(nil )) mainCmd.AddCommand(clilogging.Cmd(nil )) mainCmd.AddCommand(channel.Cmd(nil )) err := common.InitConfig(cmdRoot) ... runtime.GOMAXPROCS(viper.GetInt("peer.gomaxprocs" )) flogging.InitBackend(flogging.SetFormat(viper.GetString("logging.format" )), logOutput)
初始化本地MSP组件 MSP
组件是管理本地成员身份的重要安全模块,封装了根CA
证书、本地签名者实体等.
1 2 3 4 5 6 7 8 9 10 var mspMgrConfigDir = config.GetPath("peer.mspConfigPath" ) var mspID = viper.GetString("peer.localMspId" ) var mspType = viper.GetString("peer.localMspType" ) if mspType == "" { mspType = msp.ProviderTypeToString(msp.FABRIC) } err = common.InitCrypto(mspMgrConfigDir, mspID, mspType)
执行主命令 函数如下:
1 if mainCmd.Execute() != nil {}
接下来进入到Execute()
函数中继续分析: /vendor/github.com/spf13/cobra/command.go/ExecuteC()
1 func (c *Command) ExecuteC() (cmd *Command, err error ) {}
通过Cobra
组件调用主命令Execute
()方法,执行peer node start
命令启动Peer
节点。其中,Cobra
组件解析完用户输入的命令行选项之后,依次执行节点启动命令nodeStartCmd对象中定义的所有相关的执行方法,并按照cobra.Command
命令中定义的如下顺序来执行
PersistentPreRunE()/PersistentPreRun();
PreRunE()/PreRun();
RunE()/Run();
PostRunE()/PostRun();
PersistentPostRunE()/PersistentPostRun();
到这里为止节点命令开启执行,因为这部分主要是讲的节点启动,所以下面集中将节点启动命令执行的运行流程。
节点启动命令执行 节点启动的命令可以根据以下代码路径查找:
1 mainCmd.AddCommand(node.Cmd())
1 2 3 4 5 func Cmd () *cobra.Command { nodeCmd.AddCommand(startCmd()) nodeCmd.AddCommand(statusCmd()) return nodeCmd }
这里只讨论节点启动命令
1 2 3 4 func startCmd () *cobra.Command {... return nodeStartCmd }
1 2 3 4 5 var nodeStartCmd = &cobra.Command{... return serve(args) }, }
正式进入到serve函数讨论:
初始化资源 ①:获取本地MSP组件类型并检查MSP组件类型
目前,Hyperledger Fabric
支持FABRIC
类型和IDEMIX
类型两种MSP
组件,默认采用基于BCCSP
组件构建的FABRIC
类型MSP
组件.
1 2 3 4 mspType := mgmt.GetLocalMSP().GetType() if mspType != msp.FABRIC { panic ("Unsupported msp type " + msp.ProviderTypeToString(mspType)) }
②:初始化资源访问策略提供者
1 aclmgmt.RegisterACLProvider(nil )
③:初始化本地账本管理器
1 ledgermgmt.Initialize(peer.ConfigTxProcessors)
core/ledger/ledgermgmt/ledger_mgmt.go/initialize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func initialize (customTxProcessors customtx.Processors) { logger.Info("Initializing ledger mgmt" ) lock.Lock() defer lock.Unlock() initialized = true openedLedgers = make (map [string ]ledger.PeerLedger) customtx.Initialize(customTxProcessors) cceventmgmt.Initialize() provider, err := kvledger.NewProvider() if err != nil { panic (fmt.Errorf("Error in instantiating ledger provider: %s" , err)) } provider.Initialize(kvLedgerStateListeners) ledgerProvider = provider logger.Info("ledger mgmt initialized" ) }
正本提供者有以下几种:
账本ID数据库(idStore类型):提供存储账本ID(即链ID)与创世区块键值对的LevelDB
数据库;
账本数据存储对象提供者(ledgerstorage.Provider类型):创建账本数据存储对象,负责管理区块数据文件、隐私数据库、区块索引数据库等;
历史数据库提供者(HistoryDBProvider类型):创建历史数据库,存储每个状态数据的历史信息;
状态数据库提供者(CommonStorageDBProvider类型):创建状态数据库(LevelDB
或CouchDB
类型),存储世界状态(world state
),包括有效交易的公有数据与隐私数据。
④:初始化服务器参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if chaincodeDevMode { viper.Set("chaincode.mode" , chaincode.DevModeUserRunsChaincode) } if err := peer.CacheConfiguration(); err != nil { return err } peerEndpoint, err := peer.GetPeerEndpoint() ... var peerHost string peerHost, _, err = net.SplitHostPort(peerEndpoint.Address)
创建GRPC服务器 ①:创建gRPC服务器
serve()
函数创建了至少 3 个gRPC
服务器(独立端口),用于注册Peer
节点功能服务器,如下所示:
序号
端口
功能服务器
说明
服务接口
1
7051
DeliverEvents事件服务器
处理区块请求消息
Deliver()
7051
Admin服务器
获取节点状态、维护日志等
GetStatus()
7051
Endorser背书服务器
提供背书服务
ProcessProposal()
7051
Gossip消息服务器
组织内节点分发数据与同步状态等
GossipStream()
2
7052
chaincodeSupport链码支持服务器
提供Peer节点链码支持服务
Register()
3
7053
EventHub事件服务器
提供订阅事件服务(1.3.0废弃)
Chat()
1 2 3 serverConfig, err := peer.GetServerConfig() ... peerServer, err := peer.CreatePeerServer(listenAddr, serverConfig)
②:创建EventHub事件服务器
1 2 3 4 5 6 7 8 9 10 11 if serverConfig.SecOpts.UseTLS { ... cs := comm.GetCredentialSupport() cs.ServerRootCAs = serverConfig.SecOpts.ServerRootCAs clientCert, err := peer.GetClientCertificate() comm.GetCredentialSupport().SetClientCertificate(clientCert) } ehubGrpcServer, err := createEventHubServer(serverConfig)
③:创建DeliverEvents事件服务器
serve()
函数检查如果开启了双向的TLS
安全认证,则设置mutualTLS
标志位为true
,并定义获取资源策略检查器即policyCheckerProvider
()函数。该函数将直接调用全局变量aclProvider
对象的CheckACL
()方法,检查签名消息在通道(channelID)上是否满足指定资源的访问控制权限策略。
接着,serve
()函数调用peer.NewDeliverEventsServer()
函数,基于mutualTLS
、policy-CheckerProvider
等参数创建DeliverEvents
事件服务器abServer
,提供Deliver()
与DeliverFiltered
()服务接口,分别用于处理请求正常区块与过滤区块的消息。
然后调用pb.RegisterDeliverServer()
方法,将DeliverEvents
事件服务器abServer
注册到默认的gRPC
服务器上(7051端口),以提供本地事件服务。
1 2 3 4 5 6 7 8 9 10 11 mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert policyCheckerProvider := func (resourceName string ) deliver.PolicyChecker { return func (env *cb.Envelope, channelID string ) error { return aclmgmt.GetACLProvider().CheckACL(resourceName, channelID, env) } } abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverSupportManager{}) pb.RegisterDeliverServer(peerServer.Server(), abServer)
④:创建ChaincodeSupport链码支持服务器
1 2 3 4 5 6 7 8 9 ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost) if err != nil { logger.Panicf("Failed to create chaincode server: %s" , err) } registerChaincodeSupport(ccSrv, ccEndpoint, ca) go ccSrv.Start()
⑤:创建Admin管理服务器与Endorser背书服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer()) privDataDist := func (channel string , txID string , privateData *rwset.TxPvtReadWriteSet) error { return service.GetGossipService().DistributePrivateData(channel, txID, privateData) } serverEndorser := endorser.NewEndorserServer(privDataDist, &endorser.SupportImpl{}) libConf := library.Config{} if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers" , &libConf); err != nil { return errors.WithMessage(err, "could not load YAML config" ) } authFilters := library.InitRegistry(libConf).Lookup(library.Auth).([]authHandler.Filter) auth := authHandler.ChainFilters(serverEndorser, authFilters...) pb.RegisterEndorserServer(peerServer.Server(), auth)
⑥:创建Gossip消息服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 bootstrap := viper.GetStringSlice("peer.gossip.bootstrap" ) serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize() ... messageCryptoService := peergossip.NewMCS( peer.NewChannelPolicyManagerGetter(), localmsp.NewSigner(), mgmt.NewDeserializersManager()) secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager()) secureDialOpts := func () []grpc.DialOption { var dialOpts []grpc.DialOption dialOpts = append (dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) kaOpts := comm.DefaultKeepaliveOptions() ... dialOpts = append (dialOpts, comm.ClientKeepaliveOptions(kaOpts)...) if comm.TLSEnabled() { dialOpts = append (dialOpts, grpc.WithTransportCredentials(comm.GetCredentialSupport().GetPeerCredentials())) } else { dialOpts = append (dialOpts, grpc.WithInsecure()) } return dialOpts } var certs *common2.TLSCertificatesif peerServer.TLSEnabled() { serverCert := peerServer.ServerCertificate() clientCert, err := peer.GetClientCertificate() if err != nil { return errors.Wrap(err, "failed obtaining client certificates" ) } certs = &common2.TLSCertificates{} certs.TLSServerCert.Store(&serverCert) certs.TLSClientCert.Store(&clientCert) } err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(), certs, messageCryptoService, secAdv, secureDialOpts, bootstrap...)
部署系统链码与初始化现存通道的链结构 ①:部署系统链码
②:初始化现存通道上的链结构
1 2 3 peer.Initialize(func (cid string ) { })
core/peer/peer.go/Initialize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 func Initialize (init func (string ) ) { nWorkers := viper.GetInt("peer.validatorPoolSize" ) if nWorkers <= 0 { nWorkers = runtime.NumCPU() } validationWorkersSemaphore = semaphore.NewWeighted(int64 (nWorkers)) chainInitializer = init var cb *common.Block var ledger ledger.PeerLedger ledgermgmt.Initialize(ConfigTxProcessors) ledgerIds, err := ledgermgmt.GetLedgerIDs() if err != nil { panic (fmt.Errorf("Error in initializing ledgermgmt: %s" , err)) } for _, cid := range ledgerIds { peerLogger.Infof("Loading chain %s" , cid) if ledger, err = ledgermgmt.OpenLedger(cid); err != nil { peerLogger.Warningf("Failed to load ledger %s(%s)" , cid, err) peerLogger.Debugf("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort." , cid, err) continue } if cb, err = getCurrConfigBlockFromLedger(ledger); err != nil { peerLogger.Warningf("Failed to find config block on ledger %s(%s)" , cid, err) peerLogger.Debugf("Error while looking for config block on ledger %s with message %s. We continue to the next ledger rather than abort." , cid, err) continue } if err = createChain(cid, ledger, cb); err != nil { peerLogger.Warningf("Failed to load chain %s(%s)" , cid, err) peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort." , cid, err) continue } InitChain(cid) } }
启动gRPC服务器与profile服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 serve := make (chan error ) sigs := make (chan os.Signal, 1 ) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func () { sig := <-sigs logger.Debugf("sig: %s" , sig) serve <- nil }() go func () { var grpcErr error if grpcErr = peerServer.Start(); grpcErr != nil { grpcErr = fmt.Errorf("grpc server exited with error: %s" , grpcErr) } else { logger.Info("peer server exited" ) } serve <- grpcErr }() if err := writePid(config.GetPath("peer.fileSystemPath" )+"/peer.pid" , os.Getpid()); err != nil { return err } if ehubGrpcServer != nil { go ehubGrpcServer.Start() } if viper.GetBool("peer.profile.enabled" ) { go func () { profileListenAddress := viper.GetString("peer.profile.listenAddress" ) logger.Infof("Starting profiling server with listenAddress = %s" , profileListenAddress) if profileErr := http.ListenAndServe(profileListenAddress, nil ); profileErr != nil { logger.Errorf("Error starting profiler: %s" , profileErr) } }() }
至此,Peer节点及其功能服务器启动完毕.
参考
https://github.com/blockchainGuide/