Coverage Report

Created: 2026-06-21 15:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vrow_distribution.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/vrow_distribution.h"
19
20
#include <gen_cpp/FrontendService.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <glog/logging.h>
23
24
#include <cstdint>
25
#include <memory>
26
#include <string>
27
28
#include "common/cast_set.h"
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "core/assert_cast.h"
32
#include "core/column/column.h"
33
#include "core/column/column_const.h"
34
#include "core/column/column_nullable.h"
35
#include "core/column/column_vector.h"
36
#include "core/data_type/data_type.h"
37
#include "exec/sink/writer/vtablet_writer.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "runtime/runtime_state.h"
41
#include "service/backend_options.h"
42
#include "util/client_cache.h"
43
#include "util/debug_points.h"
44
#include "util/thrift_rpc_helper.h"
45
46
namespace doris {
47
48
43.9k
std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() {
49
43.9k
    return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()};
50
43.9k
}
51
52
Status VRowDistribution::_save_missing_values(
53
        const Block& input_block,
54
        std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
55
        int col_size, Block* block, const std::vector<uint32_t>& filter,
56
177
        const std::vector<const NullMap*>& col_null_maps) {
57
    // de-duplication for new partitions but save all rows.
58
177
    RETURN_IF_ERROR(
59
177
            _batching_block->add_rows(&input_block, filter.data(), filter.data() + filter.size()));
60
177
    std::vector<TNullableStringLiteral> cur_row_values;
61
25.1k
    for (int row = 0; row < col_strs[0].size(); ++row) {
62
24.9k
        cur_row_values.clear();
63
49.9k
        for (int col = 0; col < col_size; ++col) {
64
25.0k
            TNullableStringLiteral node;
65
            // OlapTableBlockConvertor::_validate_data() materializes destination slots so won't be const.
66
25.0k
            const auto* null_map = col_null_maps[col]; // null map for this col
67
25.0k
            node.__set_is_null((null_map && (*null_map)[filter[row]])
68
25.0k
                                       ? true
69
25.0k
                                       : node.is_null); // if not, dont change(default false)
70
25.0k
            if (!node.is_null) {
71
24.9k
                node.__set_value(col_strs[col][row]);
72
24.9k
            }
73
25.0k
            cur_row_values.push_back(node);
74
25.0k
        }
75
24.9k
        if (!_deduper.contains(cur_row_values)) {
76
359
            _deduper.insert(cur_row_values);
77
359
            _partitions_need_create.emplace_back(cur_row_values);
78
359
        }
79
24.9k
    }
80
81
    // to avoid too large mem use
82
177
    if (_batching_block->rows() > _batch_size) {
83
2
        _deal_batched = true;
84
2
    }
85
177
    _batching_rows = _batching_block->rows();
86
177
    VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows;
87
88
177
    return Status::OK();
89
177
}
90
91
173
void VRowDistribution::clear_batching_stats() {
92
173
    _partitions_need_create.clear();
93
173
    _batching_rows = 0;
94
173
    _batching_bytes = 0;
95
173
}
96
97
173
Status VRowDistribution::automatic_create_partition() {
98
173
    MonotonicStopWatch timer;
99
173
    if (_state->enable_profile() && _state->profile_level() >= 2) {
100
0
        timer.start();
101
0
    }
102
103
173
    SCOPED_TIMER(_add_partition_request_timer);
104
173
    TCreatePartitionRequest request;
105
173
    TCreatePartitionResult result;
106
173
    bool injected = false;
107
173
    std::string be_endpoint = BackendOptions::get_be_endpoint();
108
173
    request.__set_txn_id(_txn_id);
109
173
    request.__set_db_id(_vpartition->db_id());
110
173
    request.__set_table_id(_vpartition->table_id());
111
173
    request.__set_partitionValues(_partitions_need_create);
112
173
    request.__set_be_endpoint(be_endpoint);
113
173
    request.__set_write_single_replica(_write_single_replica);
114
173
    request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
115
173
    request.__set_enable_adaptive_random_bucket(_tablet_finder->is_adaptive_random_bucket());
116
173
    if (_state && _state->get_query_ctx()) {
117
        // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
118
173
        request.__set_query_id(_state->get_query_ctx()->query_id());
119
173
    }
120
121
173
    DBUG_EXECUTE_IF("VRowDistribution.automatic_create_partition.inject_result", {
122
173
        DBUG_RUN_CALLBACK(&request, &result);
123
173
        injected = true;
124
173
    });
125
126
173
    VLOG_NOTICE << "automatic partition rpc begin request " << request;
127
173
    if (!injected) {
128
171
        std::shared_ptr<TNetworkAddress> master_addr;
129
171
        if (_vpartition->get_master_address() == nullptr) {
130
171
            auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
131
171
            if (cluster_info == nullptr) {
132
0
                return Status::InternalError("cluster_info is null");
133
0
            }
134
171
            master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr);
135
171
        } else {
136
0
            master_addr = _vpartition->get_master_address();
137
0
        }
138
171
        int time_out = _state->execution_timeout() * 1000;
139
171
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
140
171
                master_addr->hostname, master_addr->port,
141
171
                [&request, &result](FrontendServiceConnection& client) {
142
171
                    client->createPartition(result, request);
143
171
                },
144
171
                time_out));
145
171
    }
146
147
173
    Status status(Status::create(result.status));
148
173
    VLOG_NOTICE << "automatic partition rpc end response " << result;
149
173
    if (result.status.status_code == TStatusCode::OK) {
150
        // Add new partitions before incremental open because adaptive random bucket builds
151
        // sender/receiver routing params from _vpartition.
152
173
        RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
153
359
        for (const auto& part : result.partitions) {
154
359
            _new_partition_ids.insert(part.id);
155
359
            VLOG_TRACE << "record new id: " << part.id;
156
359
        }
157
173
        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
158
173
    }
159
160
    // Record this request's elapsed time
161
173
    if (_state->enable_profile() && _state->profile_level() >= 2) {
162
0
        int64_t elapsed_ns = timer.elapsed_time();
163
0
        _add_partition_request_times.push_back(elapsed_ns);
164
0
    }
165
173
    return status;
166
173
}
167
168
// for reuse the same create callback of create-partition
169
20
static TCreatePartitionResult cast_as_create_result(const TReplacePartitionResult& arg) {
170
20
    TCreatePartitionResult result;
171
20
    result.status = arg.status;
172
20
    result.nodes = arg.nodes;
173
20
    result.partitions = arg.partitions;
174
20
    result.tablets = arg.tablets;
175
20
    result.slave_tablets = arg.slave_tablets;
176
20
    return result;
177
20
}
178
179
// use _partitions and replace them
180
34
Status VRowDistribution::_replace_overwriting_partition() {
181
34
    SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
182
34
    TReplacePartitionRequest request;
183
34
    TReplacePartitionResult result;
184
34
    bool injected = false;
185
34
    request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
186
34
    request.__set_db_id(_vpartition->db_id());
187
34
    request.__set_table_id(_vpartition->table_id());
188
34
    request.__set_write_single_replica(_write_single_replica);
189
190
    // only request for partitions not recorded for replacement
191
34
    std::set<int64_t> id_deduper;
192
60.0k
    for (const auto* part : _partitions) {
193
60.0k
        if (part != nullptr) {
194
47.7k
            if (_new_partition_ids.contains(part->id)) {
195
                // this is a new partition. dont replace again.
196
39.5k
                VLOG_TRACE << "skip new partition: " << part->id;
197
39.5k
            } else {
198
                // request for replacement
199
8.24k
                id_deduper.insert(part->id);
200
8.24k
            }
201
47.7k
        } else if (_missing_map.empty()) {
202
            // no origin partition. and not allow to create.
203
6
            return Status::InvalidArgument(
204
6
                    "Cannot found origin partitions in auto detect overwriting, stop "
205
6
                    "processing");
206
6
        } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here.
207
60.0k
    }
208
28
    if (id_deduper.empty()) {
209
8
        return Status::OK(); // no need to request
210
8
    }
211
    // de-duplicate. there's no check in FE
212
20
    std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end());
213
214
20
    request.__set_partition_ids(request_part_ids);
215
216
20
    std::string be_endpoint = BackendOptions::get_be_endpoint();
217
20
    request.__set_be_endpoint(be_endpoint);
218
20
    request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
219
20
    request.__set_enable_adaptive_random_bucket(_tablet_finder->is_adaptive_random_bucket());
220
20
    if (_state && _state->get_query_ctx()) {
221
        // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
222
20
        request.__set_query_id(_state->get_query_ctx()->query_id());
223
20
    }
224
225
20
    DBUG_EXECUTE_IF("VRowDistribution.replace_overwriting_partition.inject_result", {
226
20
        DBUG_RUN_CALLBACK(&request, &result);
227
20
        injected = true;
228
20
    });
229
230
20
    VLOG_NOTICE << "auto detect replace partition request: " << request;
231
20
    if (!injected) {
232
19
        std::shared_ptr<TNetworkAddress> master_addr;
233
19
        if (_vpartition->get_master_address() == nullptr) {
234
19
            auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
235
19
            if (cluster_info == nullptr) {
236
0
                return Status::InternalError("cluster_info is null");
237
0
            }
238
19
            master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr);
239
19
        } else {
240
0
            master_addr = _vpartition->get_master_address();
241
0
        }
242
19
        int time_out = _state->execution_timeout() * 1000;
243
19
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
244
19
                master_addr->hostname, master_addr->port,
245
19
                [&request, &result](FrontendServiceConnection& client) {
246
19
                    client->replacePartition(result, request);
247
19
                },
248
19
                time_out));
249
19
    }
250
251
20
    Status status(Status::create(result.status));
252
20
    VLOG_NOTICE << "auto detect replace partition result: " << result;
253
20
    if (result.status.status_code == TStatusCode::OK) {
254
        // record new partitions
255
32
        for (const auto& part : result.partitions) {
256
32
            _new_partition_ids.insert(part.id);
257
32
            VLOG_TRACE << "record new id: " << part.id;
258
32
        }
259
        // replace data in _partitions
260
        // Adaptive random bucket builds sender/receiver routing params from _vpartition during
261
        // incremental open, so the replacement must be visible first.
262
20
        RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions));
263
        // Reuse the function as the args' structure are same. It adds nodes/locations.
264
20
        auto result_as_create = cast_as_create_result(result);
265
20
        RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create));
266
20
    }
267
268
20
    return status;
269
20
}
270
271
void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx,
272
39.3k
                                       std::vector<int64_t>& tablet_ids) {
273
39.3k
    tablet_ids.resize(block->rows());
274
33.9M
    for (int row_idx = 0; row_idx < block->rows(); row_idx++) {
275
33.9M
        if (_skip[row_idx]) {
276
26.2k
            continue;
277
26.2k
        }
278
33.8M
        auto& partition = _partitions[row_idx];
279
33.8M
        auto& tablet_index = _tablet_indexes[row_idx];
280
33.8M
        auto& index = partition->indexes[index_idx];
281
282
33.8M
        auto tablet_id = index.tablets[tablet_index];
283
33.8M
        tablet_ids[row_idx] = tablet_id;
284
33.8M
    }
285
39.3k
}
286
287
44.3k
void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) {
288
44.3k
    auto& row_ids = row_part_tablet_id.row_ids;
289
44.3k
    auto& partition_ids = row_part_tablet_id.partition_ids;
290
44.3k
    auto& tablet_ids = row_part_tablet_id.tablet_ids;
291
292
44.3k
    auto rows = block->rows();
293
    // row count of a block should not exceed UINT32_MAX
294
44.3k
    auto rows_uint32 = cast_set<uint32_t>(rows);
295
36.1M
    for (uint32_t i = 0; i < rows_uint32; i++) {
296
36.0M
        if (!_skip[i]) {
297
36.0M
            row_ids.emplace_back(i);
298
36.0M
            partition_ids.emplace_back(_partitions[i]->id);
299
36.0M
            if (!_tablet_finder->is_adaptive_random_bucket()) {
300
33.9M
                tablet_ids.emplace_back(_tablet_ids[i]);
301
33.9M
            }
302
36.0M
        }
303
36.0M
    }
304
44.3k
}
305
306
Status VRowDistribution::_filter_block_by_skip_and_where_clause(
307
35
        Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) {
308
    // TODO
309
    //SCOPED_RAW_TIMER(&_stat.where_clause_ns);
310
35
    ColumnPtr filter_column;
311
35
    RETURN_IF_ERROR(where_clause->execute(block, filter_column));
312
313
35
    auto& row_ids = row_part_tablet_id.row_ids;
314
35
    auto& partition_ids = row_part_tablet_id.partition_ids;
315
35
    auto& tablet_ids = row_part_tablet_id.tablet_ids;
316
35
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
317
29
        auto rows = block->rows();
318
        // row count of a block should not exceed UINT32_MAX
319
29
        auto rows_uint32 = cast_set<uint32_t>(rows);
320
58
        for (uint32_t i = 0; i < rows_uint32; i++) {
321
29
            if (nullable_column->get_bool_inline(i) && !_skip[i]) {
322
12
                row_ids.emplace_back(i);
323
12
                partition_ids.emplace_back(_partitions[i]->id);
324
12
                if (!_tablet_finder->is_adaptive_random_bucket()) {
325
12
                    tablet_ids.emplace_back(_tablet_ids[i]);
326
12
                }
327
12
            }
328
29
        }
329
29
    } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
330
1
        bool ret = const_column->get_bool(0);
331
1
        if (!ret) {
332
1
            return Status::OK();
333
1
        }
334
        // should we optimize?
335
0
        _filter_block_by_skip(block, row_part_tablet_id);
336
5
    } else {
337
5
        const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data();
338
5
        auto rows = block->rows();
339
        // row count of a block should not exceed UINT32_MAX
340
5
        auto rows_uint32 = cast_set<uint32_t>(rows);
341
15
        for (uint32_t i = 0; i < rows_uint32; i++) {
342
10
            if (filter[i] != 0 && !_skip[i]) {
343
6
                row_ids.emplace_back(i);
344
6
                partition_ids.emplace_back(_partitions[i]->id);
345
6
                if (!_tablet_finder->is_adaptive_random_bucket()) {
346
6
                    tablet_ids.emplace_back(_tablet_ids[i]);
347
6
                }
348
6
            }
349
10
        }
350
5
    }
351
352
34
    return Status::OK();
353
35
}
354
355
Status VRowDistribution::_filter_block(Block* block,
356
43.3k
                                       std::vector<RowPartTabletIds>& row_part_tablet_ids) {
357
87.7k
    for (int i = 0; i < _schema->indexes().size(); i++) {
358
44.4k
        if (!_tablet_finder->is_adaptive_random_bucket()) {
359
39.3k
            _get_tablet_ids(block, i, _tablet_ids);
360
39.3k
        }
361
44.4k
        auto& where_clause = _schema->indexes()[i]->where_clause;
362
44.4k
        if (where_clause != nullptr) {
363
35
            RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause,
364
35
                                                                   row_part_tablet_ids[i]));
365
44.3k
        } else {
366
44.3k
            _filter_block_by_skip(block, row_part_tablet_ids[i]);
367
44.3k
        }
368
44.4k
    }
369
43.3k
    return Status::OK();
370
43.3k
}
371
372
Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
373
43.0k
        Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) {
374
43.0k
    int num_rows = cast_set<int>(block->rows());
375
376
43.0k
    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
377
43.0k
                                                 _tablet_indexes, _skip));
378
43.0k
    if (has_filtered_rows) {
379
672
        for (int i = 0; i < num_rows; i++) {
380
583
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
381
583
        }
382
89
    }
383
43.0k
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
384
43.0k
    return Status::OK();
385
43.0k
}
386
387
Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block,
388
                                           const std::vector<uint16_t>& partition_cols_idx,
389
177
                                           int64_t& rows_stat_val) {
390
    // for missing partition keys, calc the missing partition and save in _partitions_need_create
391
177
    auto [part_ctxs, part_exprs] = _get_partition_function();
392
177
    int part_col_num = cast_set<int>(part_exprs.size());
393
    // the two vectors are in column-first-order
394
177
    std::vector<std::vector<std::string>> col_strs;
395
177
    std::vector<const NullMap*> col_null_maps;
396
177
    col_strs.resize(part_col_num);
397
177
    col_null_maps.reserve(part_col_num);
398
399
177
    auto format_options = DataTypeSerDe::get_default_format_options();
400
177
    format_options.timezone = &_state->timezone_obj();
401
402
362
    for (int i = 0; i < part_col_num; ++i) {
403
185
        auto return_type = part_exprs[i]->data_type();
404
        // expose the data column. the return type would be nullable
405
185
        const auto& [range_left_col, col_const] =
406
185
                unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
407
185
        if (range_left_col->is_nullable()) {
408
47
            col_null_maps.push_back(&(
409
47
                    assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data()));
410
138
        } else {
411
138
            col_null_maps.push_back(nullptr);
412
138
        }
413
25.0k
        for (auto row : _missing_map) {
414
25.0k
            col_strs[i].push_back(return_type->to_string(
415
25.0k
                    *range_left_col, index_check_const(row, col_const), format_options));
416
25.0k
        }
417
185
    }
418
419
    // calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
420
    // NOTE: must save old batching stats before calling _save_missing_values(),
421
    // because _save_missing_values() will update _batching_rows internally.
422
177
    size_t old_bt_rows = _batching_rows;
423
177
    size_t old_bt_bytes = _batching_bytes;
424
425
177
    RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map,
426
177
                                         col_null_maps));
427
428
177
    size_t new_bt_rows = _batching_block->rows();
429
177
    size_t new_bt_bytes = _batching_block->bytes();
430
177
    rows_stat_val -= new_bt_rows - old_bt_rows;
431
177
    _state->update_num_rows_load_total(old_bt_rows - new_bt_rows);
432
177
    _state->update_num_bytes_load_total(old_bt_bytes - new_bt_bytes);
433
434
177
    return Status::OK();
435
177
}
436
437
Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
438
        const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
439
        bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
440
328
        int64_t& rows_stat_val) {
441
328
    int num_rows = cast_set<int>(block->rows());
442
328
    std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
443
444
328
    auto& partition_col = block->get_by_position(partition_keys[0]);
445
328
    _missing_map.clear();
446
328
    _missing_map.reserve(partition_col.column->size());
447
448
328
    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
449
328
                                                 _tablet_indexes, _skip, &_missing_map));
450
451
    // the missing vals for auto partition are also skipped.
452
328
    if (has_filtered_rows) {
453
8
        for (int i = 0; i < num_rows; i++) {
454
4
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
455
4
        }
456
4
    }
457
328
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
458
459
328
    if (!_missing_map.empty()) {
460
174
        RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx,
461
174
                                          rows_stat_val)); // send input block to save
462
174
    }
463
328
    return Status::OK();
464
328
}
465
466
Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
467
        const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
468
        bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
469
34
        int64_t& rows_stat_val) {
470
34
    int num_rows = cast_set<int>(block->rows());
471
472
    // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc,
473
    //  and find the new partitions to use.
474
    // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto
475
    //  partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz
476
    //  we already saved missing values.
477
34
    if (_vpartition->is_auto_partition() &&
478
34
        _state->query_options().enable_auto_create_when_overwrite) {
479
        // allow auto create partition for missing rows.
480
10
        std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
481
10
        auto partition_col = block->get_by_position(partition_keys[0]);
482
10
        _missing_map.clear();
483
10
        _missing_map.reserve(partition_col.column->size());
484
485
10
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
486
10
                                                     _tablet_indexes, _skip, &_missing_map));
487
488
        // allow and really need to create during auto-detect-overwriting.
489
10
        if (!_missing_map.empty()) {
490
3
            RETURN_IF_ERROR(
491
3
                    _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
492
3
        }
493
24
    } else {
494
24
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
495
24
                                                     _tablet_indexes, _skip));
496
24
    }
497
34
    RETURN_IF_ERROR(_replace_overwriting_partition());
498
499
    // regenerate locations for new partitions & tablets
500
28
    _reset_find_tablets(num_rows);
501
28
    if (_vpartition->is_auto_partition() &&
502
28
        _state->query_options().enable_auto_create_when_overwrite) {
503
        // here _missing_map is just a placeholder
504
10
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
505
10
                                                     _tablet_indexes, _skip, &_missing_map));
506
10
        if (VLOG_TRACE_IS_ON) {
507
0
            std::string tmp;
508
0
            for (auto v : _missing_map) {
509
0
                tmp += std::to_string(v).append(", ");
510
0
            }
511
0
            VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
512
0
        }
513
18
    } else {
514
18
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
515
18
                                                     _tablet_indexes, _skip));
516
18
    }
517
28
    if (has_filtered_rows) {
518
0
        for (int i = 0; i < num_rows; i++) {
519
0
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
520
0
        }
521
0
    }
522
28
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
523
28
    return Status::OK();
524
28
}
525
526
void VRowDistribution::_reset_row_part_tablet_ids(
527
43.4k
        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) {
528
43.4k
    row_part_tablet_ids.resize(_schema->indexes().size());
529
44.4k
    for (auto& row_part_tablet_id : row_part_tablet_ids) {
530
44.4k
        auto& row_ids = row_part_tablet_id.row_ids;
531
44.4k
        auto& partition_ids = row_part_tablet_id.partition_ids;
532
44.4k
        auto& tablet_ids = row_part_tablet_id.tablet_ids;
533
534
44.4k
        row_ids.clear();
535
44.4k
        partition_ids.clear();
536
44.4k
        tablet_ids.clear();
537
        // This is important for performance.
538
44.4k
        row_ids.reserve(rows);
539
44.4k
        partition_ids.reserve(rows);
540
44.4k
        if (!_tablet_finder->is_adaptive_random_bucket()) {
541
39.4k
            tablet_ids.reserve(rows);
542
39.4k
        }
543
44.4k
    }
544
43.4k
}
545
546
Status VRowDistribution::generate_rows_distribution(
547
        Block& input_block, std::shared_ptr<Block>& block,
548
43.4k
        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) {
549
43.4k
    auto input_rows = input_block.rows();
550
43.4k
    _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);
551
552
    // we store the batching block with value of `input_block`. so just do all of these again.
553
43.4k
    bool has_filtered_rows = false;
554
43.4k
    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
555
43.4k
            _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));
556
557
    // batching block rows which need new partitions. deal together at finish.
558
43.4k
    if (!_batching_block) [[unlikely]] {
559
35.4k
        std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0);
560
35.4k
        _batching_block = MutableBlock::create_unique(std::move(*tmp_block));
561
35.4k
    }
562
563
43.4k
    auto num_rows = block->rows();
564
43.4k
    _reset_find_tablets(num_rows);
565
566
    // if there's projection of partition calc, we need to calc it first.
567
43.4k
    auto [part_ctxs, part_funcs] = _get_partition_function();
568
43.4k
    std::vector<uint16_t> partition_cols_idx;
569
43.4k
    if (_vpartition->is_projection_partition()) {
570
        // calc the start value of missing partition ranges.
571
509
        auto func_size = part_funcs.size();
572
1.13k
        for (int i = 0; i < func_size; ++i) {
573
627
            int result_idx = -1;
574
            // we just calc left range here. leave right to FE to avoid dup calc.
575
627
            RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx));
576
577
627
            VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1);
578
627
            DCHECK(result_idx != -1);
579
580
627
            partition_cols_idx.push_back(cast_set<uint16_t>(result_idx));
581
627
        }
582
583
        // change the column to compare to transformed.
584
509
        _vpartition->set_transformed_slots(partition_cols_idx);
585
509
    }
586
587
43.4k
    Status st = Status::OK();
588
43.4k
    if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
589
        // when overwrite, no auto create partition allowed.
590
34
        st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(),
591
34
                                                            partition_cols_idx, has_filtered_rows,
592
34
                                                            row_part_tablet_ids, rows_stat_val);
593
43.3k
    } else if (_vpartition->is_auto_partition() && !_deal_batched) {
594
328
        st = _generate_rows_distribution_for_auto_partition(input_block, block.get(),
595
328
                                                            partition_cols_idx, has_filtered_rows,
596
328
                                                            row_part_tablet_ids, rows_stat_val);
597
43.0k
    } else { // not auto partition
598
43.0k
        st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows,
599
43.0k
                                                                row_part_tablet_ids);
600
43.0k
    }
601
602
43.4k
    return st;
603
43.4k
}
604
605
// reuse vars for find_tablets
606
43.4k
void VRowDistribution::_reset_find_tablets(int64_t rows) {
607
43.4k
    _tablet_finder->filter_bitmap().Reset(rows);
608
43.4k
    _partitions.assign(rows, nullptr);
609
43.4k
    _skip.assign(rows, false);
610
43.4k
    _tablet_indexes.assign(rows, 0);
611
43.4k
}
612
613
} // namespace doris