Coverage Report

Created: 2026-06-10 12:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vtablet_finder.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/vtablet_finder.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <string>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "runtime/runtime_state.h"
34
#include "storage/tablet_info.h"
35
36
namespace doris {
37
38
void AdaptiveRandomBucketState::init_partition(int32_t sender_id, int64_t partition_id,
39
                                               const std::vector<int64_t>& tablets,
40
                                               const std::vector<int32_t>& bucket_seqs,
41
9.29k
                                               int32_t start_tablet_idx) {
42
9.29k
    if (sender_id < 0 || partition_id < 0 || tablets.empty()) {
43
0
        return;
44
0
    }
45
9.29k
    std::lock_guard<std::mutex> lock(_mutex);
46
9.29k
    auto& partition_states = _sender_partition_states[sender_id];
47
9.29k
    if (partition_states.contains(partition_id)) {
48
1
        return;
49
1
    }
50
51
9.29k
    PartitionState state;
52
9.29k
    state.partition_id = partition_id;
53
9.29k
    state.tablets = tablets;
54
9.29k
    state.bucket_seqs = bucket_seqs;
55
9.29k
    if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) {
56
9.29k
        state.tablet_pos = start_tablet_idx;
57
9.29k
    }
58
9.29k
    state.current_tablet_id = state.tablets[state.tablet_pos];
59
60
9.29k
    partition_states.emplace(partition_id, std::move(state));
61
9.29k
    LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id
62
9.29k
              << ", partition=" << partition_id << ", local tablet count=" << tablets.size()
63
9.29k
              << ", start tablet=" << partition_states.at(partition_id).current_tablet_id;
64
9.29k
}
65
66
4.35k
int64_t AdaptiveRandomBucketState::current_tablet(int32_t sender_id, int64_t partition_id) {
67
4.35k
    std::lock_guard<std::mutex> lock(_mutex);
68
4.35k
    auto sender_it = _sender_partition_states.find(sender_id);
69
4.35k
    if (sender_it == _sender_partition_states.end()) {
70
1
        return -1;
71
1
    }
72
4.35k
    auto it = sender_it->second.find(partition_id);
73
4.35k
    if (it == sender_it->second.end()) {
74
0
        return -1;
75
0
    }
76
4.35k
    return it->second.current_tablet_id;
77
4.35k
}
78
79
void AdaptiveRandomBucketState::rotate_by_tablet(int32_t sender_id, int64_t partition_id,
80
3
                                                 int64_t tablet_id) {
81
3
    if (!config::enable_adaptive_random_bucket_load_bucket_rotation) {
82
0
        return;
83
0
    }
84
3
    std::lock_guard<std::mutex> lock(_mutex);
85
3
    auto sender_it = _sender_partition_states.find(sender_id);
86
3
    if (sender_it == _sender_partition_states.end()) {
87
0
        return;
88
0
    }
89
3
    auto state_it = sender_it->second.find(partition_id);
90
3
    if (state_it == sender_it->second.end()) {
91
0
        return;
92
0
    }
93
3
    auto& state = state_it->second;
94
3
    if (state.current_tablet_id != tablet_id) {
95
1
        return;
96
1
    }
97
2
    int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size());
98
2
    LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id
99
2
              << ", partition=" << state.partition_id << " rotate tablet "
100
2
              << state.current_tablet_id << " -> " << state.tablets[next_pos]
101
2
              << " after tablet=" << tablet_id << " memtable flushed";
102
2
    state.tablet_pos = next_pos;
103
2
    state.current_tablet_id = state.tablets[next_pos];
104
2
}
105
106
Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows,
107
                                      std::vector<VOlapTablePartition*>& partitions,
108
                                      std::vector<uint32_t>& tablet_index, std::vector<bool>& skip,
109
43.4k
                                      std::vector<uint32_t>* miss_rows) {
110
35.6M
    for (int index = 0; index < rows; index++) {
111
35.6M
        _vpartition->find_partition(block, index, partitions[index]);
112
35.6M
    }
113
114
43.4k
    std::vector<uint32_t> qualified_rows;
115
43.4k
    qualified_rows.reserve(rows);
116
117
35.8M
    for (int row_index = 0; row_index < rows; row_index++) {
118
35.8M
        if (partitions[row_index] == nullptr) [[unlikely]] {
119
38.3k
            if (miss_rows != nullptr) {          // auto partition table
120
37.2k
                miss_rows->push_back(row_index); // already reserve memory outside
121
37.2k
                skip[row_index] = true;
122
37.2k
                continue;
123
37.2k
            }
124
1.11k
            _num_filtered_rows++;
125
1.11k
            _filter_bitmap.Set(row_index, true);
126
1.11k
            skip[row_index] = true;
127
1.11k
            RETURN_IF_ERROR(state->append_error_msg_to_file(
128
1.11k
                    []() -> std::string { return ""; },
129
1.11k
                    [&]() -> std::string {
130
1.11k
                        fmt::memory_buffer buf;
131
1.11k
                        fmt::format_to(buf, "no partition for this tuple. tuple={}",
132
1.11k
                                       block->dump_data_json(row_index, 1));
133
1.11k
                        return fmt::to_string(buf);
134
1.11k
                    }));
135
1.11k
            continue;
136
1.11k
        }
137
35.7M
        if (!partitions[row_index]->is_mutable) [[unlikely]] {
138
12
            _num_immutable_partition_filtered_rows++;
139
12
            skip[row_index] = true;
140
12
            continue;
141
12
        }
142
35.7M
        if (partitions[row_index]->num_buckets <= 0) [[unlikely]] {
143
0
            std::stringstream ss;
144
0
            ss << "num_buckets must be greater than 0, num_buckets="
145
0
               << partitions[row_index]->num_buckets;
146
0
            return Status::InternalError(ss.str());
147
0
        }
148
149
35.7M
        _partition_ids.emplace(partitions[row_index]->id);
150
151
35.7M
        qualified_rows.push_back(row_index);
152
35.7M
    }
153
154
43.4k
    if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
155
37.7k
        _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
156
37.7k
    } else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) {
157
        // Receiver-side random bucket mode only needs partition ids on sender side.
158
        // The receiver decides the concrete tablet from its local ordered tablet list.
159
4.97k
    } else {
160
        // FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK
161
651
        _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
162
651
                                  &_partition_to_tablet_map);
163
651
        if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
164
645
            for (auto it : _partition_to_tablet_map) {
165
                // do round-robin for next batch
166
645
                if (it.first->load_tablet_idx != -1) {
167
645
                    it.first->load_tablet_idx++;
168
645
                }
169
645
            }
170
645
            _partition_to_tablet_map.clear();
171
645
        }
172
651
    }
173
174
43.4k
    return Status::OK();
175
43.4k
}
176
177
} // namespace doris