Coverage Report

Created: 2026-03-16 19:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vrow_distribution.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/vrow_distribution.h"
19
20
#include <gen_cpp/FrontendService.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <glog/logging.h>
23
24
#include <cstdint>
25
#include <memory>
26
#include <string>
27
28
#include "common/cast_set.h"
29
#include "common/logging.h"
30
#include "common/metrics/doris_metrics.h"
31
#include "common/status.h"
32
#include "core/assert_cast.h"
33
#include "core/column/column.h"
34
#include "core/column/column_const.h"
35
#include "core/column/column_nullable.h"
36
#include "core/column/column_vector.h"
37
#include "core/data_type/data_type.h"
38
#include "exec/sink/writer/vtablet_writer.h"
39
#include "runtime/exec_env.h"
40
#include "runtime/query_context.h"
41
#include "runtime/runtime_state.h"
42
#include "service/backend_options.h"
43
#include "util/client_cache.h"
44
#include "util/debug_points.h"
45
#include "util/thrift_rpc_helper.h"
46
47
namespace doris {
48
#include "common/compile_check_begin.h"
49
50
13
std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() {
51
13
    return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()};
52
13
}
53
54
Status VRowDistribution::_save_missing_values(
55
        const Block& input_block,
56
        std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
57
        int col_size, Block* block, const std::vector<uint32_t>& filter,
58
2
        const std::vector<const NullMap*>& col_null_maps) {
59
    // de-duplication for new partitions but save all rows.
60
2
    RETURN_IF_ERROR(
61
2
            _batching_block->add_rows(&input_block, filter.data(), filter.data() + filter.size()));
62
2
    std::vector<TNullableStringLiteral> cur_row_values;
63
6
    for (int row = 0; row < col_strs[0].size(); ++row) {
64
4
        cur_row_values.clear();
65
8
        for (int col = 0; col < col_size; ++col) {
66
4
            TNullableStringLiteral node;
67
4
            const auto* null_map = col_null_maps[col]; // null map for this col
68
4
            node.__set_is_null((null_map && (*null_map)[filter[row]])
69
4
                                       ? true
70
4
                                       : node.is_null); // if not, dont change(default false)
71
4
            if (!node.is_null) {
72
4
                node.__set_value(col_strs[col][row]);
73
4
            }
74
4
            cur_row_values.push_back(node);
75
4
        }
76
4
        if (!_deduper.contains(cur_row_values)) {
77
2
            _deduper.insert(cur_row_values);
78
2
            _partitions_need_create.emplace_back(cur_row_values);
79
2
        }
80
4
    }
81
82
    // to avoid too large mem use
83
2
    if (_batching_block->rows() > _batch_size) {
84
0
        _deal_batched = true;
85
0
    }
86
2
    _batching_rows = _batching_block->rows();
87
2
    VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows;
88
89
2
    return Status::OK();
90
2
}
91
92
2
void VRowDistribution::clear_batching_stats() {
93
2
    _partitions_need_create.clear();
94
2
    _batching_rows = 0;
95
2
    _batching_bytes = 0;
96
2
}
97
98
2
Status VRowDistribution::automatic_create_partition() {
99
2
    MonotonicStopWatch timer;
100
2
    if (_state->enable_profile() && _state->profile_level() >= 2) {
101
0
        timer.start();
102
0
    }
103
104
2
    SCOPED_TIMER(_add_partition_request_timer);
105
2
    TCreatePartitionRequest request;
106
2
    TCreatePartitionResult result;
107
2
    bool injected = false;
108
2
    std::string be_endpoint = BackendOptions::get_be_endpoint();
109
2
    request.__set_txn_id(_txn_id);
110
2
    request.__set_db_id(_vpartition->db_id());
111
2
    request.__set_table_id(_vpartition->table_id());
112
2
    request.__set_partitionValues(_partitions_need_create);
113
2
    request.__set_be_endpoint(be_endpoint);
114
2
    request.__set_write_single_replica(_write_single_replica);
115
2
    if (_state && _state->get_query_ctx()) {
116
        // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
117
2
        request.__set_query_id(_state->get_query_ctx()->query_id());
118
2
    }
119
120
2
    DBUG_EXECUTE_IF("VRowDistribution.automatic_create_partition.inject_result", {
121
2
        DBUG_RUN_CALLBACK(&request, &result);
122
2
        injected = true;
123
2
    });
124
125
2
    VLOG_NOTICE << "automatic partition rpc begin request " << request;
126
2
    if (!injected) {
127
0
        std::shared_ptr<TNetworkAddress> master_addr;
128
0
        if (_vpartition->get_master_address() == nullptr) {
129
0
            auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
130
0
            if (cluster_info == nullptr) {
131
0
                return Status::InternalError("cluster_info is null");
132
0
            }
133
0
            master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr);
134
0
        } else {
135
0
            master_addr = _vpartition->get_master_address();
136
0
        }
137
0
        int time_out = _state->execution_timeout() * 1000;
138
0
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
139
0
                master_addr->hostname, master_addr->port,
140
0
                [&request, &result](FrontendServiceConnection& client) {
141
0
                    client->createPartition(result, request);
142
0
                },
143
0
                time_out));
144
0
    }
145
146
2
    Status status(Status::create(result.status));
147
2
    VLOG_NOTICE << "automatic partition rpc end response " << result;
148
2
    if (result.status.status_code == TStatusCode::OK) {
149
        // add new created partitions
150
2
        RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
151
2
        for (const auto& part : result.partitions) {
152
2
            _new_partition_ids.insert(part.id);
153
2
            VLOG_TRACE << "record new id: " << part.id;
154
2
        }
155
2
        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
156
2
    }
157
158
    // Record this request's elapsed time
159
2
    if (_state->enable_profile() && _state->profile_level() >= 2) {
160
0
        int64_t elapsed_ns = timer.elapsed_time();
161
0
        _add_partition_request_times.push_back(elapsed_ns);
162
0
    }
163
2
    return status;
164
2
}
165
166
// for reuse the same create callback of create-partition
167
1
static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) {
168
1
    TCreatePartitionResult result;
169
1
    result.status = arg.status;
170
1
    result.nodes = std::move(arg.nodes);
171
1
    result.partitions = std::move(arg.partitions);
172
1
    result.tablets = std::move(arg.tablets);
173
1
    result.slave_tablets = std::move(arg.slave_tablets);
174
1
    return result;
175
1
}
176
177
// use _partitions and replace them
178
2
Status VRowDistribution::_replace_overwriting_partition() {
179
2
    SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
180
2
    TReplacePartitionRequest request;
181
2
    TReplacePartitionResult result;
182
2
    bool injected = false;
183
2
    request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
184
2
    request.__set_db_id(_vpartition->db_id());
185
2
    request.__set_table_id(_vpartition->table_id());
186
2
    request.__set_write_single_replica(_write_single_replica);
187
188
    // only request for partitions not recorded for replacement
189
2
    std::set<int64_t> id_deduper;
190
3
    for (const auto* part : _partitions) {
191
3
        if (part != nullptr) {
192
3
            if (_new_partition_ids.contains(part->id)) {
193
                // this is a new partition. dont replace again.
194
1
                VLOG_TRACE << "skip new partition: " << part->id;
195
2
            } else {
196
                // request for replacement
197
2
                id_deduper.insert(part->id);
198
2
            }
199
3
        } else if (_missing_map.empty()) {
200
            // no origin partition. and not allow to create.
201
0
            return Status::InvalidArgument(
202
0
                    "Cannot found origin partitions in auto detect overwriting, stop "
203
0
                    "processing");
204
0
        } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here.
205
3
    }
206
2
    if (id_deduper.empty()) {
207
1
        return Status::OK(); // no need to request
208
1
    }
209
    // de-duplicate. there's no check in FE
210
1
    std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end());
211
212
1
    request.__set_partition_ids(request_part_ids);
213
214
1
    std::string be_endpoint = BackendOptions::get_be_endpoint();
215
1
    request.__set_be_endpoint(be_endpoint);
216
1
    if (_state && _state->get_query_ctx()) {
217
        // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
218
1
        request.__set_query_id(_state->get_query_ctx()->query_id());
219
1
    }
220
221
1
    DBUG_EXECUTE_IF("VRowDistribution.replace_overwriting_partition.inject_result", {
222
1
        DBUG_RUN_CALLBACK(&request, &result);
223
1
        injected = true;
224
1
    });
225
226
1
    VLOG_NOTICE << "auto detect replace partition request: " << request;
227
1
    if (!injected) {
228
0
        std::shared_ptr<TNetworkAddress> master_addr;
229
0
        if (_vpartition->get_master_address() == nullptr) {
230
0
            auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
231
0
            if (cluster_info == nullptr) {
232
0
                return Status::InternalError("cluster_info is null");
233
0
            }
234
0
            master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr);
235
0
        } else {
236
0
            master_addr = _vpartition->get_master_address();
237
0
        }
238
0
        int time_out = _state->execution_timeout() * 1000;
239
0
        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
240
0
                master_addr->hostname, master_addr->port,
241
0
                [&request, &result](FrontendServiceConnection& client) {
242
0
                    client->replacePartition(result, request);
243
0
                },
244
0
                time_out));
245
0
    }
246
247
1
    Status status(Status::create(result.status));
248
1
    VLOG_NOTICE << "auto detect replace partition result: " << result;
249
1
    if (result.status.status_code == TStatusCode::OK) {
250
        // record new partitions
251
2
        for (const auto& part : result.partitions) {
252
2
            _new_partition_ids.insert(part.id);
253
2
            VLOG_TRACE << "record new id: " << part.id;
254
2
        }
255
        // replace data in _partitions
256
1
        RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions));
257
        // reuse the function as the args' structure are same. it add nodes/locations and incremental_open
258
1
        auto result_as_create = cast_as_create_result(result);
259
1
        RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create));
260
1
    }
261
262
1
    return status;
263
1
}
264
265
void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx,
266
9
                                       std::vector<int64_t>& tablet_ids) {
267
9
    tablet_ids.resize(block->rows());
268
26
    for (int row_idx = 0; row_idx < block->rows(); row_idx++) {
269
17
        if (_skip[row_idx]) {
270
6
            continue;
271
6
        }
272
11
        auto& partition = _partitions[row_idx];
273
11
        auto& tablet_index = _tablet_indexes[row_idx];
274
11
        auto& index = partition->indexes[index_idx];
275
276
11
        auto tablet_id = index.tablets[tablet_index];
277
11
        tablet_ids[row_idx] = tablet_id;
278
11
    }
279
9
}
280
281
7
void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) {
282
7
    auto& row_ids = row_part_tablet_id.row_ids;
283
7
    auto& partition_ids = row_part_tablet_id.partition_ids;
284
7
    auto& tablet_ids = row_part_tablet_id.tablet_ids;
285
286
7
    auto rows = block->rows();
287
    // row count of a block should not exceed UINT32_MAX
288
7
    auto rows_uint32 = cast_set<uint32_t>(rows);
289
19
    for (uint32_t i = 0; i < rows_uint32; i++) {
290
12
        if (!_skip[i]) {
291
6
            row_ids.emplace_back(i);
292
6
            partition_ids.emplace_back(_partitions[i]->id);
293
6
            tablet_ids.emplace_back(_tablet_ids[i]);
294
6
        }
295
12
    }
296
7
}
297
298
Status VRowDistribution::_filter_block_by_skip_and_where_clause(
299
2
        Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) {
300
    // TODO
301
    //SCOPED_RAW_TIMER(&_stat.where_clause_ns);
302
2
    int result_index = -1;
303
2
    size_t column_number = block->columns();
304
2
    RETURN_IF_ERROR(where_clause->execute(block, &result_index));
305
306
2
    auto filter_column = block->get_by_position(result_index).column;
307
308
2
    auto& row_ids = row_part_tablet_id.row_ids;
309
2
    auto& partition_ids = row_part_tablet_id.partition_ids;
310
2
    auto& tablet_ids = row_part_tablet_id.tablet_ids;
311
2
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
312
0
        auto rows = block->rows();
313
        // row count of a block should not exceed UINT32_MAX
314
0
        auto rows_uint32 = cast_set<uint32_t>(rows);
315
0
        for (uint32_t i = 0; i < rows_uint32; i++) {
316
0
            if (nullable_column->get_bool_inline(i) && !_skip[i]) {
317
0
                row_ids.emplace_back(i);
318
0
                partition_ids.emplace_back(_partitions[i]->id);
319
0
                tablet_ids.emplace_back(_tablet_ids[i]);
320
0
            }
321
0
        }
322
2
    } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
323
1
        bool ret = const_column->get_bool(0);
324
1
        if (!ret) {
325
1
            return Status::OK();
326
1
        }
327
        // should we optimize?
328
0
        _filter_block_by_skip(block, row_part_tablet_id);
329
1
    } else {
330
1
        const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data();
331
1
        auto rows = block->rows();
332
        // row count of a block should not exceed UINT32_MAX
333
1
        auto rows_uint32 = cast_set<uint32_t>(rows);
334
4
        for (uint32_t i = 0; i < rows_uint32; i++) {
335
3
            if (filter[i] != 0 && !_skip[i]) {
336
2
                row_ids.emplace_back(i);
337
2
                partition_ids.emplace_back(_partitions[i]->id);
338
2
                tablet_ids.emplace_back(_tablet_ids[i]);
339
2
            }
340
3
        }
341
1
    }
342
343
1
    for (size_t i = block->columns() - 1; i >= column_number; i--) {
344
0
        block->erase(i);
345
0
    }
346
1
    return Status::OK();
347
2
}
348
349
Status VRowDistribution::_filter_block(Block* block,
350
9
                                       std::vector<RowPartTabletIds>& row_part_tablet_ids) {
351
18
    for (int i = 0; i < _schema->indexes().size(); i++) {
352
9
        _get_tablet_ids(block, i, _tablet_ids);
353
9
        auto& where_clause = _schema->indexes()[i]->where_clause;
354
9
        if (where_clause != nullptr) {
355
2
            RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause,
356
2
                                                                   row_part_tablet_ids[i]));
357
7
        } else {
358
7
            _filter_block_by_skip(block, row_part_tablet_ids[i]);
359
7
        }
360
9
    }
361
9
    return Status::OK();
362
9
}
363
364
Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
365
5
        Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) {
366
5
    int num_rows = cast_set<int>(block->rows());
367
368
5
    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
369
5
                                                 _tablet_indexes, _skip));
370
5
    if (has_filtered_rows) {
371
0
        for (int i = 0; i < num_rows; i++) {
372
0
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
373
0
        }
374
0
    }
375
5
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
376
5
    return Status::OK();
377
5
}
378
379
Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block,
380
                                           const std::vector<uint16_t>& partition_cols_idx,
381
2
                                           int64_t& rows_stat_val) {
382
    // for missing partition keys, calc the missing partition and save in _partitions_need_create
383
2
    auto [part_ctxs, part_exprs] = _get_partition_function();
384
2
    int part_col_num = cast_set<int>(part_exprs.size());
385
    // the two vectors are in column-first-order
386
2
    std::vector<std::vector<std::string>> col_strs;
387
2
    std::vector<const NullMap*> col_null_maps;
388
2
    col_strs.resize(part_col_num);
389
2
    col_null_maps.reserve(part_col_num);
390
391
2
    auto format_options = DataTypeSerDe::get_default_format_options();
392
2
    format_options.timezone = &_state->timezone_obj();
393
394
4
    for (int i = 0; i < part_col_num; ++i) {
395
2
        auto return_type = part_exprs[i]->data_type();
396
        // expose the data column. the return type would be nullable
397
2
        const auto& [range_left_col, col_const] =
398
2
                unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
399
2
        if (range_left_col->is_nullable()) {
400
0
            col_null_maps.push_back(&(
401
0
                    assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data()));
402
2
        } else {
403
2
            col_null_maps.push_back(nullptr);
404
2
        }
405
4
        for (auto row : _missing_map) {
406
4
            col_strs[i].push_back(return_type->to_string(
407
4
                    *range_left_col, index_check_const(row, col_const), format_options));
408
4
        }
409
2
    }
410
411
    // calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
412
    // NOTE: must save old batching stats before calling _save_missing_values(),
413
    // because _save_missing_values() will update _batching_rows internally.
414
2
    size_t old_bt_rows = _batching_rows;
415
2
    size_t old_bt_bytes = _batching_bytes;
416
417
2
    RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map,
418
2
                                         col_null_maps));
419
420
2
    size_t new_bt_rows = _batching_block->rows();
421
2
    size_t new_bt_bytes = _batching_block->bytes();
422
2
    rows_stat_val -= new_bt_rows - old_bt_rows;
423
2
    _state->update_num_rows_load_total(old_bt_rows - new_bt_rows);
424
2
    _state->update_num_bytes_load_total(old_bt_bytes - new_bt_bytes);
425
2
    DorisMetrics::instance()->load_rows->increment(old_bt_rows - new_bt_rows);
426
2
    DorisMetrics::instance()->load_bytes->increment(old_bt_bytes - new_bt_bytes);
427
428
2
    return Status::OK();
429
2
}
430
431
Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
432
        const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
433
        bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
434
2
        int64_t& rows_stat_val) {
435
2
    int num_rows = cast_set<int>(block->rows());
436
2
    std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
437
438
2
    auto& partition_col = block->get_by_position(partition_keys[0]);
439
2
    _missing_map.clear();
440
2
    _missing_map.reserve(partition_col.column->size());
441
442
2
    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
443
2
                                                 _tablet_indexes, _skip, &_missing_map));
444
445
    // the missing vals for auto partition are also skipped.
446
2
    if (has_filtered_rows) {
447
0
        for (int i = 0; i < num_rows; i++) {
448
0
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
449
0
        }
450
0
    }
451
2
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
452
453
2
    if (!_missing_map.empty()) {
454
2
        RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx,
455
2
                                          rows_stat_val)); // send input block to save
456
2
    }
457
2
    return Status::OK();
458
2
}
459
460
Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
461
        const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
462
        bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
463
2
        int64_t& rows_stat_val) {
464
2
    int num_rows = cast_set<int>(block->rows());
465
466
    // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc,
467
    //  and find the new partitions to use.
468
    // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto
469
    //  partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz
470
    //  we already saved missing values.
471
2
    if (_vpartition->is_auto_partition() &&
472
2
        _state->query_options().enable_auto_create_when_overwrite) {
473
        // allow auto create partition for missing rows.
474
0
        std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
475
0
        auto partition_col = block->get_by_position(partition_keys[0]);
476
0
        _missing_map.clear();
477
0
        _missing_map.reserve(partition_col.column->size());
478
479
0
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
480
0
                                                     _tablet_indexes, _skip, &_missing_map));
481
482
        // allow and really need to create during auto-detect-overwriting.
483
0
        if (!_missing_map.empty()) {
484
0
            RETURN_IF_ERROR(
485
0
                    _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val));
486
0
        }
487
2
    } else {
488
2
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
489
2
                                                     _tablet_indexes, _skip));
490
2
    }
491
2
    RETURN_IF_ERROR(_replace_overwriting_partition());
492
493
    // regenerate locations for new partitions & tablets
494
2
    _reset_find_tablets(num_rows);
495
2
    if (_vpartition->is_auto_partition() &&
496
2
        _state->query_options().enable_auto_create_when_overwrite) {
497
        // here _missing_map is just a placeholder
498
0
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
499
0
                                                     _tablet_indexes, _skip, &_missing_map));
500
0
        if (VLOG_TRACE_IS_ON) {
501
0
            std::string tmp;
502
0
            for (auto v : _missing_map) {
503
0
                tmp += std::to_string(v).append(", ");
504
0
            }
505
0
            VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
506
0
        }
507
2
    } else {
508
2
        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
509
2
                                                     _tablet_indexes, _skip));
510
2
    }
511
2
    if (has_filtered_rows) {
512
0
        for (int i = 0; i < num_rows; i++) {
513
0
            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
514
0
        }
515
0
    }
516
2
    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
517
2
    return Status::OK();
518
2
}
519
520
void VRowDistribution::_reset_row_part_tablet_ids(
521
9
        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) {
522
9
    row_part_tablet_ids.resize(_schema->indexes().size());
523
9
    for (auto& row_part_tablet_id : row_part_tablet_ids) {
524
9
        auto& row_ids = row_part_tablet_id.row_ids;
525
9
        auto& partition_ids = row_part_tablet_id.partition_ids;
526
9
        auto& tablet_ids = row_part_tablet_id.tablet_ids;
527
528
9
        row_ids.clear();
529
9
        partition_ids.clear();
530
9
        tablet_ids.clear();
531
        // This is important for performance.
532
9
        row_ids.reserve(rows);
533
9
        partition_ids.reserve(rows);
534
9
        tablet_ids.reserve(rows);
535
9
    }
536
9
}
537
538
Status VRowDistribution::generate_rows_distribution(
539
        Block& input_block, std::shared_ptr<Block>& block,
540
9
        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) {
541
9
    auto input_rows = input_block.rows();
542
9
    _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);
543
544
    // we store the batching block with value of `input_block`. so just do all of these again.
545
9
    bool has_filtered_rows = false;
546
9
    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
547
9
            _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows));
548
549
    // batching block rows which need new partitions. deal together at finish.
550
9
    if (!_batching_block) [[unlikely]] {
551
8
        std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0);
552
8
        _batching_block = MutableBlock::create_unique(std::move(*tmp_block));
553
8
    }
554
555
9
    auto num_rows = block->rows();
556
9
    _reset_find_tablets(num_rows);
557
558
    // if there's projection of partition calc, we need to calc it first.
559
9
    auto [part_ctxs, part_funcs] = _get_partition_function();
560
9
    std::vector<uint16_t> partition_cols_idx;
561
9
    if (_vpartition->is_projection_partition()) {
562
        // calc the start value of missing partition ranges.
563
2
        auto func_size = part_funcs.size();
564
4
        for (int i = 0; i < func_size; ++i) {
565
2
            int result_idx = -1;
566
            // we just calc left range here. leave right to FE to avoid dup calc.
567
2
            RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx));
568
569
2
            VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1);
570
2
            DCHECK(result_idx != -1);
571
572
2
            partition_cols_idx.push_back(cast_set<uint16_t>(result_idx));
573
2
        }
574
575
        // change the column to compare to transformed.
576
2
        _vpartition->set_transformed_slots(partition_cols_idx);
577
2
    }
578
579
9
    Status st = Status::OK();
580
9
    if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
581
        // when overwrite, no auto create partition allowed.
582
2
        st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(),
583
2
                                                            partition_cols_idx, has_filtered_rows,
584
2
                                                            row_part_tablet_ids, rows_stat_val);
585
7
    } else if (_vpartition->is_auto_partition() && !_deal_batched) {
586
2
        st = _generate_rows_distribution_for_auto_partition(input_block, block.get(),
587
2
                                                            partition_cols_idx, has_filtered_rows,
588
2
                                                            row_part_tablet_ids, rows_stat_val);
589
5
    } else { // not auto partition
590
5
        st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows,
591
5
                                                                row_part_tablet_ids);
592
5
    }
593
594
9
    return st;
595
9
}
596
597
// reuse vars for find_tablets
598
11
void VRowDistribution::_reset_find_tablets(int64_t rows) {
599
11
    _tablet_finder->filter_bitmap().Reset(rows);
600
11
    _partitions.assign(rows, nullptr);
601
11
    _skip.assign(rows, false);
602
11
    _tablet_indexes.assign(rows, 0);
603
11
}
604
605
} // namespace doris