死磕以太坊源码分析之MPT树-下
文章以及资料请查看:https://github.com/blockchainGuide/
上篇 主要介绍了以太坊中的MPT树的原理,这篇主要会对MPT树涉及的源码进行拆解分析。trie
模块主要有以下几个文件:
1 2 3 4 5 6 7 8 9 |-encoding.go 主要讲编码之间的转换 |-hasher.go 实现了从某个结点开始计算子树的哈希的功能 |-node.go 定义了一个Trie树中所有结点的类型和解析的代码 |-sync.go 实现了SyncTrie对象的定义和所有方法 |-iterator.go 定义了所有枚举相关接口和实现 |-secure_trie.go 实现了SecureTrie对象 |-proof.go 为key构造一个merkle证明 |-trie.go Trie树的增删改查 |-database.go 对内存中的trie树节点进行引用计数
实现概览 encoding.go 这个主要是讲三种编码(KEYBYTES encoding
、HEX encoding
、COMPACT encoding
)的实现与转换,trie
中全程都需要用到这些,该文件中主要实现了如下功能:
hex编码转换为Compact编码:hexToCompact()
Compact编码转换为hex编码:compactToHex()
keybytes编码转换为Hex编码:keybytesToHex()
hex编码转换为keybytes编码:hexToKeybytes()
获取两个字节数组的公共前缀的长度:prefixLen()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func hexToCompact (hex []byte ) []byte { terminator := byte (0 ) if hasTerm(hex) { terminator = 1 hex = hex[:len (hex)-1 ] } buf := make ([]byte , len (hex)/2 +1 ) buf[0 ] = terminator << 5 if len (hex)&1 == 1 { buf[0 ] |= 1 << 4 buf[0 ] |= hex[0 ] hex = hex[1 :] } decodeNibbles(hex, buf[1 :]) return buf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func compactToHex (compact []byte ) []byte { base := keybytesToHex(compact) base = base[:len (base)-1 ] if base[0 ] >= 2 { base = append (base, 16 ) } chop := 2 - base[0 ]&1 return base[chop:] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func compactToHex (compact []byte ) []byte { base := keybytesToHex(compact) base = base[:len (base)-1 ] if base[0 ] >= 2 { base = append (base, 16 ) } chop := 2 - base[0 ]&1 return base[chop:] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func hexToKeybytes (hex []byte ) []byte { if hasTerm(hex) { hex = hex[:len (hex)-1 ] } if len (hex)&1 != 0 { panic ("can't convert hex key of odd length" ) } key := make ([]byte , (len (hex)+1 )/2 ) decodeNibbles(hex, key) return key }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func prefixLen (a, b []byte ) int { var i, length = 0 , len (a) if len (b) < length { length = len (b) } for ; i < length; i++ { if a[i] != b[i] { break } } return i }
node.go 四种节点 node 接口分四种实现: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以带有子节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 type ( fullNode struct { Children [17 ]node flags nodeFlag } shortNode struct { Key []byte Val node flags nodeFlag } hashNode []byte valueNode []byte )
trie.go Trie对象实现了MPT树的所有功能,包括(key, value)对的增删改查、计算默克尔哈希,以及将整个树写入数据库中。
iterator.go nodeIterator
提供了遍历树内部所有结点的功能。其结构如下:此结构体是在trie.go
定义的
1 2 3 4 5 type nodeIterator struct { trie.NodeIterator t *odrTrie err error }
里面包含了一个接口NodeIterator
,它的实现则是由iterator.go
来提供的,其方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 func (it *nodeIterator) Next(descend bool ) bool func (it *nodeIterator) Hash() common.Hash func (it *nodeIterator) Parent() common.Hash func (it *nodeIterator) Leaf() bool func (it *nodeIterator) LeafKey() []byte func (it *nodeIterator) LeafBlob() []byte func (it *nodeIterator) LeafProof() [][]byte func (it *nodeIterator) Path() []byte {}func (it *nodeIterator) seek(prefix []byte ) error func (it *nodeIterator) peek(descend bool ) (*nodeIteratorState, *int , []byte , error ) func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte , bool ) func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int , path []byte ) func (it *nodeIterator) pop()
NodeIterator
的核心是Next
方法,每调用一次这个方法,NodeIterator对象代表的当前节点就会更新至下一个节点,当所有结点遍历结束,Next
方法返回false
。
生成NodeIterator结口的方法有以下3种:
①:Trie.NodeIterator(start []byte)
通过start
参数指定从哪个路径开始遍历,如果为nil
则从头到尾按顺序遍历。
②:NewDifferenceIterator(a, b NodeIterator)
当调用NewDifferenceIterator(a, b NodeIterator)
后,生成的NodeIterator
只遍历存在于 b 但不存在于 a 中的结点。
③:NewUnionIterator(iters []NodeIterator)
当调用NewUnionIterator(its []NodeIterator)
后,生成的NodeIterator
遍历的结点是所有传入的结点的合集。
database.go Database
是trie
模块对真正数据库的缓存层,其目的是对缓存的节点进行引用计数,从而实现区块的修剪功能。主要方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func NewDatabase (diskdb ethdb.KeyValueStore) *Databasefunc NewDatabaseWithCache (diskdb ethdb.KeyValueStore, cache int ) *Database func (db *Database) DiskDB() ethdb.KeyValueReaderfunc (db *Database) InsertBlob(hash common.Hash, blob []byte )func (db *Database) insert(hash common.Hash, blob []byte , node node)func (db *Database) insertPreimage(hash common.Hash, preimage []byte )func (db *Database) node(hash common.Hash) nodefunc (db *Database) Node(hash common.Hash) ([]byte , error )func (db *Database) preimage(hash common.Hash) ([]byte , error )func (db *Database) secureKey(key []byte ) []byte func (db *Database) Nodes() []common.Hashfunc (db *Database) Reference(child common.Hash, parent common.Hash)func (db *Database) Dereference(root common.Hash)func (db *Database) dereference(child common.Hash, parent common.Hash)func (db *Database) Cap(limit common.StorageSize) error func (db *Database) Commit(node common.Hash, report bool ) error
security_trie.go 可以理解为加密了的trie
的实现,ecurity_trie
包装了一下trie
树, 所有的key
都转换成keccak256
算法计算的hash
值。同时在数据库里面存储hash
值对应的原始的key
。 但是官方在代码里也注释了,这个代码不稳定,除了测试用例,别的地方并没有使用该代码。
proof.go
Prove():根据给定的key
,在trie
中,将满足key
中最大长度前缀的路径上的节点都加入到proofDb
(队列中每个元素满足:未编码的hash以及对应rlp
编码后的节点)
VerifyProof():验证proffDb
中是否存在满足输入的hash
,和对应key的节点,如果满足,则返回rlp
解码后的该节点。
实现细节 Trie对象的增删改查 ①:Trie树的初始化
如果root
不为空,就通过resolveHash
来加载整个Trie
树,如果为空,就新建一个Trie
树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func New (root common.Hash, db *Database) (*Trie, error ) { if db == nil { panic ("trie.New called without a database" ) } trie := &Trie{ db: db, } if root != (common.Hash{}) && root != emptyRoot { rootnode, err := trie.resolveHash(root[:], nil ) if err != nil { return nil , err } trie.root = rootnode } return trie, nil }
②:Trie树的插入
首先Trie树的插入是个递归调用的过程,它会从根开始找,一直找到合适的位置插入。
1 func (t *Trie) insert(n node, prefix, key []byte , value node) (bool , node, error )
参数说明:
n: 当前要插入的节点
prefix: 当前已经处理完的key (节点共有的前缀)
key: 未处理完的部分key ,完整的key = prefix + key
value:需要插入的值
返回值说明:
bool : 操作是否改变了Trie 树(dirty )
Node :插入完成后的子树的根节点
接下来就是分别对shortNode
、fullNode
、hashNode
、nil
几种情况进行说明。
2.1:节点为nil
空树直接返回shortNode
, 此时整颗树的根就含有了一个shortNode
节点。
1 2 case nil : return true , &shortNode{key, value, t.newFlag()}, nil
2.2 :节点为shortNode
首先计算公共前缀,如果公共前缀就等于key
,那么说明这两个key
是一样的,如果value
也一样的(dirty == false
),那么返回错误。
如果没有错误就更新shortNode
的值然后返回
如果公共前缀不完全匹配,那么就需要把公共前缀提取出来形成一个独立的节点(扩展节点),扩展节点后面连接一个branch
节点,branch
节点后面看情况连接两个short
节点。
首先构建一个branch节点(branch := &fullNode{flags: t.newFlag()}),然后再branch节点的Children位置调用t.insert插入剩下的两个short节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 matchlen := prefixLen(key, n.Key) if matchlen == len (n.Key) { dirty, nn, err := t.insert(n.Val, append (prefix, key[:matchlen]...), key[matchlen:], value) if !dirty || err != nil { return false , n, err } return true , &shortNode{n.Key, nn, t.newFlag()}, nil } branch := &fullNode{flags: t.newFlag()} var err error _, branch.Children[n.Key[matchlen]], err = t.insert(nil , append (prefix, n.Key[:matchlen+1 ]...), n.Key[matchlen+1 :], n.Val) if err != nil { return false , nil , err } _, branch.Children[key[matchlen]], err = t.insert(nil , append (prefix, key[:matchlen+1 ]...), key[matchlen+1 :], value) if err != nil { return false , nil , err } if matchlen == 0 { return true , branch, nil } return true , &shortNode{key[:matchlen], branch, t.newFlag()}, nil
2.3: 节点为fullNode
节点是fullNode
(也就是分支节点),那么直接往对应的孩子节点调用insert
方法,然后把对应的孩子节点指向新生成的节点。
1 2 3 4 5 6 7 8 dirty, nn, err := t.insert(n.Children[key[0 ]], append (prefix, key[0 ]), key[1 :], value) if !dirty || err != nil { return false , n, err } n = n.copy () n.flags = t.newFlag() n.Children[key[0 ]] = nn return true , n, nil
2.4: 节点为hashnode
暂时还在数据库中的节点,先调用 t.resolveHash(n, prefix)
来加载到内存,然后调用insert
方法来插入。
1 2 3 4 5 6 7 8 9 rn, err := t.resolveHash(n, prefix) if err != nil { return false , nil , err } dirty, nn, err := t.insert(rn, prefix, key, value) if !dirty || err != nil { return false , rn, err } return true , nn, nil
③:Trie树查询值
其实就是根据输入的hash
,找到对应的叶子节点的数据。主要看TryGet
方法。
参数:
origNode
:当前查找的起始node 位置
key
:输入要查找的数据的hash
pos
:当前hash 匹配到第几位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (t *Trie) tryGet(origNode node, key []byte , pos int ) (value []byte , newnode node, didResolve bool , err error ) { switch n := (origNode).(type ) { case nil : return nil , nil , false , nil case valueNode: return n, n, false , nil case *shortNode: if len (key)-pos < len (n.Key) || !bytes.Equal(n.Key, key[pos:pos+len (n.Key)]) { return nil , n, false , nil } value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len (n.Key)) if err == nil && didResolve { n = n.copy () n.Val = newnode } return value, n, didResolve, err case *fullNode: value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1 ) if err == nil && didResolve { n = n.copy () n.Children[key[pos]] = newnode } return value, n, didResolve, err case hashNode: child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil , n, true , err } value, newnode, _, err := t.tryGet(child, key, pos) return value, newnode, true , err ... }
didResolve
用于判断trie
树是否会发生变化,tryGet()
只是用来获取数据的,当hashNode
去db
中获取该node
值后需要更新现有的trie,didResolve
就会发生变化。其他就是基本的递归查找树操作。
④:Trie树更新值
更新值,其实就是调用insert方法进行操作。
到此Trie树的增删改查就讲解的差不多了。
将节点写入到Trie的内存数据库 如果要把节点写入到内存数据库,需要序列化,可以先去了解下以太坊的Rlp编码。这部分工作由trie.Commit()
完成,当trie.Commit(nil)
,会执行序列化和缓存等操作,序列化之后是使用的Compact Encoding
进行编码,从而达到节省空间的目的。
1 2 3 4 5 6 7 8 9 10 11 func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error ) { if t.db == nil { panic ("commit called on trie with nil database" ) } hash, cached, err := t.hashRoot(t.db, onleaf) if err != nil { return common.Hash{}, err } t.root = cached return common.BytesToHash(hash.(hashNode)), nil }
上述代码大概讲了这些:
每次执行Commit()
,该trie的cachegen
就会加 1
Commit()
方法返回的是trie.root
所指向的node
的hash
(未编码)
其中的hashRoot()
方法目的是返回trie.root所指向的node的hash
以及每个节点都带有各自hash的trie树的root
。
1 2 3 4 5 6 7 8 9 func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error ) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil , nil } h := newHasher(onleaf) defer returnHasherToPool(h) return h.hash(t.root, db, true ) }
而hashRoot
的核心方法就是 h.hash
,它返回了头节点的hash
以及每个子节点都带有hash
的头节点(Trie.root指向它),大致做了以下几件事:
①:如果我们不存储节点,而只是哈希,则从缓存中获取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 if hash, dirty := n.cache(); hash != nil { if db == nil { return hash, n, nil } if !dirty { switch n.(type ) { case *fullNode, *shortNode: return hash, hash, nil default : return hash, n, nil } } }
②:递归调用h.hashChildren
,求出所有的子节点的hash
值,再把原有的子节点替换成现在子节点的hash
值
2.1:如果节点是shortNode
首先把collapsed.Key从Hex Encoding
替换成 Compact Encoding
, 然后递归调用hash
方法计算子节点的hash
和cache
,从而把子节点替换成了子节点的hash
值
1 2 3 4 5 6 7 8 9 10 11 collapsed, cached := n.copy (), n.copy () collapsed.Key = hexToCompact(n.Key) cached.Key = common.CopyBytes(n.Key) if _, ok := n.Val.(valueNode); !ok { collapsed.Val, cached.Val, err = h.hash(n.Val, db, false ) if err != nil { return original, original, err } } return collapsed, cached, nil
2.2:节点是fullNode
遍历每个子节点,把子节点替换成子节点的Hash
值,否则的化这个节点没有children
。直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 collapsed, cached := n.copy (), n.copy () for i := 0 ; i < 16 ; i++ { if n.Children[i] != nil { collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false ) if err != nil { return original, original, err } } } cached.Children[16 ] = n.Children[16 ] return collapsed, cached, nil
③:存储节点n的哈希值,如果我们指定了存储层,它会写对应的键/值对
store()方法主要就做了两件事:
rlp
序列化collapsed
节点并将其插入db磁盘中
生成当前节点的hash
将节点哈希插入db
3.1:空数据或者hashNode,则不处理
1 2 3 if _, isHash := n.(hashNode); n == nil || isHash { return n, nil }
3.2:生成节点的RLP编码
1 2 3 4 5 6 7 8 9 10 11 12 h.tmp.Reset() if err := rlp.Encode(&h.tmp, n); err != nil { panic ("encode error: " + err.Error()) } if len (h.tmp) < 32 && !force { return n, nil } hash, _ := n.cache() if hash == nil { hash = h.makeHashNode(h.tmp) }
3.3:将Trie节点合并到中间内存缓存中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 hash := common.BytesToHash(hash) db.lock.Lock() db.insert(hash, h.tmp, n) db.lock.Unlock() if h.onleaf != nil { switch n := n.(type ) { case *shortNode: if child, ok := n.Val.(valueNode); ok { h.onleaf(child, hash) } case *fullNode: for i := 0 ; i < 16 ; i++ { if child, ok := n.Children[i].(valueNode); ok { h.onleaf(child, hash) } } } }
到此为止将节点写入到Trie
的内存数据库就已经完成了。
如果觉得文章不错可以关注公众号:区块链技术栈 ,详细的所有以太坊源码分析文章内容以及代码资料都在其中。
Trie树缓存机制 Trie
树的结构里面有两个参数, 一个是cachegen
,一个是cachelimit
。这两个参数就是cache
控制的参数。 Trie
树每一次调用Commit
方法,会导致当前的cachegen
增加1。
1 2 3 4 5 func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error ) { ... t.cachegen++ ... }
然后在Trie
树插入的时候,会把当前的cachegen
存放到节点中。
1 2 3 4 5 6 7 8 func (t *Trie) insert(n node, prefix, key []byte , value node) (bool , node, error ) { .... return true , &shortNode{n.Key, nn, t.newFlag()}, nil } func (t *Trie) newFlag() nodeFlag { return nodeFlag{dirty: true , gen: t.cachegen} }
如果 trie.cachegen - node.cachegen > cachelimit
,就可以把节点从内存里面拿掉。 也就是说节点经过几次Commit
,都没有修改,那么就把节点从内存里面干掉。 只要trie
路径上新增或者删除一个节点,整个路径的节点都需要重新实例化,也就是节点中的nodeFlag
被初始化了。都需要重新更新到db
磁盘。
拿掉节点过程在 hasher.hash
方法中, 这个方法是在commit
的时候调用。如果方法的canUnload
方法调用返回真,那么就拿掉节点,如果只返回了hash
节点,而没有返回node
节点,这样节点就没有引用,不久就会被gc清除掉。 节点被拿掉之后,会用一个hashNode
节点来表示这个节点以及其子节点。 如果后续需要使用,可以通过方法把这个节点加载到内存里面来。
1 2 3 4 5 6 func (h *hasher) hash(n node, db *Database, force bool ) (node, node, error ) { .... cacheUnloadCounter.Inc(1 ) ... }
参考&总结 这部分重要的内容也就上面讲述的,主要集中在Trie
上面,如果有不对的地方,可以及时指正哦。
https://mindcarver.cn/about/
https://github.com/blockchainGuide/blockchainguide