死磕hyperledger fabric源码|交易广播
文章及代码:https://github.com/blockchainGuide/
分支:v1.1.0
前言
Hyperledger Fabric
提供了Broadcast(srv ab.AtomicBroadcast_BroadcastServer)
交易广播服务接口,接收客户端提交的签名交易消息请求,交由共识组件链对象对交易进行排序与执行通道管理,按照交易出块规则切割打包,构造新区块并提交账本。同时,通过Deliver()
区块分发服务接口,将区块数据发送给通道组织内发起请求的Leader
主节点,再基于Gossip
消息协议广播到组织内的其他节点上,从而实现广播交易消息的目的。
Broadcast服务消息处理
Orderer
节点启动时已经在本地的gRPC
服务器上注册了Orderer
排序服务器,并创建了Broadcast
服务处理句柄。当客户端调用Broadcast()
服务接口发起服务请求时,Orderer
排序服务器会调用Broadcast()→s.bh.Handle()
方法处理请求,流程如下:
1 | func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { |
1 | func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { |
主要就是这个Handle
的处理,分析如下:
①:等待接收处理消息
1 | msg, err := srv.Recv() |
②:解析获取通道头部chdr、配置交易消息标志位isConfig、通道链支持对象(通道消息处理器)
1 | chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg) |
③:检查共识组件链对象是否准备好接收新的交易消息
1 | if err = processor.WaitReady(); err != nil {} |
④:分类处理消息
处理普通消息
4.1 解析获取通道的最新配置序号
1 | configSeq, err := processor.ProcessNormalMsg(msg) |
1 | /orderer/common/msgprocessor/standardchannel.go |
configSeq是最新配置序号,默认初始值为0,新建应用通道后该配置序号自增为1,通过比较该序号就能判断当前通道配置版本是否发生了更新,从而确定当前交易消息是否需要重新过滤与重新排序。
接着就是使用自带的默认通道消息过滤器过滤消息,有以下过滤条件:
- 验证不能为空
- 拒绝过期的签名者身份证书
- 消息最大字节数过滤器(98MB)
- 消息签名验证过滤器
4.2 构造新的普通交易消息并发送到共识组件链对象请求处理
1 | err = processor.Order(msg, configSeq) |
这里我们只关注kafka
的共识组件处理。
首先序列化消息,然后将该消息发送到Kafka
集群的指定分区上请求排序,再转发给Kafka
共识组件链对象请求打包出块。
1 | /orderer/consensus/kafka/chain.go |
我们来看看enqueue方法是如何做的:
1 | func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool { |
处理通道配置交易消息
4.3 获取配置交易消息与通道的最新配置序号
1 | config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) |
代码位置:/orderer/common/msgprocessor/systemchannel.go/ProcessConfigUpdateMsg,大概做了以下事情:
- 获取消息中的通道ID
- 检查消息中的通道ID与当前通道ID是否一致,一致的话交由标准通道处理器处理
- 创建新应用通道的通道配置实体Bundle结构对象
- 构造新的通道配置更新交易消息(ConfigEnvelope类型),注意将该消息的通道配置序号更新为1
- 创建内层的通道配置交易消息(CONFIG类型)
- 创建外层的配置交易消息(ORDERER_TRANSACTION类型)
- 应用系统通道的消息过滤器
- 返回新的通道配置交易消息与当前系统通道的配置序号
1 | func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) { |
4.4 构造新的配置交易消息发送到共识组件链对象请求排序
1 | err = processor.Configure(config, configSeq) |
这里我们依旧只是考虑kafka
共识组件,processor.Configure()
方法实际上是调用chainImpl.configure()
方法,同样构造Kafka
常规消息(KafkaMessageRegular
类型)。其中,Class
消息类别属于KafkaMessageRegular_CONFIG
类型,包含了通道配置交易消息、 通道配置序号configSeq
与初始消息偏移量originalOffset(0)
。接着,调用chain.enqueue()
方法,将其发送到Kafka
集群上指定主题(chainID
)和分区号(0)的分区上,同时,由Kafka
共识组件链对象分区消费者channelConsumer
获取该消息,再交由给Kafka
共识组件链对象请求打包出块。
⑤:发送成功处理状态响应消息
1 | err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) |
整个流程图如下:
参考
https://github.com/blockchainGuide/ (文章图片代码资料)
微信公众号:区块链技术栈