Coverage Report

Created: 2026-07-01 22:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/adaptive_random_bucket_state.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/adaptive_random_bucket_state.h"
19
20
#include <glog/logging.h>
21
22
#include <utility>
23
24
namespace doris {
25
26
Status AdaptiveRandomBucketState::init_partition(int64_t partition_id,
27
                                                 const std::vector<int64_t>& tablets,
28
                                                 const std::vector<int32_t>& bucket_seqs,
29
8
                                                 int32_t start_tablet_idx) {
30
8
    if (partition_id < 0 || tablets.empty()) {
31
0
        return Status::OK();
32
0
    }
33
8
    if (tablets.size() != bucket_seqs.size()) {
34
1
        return Status::InternalError(
35
1
                "invalid adaptive random bucket tablet sequence, load_id={}, partition_id={}, "
36
1
                "tablet_count={}, bucket_seq_count={}",
37
1
                print_id(_load_id), partition_id, tablets.size(), bucket_seqs.size());
38
1
    }
39
7
    if (start_tablet_idx < 0 || static_cast<size_t>(start_tablet_idx) >= tablets.size()) {
40
1
        return Status::InternalError(
41
1
                "invalid adaptive random bucket start tablet index, load_id={}, partition_id={}, "
42
1
                "start_tablet_idx={}, tablet_count={}",
43
1
                print_id(_load_id), partition_id, start_tablet_idx, tablets.size());
44
1
    }
45
6
    std::lock_guard<std::mutex> lock(_mutex);
46
6
    auto existing = _partition_states.find(partition_id);
47
6
    if (existing != _partition_states.end()) {
48
2
        const auto& state = existing->second;
49
2
        if (state.tablets != tablets || state.bucket_seqs != bucket_seqs ||
50
2
            state.initial_tablet_pos != start_tablet_idx) {
51
1
            return Status::InternalError(
52
1
                    "inconsistent adaptive random bucket partition init, load_id={}, "
53
1
                    "partition_id={}, existing_tablet_count={}, new_tablet_count={}, "
54
1
                    "existing_bucket_seq_count={}, new_bucket_seq_count={}, "
55
1
                    "existing_start_idx={}, new_start_idx={}",
56
1
                    print_id(_load_id), partition_id, state.tablets.size(), tablets.size(),
57
1
                    state.bucket_seqs.size(), bucket_seqs.size(), state.initial_tablet_pos,
58
1
                    start_tablet_idx);
59
1
        }
60
1
        return Status::OK();
61
2
    }
62
63
4
    PartitionState state;
64
4
    state.partition_id = partition_id;
65
4
    state.tablets = tablets;
66
4
    state.bucket_seqs = bucket_seqs;
67
4
    state.initial_tablet_pos = start_tablet_idx;
68
4
    state.tablet_pos = start_tablet_idx;
69
4
    state.current_tablet_id = state.tablets[state.tablet_pos];
70
71
4
    _partition_states.emplace(partition_id, std::move(state));
72
4
    LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", partition=" << partition_id
73
4
              << ", local tablet count=" << tablets.size()
74
4
              << ", start tablet=" << _partition_states.at(partition_id).current_tablet_id;
75
4
    return Status::OK();
76
6
}
77
78
11
int64_t AdaptiveRandomBucketState::current_tablet(int64_t partition_id) {
79
11
    std::lock_guard<std::mutex> lock(_mutex);
80
11
    auto it = _partition_states.find(partition_id);
81
11
    if (it == _partition_states.end()) {
82
2
        return -1;
83
2
    }
84
9
    return it->second.current_tablet_id;
85
11
}
86
87
4
void AdaptiveRandomBucketState::rotate_by_tablet(int64_t partition_id, int64_t tablet_id) {
88
4
    std::lock_guard<std::mutex> lock(_mutex);
89
4
    auto state_it = _partition_states.find(partition_id);
90
4
    if (state_it == _partition_states.end()) {
91
0
        return;
92
0
    }
93
4
    auto& state = state_it->second;
94
4
    if (state.current_tablet_id != tablet_id) {
95
1
        return;
96
1
    }
97
3
    int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size());
98
    LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id
99
3
              << ", partition=" << state.partition_id << " rotate tablet "
100
3
              << state.current_tablet_id << " -> " << state.tablets[next_pos]
101
3
              << " after tablet=" << tablet_id << " memtable flushed";
102
3
    state.tablet_pos = next_pos;
103
3
    state.current_tablet_id = state.tablets[next_pos];
104
3
}
105
106
} // namespace doris