be/src/exec/sink/vrow_distribution.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vrow_distribution.h" |
19 | | |
20 | | #include <gen_cpp/FrontendService.h> |
21 | | #include <gen_cpp/FrontendService_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | |
28 | | #include "common/cast_set.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/metrics/doris_metrics.h" |
31 | | #include "common/status.h" |
32 | | #include "core/assert_cast.h" |
33 | | #include "core/column/column.h" |
34 | | #include "core/column/column_const.h" |
35 | | #include "core/column/column_nullable.h" |
36 | | #include "core/column/column_vector.h" |
37 | | #include "core/data_type/data_type.h" |
38 | | #include "exec/sink/writer/vtablet_writer.h" |
39 | | #include "runtime/exec_env.h" |
40 | | #include "runtime/query_context.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | #include "service/backend_options.h" |
43 | | #include "util/client_cache.h" |
44 | | #include "util/debug_points.h" |
45 | | #include "util/thrift_rpc_helper.h" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | 38.5k | std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() { |
50 | 38.5k | return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; |
51 | 38.5k | } |
52 | | |
53 | | Status VRowDistribution::_save_missing_values( |
54 | | const Block& input_block, |
55 | | std::vector<std::vector<std::string>>& col_strs, // non-const ref for move |
56 | | int col_size, Block* block, const std::vector<uint32_t>& filter, |
57 | 172 | const std::vector<const NullMap*>& col_null_maps) { |
58 | | // de-duplication for new partitions but save all rows. |
59 | 172 | RETURN_IF_ERROR( |
60 | 172 | _batching_block->add_rows(&input_block, filter.data(), filter.data() + filter.size())); |
61 | 172 | std::vector<TNullableStringLiteral> cur_row_values; |
62 | 33.2k | for (int row = 0; row < col_strs[0].size(); ++row) { |
63 | 33.0k | cur_row_values.clear(); |
64 | 66.1k | for (int col = 0; col < col_size; ++col) { |
65 | 33.1k | TNullableStringLiteral node; |
66 | 33.1k | const auto* null_map = col_null_maps[col]; // null map for this col |
67 | 33.1k | node.__set_is_null((null_map && (*null_map)[filter[row]]) |
68 | 33.1k | ? true |
69 | 33.1k | : node.is_null); // if not, dont change(default false) |
70 | 33.1k | if (!node.is_null) { |
71 | 33.0k | node.__set_value(col_strs[col][row]); |
72 | 33.0k | } |
73 | 33.1k | cur_row_values.push_back(node); |
74 | 33.1k | } |
75 | 33.0k | if (!_deduper.contains(cur_row_values)) { |
76 | 335 | _deduper.insert(cur_row_values); |
77 | 335 | _partitions_need_create.emplace_back(cur_row_values); |
78 | 335 | } |
79 | 33.0k | } |
80 | | |
81 | | // to avoid too large mem use |
82 | 172 | if (_batching_block->rows() > _batch_size) { |
83 | 2 | _deal_batched = true; |
84 | 2 | } |
85 | 172 | _batching_rows = _batching_block->rows(); |
86 | 172 | VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows; |
87 | | |
88 | 172 | return Status::OK(); |
89 | 172 | } |
90 | | |
91 | 168 | void VRowDistribution::clear_batching_stats() { |
92 | 168 | _partitions_need_create.clear(); |
93 | 168 | _batching_rows = 0; |
94 | 168 | _batching_bytes = 0; |
95 | 168 | } |
96 | | |
97 | 168 | Status VRowDistribution::automatic_create_partition() { |
98 | 168 | MonotonicStopWatch timer; |
99 | 168 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
100 | 0 | timer.start(); |
101 | 0 | } |
102 | | |
103 | 168 | SCOPED_TIMER(_add_partition_request_timer); |
104 | 168 | TCreatePartitionRequest request; |
105 | 168 | TCreatePartitionResult result; |
106 | 168 | bool injected = false; |
107 | 168 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
108 | 168 | request.__set_txn_id(_txn_id); |
109 | 168 | request.__set_db_id(_vpartition->db_id()); |
110 | 168 | request.__set_table_id(_vpartition->table_id()); |
111 | 168 | request.__set_partitionValues(_partitions_need_create); |
112 | 168 | request.__set_be_endpoint(be_endpoint); |
113 | 168 | request.__set_write_single_replica(_write_single_replica); |
114 | 168 | if (_state && _state->get_query_ctx()) { |
115 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
116 | 168 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
117 | 168 | } |
118 | | |
119 | 168 | DBUG_EXECUTE_IF("VRowDistribution.automatic_create_partition.inject_result", { |
120 | 168 | DBUG_RUN_CALLBACK(&request, &result); |
121 | 168 | injected = true; |
122 | 168 | }); |
123 | | |
124 | 168 | VLOG_NOTICE << "automatic partition rpc begin request " << request; |
125 | 168 | if (!injected) { |
126 | 166 | std::shared_ptr<TNetworkAddress> master_addr; |
127 | 166 | if (_vpartition->get_master_address() == nullptr) { |
128 | 166 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
129 | 166 | if (cluster_info == nullptr) { |
130 | 0 | return Status::InternalError("cluster_info is null"); |
131 | 0 | } |
132 | 166 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
133 | 166 | } else { |
134 | 0 | master_addr = _vpartition->get_master_address(); |
135 | 0 | } |
136 | 166 | int time_out = _state->execution_timeout() * 1000; |
137 | 166 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
138 | 166 | master_addr->hostname, master_addr->port, |
139 | 166 | [&request, &result](FrontendServiceConnection& client) { |
140 | 166 | client->createPartition(result, request); |
141 | 166 | }, |
142 | 166 | time_out)); |
143 | 166 | } |
144 | | |
145 | 168 | Status status(Status::create(result.status)); |
146 | 168 | VLOG_NOTICE << "automatic partition rpc end response " << result; |
147 | 168 | if (result.status.status_code == TStatusCode::OK) { |
148 | | // add new created partitions |
149 | 168 | RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); |
150 | 335 | for (const auto& part : result.partitions) { |
151 | 335 | _new_partition_ids.insert(part.id); |
152 | 335 | VLOG_TRACE << "record new id: " << part.id; |
153 | 335 | } |
154 | 168 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); |
155 | 168 | } |
156 | | |
157 | | // Record this request's elapsed time |
158 | 168 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
159 | 0 | int64_t elapsed_ns = timer.elapsed_time(); |
160 | 0 | _add_partition_request_times.push_back(elapsed_ns); |
161 | 0 | } |
162 | 168 | return status; |
163 | 168 | } |
164 | | |
165 | | // for reuse the same create callback of create-partition |
166 | 20 | static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { |
167 | 20 | TCreatePartitionResult result; |
168 | 20 | result.status = arg.status; |
169 | 20 | result.nodes = std::move(arg.nodes); |
170 | 20 | result.partitions = std::move(arg.partitions); |
171 | 20 | result.tablets = std::move(arg.tablets); |
172 | 20 | result.slave_tablets = std::move(arg.slave_tablets); |
173 | 20 | return result; |
174 | 20 | } |
175 | | |
176 | | // use _partitions and replace them |
177 | 34 | Status VRowDistribution::_replace_overwriting_partition() { |
178 | 34 | SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition |
179 | 34 | TReplacePartitionRequest request; |
180 | 34 | TReplacePartitionResult result; |
181 | 34 | bool injected = false; |
182 | 34 | request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); |
183 | 34 | request.__set_db_id(_vpartition->db_id()); |
184 | 34 | request.__set_table_id(_vpartition->table_id()); |
185 | 34 | request.__set_write_single_replica(_write_single_replica); |
186 | | |
187 | | // only request for partitions not recorded for replacement |
188 | 34 | std::set<int64_t> id_deduper; |
189 | 60.0k | for (const auto* part : _partitions) { |
190 | 60.0k | if (part != nullptr) { |
191 | 43.7k | if (_new_partition_ids.contains(part->id)) { |
192 | | // this is a new partition. dont replace again. |
193 | 27.3k | VLOG_TRACE << "skip new partition: " << part->id; |
194 | 27.3k | } else { |
195 | | // request for replacement |
196 | 16.3k | id_deduper.insert(part->id); |
197 | 16.3k | } |
198 | 43.7k | } else if (_missing_map.empty()) { |
199 | | // no origin partition. and not allow to create. |
200 | 6 | return Status::InvalidArgument( |
201 | 6 | "Cannot found origin partitions in auto detect overwriting, stop " |
202 | 6 | "processing"); |
203 | 6 | } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here. |
204 | 60.0k | } |
205 | 28 | if (id_deduper.empty()) { |
206 | 8 | return Status::OK(); // no need to request |
207 | 8 | } |
208 | | // de-duplicate. there's no check in FE |
209 | 20 | std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end()); |
210 | | |
211 | 20 | request.__set_partition_ids(request_part_ids); |
212 | | |
213 | 20 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
214 | 20 | request.__set_be_endpoint(be_endpoint); |
215 | 20 | if (_state && _state->get_query_ctx()) { |
216 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
217 | 20 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
218 | 20 | } |
219 | | |
220 | 20 | DBUG_EXECUTE_IF("VRowDistribution.replace_overwriting_partition.inject_result", { |
221 | 20 | DBUG_RUN_CALLBACK(&request, &result); |
222 | 20 | injected = true; |
223 | 20 | }); |
224 | | |
225 | 20 | VLOG_NOTICE << "auto detect replace partition request: " << request; |
226 | 20 | if (!injected) { |
227 | 19 | std::shared_ptr<TNetworkAddress> master_addr; |
228 | 19 | if (_vpartition->get_master_address() == nullptr) { |
229 | 19 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
230 | 19 | if (cluster_info == nullptr) { |
231 | 0 | return Status::InternalError("cluster_info is null"); |
232 | 0 | } |
233 | 19 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
234 | 19 | } else { |
235 | 0 | master_addr = _vpartition->get_master_address(); |
236 | 0 | } |
237 | 19 | int time_out = _state->execution_timeout() * 1000; |
238 | 19 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
239 | 19 | master_addr->hostname, master_addr->port, |
240 | 19 | [&request, &result](FrontendServiceConnection& client) { |
241 | 19 | client->replacePartition(result, request); |
242 | 19 | }, |
243 | 19 | time_out)); |
244 | 19 | } |
245 | | |
246 | 20 | Status status(Status::create(result.status)); |
247 | 20 | VLOG_NOTICE << "auto detect replace partition result: " << result; |
248 | 20 | if (result.status.status_code == TStatusCode::OK) { |
249 | | // record new partitions |
250 | 32 | for (const auto& part : result.partitions) { |
251 | 32 | _new_partition_ids.insert(part.id); |
252 | 32 | VLOG_TRACE << "record new id: " << part.id; |
253 | 32 | } |
254 | | // replace data in _partitions |
255 | 20 | RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); |
256 | | // reuse the function as the args' structure are same. it add nodes/locations and incremental_open |
257 | 20 | auto result_as_create = cast_as_create_result(result); |
258 | 20 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); |
259 | 20 | } |
260 | | |
261 | 20 | return status; |
262 | 20 | } |
263 | | |
264 | | void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx, |
265 | 39.0k | std::vector<int64_t>& tablet_ids) { |
266 | 39.0k | tablet_ids.resize(block->rows()); |
267 | 35.2M | for (int row_idx = 0; row_idx < block->rows(); row_idx++) { |
268 | 35.1M | if (_skip[row_idx]) { |
269 | 33.8k | continue; |
270 | 33.8k | } |
271 | 35.1M | auto& partition = _partitions[row_idx]; |
272 | 35.1M | auto& tablet_index = _tablet_indexes[row_idx]; |
273 | 35.1M | auto& index = partition->indexes[index_idx]; |
274 | | |
275 | 35.1M | auto tablet_id = index.tablets[tablet_index]; |
276 | 35.1M | tablet_ids[row_idx] = tablet_id; |
277 | 35.1M | } |
278 | 39.0k | } |
279 | | |
280 | 39.0k | void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) { |
281 | 39.0k | auto& row_ids = row_part_tablet_id.row_ids; |
282 | 39.0k | auto& partition_ids = row_part_tablet_id.partition_ids; |
283 | 39.0k | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
284 | | |
285 | 39.0k | auto rows = block->rows(); |
286 | | // row count of a block should not exceed UINT32_MAX |
287 | 39.0k | auto rows_uint32 = cast_set<uint32_t>(rows); |
288 | 35.6M | for (uint32_t i = 0; i < rows_uint32; i++) { |
289 | 35.6M | if (!_skip[i]) { |
290 | 35.5M | row_ids.emplace_back(i); |
291 | 35.5M | partition_ids.emplace_back(_partitions[i]->id); |
292 | 35.5M | tablet_ids.emplace_back(_tablet_ids[i]); |
293 | 35.5M | } |
294 | 35.6M | } |
295 | 39.0k | } |
296 | | |
297 | | Status VRowDistribution::_filter_block_by_skip_and_where_clause( |
298 | 35 | Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) { |
299 | | // TODO |
300 | | //SCOPED_RAW_TIMER(&_stat.where_clause_ns); |
301 | 35 | ColumnPtr filter_column; |
302 | 35 | RETURN_IF_ERROR(where_clause->execute(block, filter_column)); |
303 | | |
304 | 35 | auto& row_ids = row_part_tablet_id.row_ids; |
305 | 35 | auto& partition_ids = row_part_tablet_id.partition_ids; |
306 | 35 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
307 | 35 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
308 | 29 | auto rows = block->rows(); |
309 | | // row count of a block should not exceed UINT32_MAX |
310 | 29 | auto rows_uint32 = cast_set<uint32_t>(rows); |
311 | 58 | for (uint32_t i = 0; i < rows_uint32; i++) { |
312 | 29 | if (nullable_column->get_bool_inline(i) && !_skip[i]) { |
313 | 12 | row_ids.emplace_back(i); |
314 | 12 | partition_ids.emplace_back(_partitions[i]->id); |
315 | 12 | tablet_ids.emplace_back(_tablet_ids[i]); |
316 | 12 | } |
317 | 29 | } |
318 | 29 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
319 | 1 | bool ret = const_column->get_bool(0); |
320 | 1 | if (!ret) { |
321 | 1 | return Status::OK(); |
322 | 1 | } |
323 | | // should we optimize? |
324 | 0 | _filter_block_by_skip(block, row_part_tablet_id); |
325 | 5 | } else { |
326 | 5 | const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data(); |
327 | 5 | auto rows = block->rows(); |
328 | | // row count of a block should not exceed UINT32_MAX |
329 | 5 | auto rows_uint32 = cast_set<uint32_t>(rows); |
330 | 15 | for (uint32_t i = 0; i < rows_uint32; i++) { |
331 | 10 | if (filter[i] != 0 && !_skip[i]) { |
332 | 6 | row_ids.emplace_back(i); |
333 | 6 | partition_ids.emplace_back(_partitions[i]->id); |
334 | 6 | tablet_ids.emplace_back(_tablet_ids[i]); |
335 | 6 | } |
336 | 10 | } |
337 | 5 | } |
338 | | |
339 | 34 | return Status::OK(); |
340 | 35 | } |
341 | | |
342 | | Status VRowDistribution::_filter_block(Block* block, |
343 | 38.1k | std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
344 | 77.2k | for (int i = 0; i < _schema->indexes().size(); i++) { |
345 | 39.0k | _get_tablet_ids(block, i, _tablet_ids); |
346 | 39.0k | auto& where_clause = _schema->indexes()[i]->where_clause; |
347 | 39.0k | if (where_clause != nullptr) { |
348 | 35 | RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, |
349 | 35 | row_part_tablet_ids[i])); |
350 | 39.0k | } else { |
351 | 39.0k | _filter_block_by_skip(block, row_part_tablet_ids[i]); |
352 | 39.0k | } |
353 | 39.0k | } |
354 | 38.1k | return Status::OK(); |
355 | 38.1k | } |
356 | | |
357 | | Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( |
358 | 37.7k | Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
359 | 37.7k | int num_rows = cast_set<int>(block->rows()); |
360 | | |
361 | 37.7k | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
362 | 37.7k | _tablet_indexes, _skip)); |
363 | 37.7k | if (has_filtered_rows) { |
364 | 534 | for (int i = 0; i < num_rows; i++) { |
365 | 470 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
366 | 470 | } |
367 | 64 | } |
368 | 37.7k | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
369 | 37.7k | return Status::OK(); |
370 | 37.7k | } |
371 | | |
372 | | Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block, |
373 | | const std::vector<uint16_t>& partition_cols_idx, |
374 | 172 | int64_t& rows_stat_val) { |
375 | | // for missing partition keys, calc the missing partition and save in _partitions_need_create |
376 | 172 | auto [part_ctxs, part_exprs] = _get_partition_function(); |
377 | 172 | int part_col_num = cast_set<int>(part_exprs.size()); |
378 | | // the two vectors are in column-first-order |
379 | 172 | std::vector<std::vector<std::string>> col_strs; |
380 | 172 | std::vector<const NullMap*> col_null_maps; |
381 | 172 | col_strs.resize(part_col_num); |
382 | 172 | col_null_maps.reserve(part_col_num); |
383 | | |
384 | 172 | auto format_options = DataTypeSerDe::get_default_format_options(); |
385 | 172 | format_options.timezone = &_state->timezone_obj(); |
386 | | |
387 | 352 | for (int i = 0; i < part_col_num; ++i) { |
388 | 180 | auto return_type = part_exprs[i]->data_type(); |
389 | | // expose the data column. the return type would be nullable |
390 | 180 | const auto& [range_left_col, col_const] = |
391 | 180 | unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); |
392 | 180 | if (range_left_col->is_nullable()) { |
393 | 47 | col_null_maps.push_back(&( |
394 | 47 | assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data())); |
395 | 133 | } else { |
396 | 133 | col_null_maps.push_back(nullptr); |
397 | 133 | } |
398 | 33.1k | for (auto row : _missing_map) { |
399 | 33.1k | col_strs[i].push_back(return_type->to_string( |
400 | 33.1k | *range_left_col, index_check_const(row, col_const), format_options)); |
401 | 33.1k | } |
402 | 180 | } |
403 | | |
404 | | // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. |
405 | | // NOTE: must save old batching stats before calling _save_missing_values(), |
406 | | // because _save_missing_values() will update _batching_rows internally. |
407 | 172 | size_t old_bt_rows = _batching_rows; |
408 | 172 | size_t old_bt_bytes = _batching_bytes; |
409 | | |
410 | 172 | RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map, |
411 | 172 | col_null_maps)); |
412 | | |
413 | 172 | size_t new_bt_rows = _batching_block->rows(); |
414 | 172 | size_t new_bt_bytes = _batching_block->bytes(); |
415 | 172 | rows_stat_val -= new_bt_rows - old_bt_rows; |
416 | 172 | _state->update_num_rows_load_total(old_bt_rows - new_bt_rows); |
417 | 172 | _state->update_num_bytes_load_total(old_bt_bytes - new_bt_bytes); |
418 | 172 | DorisMetrics::instance()->load_rows->increment(old_bt_rows - new_bt_rows); |
419 | 172 | DorisMetrics::instance()->load_bytes->increment(old_bt_bytes - new_bt_bytes); |
420 | | |
421 | 172 | return Status::OK(); |
422 | 172 | } |
423 | | |
424 | | Status VRowDistribution::_generate_rows_distribution_for_auto_partition( |
425 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
426 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
427 | 322 | int64_t& rows_stat_val) { |
428 | 322 | int num_rows = cast_set<int>(block->rows()); |
429 | 322 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
430 | | |
431 | 322 | auto& partition_col = block->get_by_position(partition_keys[0]); |
432 | 322 | _missing_map.clear(); |
433 | 322 | _missing_map.reserve(partition_col.column->size()); |
434 | | |
435 | 322 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
436 | 322 | _tablet_indexes, _skip, &_missing_map)); |
437 | | |
438 | | // the missing vals for auto partition are also skipped. |
439 | 322 | if (has_filtered_rows) { |
440 | 8 | for (int i = 0; i < num_rows; i++) { |
441 | 4 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
442 | 4 | } |
443 | 4 | } |
444 | 322 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
445 | | |
446 | 322 | if (!_missing_map.empty()) { |
447 | 169 | RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, |
448 | 169 | rows_stat_val)); // send input block to save |
449 | 169 | } |
450 | 322 | return Status::OK(); |
451 | 322 | } |
452 | | |
453 | | Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( |
454 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
455 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
456 | 34 | int64_t& rows_stat_val) { |
457 | 34 | int num_rows = cast_set<int>(block->rows()); |
458 | | |
459 | | // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc, |
460 | | // and find the new partitions to use. |
461 | | // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto |
462 | | // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz |
463 | | // we already saved missing values. |
464 | 34 | if (_vpartition->is_auto_partition() && |
465 | 34 | _state->query_options().enable_auto_create_when_overwrite) { |
466 | | // allow auto create partition for missing rows. |
467 | 10 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
468 | 10 | auto partition_col = block->get_by_position(partition_keys[0]); |
469 | 10 | _missing_map.clear(); |
470 | 10 | _missing_map.reserve(partition_col.column->size()); |
471 | | |
472 | 10 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
473 | 10 | _tablet_indexes, _skip, &_missing_map)); |
474 | | |
475 | | // allow and really need to create during auto-detect-overwriting. |
476 | 10 | if (!_missing_map.empty()) { |
477 | 3 | RETURN_IF_ERROR( |
478 | 3 | _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val)); |
479 | 3 | } |
480 | 24 | } else { |
481 | 24 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
482 | 24 | _tablet_indexes, _skip)); |
483 | 24 | } |
484 | 34 | RETURN_IF_ERROR(_replace_overwriting_partition()); |
485 | | |
486 | | // regenerate locations for new partitions & tablets |
487 | 28 | _reset_find_tablets(num_rows); |
488 | 28 | if (_vpartition->is_auto_partition() && |
489 | 28 | _state->query_options().enable_auto_create_when_overwrite) { |
490 | | // here _missing_map is just a placeholder |
491 | 10 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
492 | 10 | _tablet_indexes, _skip, &_missing_map)); |
493 | 10 | if (VLOG_TRACE_IS_ON) { |
494 | 0 | std::string tmp; |
495 | 0 | for (auto v : _missing_map) { |
496 | 0 | tmp += std::to_string(v).append(", "); |
497 | 0 | } |
498 | 0 | VLOG_TRACE << "Trace missing map of " << this << ':' << tmp; |
499 | 0 | } |
500 | 18 | } else { |
501 | 18 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
502 | 18 | _tablet_indexes, _skip)); |
503 | 18 | } |
504 | 28 | if (has_filtered_rows) { |
505 | 0 | for (int i = 0; i < num_rows; i++) { |
506 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
507 | 0 | } |
508 | 0 | } |
509 | 28 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
510 | 28 | return Status::OK(); |
511 | 28 | } |
512 | | |
513 | | void VRowDistribution::_reset_row_part_tablet_ids( |
514 | 38.1k | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) { |
515 | 38.1k | row_part_tablet_ids.resize(_schema->indexes().size()); |
516 | 39.1k | for (auto& row_part_tablet_id : row_part_tablet_ids) { |
517 | 39.1k | auto& row_ids = row_part_tablet_id.row_ids; |
518 | 39.1k | auto& partition_ids = row_part_tablet_id.partition_ids; |
519 | 39.1k | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
520 | | |
521 | 39.1k | row_ids.clear(); |
522 | 39.1k | partition_ids.clear(); |
523 | 39.1k | tablet_ids.clear(); |
524 | | // This is important for performance. |
525 | 39.1k | row_ids.reserve(rows); |
526 | 39.1k | partition_ids.reserve(rows); |
527 | 39.1k | tablet_ids.reserve(rows); |
528 | 39.1k | } |
529 | 38.1k | } |
530 | | |
531 | | Status VRowDistribution::generate_rows_distribution( |
532 | | Block& input_block, std::shared_ptr<Block>& block, |
533 | 38.1k | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) { |
534 | 38.1k | auto input_rows = input_block.rows(); |
535 | 38.1k | _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); |
536 | | |
537 | | // we store the batching block with value of `input_block`. so just do all of these again. |
538 | 38.1k | bool has_filtered_rows = false; |
539 | 38.1k | RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( |
540 | 38.1k | _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); |
541 | | |
542 | | // batching block rows which need new partitions. deal together at finish. |
543 | 38.1k | if (!_batching_block) [[unlikely]] { |
544 | 31.4k | std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0); |
545 | 31.4k | _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); |
546 | 31.4k | } |
547 | | |
548 | 38.1k | auto num_rows = block->rows(); |
549 | 38.1k | _reset_find_tablets(num_rows); |
550 | | |
551 | | // if there's projection of partition calc, we need to calc it first. |
552 | 38.1k | auto [part_ctxs, part_funcs] = _get_partition_function(); |
553 | 38.1k | std::vector<uint16_t> partition_cols_idx; |
554 | 38.1k | if (_vpartition->is_projection_partition()) { |
555 | | // calc the start value of missing partition ranges. |
556 | 498 | auto func_size = part_funcs.size(); |
557 | 1.11k | for (int i = 0; i < func_size; ++i) { |
558 | 616 | int result_idx = -1; |
559 | | // we just calc left range here. leave right to FE to avoid dup calc. |
560 | 616 | RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx)); |
561 | | |
562 | 616 | VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1); |
563 | 616 | DCHECK(result_idx != -1); |
564 | | |
565 | 616 | partition_cols_idx.push_back(cast_set<uint16_t>(result_idx)); |
566 | 616 | } |
567 | | |
568 | | // change the column to compare to transformed. |
569 | 498 | _vpartition->set_transformed_slots(partition_cols_idx); |
570 | 498 | } |
571 | | |
572 | 38.1k | Status st = Status::OK(); |
573 | 38.1k | if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { |
574 | | // when overwrite, no auto create partition allowed. |
575 | 34 | st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(), |
576 | 34 | partition_cols_idx, has_filtered_rows, |
577 | 34 | row_part_tablet_ids, rows_stat_val); |
578 | 38.0k | } else if (_vpartition->is_auto_partition() && !_deal_batched) { |
579 | 322 | st = _generate_rows_distribution_for_auto_partition(input_block, block.get(), |
580 | 322 | partition_cols_idx, has_filtered_rows, |
581 | 322 | row_part_tablet_ids, rows_stat_val); |
582 | 37.7k | } else { // not auto partition |
583 | 37.7k | st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows, |
584 | 37.7k | row_part_tablet_ids); |
585 | 37.7k | } |
586 | | |
587 | 38.1k | return st; |
588 | 38.1k | } |
589 | | |
590 | | // reuse vars for find_tablets |
591 | 38.1k | void VRowDistribution::_reset_find_tablets(int64_t rows) { |
592 | 38.1k | _tablet_finder->filter_bitmap().Reset(rows); |
593 | 38.1k | _partitions.assign(rows, nullptr); |
594 | 38.1k | _skip.assign(rows, false); |
595 | 38.1k | _tablet_indexes.assign(rows, 0); |
596 | 38.1k | } |
597 | | |
598 | | } // namespace doris |