Coverage Report

Created: 2026-04-14 20:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vrow_distribution.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/FrontendService.h>
23
#include <gen_cpp/FrontendService_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <glog/logging.h>
26
27
#include <cstdint>
28
#include <functional>
29
#include <string>
30
#include <unordered_set>
31
#include <vector>
32
33
#include "common/status.h"
34
#include "core/block/block.h"
35
#include "core/custom_allocator.h"
36
#include "exec/sink/vtablet_block_convertor.h"
37
#include "exec/sink/vtablet_finder.h"
38
#include "exprs/vexpr_context.h"
39
#include "exprs/vexpr_fwd.h"
40
#include "runtime/runtime_profile.h"
41
#include "runtime/runtime_state.h"
42
#include "storage/tablet_info.h"
43
44
namespace doris {
45
46
class IndexChannel;
47
class VNodeChannel;
48
49
// <row_idx, partition_id, tablet_id>
50
class RowPartTabletIds {
51
public:
52
    DorisVector<uint32_t> row_ids;
53
    DorisVector<int64_t> partition_ids;
54
    DorisVector<int64_t> tablet_ids;
55
56
0
    std::string debug_string() const {
57
0
        std::string value;
58
0
        value.reserve(row_ids.size() * 15);
59
0
        for (int i = 0; i < row_ids.size(); i++) {
60
0
            value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
61
0
        }
62
0
        return value;
63
0
    }
64
};
65
66
// void* for caller
67
using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*);
68
69
class VRowDistribution {
70
public:
71
    // only used to pass parameters for VRowDistribution
72
    struct VRowDistributionContext {
73
        RuntimeState* state = nullptr;
74
        OlapTableBlockConvertor* block_convertor = nullptr;
75
        OlapTabletFinder* tablet_finder = nullptr;
76
        VOlapTablePartitionParam* vpartition = nullptr;
77
        RuntimeProfile::Counter* add_partition_request_timer = nullptr;
78
        int64_t txn_id = -1;
79
        ObjectPool* pool = nullptr;
80
        OlapTableLocationParam* location = nullptr;
81
        const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
82
        std::shared_ptr<OlapTableSchemaParam> schema;
83
        void* caller = nullptr;
84
        bool write_single_replica = false;
85
        CreatePartitionCallback create_partition_callback;
86
    };
87
    friend class VTabletWriter;
88
    friend class VTabletWriterV2;
89
90
21
    VRowDistribution() = default;
91
21
    virtual ~VRowDistribution() = default;
92
93
8
    void init(VRowDistributionContext ctx) {
94
8
        _state = ctx.state;
95
8
        _batch_size = std::max(_state->batch_size(), 8192);
96
8
        _block_convertor = ctx.block_convertor;
97
8
        _tablet_finder = ctx.tablet_finder;
98
8
        _vpartition = ctx.vpartition;
99
8
        _add_partition_request_timer = ctx.add_partition_request_timer;
100
8
        _txn_id = ctx.txn_id;
101
8
        _pool = ctx.pool;
102
8
        _location = ctx.location;
103
8
        _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
104
8
        _schema = ctx.schema;
105
8
        _caller = ctx.caller;
106
8
        _write_single_replica = ctx.write_single_replica;
107
8
        _create_partition_callback = ctx.create_partition_callback;
108
8
    }
109
110
0
    void output_profile_info(RuntimeProfile* profile) {
111
0
        if (!_add_partition_request_times.empty()) {
112
0
            std::stringstream ss;
113
0
            ss << "[";
114
0
            for (size_t i = 0; i < _add_partition_request_times.size(); ++i) {
115
0
                if (i > 0) {
116
0
                    ss << ", ";
117
0
                }
118
0
                ss << PrettyPrinter::print(_add_partition_request_times[i], TUnit::TIME_NS);
119
0
            }
120
0
            ss << "]";
121
0
            profile->add_info_string("AddPartitionRequestTimeList", ss.str());
122
0
        }
123
0
    }
124
125
8
    Status open(RowDescriptor* output_row_desc) {
126
8
        if (_vpartition->is_auto_partition()) {
127
2
            auto [part_ctxs, part_funcs] = _get_partition_function();
128
2
            for (auto part_ctx : part_ctxs) {
129
2
                RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
130
2
                RETURN_IF_ERROR(part_ctx->open(_state));
131
2
            }
132
2
        }
133
8
        for (const auto& index : _schema->indexes()) {
134
8
            auto& where_clause = index->where_clause;
135
8
            if (where_clause != nullptr) {
136
0
                RETURN_IF_ERROR(where_clause->prepare(_state, *output_row_desc));
137
0
                RETURN_IF_ERROR(where_clause->open(_state));
138
0
            }
139
8
        }
140
8
        return Status::OK();
141
8
    }
142
143
    // auto partition
144
    // mv where clause
145
    // v1 needs index->node->row_ids - tabletids
146
    // v2 needs index,tablet->rowids
147
    Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
148
                                      std::vector<RowPartTabletIds>& row_part_tablet_ids,
149
                                      int64_t& rows_stat_val);
150
    // have 2 ways remind to deal batching block:
151
    // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
152
    // 2. in caller, after last block and before close, set _deal_batched = true.
153
3
    bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
154
    // create partitions when need for auto-partition table using #_partitions_need_create.
155
    Status automatic_create_partition();
156
    void clear_batching_stats();
157
3
    const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round
158
159
    // for auto partition
160
    std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
161
    bool _deal_batched = false; // If true, send batched block before any block's append.
162
163
private:
164
    std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();
165
166
    Status _save_missing_values(const Block& input_block,
167
                                std::vector<std::vector<std::string>>& col_strs, int col_size,
168
                                Block* block, const std::vector<uint32_t>& filter,
169
                                const std::vector<const NullMap*>& col_null_maps);
170
171
    void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);
172
173
    void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);
174
175
    Status _filter_block_by_skip_and_where_clause(Block* block,
176
                                                  const VExprContextSPtr& where_clause,
177
                                                  RowPartTabletIds& row_part_tablet_id);
178
179
    Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);
180
181
    Status _generate_rows_distribution_for_auto_partition(
182
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
183
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
184
            int64_t& rows_stat_val);
185
    // the whole process to deal missing rows. will call _save_missing_values
186
    Status _deal_missing_map(const Block& input_block, Block* block,
187
                             const std::vector<uint16_t>& partition_cols_idx,
188
                             int64_t& rows_stat_val);
189
190
    Status _generate_rows_distribution_for_non_auto_partition(
191
            Block* block, bool has_filtered_rows,
192
            std::vector<RowPartTabletIds>& row_part_tablet_ids);
193
194
    Status _generate_rows_distribution_for_auto_overwrite(
195
            const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
196
            bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
197
            int64_t& rows_stat_val);
198
    Status _replace_overwriting_partition();
199
200
    void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
201
                                    int64_t rows);
202
    void _reset_find_tablets(int64_t rows);
203
204
    struct NullableStringListHash {
205
6
        std::size_t _hash(const TNullableStringLiteral& arg) const {
206
6
            if (arg.is_null) {
207
0
                return 0;
208
0
            }
209
6
            return std::hash<std::string>()(arg.value);
210
6
        }
211
6
        std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const {
212
6
            std::size_t result = 0;
213
6
            for (const auto& v : arg) {
214
6
                result = (result << 1) ^ _hash(v);
215
6
            }
216
6
            return result;
217
6
        }
218
    };
219
220
    RuntimeState* _state = nullptr;
221
    int _batch_size = 0;
222
223
    // for auto partitions
224
    std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
225
    size_t _batching_rows = 0, _batching_bytes = 0;
226
    std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper;
227
228
    OlapTableBlockConvertor* _block_convertor = nullptr;
229
    OlapTabletFinder* _tablet_finder = nullptr;
230
    VOlapTablePartitionParam* _vpartition = nullptr;
231
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
232
    int64_t _txn_id = -1;
233
    ObjectPool* _pool = nullptr;
234
    OlapTableLocationParam* _location = nullptr;
235
236
    // Record each auto-partition request time for detailed profiling
237
    std::vector<int64_t> _add_partition_request_times;
238
    // int64_t _number_output_rows = 0;
239
    const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr;
240
    // generally it's writer's on_partitions_created
241
    CreatePartitionCallback _create_partition_callback = nullptr;
242
    void* _caller = nullptr;
243
    std::shared_ptr<OlapTableSchemaParam> _schema;
244
    bool _write_single_replica = false;
245
246
    // reuse for find_tablet. save partitions found by find_tablets
247
    std::vector<VOlapTablePartition*> _partitions;
248
    std::vector<bool> _skip;
249
    std::vector<uint32_t> _tablet_indexes;
250
    std::vector<int64_t> _tablet_ids;
251
    std::vector<uint32_t> _missing_map; // indice of missing values in partition_col
252
    // for auto detect overwrite partition
253
    std::set<int64_t> _new_partition_ids; // if contains, not to replace it again.
254
};
255
256
} // namespace doris