Merge源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct MergeOperationInput {
explicit MergeOperationInput(const Slice& _key,
const Slice* _existing_value,
const std::vector<Slice>& _operand_list,
Logger* _logger)
: key(_key),
existing_value(_existing_value),
operand_list(_operand_list),
logger(_logger) {}

// The key associated with the merge operation.
const Slice& key;
// The existing value of the current key, nullptr means that the
// value doesn't exist.
const Slice* existing_value;
// A list of operands to apply.
const std::vector<Slice>& operand_list;
// Logger could be used by client to log any errors that happen during
// the merge operation.
Logger* logger;
};

struct MergeOperationOutput {
explicit MergeOperationOutput(std::string& _new_value,
Slice& _existing_operand)
: new_value(_new_value), existing_operand(_existing_operand) {}

// Client is responsible for filling the merge result here.
std::string& new_value;
// If the merge result is one of the existing operands (or existing_value),
// client can set this field to the operand (or existing_value) instead of
// using new_value.
Slice& existing_operand;
};

问题: 首先要明白这两个参数类是干嘛的. 其中的参数是在哪一步定义和初始化的.

sortlist.cc

sortlist 中 merge的实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool SortList::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
std::vector<int> left;
for (Slice slice : merge_in.operand_list) {
std::vector<int> right;
MakeVector(right, slice);
left = Merge(left, right);
}
for (int i = 0; i < static_cast<int>(left.size()) - 1; i++) {
merge_out->new_value.append(std::to_string(left[i])).append(",");
}
merge_out->new_value.append(std::to_string(left.back()));
return true;
}

db_range_del_test.cc

MockMergeOperator 模拟测试merge.

1
2
3
4
5
6
7
8
9
10
11
12
13
class MockMergeOperator : public MergeOperator {
// Mock non-associative operator. Non-associativity is expressed by lack of
// implementation for any `PartialMerge*` functions.
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
assert(merge_out != nullptr);
merge_out->new_value = merge_in.operand_list.back().ToString();
return true;
}

const char* Name() const override { return "MockMergeOperator"; }
};

new_value 指向operand_list的尾元素.

c.cc

这个也可以借鉴下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
struct rocksdb_mergeoperator_t : public MergeOperator {
void* state_;
void (*destructor_)(void*);
const char* (*name_)(void*);
char* (*full_merge_)(
void*,
const char* key, size_t key_length,
const char* existing_value, size_t existing_value_length,
const char* const* operands_list, const size_t* operands_list_length,
int num_operands,
unsigned char* success, size_t* new_value_length);
char* (*partial_merge_)(void*, const char* key, size_t key_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length);
void (*delete_value_)(
void*,
const char* value, size_t value_length);

~rocksdb_mergeoperator_t() override { (*destructor_)(state_); }

const char* Name() const override { return (*name_)(state_); }

bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
size_t n = merge_in.operand_list.size();
std::vector<const char*> operand_pointers(n);
std::vector<size_t> operand_sizes(n);
for (size_t i = 0; i < n; i++) {
Slice operand(merge_in.operand_list[i]);
operand_pointers[i] = operand.data();
operand_sizes[i] = operand.size();
}

const char* existing_value_data = nullptr;
size_t existing_value_len = 0;
if (merge_in.existing_value != nullptr) {
existing_value_data = merge_in.existing_value->data();
existing_value_len = merge_in.existing_value->size();
}

unsigned char success;
size_t new_value_len;
char* tmp_new_value = (*full_merge_)(
state_, merge_in.key.data(), merge_in.key.size(), existing_value_data,
existing_value_len, &operand_pointers[0], &operand_sizes[0],
static_cast<int>(n), &success, &new_value_len);
merge_out->new_value.assign(tmp_new_value, new_value_len);

if (delete_value_ != nullptr) {
(*delete_value_)(state_, tmp_new_value, new_value_len);
} else {
free(tmp_new_value);
}

return success;
}

bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* /*logger*/) const override {
size_t operand_count = operand_list.size();
std::vector<const char*> operand_pointers(operand_count);
std::vector<size_t> operand_sizes(operand_count);
for (size_t i = 0; i < operand_count; ++i) {
Slice operand(operand_list[i]);
operand_pointers[i] = operand.data();
operand_sizes[i] = operand.size();
}

unsigned char success;
size_t new_value_len;
char* tmp_new_value = (*partial_merge_)(
state_, key.data(), key.size(), &operand_pointers[0], &operand_sizes[0],
static_cast<int>(operand_count), &success, &new_value_len);
new_value->assign(tmp_new_value, new_value_len);

if (delete_value_ != nullptr) {
(*delete_value_)(state_, tmp_new_value, new_value_len);
} else {
free(tmp_new_value);
}

return success;
}
};

最后list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class ListMergeOperator : public rocksdb::MergeOperator {
private:
size_t max_val;
public:
ListMergeOperator(size_t max_val): max_val(max_val) {
}

virtual ~ListMergeOperator() {}

bool FullMergeV2(const MergeOperationInput &merge_in, MergeOperationOutput *merge_out) const {
merge_out->new_value.clear();

// Compute the space needed for the final result.
size_t numBytes = 0;
for (auto &s : merge_in.operand_list) {
numBytes += s.size() + rocksdb::VarintLength(s.size());

#ifndef NDEBUG
if (merge_in.existing_value) {
listdb::log_debug("FullMergeV2, exits %s, %s, total bytes %d", merge_in.existing_value->data(),
s.data(), numBytes);
} else {
listdb::log_debug("FullMergeV2, no exits, %s, total bytes %d", s.data(), numBytes);
}
#endif
}

if (merge_in.existing_value) {
merge_out->new_value.reserve(numBytes + merge_in.existing_value->size());
merge_out->new_value.append(merge_in.existing_value->data(), merge_in.existing_value->size());
} else {
merge_out->new_value.reserve(numBytes);
}

for (auto &s: merge_in.operand_list) {
// encode list item as: size + data
rocksdb::PutVarint32(&merge_out->new_value, (uint32_t) s.size()); // put size
merge_out->new_value.append(s.data_, s.size());
}

if (max_val > 0 && merge_out->new_value.size() > max_val) { // 超过了大小限制, 去掉前面的
size_t to_ignore = merge_out->new_value.size() - max_val;

auto p = merge_out->new_value.data(), start = merge_out->new_value.data(), end = merge_out->new_value.data() + merge_out->new_value.size();
while (p < end) {
uint32_t size = 0;
auto t = rocksdb::GetVarint32Ptr(p, p + 5, &size);
t += size;
if (t - start > to_ignore) break;
p = t;
}

listdb::log_debug("FullMergeV2, trim %d -> %d/%d", merge_out->new_value.size(), to_ignore, merge_out->new_value.size() - (p - start));
if(p != start)
merge_out->new_value = merge_out->new_value.substr(p - start);
}

#ifndef NDEBUG
if (merge_in.existing_value) {
listdb::log_debug("FullMergeV2, exits %s, get %s %d/%d", merge_in.existing_value->data(),
merge_out->new_value.data(), numBytes, merge_out->new_value.size());
} else {
listdb::log_debug("FullMergeV2, no exits, get %s %d/%d", merge_out->new_value.data(), numBytes,
merge_out->new_value.size());
}
#endif

return true;
}

bool PartialMerge(const rocksdb::Slice &key, const rocksdb::Slice &left_operand,
const rocksdb::Slice &right_operand, std::string *new_value,
rocksdb::Logger *logger) const {
return false;
}

const char *Name() const {
return "list-merge";
}
};