Cassandra's AntiEntropy service uses Merkle trees to detect the inconsistencies in data between replicas. Merkle tree is a hash tree where leaves contain hashes of individual data blocks and parent nodes contain hashes of their respective children. It provides an efficient way to find differences in data blocks stored on replicas and reduces the amount of data transferred to compare the data blocks.

Cassandra's implementation of Merkle tree (org.apache.cassandra.utils.MerkleTree) uses perfect binary tree where each leaf contains the hash of a row value and each parent node contains hash of its right and left child. In a perfect binary tree all the leaves are at the same level or at same depth. A perfect binary tree of depth

When nodetool repair command is executed, the target node specified with

A replica node builds a Merkle tree for each column family to represent hashes of rows in a given token range. A token range can contain up to

For example, the token range (0, 256], contains 256 sub ranges (0, 1], (1, 2]...(255, 256] each containing single token. A perfect binary tree of depth 8 is required to store all 256 sub range hashes at leaves. A compact version of tree with depth 3 for the same range contains only 8 leaves representing hashes of sub ranges (0, 32], (32, 64] ... (224, 256] each containing 32 tokens. Each leaf hash in this compact version of tree is a computed hash of all the nodes under it in the perfect binary tree of depth 8.

RandomPartitioner distributes keys uniformly , so the Merkle tree is constructed recursively by splitting the given token range in to two equal sub ranges until maximum number of sub ranges are reached. A root node is added with the given token range (left, right] and the range is split in to two halves at a token which is at the midpoint of the range. A left child node is added with range (left, midpoint] and a right child node is added with range covering (midpoint, right]. The process is repeated until required number of leaves (sub ranges) added to the tree.

Next row hashes are added to the Merkle tree in sorted order. Each row's hash value is computed by calculating MD5 digest of row value which includes row's column count, column names and column values but not the row key and row size. The deleted rows (tombstones) hashes are also added to the tree which include the delete timestamps. Row hashes are added to Merkle tree leaves based on their tokens. If a leaf's sub range contains multiple rows, its hash is computed by combining hashes of all rows covered by its range using XOR operation. Non leaf nodes hashes are computed by performing XOR on hashes of their respective children.

Two Merkle trees are compared if both of them cover the same token range regardless of their size. The trees are compared recursively starting at root hash. If root hashes match in both the trees then all the data blocks in the tree's token range are consistent between replicas. If root hashes disagree, then the left child hashes are compared followed next by right child hashes. The comparison proceeds until all the token ranges that differ between the two trees are calculated.

import java.util.*;

import org.apache.cassandra.dht.*;

import org.apache.cassandra.utils.*;

import org.apache.cassandra.utils.MerkleTree.RowHash;

import org.apache.cassandra.utils.MerkleTree.TreeRange;

import org.apache.cassandra.utils.MerkleTree.TreeRangeIterator;

import static org.apache.cassandra.utils.MerkleTree.RECOMMENDED_DEPTH;

public class MerkleTreeTest {

public static void addRows(MerkleTree tree, Map<BigIntegerToken, byte[]> rows) {

MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

// Get the sub range iterator which iterates over sorted sub ranges

TreeRangeIterator ranges = tree.invalids();

TreeRange range = ranges.next();

for (BigIntegerToken token : rows.keySet()) {

while (!range.contains(token)) {

// add the empty hash, and move to the next range

range.addHash(EMPTY_ROW);

range = ranges.next();

}

range.addHash(new RowHash(token, rows.get(token)));

}

if (range != null) {

range.addHash(EMPTY_ROW);

}

while (ranges.hasNext()) {

range = ranges.next();

range.addHash(EMPTY_ROW);

}

}

public static void main(final String[] args) {

// Sorted map of hash values of rows

Map<BigIntegerToken, byte[]> rows1 = new TreeMap<BigIntegerToken, byte[]>();

rows1.put(new BigIntegerToken("5"), new byte[] { (byte)0x09 });

rows1.put(new BigIntegerToken("135"), new byte[] { (byte)0x0c });

rows1.put(new BigIntegerToken("170"), new byte[] { (byte)0x05 });

rows1.put(new BigIntegerToken("185"), new byte[] { (byte)0x02 });

Map<BigIntegerToken, byte[]> rows2 = new TreeMap<BigIntegerToken, byte[]>();

rows2.put(new BigIntegerToken("90"), new byte[] { (byte)0x03 });

rows2.put(new BigIntegerToken("135"), new byte[] { (byte)0x0c });

rows2.put(new BigIntegerToken("170"), new byte[] { (byte)0x05 });

rows2.put(new BigIntegerToken("185"), new byte[] { (byte)0x02 });

// Create Merkle tree of depth 3 for the token range (0 - 256]

IPartitioner partitioner = new RandomPartitioner();

Range fullRange = new Range(new BigIntegerToken("0"), new BigIntegerToken("256"), partitioner);

MerkleTree tree1 = new MerkleTree(partitioner, fullRange, RECOMMENDED_DEPTH, 8);

tree1.init();

MerkleTree tree2 = new MerkleTree(partitioner, fullRange, RECOMMENDED_DEPTH, 8);

tree2.init();

// Add row hashes

addRows(tree1, rows1);

addRows(tree2, rows2);

// Calculate hashes of non leaf nodes

List<TreeRange> diff = MerkleTree.difference(tree1, tree2);

System.out.println("tree1: " + tree1);

System.out.println("tree2: " + tree2);

System.out.println("difference: " + diff);

}

}

tree1: #<MerkleTree root=#<Inner 128 hash=[02] children=[#<Inner 64 hash=[09] children=[#<Inner 32 hash=[09] children=[#<Leaf [09]> #<Leaf []>]> #<Inner 96 hash=[] children=[#<Leaf []> #<Leaf []>]>]> #<Inner 192 hash=[0b] children=[#<Inner 160 hash=[0b] children=[#<Leaf [0c]> #<Leaf [07]>]> #<Inner 224 hash=[] children=[#<Leaf []> #<Leaf []>]>]>]>>

tree2: #<MerkleTree root=#<Inner 128 hash=[08] children=[#<Inner 64 hash=[03] children=[#<Inner 32 hash=[] children=[#<Leaf []> #<Leaf []>]> #<Inner 96 hash=[03] children=[#<Leaf [03]> #<Leaf []>]>]> #<Inner 192 hash=[0b] children=[#<Inner 160 hash=[0b] children=[#<Leaf [0c]> #<Leaf [07]>]> #<Inner 224 hash=[] children=[#<Leaf []> #<Leaf []>]>]>]>>

difference: [#<TreeRange (0,32] depth=3>, #<TreeRange (64,96] depth=3>]

http://en.wikipedia.org/wiki/Merkle_tree

http://wiki.apache.org/cassandra/AntiEntropy

https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java

Cassandra's implementation of Merkle tree (org.apache.cassandra.utils.MerkleTree) uses perfect binary tree where each leaf contains the hash of a row value and each parent node contains hash of its right and left child. In a perfect binary tree all the leaves are at the same level or at same depth. A perfect binary tree of depth

**h**contains**2^h**leaves. In other terms if a range contains**n**tokens then the Merkle tree representing it contains**log(n)**levels.When nodetool repair command is executed, the target node specified with

**-h**option in the command, coordinates the repair of each column family in each keyspace. A repair coordinator node requests Merkle tree from each replica for a specific token range to compare them. Each replica builds a Merkle tree by scanning the data stored locally in the requested token range. The repair coordinator node compares the Merkle trees and finds all the sub token ranges that differ between the replicas and repairs data in those ranges.A replica node builds a Merkle tree for each column family to represent hashes of rows in a given token range. A token range can contain up to

**2^127**tokens when RandomPartitioner is used. Merkle tree of depth 127 is required which contains**2^127**leaves. Cassandra builds a compact version of Merkle tree of depth 15 to reduce the memory usage to store the tree and to minimize the amount of data required to transfer Merkle tree to another node. It expands the tree until a given token range is split in to**32768**sub ranges. In the compact version of tree, each leaf represents hash of all rows in its respective sub range. Regardless of their size and split, two Merkle trees can be compared if they have same hash depth.For example, the token range (0, 256], contains 256 sub ranges (0, 1], (1, 2]...(255, 256] each containing single token. A perfect binary tree of depth 8 is required to store all 256 sub range hashes at leaves. A compact version of tree with depth 3 for the same range contains only 8 leaves representing hashes of sub ranges (0, 32], (32, 64] ... (224, 256] each containing 32 tokens. Each leaf hash in this compact version of tree is a computed hash of all the nodes under it in the perfect binary tree of depth 8.

**Building Merkle tree with RandomPartitioner**RandomPartitioner distributes keys uniformly , so the Merkle tree is constructed recursively by splitting the given token range in to two equal sub ranges until maximum number of sub ranges are reached. A root node is added with the given token range (left, right] and the range is split in to two halves at a token which is at the midpoint of the range. A left child node is added with range (left, midpoint] and a right child node is added with range covering (midpoint, right]. The process is repeated until required number of leaves (sub ranges) added to the tree.

Next row hashes are added to the Merkle tree in sorted order. Each row's hash value is computed by calculating MD5 digest of row value which includes row's column count, column names and column values but not the row key and row size. The deleted rows (tombstones) hashes are also added to the tree which include the delete timestamps. Row hashes are added to Merkle tree leaves based on their tokens. If a leaf's sub range contains multiple rows, its hash is computed by combining hashes of all rows covered by its range using XOR operation. Non leaf nodes hashes are computed by performing XOR on hashes of their respective children.

**Comparing Merkle trees**Two Merkle trees are compared if both of them cover the same token range regardless of their size. The trees are compared recursively starting at root hash. If root hashes match in both the trees then all the data blocks in the tree's token range are consistent between replicas. If root hashes disagree, then the left child hashes are compared followed next by right child hashes. The comparison proceeds until all the token ranges that differ between the two trees are calculated.

**Sample Java code to test Merkle tree**

import java.util.*;

import org.apache.cassandra.dht.*;

import org.apache.cassandra.utils.*;

import org.apache.cassandra.utils.MerkleTree.RowHash;

import org.apache.cassandra.utils.MerkleTree.TreeRange;

import org.apache.cassandra.utils.MerkleTree.TreeRangeIterator;

import static org.apache.cassandra.utils.MerkleTree.RECOMMENDED_DEPTH;

public class MerkleTreeTest {

public static void addRows(MerkleTree tree, Map<BigIntegerToken, byte[]> rows) {

MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

// Get the sub range iterator which iterates over sorted sub ranges

TreeRangeIterator ranges = tree.invalids();

TreeRange range = ranges.next();

for (BigIntegerToken token : rows.keySet()) {

while (!range.contains(token)) {

// add the empty hash, and move to the next range

range.addHash(EMPTY_ROW);

range = ranges.next();

}

range.addHash(new RowHash(token, rows.get(token)));

}

if (range != null) {

range.addHash(EMPTY_ROW);

}

while (ranges.hasNext()) {

range = ranges.next();

range.addHash(EMPTY_ROW);

}

}

public static void main(final String[] args) {

// Sorted map of hash values of rows

Map<BigIntegerToken, byte[]> rows1 = new TreeMap<BigIntegerToken, byte[]>();

rows1.put(new BigIntegerToken("5"), new byte[] { (byte)0x09 });

rows1.put(new BigIntegerToken("135"), new byte[] { (byte)0x0c });

rows1.put(new BigIntegerToken("170"), new byte[] { (byte)0x05 });

rows1.put(new BigIntegerToken("185"), new byte[] { (byte)0x02 });

Map<BigIntegerToken, byte[]> rows2 = new TreeMap<BigIntegerToken, byte[]>();

rows2.put(new BigIntegerToken("90"), new byte[] { (byte)0x03 });

rows2.put(new BigIntegerToken("135"), new byte[] { (byte)0x0c });

rows2.put(new BigIntegerToken("170"), new byte[] { (byte)0x05 });

rows2.put(new BigIntegerToken("185"), new byte[] { (byte)0x02 });

// Create Merkle tree of depth 3 for the token range (0 - 256]

IPartitioner partitioner = new RandomPartitioner();

Range fullRange = new Range(new BigIntegerToken("0"), new BigIntegerToken("256"), partitioner);

MerkleTree tree1 = new MerkleTree(partitioner, fullRange, RECOMMENDED_DEPTH, 8);

tree1.init();

MerkleTree tree2 = new MerkleTree(partitioner, fullRange, RECOMMENDED_DEPTH, 8);

tree2.init();

// Add row hashes

addRows(tree1, rows1);

addRows(tree2, rows2);

// Calculate hashes of non leaf nodes

List<TreeRange> diff = MerkleTree.difference(tree1, tree2);

System.out.println("tree1: " + tree1);

System.out.println("tree2: " + tree2);

System.out.println("difference: " + diff);

}

}

**Output:**

tree1: #<MerkleTree root=#<Inner 128 hash=[02] children=[#<Inner 64 hash=[09] children=[#<Inner 32 hash=[09] children=[#<Leaf [09]> #<Leaf []>]> #<Inner 96 hash=[] children=[#<Leaf []> #<Leaf []>]>]> #<Inner 192 hash=[0b] children=[#<Inner 160 hash=[0b] children=[#<Leaf [0c]> #<Leaf [07]>]> #<Inner 224 hash=[] children=[#<Leaf []> #<Leaf []>]>]>]>>

tree2: #<MerkleTree root=#<Inner 128 hash=[08] children=[#<Inner 64 hash=[03] children=[#<Inner 32 hash=[] children=[#<Leaf []> #<Leaf []>]> #<Inner 96 hash=[03] children=[#<Leaf [03]> #<Leaf []>]>]> #<Inner 192 hash=[0b] children=[#<Inner 160 hash=[0b] children=[#<Leaf [0c]> #<Leaf [07]>]> #<Inner 224 hash=[] children=[#<Leaf []> #<Leaf []>]>]>]>>

difference: [#<TreeRange (0,32] depth=3>, #<TreeRange (64,96] depth=3>]

**References:**http://en.wikipedia.org/wiki/Merkle_tree

http://wiki.apache.org/cassandra/AntiEntropy

https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MerkleTree.java

Very useful! Do you known that Apache Merkle trees has any c++ implement?

ReplyDeleteThe Java code is part of Cassandra source. There are some open source implementations on github in various languages. I haven't tried any one of them so far.

ReplyDeletehi, can you provide some c code for merkle hash tree implementation

ReplyDeleteIf a distributed mechanism is well designed, it will scale with a number of nodes. Cassandra is one of the best examples of such a system. It scales almost linearly, with regard to performance, when we add new nodes. This means that Cassandra can handle the behemoth of data without wincing.

ReplyDelete