0%

死磕以太坊源码分析之MPT树-下

死磕以太坊源码分析之MPT树-下

文章以及资料请查看:https://github.com/blockchainGuide/

上篇主要介绍了以太坊中的MPT树的原理,这篇主要会对MPT树涉及的源码进行拆解分析。trie模块主要有以下几个文件:

1
2
3
4
5
6
7
8
9
|-encoding.go 主要讲编码之间的转换
|-hasher.go 实现了从某个结点开始计算子树的哈希的功能
|-node.go 定义了一个Trie树中所有结点的类型和解析的代码
|-sync.go 实现了SyncTrie对象的定义和所有方法
|-iterator.go 定义了所有枚举相关接口和实现
|-secure_trie.go 实现了SecureTrie对象
|-proof.go 为key构造一个merkle证明
|-trie.go Trie树的增删改查
|-database.go 对内存中的trie树节点进行引用计数

实现概览

encoding.go

这个主要是讲三种编码(KEYBYTES encodingHEX encodingCOMPACT encoding)的实现与转换,trie中全程都需要用到这些,该文件中主要实现了如下功能:

  1. hex编码转换为Compact编码:hexToCompact()
  2. Compact编码转换为hex编码:compactToHex()
  3. keybytes编码转换为Hex编码:keybytesToHex()
  4. hex编码转换为keybytes编码:hexToKeybytes()
  5. 获取两个字节数组的公共前缀的长度:prefixLen()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func hexToCompact(hex []byte) []byte {
terminator := byte(0)
if hasTerm(hex) { //检查是否有结尾为0x10 => 16
terminator = 1 //有结束标记16说明是叶子节点
hex = hex[:len(hex)-1] //去除尾部标记
}
buf := make([]byte, len(hex)/2+1) // 字节数组

buf[0] = terminator << 5 // 标志byte为00000000或者00100000
//如果长度为奇数,添加奇数位标志1,并把第一个nibble字节放入buf[0]的低四位
if len(hex)&1 == 1 {
buf[0] |= 1 << 4 // 奇数标志 00110000
buf[0] |= hex[0] // 第一个nibble包含在第一个字节中 0011xxxx
hex = hex[1:]
}
//将两个nibble字节合并成一个字节
decodeNibbles(hex, buf[1:])
return buf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//compact编码转化为Hex编码
func compactToHex(compact []byte) []byte {
base := keybytesToHex(compact)
base = base[:len(base)-1]
// apply terminator flag
// base[0]包括四种情况
// 00000000 扩展节点偶数位
// 00000001 扩展节点奇数位
// 00000010 叶子节点偶数位
// 00000011 叶子节点奇数位

// apply terminator flag
if base[0] >= 2 {
//如果是叶子节点,末尾添加Hex标志位16
base = append(base, 16)
}
// apply odd flag
//如果是偶数位,chop等于2,否则等于1
chop := 2 - base[0]&1
return base[chop:]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//compact编码转化为Hex编码
func compactToHex(compact []byte) []byte {
base := keybytesToHex(compact)
base = base[:len(base)-1]
// apply terminator flag
// base[0]包括四种情况
// 00000000 扩展节点偶数位
// 00000001 扩展节点奇数位
// 00000010 叶子节点偶数位
// 00000011 叶子节点奇数位

// apply terminator flag
if base[0] >= 2 {
//如果是叶子节点,末尾添加Hex标志位16
base = append(base, 16)
}
// apply odd flag
//如果是偶数位,chop等于2,否则等于1
chop := 2 - base[0]&1
return base[chop:]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 将十六进制的bibbles转成key bytes,这只能用于偶数长度的key
func hexToKeybytes(hex []byte) []byte {
if hasTerm(hex) {
hex = hex[:len(hex)-1]
}
if len(hex)&1 != 0 {
panic("can't convert hex key of odd length")
}
key := make([]byte, (len(hex)+1)/2)
decodeNibbles(hex, key)
return key
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 返回a和b的公共前缀的长度
func prefixLen(a, b []byte) int {
var i, length = 0, len(a)
if len(b) < length {
length = len(b)
}
for ; i < length; i++ {
if a[i] != b[i] {
break
}
}
return i
}


node.go

四种节点

node 接口分四种实现: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以带有子节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
type (
fullNode struct {
Children [17]node // 分支节点
flags nodeFlag
}
shortNode struct { //扩展节点
Key []byte
Val node //可能指向叶子节点,也可能指向分支节点。
flags nodeFlag
}
hashNode []byte
valueNode []byte // 叶子节点值,但是该叶子节点最终还是会包装在shortNode中
)

trie.go

Trie对象实现了MPT树的所有功能,包括(key, value)对的增删改查、计算默克尔哈希,以及将整个树写入数据库中。

iterator.go

nodeIterator提供了遍历树内部所有结点的功能。其结构如下:此结构体是在trie.go定义的

1
2
3
4
5
type nodeIterator struct {
trie.NodeIterator
t *odrTrie
err error
}

里面包含了一个接口NodeIterator,它的实现则是由iterator.go来提供的,其方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (it *nodeIterator) Next(descend bool) bool 
func (it *nodeIterator) Hash() common.Hash
func (it *nodeIterator) Parent() common.Hash
func (it *nodeIterator) Leaf() bool
func (it *nodeIterator) LeafKey() []byte
func (it *nodeIterator) LeafBlob() []byte
func (it *nodeIterator) LeafProof() [][]byte
func (it *nodeIterator) Path() []byte {}
func (it *nodeIterator) seek(prefix []byte) error
func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, error)
func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool)
func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte)
func (it *nodeIterator) pop()

NodeIterator的核心是Next方法,每调用一次这个方法,NodeIterator对象代表的当前节点就会更新至下一个节点,当所有结点遍历结束,Next方法返回false

生成NodeIterator结口的方法有以下3种:

①:Trie.NodeIterator(start []byte)

通过start参数指定从哪个路径开始遍历,如果为nil则从头到尾按顺序遍历。

②:NewDifferenceIterator(a, b NodeIterator)

当调用NewDifferenceIterator(a, b NodeIterator)后,生成的NodeIterator只遍历存在于 b 但不存在于 a 中的结点。

③:NewUnionIterator(iters []NodeIterator)

当调用NewUnionIterator(its []NodeIterator)后,生成的NodeIterator遍历的结点是所有传入的结点的合集。

database.go

Databasetrie模块对真正数据库的缓存层,其目的是对缓存的节点进行引用计数,从而实现区块的修剪功能。主要方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewDatabase(diskdb ethdb.KeyValueStore) *Database
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database
func (db *Database) DiskDB() ethdb.KeyValueReader
func (db *Database) InsertBlob(hash common.Hash, blob []byte)
func (db *Database) insert(hash common.Hash, blob []byte, node node)
func (db *Database) insertPreimage(hash common.Hash, preimage []byte)
func (db *Database) node(hash common.Hash) node
func (db *Database) Node(hash common.Hash) ([]byte, error)
func (db *Database) preimage(hash common.Hash) ([]byte, error)
func (db *Database) secureKey(key []byte) []byte
func (db *Database) Nodes() []common.Hash
func (db *Database) Reference(child common.Hash, parent common.Hash)
func (db *Database) Dereference(root common.Hash)
func (db *Database) dereference(child common.Hash, parent common.Hash)
func (db *Database) Cap(limit common.StorageSize) error
func (db *Database) Commit(node common.Hash, report bool) error

security_trie.go

可以理解为加密了的trie的实现,ecurity_trie包装了一下trie树, 所有的key都转换成keccak256算法计算的hash值。同时在数据库里面存储hash值对应的原始的key
但是官方在代码里也注释了,这个代码不稳定,除了测试用例,别的地方并没有使用该代码。

proof.go

  • Prove():根据给定的key,在trie中,将满足key中最大长度前缀的路径上的节点都加入到proofDb(队列中每个元素满足:未编码的hash以及对应rlp编码后的节点)
  • VerifyProof():验证proffDb中是否存在满足输入的hash,和对应key的节点,如果满足,则返回rlp解码后的该节点。

实现细节

Trie对象的增删改查

①:Trie树的初始化

如果root不为空,就通过resolveHash来加载整个Trie树,如果为空,就新建一个Trie树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func New(root common.Hash, db *Database) (*Trie, error) {
if db == nil {
panic("trie.New called without a database")
}
trie := &Trie{
db: db,
}
if root != (common.Hash{}) && root != emptyRoot {
rootnode, err := trie.resolveHash(root[:], nil)
if err != nil {
return nil, err
}
trie.root = rootnode
}
return trie, nil
}

②:Trie树的插入

首先Trie树的插入是个递归调用的过程,它会从根开始找,一直找到合适的位置插入。

1
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error)

参数说明:

  • n: 当前要插入的节点
  • prefix: 当前已经处理完的key(节点共有的前缀)
  • key: 未处理完的部分key,完整的key = prefix + key
  • value:需要插入的值

返回值说明:

  • bool : 操作是否改变了Trie树(dirty)
  • Node :插入完成后的子树的根节点

接下来就是分别对shortNodefullNodehashNodenil 几种情况进行说明。

2.1:节点为nil

空树直接返回shortNode, 此时整颗树的根就含有了一个shortNode节点。

1
2
case nil:
return true, &shortNode{key, value, t.newFlag()}, nil

2.2 :节点为shortNode

  • 首先计算公共前缀,如果公共前缀就等于key,那么说明这两个key是一样的,如果value也一样的(dirty == false),那么返回错误。

  • 如果没有错误就更新shortNode的值然后返回

  • 如果公共前缀不完全匹配,那么就需要把公共前缀提取出来形成一个独立的节点(扩展节点),扩展节点后面连接一个branch节点,branch节点后面看情况连接两个short节点。

  • 首先构建一个branch节点(branch := &fullNode{flags: t.newFlag()}),然后再branch节点的Children位置调用t.insert插入剩下的两个short节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
matchlen := prefixLen(key, n.Key)
if matchlen == len(n.Key) {
dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
if !dirty || err != nil {
return false, n, err
}
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
branch := &fullNode{flags: t.newFlag()}
var err error
_, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
if err != nil {
return false, nil, err
}
_, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
if err != nil {
return false, nil, err
}
if matchlen == 0 {
return true, branch, nil
}
return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

2.3: 节点为fullNode

节点是fullNode(也就是分支节点),那么直接往对应的孩子节点调用insert方法,然后把对应的孩子节点指向新生成的节点。

1
2
3
4
5
6
7
8
dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
if !dirty || err != nil {
return false, n, err
}
n = n.copy()
n.flags = t.newFlag()
n.Children[key[0]] = nn
return true, n, nil

2.4: 节点为hashnode

暂时还在数据库中的节点,先调用 t.resolveHash(n, prefix)来加载到内存,然后调用insert方法来插入。

1
2
3
4
5
6
7
8
9
rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
dirty, nn, err := t.insert(rn, prefix, key, value)
if !dirty || err != nil {
return false, rn, err
}
return true, nn, nil

③:Trie树查询值

其实就是根据输入的hash,找到对应的叶子节点的数据。主要看TryGet方法。

参数:

  • origNode:当前查找的起始node位置
  • key:输入要查找的数据的hash
  • pos:当前hash匹配到第几位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
switch n := (origNode).(type) {
case nil: //表示当前trie是空树
return nil, nil, false, nil
case valueNode: ////这就是我们要查找的叶子节点对应的数据
return n, n, false, nil
case *shortNode: ////在叶子节点或者扩展节点匹配
if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
return nil, n, false, nil
}
value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
if err == nil && didResolve {
n = n.copy()
n.Val = newnode
}
return value, n, didResolve, err
case *fullNode://在分支节点匹配
value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
if err == nil && didResolve {
n = n.copy()
n.Children[key[pos]] = newnode
}
return value, n, didResolve, err
case hashNode: //说明当前节点是轻节点,需要从db中获取
child, err := t.resolveHash(n, key[:pos])
if err != nil {
return nil, n, true, err
}
value, newnode, _, err := t.tryGet(child, key, pos)
return value, newnode, true, err
...
}

didResolve用于判断trie树是否会发生变化,tryGet()只是用来获取数据的,当hashNodedb中获取该node值后需要更新现有的trie,didResolve就会发生变化。其他就是基本的递归查找树操作。

④:Trie树更新值

更新值,其实就是调用insert方法进行操作。

到此Trie树的增删改查就讲解的差不多了。

将节点写入到Trie的内存数据库

如果要把节点写入到内存数据库,需要序列化,可以先去了解下以太坊的Rlp编码。这部分工作由trie.Commit()完成,当trie.Commit(nil),会执行序列化和缓存等操作,序列化之后是使用的Compact Encoding进行编码,从而达到节省空间的目的。

1
2
3
4
5
6
7
8
9
10
11
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
if t.db == nil {
panic("commit called on trie with nil database")
}
hash, cached, err := t.hashRoot(t.db, onleaf)
if err != nil {
return common.Hash{}, err
}
t.root = cached
return common.BytesToHash(hash.(hashNode)), nil
}

上述代码大概讲了这些:

  • 每次执行Commit(),该trie的cachegen就会加 1
  • Commit()方法返回的是trie.root所指向的nodehash(未编码)
  • 其中的hashRoot()方法目的是返回trie.root所指向的node的hash以及每个节点都带有各自hash的trie树的root
1
2
3
4
5
6
7
8
9
//为每个node生成一个hash
func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher(onleaf)
defer returnHasherToPool(h)
return h.hash(t.root, db, true) //为每个节点生成一个未编码的hash
}

hashRoot的核心方法就是 h.hash,它返回了头节点的hash以及每个子节点都带有hash的头节点(Trie.root指向它),大致做了以下几件事:

①:如果我们不存储节点,而只是哈希,则从缓存中获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
if hash, dirty := n.cache(); hash != nil {
if db == nil {
return hash, n, nil
}
if !dirty {
switch n.(type) {
case *fullNode, *shortNode:
return hash, hash, nil
default:
return hash, n, nil
}
}
}

②:递归调用h.hashChildren,求出所有的子节点的hash值,再把原有的子节点替换成现在子节点的hash

2.1:如果节点是shortNode

首先把collapsed.Key从Hex Encoding 替换成 Compact Encoding, 然后递归调用hash方法计算子节点的hashcache,从而把子节点替换成了子节点的hash

1
2
3
4
5
6
7
8
9
10
11
collapsed, cached := n.copy(), n.copy()
collapsed.Key = hexToCompact(n.Key)
cached.Key = common.CopyBytes(n.Key)

if _, ok := n.Val.(valueNode); !ok {
collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
if err != nil {
return original, original, err
}
}
return collapsed, cached, nil

2.2:节点是fullNode

遍历每个子节点,把子节点替换成子节点的Hash值,否则的化这个节点没有children。直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
collapsed, cached := n.copy(), n.copy()

for i := 0; i < 16; i++ {
if n.Children[i] != nil {
collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
if err != nil {
return original, original, err
}
}
}
cached.Children[16] = n.Children[16]
return collapsed, cached, nil

③:存储节点n的哈希值,如果我们指定了存储层,它会写对应的键/值对

store()方法主要就做了两件事:

  • rlp序列化collapsed节点并将其插入db磁盘中
  • 生成当前节点的hash
  • 将节点哈希插入db

3.1:空数据或者hashNode,则不处理

1
2
3
if _, isHash := n.(hashNode); n == nil || isHash {
return n, nil
}

3.2:生成节点的RLP编码

1
2
3
4
5
6
7
8
9
10
11
12
h.tmp.Reset()                                 // 缓存初始化
if err := rlp.Encode(&h.tmp, n); err != nil { //将当前node序列化
panic("encode error: " + err.Error())
}
if len(h.tmp) < 32 && !force {
return n, nil // Nodes smaller than 32 bytes are stored inside their parent 编码后的node长度小于32,若force为true,则可确保所有节点都被编码
}
//长度过大的,则都将被新计算出来的hash取代
hash, _ := n.cache() //取出当前节点的hash
if hash == nil {
hash = h.makeHashNode(h.tmp) //生成哈希node
}

3.3:将Trie节点合并到中间内存缓存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hash := common.BytesToHash(hash)
db.lock.Lock()
db.insert(hash, h.tmp, n)
db.lock.Unlock()
// Track external references from account->storage trie
//跟踪帐户->存储Trie中的外部引用
if h.onleaf != nil {
switch n := n.(type) {
case *shortNode:
if child, ok := n.Val.(valueNode); ok { //指向的是分支节点
h.onleaf(child, hash) //用于统计当前节点的信息,比如当前节点有几个子节点,当前有效的节点数
}
case *fullNode:
for i := 0; i < 16; i++ {
if child, ok := n.Children[i].(valueNode); ok {
h.onleaf(child, hash)
}
}
}
}

到此为止将节点写入到Trie的内存数据库就已经完成了。

如果觉得文章不错可以关注公众号:区块链技术栈,详细的所有以太坊源码分析文章内容以及代码资料都在其中。

Trie树缓存机制

Trie树的结构里面有两个参数, 一个是cachegen,一个是cachelimit。这两个参数就是cache控制的参数。 Trie树每一次调用Commit方法,会导致当前的cachegen增加1。

1
2
3
4
5
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
...
t.cachegen++
...
}

然后在Trie树插入的时候,会把当前的cachegen存放到节点中。

1
2
3
4
5
6
7
8
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
....
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
func (t *Trie) newFlag() nodeFlag {
return nodeFlag{dirty: true, gen: t.cachegen}
}

如果 trie.cachegen - node.cachegen > cachelimit,就可以把节点从内存里面拿掉。 也就是说节点经过几次Commit,都没有修改,那么就把节点从内存里面干掉。 只要trie路径上新增或者删除一个节点,整个路径的节点都需要重新实例化,也就是节点中的nodeFlag被初始化了。都需要重新更新到db磁盘。

拿掉节点过程在 hasher.hash方法中, 这个方法是在commit的时候调用。如果方法的canUnload方法调用返回真,那么就拿掉节点,如果只返回了hash节点,而没有返回node节点,这样节点就没有引用,不久就会被gc清除掉。 节点被拿掉之后,会用一个hashNode节点来表示这个节点以及其子节点。 如果后续需要使用,可以通过方法把这个节点加载到内存里面来。

1
2
3
4
5
6
func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
....
// 从缓存中卸载节点。它的所有子节点将具有较低或相等的缓存世代号码。
cacheUnloadCounter.Inc(1)
...
}

参考&总结

这部分重要的内容也就上面讲述的,主要集中在Trie上面,如果有不对的地方,可以及时指正哦。

https://mindcarver.cn/about/

https://github.com/blockchainGuide/blockchainguide