Coverage Report

Created: 2026-03-13 05:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vrow_distribution.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/FrontendService.h>
23
#include <gen_cpp/FrontendService_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <glog/logging.h>
26
27
#include <cstdint>
28
#include <functional>
29
#include <string>
30
#include <unordered_set>
31
#include <vector>
32
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "core/custom_allocator.h"
36
#include "exec/sink/vtablet_block_convertor.h"
37
#include "exec/sink/vtablet_finder.h"
38
#include "exprs/vexpr_context.h"
39
#include "exprs/vexpr_fwd.h"
40
#include "runtime/runtime_profile.h"
41
#include "runtime/runtime_state.h"
42
#include "storage/tablet_info.h"
43
44
namespace doris {
45
#include "common/compile_check_begin.h"
46
47
class IndexChannel;
48
class VNodeChannel;
49
50
// <row_idx, partition_id, tablet_id>
51
class RowPartTabletIds {
52
public:
53
    DorisVector<uint32_t> row_ids;
54
    DorisVector<int64_t> partition_ids;
55
    DorisVector<int64_t> tablet_ids;
56
57
0
    std::string debug_string() const {
58
0
        std::string value;
59
0
        value.reserve(row_ids.size() * 15);
60
0
        for (int i = 0; i < row_ids.size(); i++) {
61
0
            value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
62
0
        }
63
0
        return value;
64
0
    }
65
};
66
67
// void* for caller
68
using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*);
69
70
class VRowDistribution {
71
public:
72
    // only used to pass parameters for VRowDistribution
73
    struct VRowDistributionContext {
74
        RuntimeState* state = nullptr;
75
        OlapTableBlockConvertor* block_convertor = nullptr;
76
        OlapTabletFinder* tablet_finder = nullptr;
77
        VOlapTablePartitionParam* vpartition = nullptr;
78
        RuntimeProfile::Counter* add_partition_request_timer = nullptr;
79
        int64_t txn_id = -1;
80
        ObjectPool* pool = nullptr;
81
        OlapTableLocationParam* location = nullptr;
82
        const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
83
        std::shared_ptr<OlapTableSchemaParam> schema;
84
        void* caller = nullptr;
85
        bool write_single_replica = false;
86
        CreatePartitionCallback create_partition_callback;
87
    };
88
    friend class VTabletWriter;
89
    friend class VTabletWriterV2;
90
91
74.6k
    VRowDistribution() = default;
92
74.8k
    virtual ~VRowDistribution() = default;
93
94
74.7k
    void init(VRowDistributionContext ctx) {
95
74.7k
        _state = ctx.state;
96
74.7k
        _batch_size = std::max(_state->batch_size(), 8192);
97
74.7k
        _block_convertor = ctx.block_convertor;
98
74.7k
        _tablet_finder = ctx.tablet_finder;
99
74.7k
        _vpartition = ctx.vpartition;
100
74.7k
        _add_partition_request_timer = ctx.add_partition_request_timer;
101
74.7k
        _txn_id = ctx.txn_id;
102
74.7k
        _pool = ctx.pool;
103
74.7k
        _location = ctx.location;
104
74.7k
        _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
105
74.7k
        _schema = ctx.schema;
106
74.7k
        _caller = ctx.caller;
107
74.7k
        _write_single_replica = ctx.write_single_replica;
108
74.7k
        _create_partition_callback = ctx.create_partition_callback;
109
74.7k
    }
110
111
2
    void output_profile_info(RuntimeProfile* profile) {
112
2
        if (!_add_partition_request_times.empty()) {
113
0
            std::stringstream ss;
114
0
            ss << "[";
115
0
            for (size_t i = 0; i < _add_partition_request_times.size(); ++i) {
116
0
                if (i > 0) {
117
0
                    ss << ", ";
118
0
                }
119
0
                ss << PrettyPrinter::print(_add_partition_request_times[i], TUnit::TIME_NS);
120
0
            }
121
0
            ss << "]";
122
0
            profile->add_info_string("AddPartitionRequestTimeList", ss.str());
123
0
        }
124
2
    }
125
126
74.6k
    Status open(RowDescriptor* output_row_desc) {
127
74.6k
        if (_vpartition->is_auto_partition()) {
128
300
            auto [part_ctxs, part_funcs] = _get_partition_function();
129
312
            for (auto part_ctx : part_ctxs) {
130
312
                RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
131
312
                RETURN_IF_ERROR(part_ctx->open(_state));
132
312
            }
133
300
        }
134
75.7k
        for (const auto& index : _schema->indexes()) {
135
75.7k
            auto& where_clause = index->where_clause;
136
75.7k
            if (where_clause != nullptr) {
137
33
                RETURN_IF_ERROR(where_clause->prepare(_state, *output_row_desc));
138
33
                RETURN_IF_ERROR(where_clause->open(_state));
139
33
            }
140
75.7k
        }
141
74.6k
        return Status::OK();
142
74.6k
    }
143
144
    // auto partition
145
    // mv where clause
146
    // v1 needs index->node->row_ids - tabletids
147
    // v2 needs index,tablet->rowids
148
    Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
149
                                      std::vector<RowPartTabletIds>& row_part_tablet_ids,
150
                                      int64_t& rows_stat_val);
151
    // have 2 ways remind to deal batching block:
152
    // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
153
    // 2. in caller, after last block and before close, set _deal_batched = true.
154
118k
    bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
155
    // create partitions when need for auto-partition table using #_partitions_need_create.
156
    Status automatic_create_partition();
157
    void clear_batching_stats();
158
1.47k
    const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round
159
160
    // for auto partition
161
    std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
162
    bool _deal_batched = false; // If true, send batched block before any block's append.
163
164
private:
165
    std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();
166
167
    Status _save_missing_values(const Block& input_block,
168
                                std::vector<std::vector<std::string>>& col_strs, int col_size,
169
                                Block* block, const std::vector<uint32_t>& filter,
170
                                const std::vector<const NullMap*>& col_null_maps);
171
172
    void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);
173
174
    void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);
175
176
    Status _filter_block_by_skip_and_where_clause(Block* block,
177
                                                  const VExprContextSPtr& where_clause,
178
                                                  RowPartTabletIds& row_part_tablet_id);
179
180
    Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);
181
182
    Status _generate_rows_distribution_for_auto_partition(
183
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
184
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
185
            int64_t& rows_stat_val);
186
    // the whole process to deal missing rows. will call _save_missing_values
187
    Status _deal_missing_map(const Block& input_block, Block* block,
188
                             const std::vector<uint16_t>& partition_cols_idx,
189
                             int64_t& rows_stat_val);
190
191
    Status _generate_rows_distribution_for_non_auto_partition(
192
            Block* block, bool has_filtered_rows,
193
            std::vector<RowPartTabletIds>& row_part_tablet_ids);
194
195
    Status _generate_rows_distribution_for_auto_overwrite(
196
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
197
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
198
            int64_t& rows_stat_val);
199
    Status _replace_overwriting_partition();
200
201
    void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
202
                                    int64_t rows);
203
    void _reset_find_tablets(int64_t rows);
204
205
    struct NullableStringListHash {
206
18.3k
        std::size_t _hash(const TNullableStringLiteral& arg) const {
207
18.3k
            if (arg.is_null) {
208
16
                return 0;
209
16
            }
210
18.3k
            return std::hash<std::string>()(arg.value);
211
18.3k
        }
212
18.2k
        std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const {
213
18.2k
            std::size_t result = 0;
214
18.3k
            for (const auto& v : arg) {
215
18.3k
                result = (result << 1) ^ _hash(v);
216
18.3k
            }
217
18.2k
            return result;
218
18.2k
        }
219
    };
220
221
    RuntimeState* _state = nullptr;
222
    int _batch_size = 0;
223
224
    // for auto partitions
225
    std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
226
    size_t _batching_rows = 0, _batching_bytes = 0;
227
    std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper;
228
229
    OlapTableBlockConvertor* _block_convertor = nullptr;
230
    OlapTabletFinder* _tablet_finder = nullptr;
231
    VOlapTablePartitionParam* _vpartition = nullptr;
232
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
233
    int64_t _txn_id = -1;
234
    ObjectPool* _pool = nullptr;
235
    OlapTableLocationParam* _location = nullptr;
236
237
    // Record each auto-partition request time for detailed profiling
238
    std::vector<int64_t> _add_partition_request_times;
239
    // int64_t _number_output_rows = 0;
240
    const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr;
241
    // generally it's writer's on_partitions_created
242
    CreatePartitionCallback _create_partition_callback = nullptr;
243
    void* _caller = nullptr;
244
    std::shared_ptr<OlapTableSchemaParam> _schema;
245
    bool _write_single_replica = false;
246
247
    // reuse for find_tablet. save partitions found by find_tablets
248
    std::vector<VOlapTablePartition*> _partitions;
249
    std::vector<bool> _skip;
250
    std::vector<uint32_t> _tablet_indexes;
251
    std::vector<int64_t> _tablet_ids;
252
    std::vector<uint32_t> _missing_map; // indice of missing values in partition_col
253
    // for auto detect overwrite partition
254
    std::set<int64_t> _new_partition_ids; // if contains, not to replace it again.
255
};
256
257
} // namespace doris
258
259
#include "common/compile_check_end.h"