Coverage Report

Created: 2026-06-09 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vrow_distribution.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/FrontendService.h>
23
#include <gen_cpp/FrontendService_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <glog/logging.h>
26
27
#include <cstdint>
28
#include <functional>
29
#include <string>
30
#include <unordered_set>
31
#include <vector>
32
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "core/custom_allocator.h"
36
#include "exec/sink/vtablet_block_convertor.h"
37
#include "exec/sink/vtablet_finder.h"
38
#include "exprs/vexpr_context.h"
39
#include "exprs/vexpr_fwd.h"
40
#include "runtime/runtime_profile.h"
41
#include "runtime/runtime_state.h"
42
#include "storage/tablet_info.h"
43
44
namespace doris {
45
46
class IndexChannel;
47
class VNodeChannel;
48
49
// <row_idx, partition_id, tablet_id>
50
class RowPartTabletIds {
51
public:
52
    DorisVector<uint32_t> row_ids;
53
    DorisVector<int64_t> partition_ids;
54
    DorisVector<int64_t> tablet_ids;
55
56
0
    std::string debug_string() const {
57
0
        std::string value;
58
0
        value.reserve(row_ids.size() * 15);
59
0
        for (int i = 0; i < row_ids.size(); i++) {
60
0
            if (i < tablet_ids.size()) {
61
0
                value.append(
62
0
                        fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
63
0
            } else {
64
0
                value.append(fmt::format("[{}, {}]", row_ids[i], partition_ids[i]));
65
0
            }
66
0
        }
67
0
        return value;
68
0
    }
69
};
70
71
// void* for caller
72
using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*);
73
74
class VRowDistribution {
75
public:
76
    // only used to pass parameters for VRowDistribution
77
    struct VRowDistributionContext {
78
        RuntimeState* state = nullptr;
79
        OlapTableBlockConvertor* block_convertor = nullptr;
80
        OlapTabletFinder* tablet_finder = nullptr;
81
        VOlapTablePartitionParam* vpartition = nullptr;
82
        RuntimeProfile::Counter* add_partition_request_timer = nullptr;
83
        int64_t txn_id = -1;
84
        ObjectPool* pool = nullptr;
85
        OlapTableLocationParam* location = nullptr;
86
        const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
87
        std::shared_ptr<OlapTableSchemaParam> schema;
88
        void* caller = nullptr;
89
        bool write_single_replica = false;
90
        CreatePartitionCallback create_partition_callback;
91
    };
92
    friend class VTabletWriter;
93
    friend class VTabletWriterV2;
94
95
21
    VRowDistribution() = default;
96
21
    virtual ~VRowDistribution() = default;
97
98
8
    void init(VRowDistributionContext ctx) {
99
8
        _state = ctx.state;
100
8
        _batch_size = std::max(_state->batch_size(), 8192);
101
8
        _block_convertor = ctx.block_convertor;
102
8
        _tablet_finder = ctx.tablet_finder;
103
8
        _vpartition = ctx.vpartition;
104
8
        _add_partition_request_timer = ctx.add_partition_request_timer;
105
8
        _txn_id = ctx.txn_id;
106
8
        _pool = ctx.pool;
107
8
        _location = ctx.location;
108
8
        _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
109
8
        _schema = ctx.schema;
110
8
        _caller = ctx.caller;
111
8
        _write_single_replica = ctx.write_single_replica;
112
8
        _create_partition_callback = ctx.create_partition_callback;
113
8
    }
114
115
0
    void output_profile_info(RuntimeProfile* profile) {
116
0
        if (!_add_partition_request_times.empty()) {
117
0
            std::stringstream ss;
118
0
            ss << "[";
119
0
            for (size_t i = 0; i < _add_partition_request_times.size(); ++i) {
120
0
                if (i > 0) {
121
0
                    ss << ", ";
122
0
                }
123
0
                ss << PrettyPrinter::print(_add_partition_request_times[i], TUnit::TIME_NS);
124
0
            }
125
0
            ss << "]";
126
0
            profile->add_info_string("AddPartitionRequestTimeList", ss.str());
127
0
        }
128
0
    }
129
130
8
    Status open(RowDescriptor* output_row_desc) {
131
8
        if (_vpartition->is_auto_partition()) {
132
2
            auto [part_ctxs, part_funcs] = _get_partition_function();
133
2
            for (auto part_ctx : part_ctxs) {
134
2
                RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
135
2
                RETURN_IF_ERROR(part_ctx->open(_state));
136
2
            }
137
2
        }
138
8
        for (const auto& index : _schema->indexes()) {
139
8
            auto& where_clause = index->where_clause;
140
8
            if (where_clause != nullptr) {
141
0
                RETURN_IF_ERROR(where_clause->prepare(_state, *output_row_desc));
142
0
                RETURN_IF_ERROR(where_clause->open(_state));
143
0
            }
144
8
        }
145
8
        return Status::OK();
146
8
    }
147
148
    // auto partition
149
    // mv where clause
150
    // v1 needs index->node->row_ids - tabletids
151
    // v2 needs index,tablet->rowids
152
    Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
153
                                      std::vector<RowPartTabletIds>& row_part_tablet_ids,
154
                                      int64_t& rows_stat_val);
155
    // have 2 ways remind to deal batching block:
156
    // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
157
    // 2. in caller, after last block and before close, set _deal_batched = true.
158
3
    bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
159
    // create partitions when need for auto-partition table using #_partitions_need_create.
160
    Status automatic_create_partition();
161
    void clear_batching_stats();
162
3
    const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round
163
164
    // for auto partition
165
    std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
166
    bool _deal_batched = false; // If true, send batched block before any block's append.
167
168
private:
169
    std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();
170
171
    Status _save_missing_values(const Block& input_block,
172
                                std::vector<std::vector<std::string>>& col_strs, int col_size,
173
                                Block* block, const std::vector<uint32_t>& filter,
174
                                const std::vector<const NullMap*>& col_null_maps);
175
176
    void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);
177
178
    void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);
179
180
    Status _filter_block_by_skip_and_where_clause(Block* block,
181
                                                  const VExprContextSPtr& where_clause,
182
                                                  RowPartTabletIds& row_part_tablet_id);
183
184
    Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);
185
186
    Status _generate_rows_distribution_for_auto_partition(
187
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
188
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
189
            int64_t& rows_stat_val);
190
    // the whole process to deal missing rows. will call _save_missing_values
191
    Status _deal_missing_map(const Block& input_block, Block* block,
192
                             const std::vector<uint16_t>& partition_cols_idx,
193
                             int64_t& rows_stat_val);
194
195
    Status _generate_rows_distribution_for_non_auto_partition(
196
            Block* block, bool has_filtered_rows,
197
            std::vector<RowPartTabletIds>& row_part_tablet_ids);
198
199
    Status _generate_rows_distribution_for_auto_overwrite(
200
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
201
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
202
            int64_t& rows_stat_val);
203
    Status _replace_overwriting_partition();
204
205
    void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
206
                                    int64_t rows);
207
    void _reset_find_tablets(int64_t rows);
208
209
    struct NullableStringListHash {
210
6
        std::size_t _hash(const TNullableStringLiteral& arg) const {
211
6
            if (arg.is_null) {
212
0
                return 0;
213
0
            }
214
6
            return std::hash<std::string>()(arg.value);
215
6
        }
216
6
        std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const {
217
6
            std::size_t result = 0;
218
6
            for (const auto& v : arg) {
219
6
                result = (result << 1) ^ _hash(v);
220
6
            }
221
6
            return result;
222
6
        }
223
    };
224
225
    RuntimeState* _state = nullptr;
226
    int _batch_size = 0;
227
228
    // for auto partitions
229
    std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
230
    size_t _batching_rows = 0, _batching_bytes = 0;
231
    std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper;
232
233
    OlapTableBlockConvertor* _block_convertor = nullptr;
234
    OlapTabletFinder* _tablet_finder = nullptr;
235
    VOlapTablePartitionParam* _vpartition = nullptr;
236
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
237
    int64_t _txn_id = -1;
238
    ObjectPool* _pool = nullptr;
239
    OlapTableLocationParam* _location = nullptr;
240
241
    // Record each auto-partition request time for detailed profiling
242
    std::vector<int64_t> _add_partition_request_times;
243
    // int64_t _number_output_rows = 0;
244
    const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr;
245
    // generally it's writer's on_partitions_created
246
    CreatePartitionCallback _create_partition_callback = nullptr;
247
    void* _caller = nullptr;
248
    std::shared_ptr<OlapTableSchemaParam> _schema;
249
    bool _write_single_replica = false;
250
251
    // reuse for find_tablet. save partitions found by find_tablets
252
    std::vector<VOlapTablePartition*> _partitions;
253
    std::vector<bool> _skip;
254
    std::vector<uint32_t> _tablet_indexes;
255
    std::vector<int64_t> _tablet_ids;
256
    std::vector<uint32_t> _missing_map; // indice of missing values in partition_col
257
    // for auto detect overwrite partition
258
    std::set<int64_t> _new_partition_ids; // if contains, not to replace it again.
259
};
260
261
} // namespace doris