be/src/exec/sink/tablet_sink_hash_partitioner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/tablet_sink_hash_partitioner.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <memory> |
22 | | #include <utility> |
23 | | |
24 | | #include "exec/operator/operator.h" |
25 | | |
26 | | namespace doris { |
27 | | #include "common/compile_check_begin.h" |
28 | | TabletSinkHashPartitioner::TabletSinkHashPartitioner(uint32_t partition_count, int64_t txn_id, |
29 | | TOlapTableSchemaParam tablet_sink_schema, |
30 | | TOlapTablePartitionParam tablet_sink_partition, |
31 | | TOlapTableLocationParam tablet_sink_location, |
32 | | const TTupleId& tablet_sink_tuple_id, |
33 | | ExchangeSinkLocalState* local_state) |
34 | 2 | : PartitionerBase(partition_count), |
35 | 2 | _txn_id(txn_id), |
36 | 2 | _tablet_sink_schema(std::move(tablet_sink_schema)), |
37 | 2 | _tablet_sink_partition(std::move(tablet_sink_partition)), |
38 | 2 | _tablet_sink_location(std::move(tablet_sink_location)), |
39 | 2 | _tablet_sink_tuple_id(tablet_sink_tuple_id), |
40 | 2 | _local_state(local_state) {} |
41 | | |
42 | 0 | Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) { |
43 | 0 | return Status::OK(); |
44 | 0 | } |
45 | | |
46 | 0 | Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) { |
47 | 0 | return Status::OK(); |
48 | 0 | } |
49 | | |
50 | 2 | Status TabletSinkHashPartitioner::open(RuntimeState* state) { |
51 | 2 | _schema = std::make_shared<OlapTableSchemaParam>(); |
52 | 2 | RETURN_IF_ERROR(_schema->init(_tablet_sink_schema)); |
53 | 2 | _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, _tablet_sink_partition); |
54 | 2 | RETURN_IF_ERROR(_vpartition->init()); |
55 | 2 | auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; |
56 | 2 | _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition.get(), find_tablet_mode); |
57 | 2 | _tablet_sink_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id); |
58 | 2 | _tablet_sink_row_desc = state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc)); |
59 | 2 | auto& ctxs = _local_state->parent()->cast<ExchangeSinkOperatorX>().tablet_sink_expr_ctxs(); |
60 | 2 | _tablet_sink_expr_ctxs.resize(ctxs.size()); |
61 | 2 | for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { |
62 | 0 | RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); |
63 | 0 | } |
64 | | // if _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column |
65 | | // on exchange node rather than on TabletWriter |
66 | 2 | _block_convertor = std::make_unique<OlapTableBlockConvertor>(_tablet_sink_tuple_desc); |
67 | 2 | _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), state->batch_size()); |
68 | 2 | _location = state->obj_pool()->add(new OlapTableLocationParam(_tablet_sink_location)); |
69 | 2 | _row_distribution.init( |
70 | 2 | {.state = state, |
71 | 2 | .block_convertor = _block_convertor.get(), |
72 | 2 | .tablet_finder = _tablet_finder.get(), |
73 | 2 | .vpartition = _vpartition.get(), |
74 | 2 | .add_partition_request_timer = _local_state->add_partition_request_timer(), |
75 | 2 | .txn_id = _txn_id, |
76 | 2 | .pool = state->obj_pool(), |
77 | 2 | .location = _location, |
78 | 2 | .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, |
79 | 2 | .schema = _schema, |
80 | 2 | .caller = (void*)this, |
81 | 2 | .create_partition_callback = &TabletSinkHashPartitioner::empty_callback_function}); |
82 | 2 | RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); |
83 | 2 | return Status::OK(); |
84 | 2 | } |
85 | | |
86 | 2 | Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block) const { |
87 | 2 | _hash_vals.resize(block->rows()); |
88 | 2 | if (block->empty()) { |
89 | 0 | return Status::OK(); |
90 | 0 | } |
91 | | |
92 | | // tablet_id_hash % invalid_val never get invalid_val, so we use invalid_val as sentinel value |
93 | 2 | DCHECK_EQ(invalid_sentinel(), partition_count()); |
94 | 2 | const auto& invalid_val = invalid_sentinel(); |
95 | 2 | std::ranges::fill(_hash_vals, invalid_val); |
96 | | |
97 | 2 | int64_t dummy_stats = 0; // _local_state->rows_input_counter() updated in sink and write. |
98 | 2 | std::shared_ptr<Block> convert_block = std::make_shared<Block>(); |
99 | | |
100 | 2 | RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( |
101 | 2 | *block, convert_block, _row_part_tablet_ids, dummy_stats)); |
102 | 2 | _skipped = _row_distribution.get_skipped(); |
103 | 2 | const auto& row_ids = _row_part_tablet_ids[0].row_ids; |
104 | 2 | const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; |
105 | | |
106 | 3 | for (int idx = 0; idx < row_ids.size(); ++idx) { |
107 | 1 | const auto& row = row_ids[idx]; |
108 | 1 | const auto& tablet_id_hash = |
109 | 1 | HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0); |
110 | 1 | _hash_vals[row] = tablet_id_hash % invalid_val; |
111 | 1 | } |
112 | | |
113 | | // _hash_vals[i] == invalid_val => row i is skipped or filtered |
114 | 2 | #ifndef NDEBUG |
115 | 6 | for (size_t i = 0; i < _skipped.size(); ++i) { |
116 | 4 | if (_skipped[i]) { |
117 | 3 | CHECK_EQ(_hash_vals[i], invalid_val); |
118 | 3 | } |
119 | 4 | } |
120 | 2 | CHECK_LE(std::ranges::count_if(_skipped, [](bool v) { return v; }), |
121 | 2 | std::ranges::count_if(_hash_vals, [=](HashValType v) { return v == invalid_val; })); |
122 | 2 | #endif |
123 | | |
124 | 2 | return Status::OK(); |
125 | 2 | } |
126 | | |
127 | 1 | Status TabletSinkHashPartitioner::try_cut_in_line(Block& prior_block) const { |
128 | | // check if we need send batching block first |
129 | 1 | if (_row_distribution.need_deal_batching()) { |
130 | 1 | { |
131 | 1 | SCOPED_TIMER(_local_state->send_new_partition_timer()); |
132 | 1 | RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); |
133 | 1 | } |
134 | | |
135 | 1 | prior_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref |
136 | 1 | _row_distribution._batching_block.reset(); // clear. vrow_distribution will re-construct it |
137 | 1 | _row_distribution.clear_batching_stats(); |
138 | 1 | VLOG_DEBUG << "sinking batched block:\n" << prior_block.dump_data(); |
139 | 1 | } |
140 | 1 | return Status::OK(); |
141 | 1 | } |
142 | | |
143 | | Status TabletSinkHashPartitioner::clone(RuntimeState* state, |
144 | 0 | std::unique_ptr<PartitionerBase>& partitioner) { |
145 | 0 | partitioner = std::make_unique<TabletSinkHashPartitioner>( |
146 | 0 | _partition_count, _txn_id, _tablet_sink_schema, _tablet_sink_partition, |
147 | 0 | _tablet_sink_location, _tablet_sink_tuple_id, _local_state); |
148 | 0 | return Status::OK(); |
149 | 0 | } |
150 | | |
151 | 0 | Status TabletSinkHashPartitioner::close(RuntimeState* state) { |
152 | 0 | if (_block_convertor != nullptr && _tablet_finder != nullptr) { |
153 | 0 | state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + |
154 | 0 | _tablet_finder->num_filtered_rows()); |
155 | 0 | state->update_num_rows_load_unselected( |
156 | 0 | _tablet_finder->num_immutable_partition_filtered_rows()); |
157 | | // sink won't see those filtered rows, we should compensate here |
158 | 0 | state->set_num_rows_load_total(state->num_rows_load_filtered() + |
159 | 0 | state->num_rows_load_unselected()); |
160 | 0 | } |
161 | 0 | return Status::OK(); |
162 | 0 | } |
163 | | } // namespace doris |