be/src/exec/sink/vrow_distribution.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <fmt/format.h> |
22 | | #include <gen_cpp/FrontendService.h> |
23 | | #include <gen_cpp/FrontendService_types.h> |
24 | | #include <gen_cpp/PaloInternalService_types.h> |
25 | | #include <glog/logging.h> |
26 | | |
27 | | #include <cstdint> |
28 | | #include <functional> |
29 | | #include <string> |
30 | | #include <unordered_set> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/status.h" |
34 | | #include "core/block/block.h" |
35 | | #include "core/custom_allocator.h" |
36 | | #include "exec/sink/vtablet_block_convertor.h" |
37 | | #include "exec/sink/vtablet_finder.h" |
38 | | #include "exprs/vexpr_context.h" |
39 | | #include "exprs/vexpr_fwd.h" |
40 | | #include "runtime/runtime_profile.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | #include "storage/tablet_info.h" |
43 | | |
44 | | namespace doris { |
45 | | #include "common/compile_check_begin.h" |
46 | | |
47 | | class IndexChannel; |
48 | | class VNodeChannel; |
49 | | |
50 | | // <row_idx, partition_id, tablet_id> |
51 | | class RowPartTabletIds { |
52 | | public: |
53 | | DorisVector<uint32_t> row_ids; |
54 | | DorisVector<int64_t> partition_ids; |
55 | | DorisVector<int64_t> tablet_ids; |
56 | | |
57 | 0 | std::string debug_string() const { |
58 | 0 | std::string value; |
59 | 0 | value.reserve(row_ids.size() * 15); |
60 | 0 | for (int i = 0; i < row_ids.size(); i++) { |
61 | 0 | value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i])); |
62 | 0 | } |
63 | 0 | return value; |
64 | 0 | } |
65 | | }; |
66 | | |
67 | | // void* for caller |
68 | | using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*); |
69 | | |
70 | | class VRowDistribution { |
71 | | public: |
72 | | // only used to pass parameters for VRowDistribution |
73 | | struct VRowDistributionContext { |
74 | | RuntimeState* state = nullptr; |
75 | | OlapTableBlockConvertor* block_convertor = nullptr; |
76 | | OlapTabletFinder* tablet_finder = nullptr; |
77 | | VOlapTablePartitionParam* vpartition = nullptr; |
78 | | RuntimeProfile::Counter* add_partition_request_timer = nullptr; |
79 | | int64_t txn_id = -1; |
80 | | ObjectPool* pool = nullptr; |
81 | | OlapTableLocationParam* location = nullptr; |
82 | | const VExprContextSPtrs* vec_output_expr_ctxs = nullptr; |
83 | | std::shared_ptr<OlapTableSchemaParam> schema; |
84 | | void* caller = nullptr; |
85 | | bool write_single_replica = false; |
86 | | CreatePartitionCallback create_partition_callback; |
87 | | }; |
88 | | friend class VTabletWriter; |
89 | | friend class VTabletWriterV2; |
90 | | |
91 | 74.6k | VRowDistribution() = default; |
92 | 74.8k | virtual ~VRowDistribution() = default; |
93 | | |
94 | 74.7k | void init(VRowDistributionContext ctx) { |
95 | 74.7k | _state = ctx.state; |
96 | 74.7k | _batch_size = std::max(_state->batch_size(), 8192); |
97 | 74.7k | _block_convertor = ctx.block_convertor; |
98 | 74.7k | _tablet_finder = ctx.tablet_finder; |
99 | 74.7k | _vpartition = ctx.vpartition; |
100 | 74.7k | _add_partition_request_timer = ctx.add_partition_request_timer; |
101 | 74.7k | _txn_id = ctx.txn_id; |
102 | 74.7k | _pool = ctx.pool; |
103 | 74.7k | _location = ctx.location; |
104 | 74.7k | _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs; |
105 | 74.7k | _schema = ctx.schema; |
106 | 74.7k | _caller = ctx.caller; |
107 | 74.7k | _write_single_replica = ctx.write_single_replica; |
108 | 74.7k | _create_partition_callback = ctx.create_partition_callback; |
109 | 74.7k | } |
110 | | |
111 | 2 | void output_profile_info(RuntimeProfile* profile) { |
112 | 2 | if (!_add_partition_request_times.empty()) { |
113 | 0 | std::stringstream ss; |
114 | 0 | ss << "["; |
115 | 0 | for (size_t i = 0; i < _add_partition_request_times.size(); ++i) { |
116 | 0 | if (i > 0) { |
117 | 0 | ss << ", "; |
118 | 0 | } |
119 | 0 | ss << PrettyPrinter::print(_add_partition_request_times[i], TUnit::TIME_NS); |
120 | 0 | } |
121 | 0 | ss << "]"; |
122 | 0 | profile->add_info_string("AddPartitionRequestTimeList", ss.str()); |
123 | 0 | } |
124 | 2 | } |
125 | | |
126 | 74.6k | Status open(RowDescriptor* output_row_desc) { |
127 | 74.6k | if (_vpartition->is_auto_partition()) { |
128 | 300 | auto [part_ctxs, part_funcs] = _get_partition_function(); |
129 | 312 | for (auto part_ctx : part_ctxs) { |
130 | 312 | RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc)); |
131 | 312 | RETURN_IF_ERROR(part_ctx->open(_state)); |
132 | 312 | } |
133 | 300 | } |
134 | 75.7k | for (const auto& index : _schema->indexes()) { |
135 | 75.7k | auto& where_clause = index->where_clause; |
136 | 75.7k | if (where_clause != nullptr) { |
137 | 33 | RETURN_IF_ERROR(where_clause->prepare(_state, *output_row_desc)); |
138 | 33 | RETURN_IF_ERROR(where_clause->open(_state)); |
139 | 33 | } |
140 | 75.7k | } |
141 | 74.6k | return Status::OK(); |
142 | 74.6k | } |
143 | | |
144 | | // auto partition |
145 | | // mv where clause |
146 | | // v1 needs index->node->row_ids - tabletids |
147 | | // v2 needs index,tablet->rowids |
148 | | Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block, |
149 | | std::vector<RowPartTabletIds>& row_part_tablet_ids, |
150 | | int64_t& rows_stat_val); |
151 | | // have 2 ways remind to deal batching block: |
152 | | // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true. |
153 | | // 2. in caller, after last block and before close, set _deal_batched = true. |
154 | 118k | bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } |
155 | | // create partitions when need for auto-partition table using #_partitions_need_create. |
156 | | Status automatic_create_partition(); |
157 | | void clear_batching_stats(); |
158 | 1.47k | const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round |
159 | | |
160 | | // for auto partition |
161 | | std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block |
162 | | bool _deal_batched = false; // If true, send batched block before any block's append. |
163 | | |
164 | | private: |
165 | | std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function(); |
166 | | |
167 | | Status _save_missing_values(const Block& input_block, |
168 | | std::vector<std::vector<std::string>>& col_strs, int col_size, |
169 | | Block* block, const std::vector<uint32_t>& filter, |
170 | | const std::vector<const NullMap*>& col_null_maps); |
171 | | |
172 | | void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids); |
173 | | |
174 | | void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id); |
175 | | |
176 | | Status _filter_block_by_skip_and_where_clause(Block* block, |
177 | | const VExprContextSPtr& where_clause, |
178 | | RowPartTabletIds& row_part_tablet_id); |
179 | | |
180 | | Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids); |
181 | | |
182 | | Status _generate_rows_distribution_for_auto_partition( |
183 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx, |
184 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
185 | | int64_t& rows_stat_val); |
186 | | // the whole process to deal missing rows. will call _save_missing_values |
187 | | Status _deal_missing_map(const Block& input_block, Block* block, |
188 | | const std::vector<uint16_t>& partition_cols_idx, |
189 | | int64_t& rows_stat_val); |
190 | | |
191 | | Status _generate_rows_distribution_for_non_auto_partition( |
192 | | Block* block, bool has_filtered_rows, |
193 | | std::vector<RowPartTabletIds>& row_part_tablet_ids); |
194 | | |
195 | | Status _generate_rows_distribution_for_auto_overwrite( |
196 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
197 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
198 | | int64_t& rows_stat_val); |
199 | | Status _replace_overwriting_partition(); |
200 | | |
201 | | void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids, |
202 | | int64_t rows); |
203 | | void _reset_find_tablets(int64_t rows); |
204 | | |
205 | | struct NullableStringListHash { |
206 | 18.3k | std::size_t _hash(const TNullableStringLiteral& arg) const { |
207 | 18.3k | if (arg.is_null) { |
208 | 16 | return 0; |
209 | 16 | } |
210 | 18.3k | return std::hash<std::string>()(arg.value); |
211 | 18.3k | } |
212 | 18.2k | std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const { |
213 | 18.2k | std::size_t result = 0; |
214 | 18.3k | for (const auto& v : arg) { |
215 | 18.3k | result = (result << 1) ^ _hash(v); |
216 | 18.3k | } |
217 | 18.2k | return result; |
218 | 18.2k | } |
219 | | }; |
220 | | |
221 | | RuntimeState* _state = nullptr; |
222 | | int _batch_size = 0; |
223 | | |
224 | | // for auto partitions |
225 | | std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create; |
226 | | size_t _batching_rows = 0, _batching_bytes = 0; |
227 | | std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper; |
228 | | |
229 | | OlapTableBlockConvertor* _block_convertor = nullptr; |
230 | | OlapTabletFinder* _tablet_finder = nullptr; |
231 | | VOlapTablePartitionParam* _vpartition = nullptr; |
232 | | RuntimeProfile::Counter* _add_partition_request_timer = nullptr; |
233 | | int64_t _txn_id = -1; |
234 | | ObjectPool* _pool = nullptr; |
235 | | OlapTableLocationParam* _location = nullptr; |
236 | | |
237 | | // Record each auto-partition request time for detailed profiling |
238 | | std::vector<int64_t> _add_partition_request_times; |
239 | | // int64_t _number_output_rows = 0; |
240 | | const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr; |
241 | | // generally it's writer's on_partitions_created |
242 | | CreatePartitionCallback _create_partition_callback = nullptr; |
243 | | void* _caller = nullptr; |
244 | | std::shared_ptr<OlapTableSchemaParam> _schema; |
245 | | bool _write_single_replica = false; |
246 | | |
247 | | // reuse for find_tablet. save partitions found by find_tablets |
248 | | std::vector<VOlapTablePartition*> _partitions; |
249 | | std::vector<bool> _skip; |
250 | | std::vector<uint32_t> _tablet_indexes; |
251 | | std::vector<int64_t> _tablet_ids; |
252 | | std::vector<uint32_t> _missing_map; // indice of missing values in partition_col |
253 | | // for auto detect overwrite partition |
254 | | std::set<int64_t> _new_partition_ids; // if contains, not to replace it again. |
255 | | }; |
256 | | |
257 | | } // namespace doris |
258 | | |
259 | | #include "common/compile_check_end.h" |