Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/tablet_sink_hash_partitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/tablet_sink_hash_partitioner.h"
19
20
#include <algorithm>
21
#include <memory>
22
#include <utility>
23
24
#include "exec/operator/operator.h"
25
26
namespace doris {
27
#include "common/compile_check_begin.h"
28
TabletSinkHashPartitioner::TabletSinkHashPartitioner(uint32_t partition_count, int64_t txn_id,
29
                                                     TOlapTableSchemaParam tablet_sink_schema,
30
                                                     TOlapTablePartitionParam tablet_sink_partition,
31
                                                     TOlapTableLocationParam tablet_sink_location,
32
                                                     const TTupleId& tablet_sink_tuple_id,
33
                                                     ExchangeSinkLocalState* local_state)
34
2
        : PartitionerBase(partition_count),
35
2
          _txn_id(txn_id),
36
2
          _tablet_sink_schema(std::move(tablet_sink_schema)),
37
2
          _tablet_sink_partition(std::move(tablet_sink_partition)),
38
2
          _tablet_sink_location(std::move(tablet_sink_location)),
39
2
          _tablet_sink_tuple_id(tablet_sink_tuple_id),
40
2
          _local_state(local_state) {}
41
42
0
Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) {
43
0
    return Status::OK();
44
0
}
45
46
0
Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
47
0
    return Status::OK();
48
0
}
49
50
2
Status TabletSinkHashPartitioner::open(RuntimeState* state) {
51
2
    _schema = std::make_shared<OlapTableSchemaParam>();
52
2
    RETURN_IF_ERROR(_schema->init(_tablet_sink_schema));
53
2
    _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, _tablet_sink_partition);
54
2
    RETURN_IF_ERROR(_vpartition->init());
55
2
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
56
2
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
57
2
    _tablet_sink_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id);
58
2
    _tablet_sink_row_desc = state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc));
59
2
    auto& ctxs = _local_state->parent()->cast<ExchangeSinkOperatorX>().tablet_sink_expr_ctxs();
60
2
    _tablet_sink_expr_ctxs.resize(ctxs.size());
61
2
    for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
62
0
        RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
63
0
    }
64
    // if _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
65
    // on exchange node rather than on TabletWriter
66
2
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
67
2
    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), state->batch_size());
68
2
    _location = state->obj_pool()->add(new OlapTableLocationParam(_tablet_sink_location));
69
2
    _row_distribution.init(
70
2
            {.state = state,
71
2
             .block_convertor = _block_convertor.get(),
72
2
             .tablet_finder = _tablet_finder.get(),
73
2
             .vpartition = _vpartition.get(),
74
2
             .add_partition_request_timer = _local_state->add_partition_request_timer(),
75
2
             .txn_id = _txn_id,
76
2
             .pool = state->obj_pool(),
77
2
             .location = _location,
78
2
             .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
79
2
             .schema = _schema,
80
2
             .caller = (void*)this,
81
2
             .create_partition_callback = &TabletSinkHashPartitioner::empty_callback_function});
82
2
    RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
83
2
    return Status::OK();
84
2
}
85
86
2
Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block) const {
87
2
    _hash_vals.resize(block->rows());
88
2
    if (block->empty()) {
89
0
        return Status::OK();
90
0
    }
91
92
    // tablet_id_hash % invalid_val never get invalid_val, so we use invalid_val as sentinel value
93
2
    DCHECK_EQ(invalid_sentinel(), partition_count());
94
2
    const auto& invalid_val = invalid_sentinel();
95
2
    std::ranges::fill(_hash_vals, invalid_val);
96
97
2
    int64_t dummy_stats = 0; // _local_state->rows_input_counter() updated in sink and write.
98
2
    std::shared_ptr<Block> convert_block = std::make_shared<Block>();
99
100
2
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
101
2
            *block, convert_block, _row_part_tablet_ids, dummy_stats));
102
2
    _skipped = _row_distribution.get_skipped();
103
2
    const auto& row_ids = _row_part_tablet_ids[0].row_ids;
104
2
    const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
105
106
3
    for (int idx = 0; idx < row_ids.size(); ++idx) {
107
1
        const auto& row = row_ids[idx];
108
1
        const auto& tablet_id_hash =
109
1
                HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0);
110
1
        _hash_vals[row] = tablet_id_hash % invalid_val;
111
1
    }
112
113
    // _hash_vals[i] == invalid_val => row i is skipped or filtered
114
2
#ifndef NDEBUG
115
6
    for (size_t i = 0; i < _skipped.size(); ++i) {
116
4
        if (_skipped[i]) {
117
3
            CHECK_EQ(_hash_vals[i], invalid_val);
118
3
        }
119
4
    }
120
2
    CHECK_LE(std::ranges::count_if(_skipped, [](bool v) { return v; }),
121
2
             std::ranges::count_if(_hash_vals, [=](HashValType v) { return v == invalid_val; }));
122
2
#endif
123
124
2
    return Status::OK();
125
2
}
126
127
1
Status TabletSinkHashPartitioner::try_cut_in_line(Block& prior_block) const {
128
    // check if we need send batching block first
129
1
    if (_row_distribution.need_deal_batching()) {
130
1
        {
131
1
            SCOPED_TIMER(_local_state->send_new_partition_timer());
132
1
            RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
133
1
        }
134
135
1
        prior_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
136
1
        _row_distribution._batching_block.reset(); // clear. vrow_distribution will re-construct it
137
1
        _row_distribution.clear_batching_stats();
138
1
        VLOG_DEBUG << "sinking batched block:\n" << prior_block.dump_data();
139
1
    }
140
1
    return Status::OK();
141
1
}
142
143
Status TabletSinkHashPartitioner::clone(RuntimeState* state,
144
0
                                        std::unique_ptr<PartitionerBase>& partitioner) {
145
0
    partitioner = std::make_unique<TabletSinkHashPartitioner>(
146
0
            _partition_count, _txn_id, _tablet_sink_schema, _tablet_sink_partition,
147
0
            _tablet_sink_location, _tablet_sink_tuple_id, _local_state);
148
0
    return Status::OK();
149
0
}
150
151
0
Status TabletSinkHashPartitioner::close(RuntimeState* state) {
152
0
    if (_block_convertor != nullptr && _tablet_finder != nullptr) {
153
0
        state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
154
0
                                             _tablet_finder->num_filtered_rows());
155
0
        state->update_num_rows_load_unselected(
156
0
                _tablet_finder->num_immutable_partition_filtered_rows());
157
        // sink won't see those filtered rows, we should compensate here
158
0
        state->set_num_rows_load_total(state->num_rows_load_filtered() +
159
0
                                       state->num_rows_load_unselected());
160
0
    }
161
0
    return Status::OK();
162
0
}
163
} // namespace doris