be/src/exec/sink/vrow_distribution.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vrow_distribution.h" |
19 | | |
20 | | #include <gen_cpp/FrontendService.h> |
21 | | #include <gen_cpp/FrontendService_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | |
28 | | #include "common/cast_set.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/metrics/doris_metrics.h" |
31 | | #include "common/status.h" |
32 | | #include "core/assert_cast.h" |
33 | | #include "core/column/column.h" |
34 | | #include "core/column/column_const.h" |
35 | | #include "core/column/column_nullable.h" |
36 | | #include "core/column/column_vector.h" |
37 | | #include "core/data_type/data_type.h" |
38 | | #include "exec/sink/writer/vtablet_writer.h" |
39 | | #include "runtime/exec_env.h" |
40 | | #include "runtime/query_context.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | #include "service/backend_options.h" |
43 | | #include "util/client_cache.h" |
44 | | #include "util/debug_points.h" |
45 | | #include "util/thrift_rpc_helper.h" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | 13 | std::pair<VExprContextSPtrs, VExprSPtrs> VRowDistribution::_get_partition_function() { |
50 | 13 | return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; |
51 | 13 | } |
52 | | |
53 | | Status VRowDistribution::_save_missing_values( |
54 | | const Block& input_block, |
55 | | std::vector<std::vector<std::string>>& col_strs, // non-const ref for move |
56 | | int col_size, Block* block, const std::vector<uint32_t>& filter, |
57 | 2 | const std::vector<const NullMap*>& col_null_maps) { |
58 | | // de-duplication for new partitions but save all rows. |
59 | 2 | RETURN_IF_ERROR( |
60 | 2 | _batching_block->add_rows(&input_block, filter.data(), filter.data() + filter.size())); |
61 | 2 | std::vector<TNullableStringLiteral> cur_row_values; |
62 | 6 | for (int row = 0; row < col_strs[0].size(); ++row) { |
63 | 4 | cur_row_values.clear(); |
64 | 8 | for (int col = 0; col < col_size; ++col) { |
65 | 4 | TNullableStringLiteral node; |
66 | 4 | const auto* null_map = col_null_maps[col]; // null map for this col |
67 | 4 | node.__set_is_null((null_map && (*null_map)[filter[row]]) |
68 | 4 | ? true |
69 | 4 | : node.is_null); // if not, dont change(default false) |
70 | 4 | if (!node.is_null) { |
71 | 4 | node.__set_value(col_strs[col][row]); |
72 | 4 | } |
73 | 4 | cur_row_values.push_back(node); |
74 | 4 | } |
75 | 4 | if (!_deduper.contains(cur_row_values)) { |
76 | 2 | _deduper.insert(cur_row_values); |
77 | 2 | _partitions_need_create.emplace_back(cur_row_values); |
78 | 2 | } |
79 | 4 | } |
80 | | |
81 | | // to avoid too large mem use |
82 | 2 | if (_batching_block->rows() > _batch_size) { |
83 | 0 | _deal_batched = true; |
84 | 0 | } |
85 | 2 | _batching_rows = _batching_block->rows(); |
86 | 2 | VLOG_NOTICE << "pushed some batching lines, now numbers = " << _batching_rows; |
87 | | |
88 | 2 | return Status::OK(); |
89 | 2 | } |
90 | | |
91 | 2 | void VRowDistribution::clear_batching_stats() { |
92 | 2 | _partitions_need_create.clear(); |
93 | 2 | _batching_rows = 0; |
94 | 2 | _batching_bytes = 0; |
95 | 2 | } |
96 | | |
97 | 2 | Status VRowDistribution::automatic_create_partition() { |
98 | 2 | MonotonicStopWatch timer; |
99 | 2 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
100 | 0 | timer.start(); |
101 | 0 | } |
102 | | |
103 | 2 | SCOPED_TIMER(_add_partition_request_timer); |
104 | 2 | TCreatePartitionRequest request; |
105 | 2 | TCreatePartitionResult result; |
106 | 2 | bool injected = false; |
107 | 2 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
108 | 2 | request.__set_txn_id(_txn_id); |
109 | 2 | request.__set_db_id(_vpartition->db_id()); |
110 | 2 | request.__set_table_id(_vpartition->table_id()); |
111 | 2 | request.__set_partitionValues(_partitions_need_create); |
112 | 2 | request.__set_be_endpoint(be_endpoint); |
113 | 2 | request.__set_write_single_replica(_write_single_replica); |
114 | 2 | if (_state && _state->get_query_ctx()) { |
115 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
116 | 2 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
117 | 2 | } |
118 | | |
119 | 2 | DBUG_EXECUTE_IF("VRowDistribution.automatic_create_partition.inject_result", { |
120 | 2 | DBUG_RUN_CALLBACK(&request, &result); |
121 | 2 | injected = true; |
122 | 2 | }); |
123 | | |
124 | 2 | VLOG_NOTICE << "automatic partition rpc begin request " << request; |
125 | 2 | if (!injected) { |
126 | 0 | std::shared_ptr<TNetworkAddress> master_addr; |
127 | 0 | if (_vpartition->get_master_address() == nullptr) { |
128 | 0 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
129 | 0 | if (cluster_info == nullptr) { |
130 | 0 | return Status::InternalError("cluster_info is null"); |
131 | 0 | } |
132 | 0 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
133 | 0 | } else { |
134 | 0 | master_addr = _vpartition->get_master_address(); |
135 | 0 | } |
136 | 0 | int time_out = _state->execution_timeout() * 1000; |
137 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
138 | 0 | master_addr->hostname, master_addr->port, |
139 | 0 | [&request, &result](FrontendServiceConnection& client) { |
140 | 0 | client->createPartition(result, request); |
141 | 0 | }, |
142 | 0 | time_out)); |
143 | 0 | } |
144 | | |
145 | 2 | Status status(Status::create(result.status)); |
146 | 2 | VLOG_NOTICE << "automatic partition rpc end response " << result; |
147 | 2 | if (result.status.status_code == TStatusCode::OK) { |
148 | | // add new created partitions |
149 | 2 | RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); |
150 | 2 | for (const auto& part : result.partitions) { |
151 | 2 | _new_partition_ids.insert(part.id); |
152 | 2 | VLOG_TRACE << "record new id: " << part.id; |
153 | 2 | } |
154 | 2 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); |
155 | 2 | } |
156 | | |
157 | | // Record this request's elapsed time |
158 | 2 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
159 | 0 | int64_t elapsed_ns = timer.elapsed_time(); |
160 | 0 | _add_partition_request_times.push_back(elapsed_ns); |
161 | 0 | } |
162 | 2 | return status; |
163 | 2 | } |
164 | | |
165 | | // for reuse the same create callback of create-partition |
166 | 1 | static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { |
167 | 1 | TCreatePartitionResult result; |
168 | 1 | result.status = arg.status; |
169 | 1 | result.nodes = std::move(arg.nodes); |
170 | 1 | result.partitions = std::move(arg.partitions); |
171 | 1 | result.tablets = std::move(arg.tablets); |
172 | 1 | result.slave_tablets = std::move(arg.slave_tablets); |
173 | 1 | return result; |
174 | 1 | } |
175 | | |
176 | | // use _partitions and replace them |
177 | 2 | Status VRowDistribution::_replace_overwriting_partition() { |
178 | 2 | SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition |
179 | 2 | TReplacePartitionRequest request; |
180 | 2 | TReplacePartitionResult result; |
181 | 2 | bool injected = false; |
182 | 2 | request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); |
183 | 2 | request.__set_db_id(_vpartition->db_id()); |
184 | 2 | request.__set_table_id(_vpartition->table_id()); |
185 | 2 | request.__set_write_single_replica(_write_single_replica); |
186 | | |
187 | | // only request for partitions not recorded for replacement |
188 | 2 | std::set<int64_t> id_deduper; |
189 | 3 | for (const auto* part : _partitions) { |
190 | 3 | if (part != nullptr) { |
191 | 3 | if (_new_partition_ids.contains(part->id)) { |
192 | | // this is a new partition. dont replace again. |
193 | 1 | VLOG_TRACE << "skip new partition: " << part->id; |
194 | 2 | } else { |
195 | | // request for replacement |
196 | 2 | id_deduper.insert(part->id); |
197 | 2 | } |
198 | 3 | } else if (_missing_map.empty()) { |
199 | | // no origin partition. and not allow to create. |
200 | 0 | return Status::InvalidArgument( |
201 | 0 | "Cannot found origin partitions in auto detect overwriting, stop " |
202 | 0 | "processing"); |
203 | 0 | } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here. |
204 | 3 | } |
205 | 2 | if (id_deduper.empty()) { |
206 | 1 | return Status::OK(); // no need to request |
207 | 1 | } |
208 | | // de-duplicate. there's no check in FE |
209 | 1 | std::vector<int64_t> request_part_ids(id_deduper.begin(), id_deduper.end()); |
210 | | |
211 | 1 | request.__set_partition_ids(request_part_ids); |
212 | | |
213 | 1 | std::string be_endpoint = BackendOptions::get_be_endpoint(); |
214 | 1 | request.__set_be_endpoint(be_endpoint); |
215 | 1 | if (_state && _state->get_query_ctx()) { |
216 | | // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator |
217 | 1 | request.__set_query_id(_state->get_query_ctx()->query_id()); |
218 | 1 | } |
219 | | |
220 | 1 | DBUG_EXECUTE_IF("VRowDistribution.replace_overwriting_partition.inject_result", { |
221 | 1 | DBUG_RUN_CALLBACK(&request, &result); |
222 | 1 | injected = true; |
223 | 1 | }); |
224 | | |
225 | 1 | VLOG_NOTICE << "auto detect replace partition request: " << request; |
226 | 1 | if (!injected) { |
227 | 0 | std::shared_ptr<TNetworkAddress> master_addr; |
228 | 0 | if (_vpartition->get_master_address() == nullptr) { |
229 | 0 | auto* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
230 | 0 | if (cluster_info == nullptr) { |
231 | 0 | return Status::InternalError("cluster_info is null"); |
232 | 0 | } |
233 | 0 | master_addr = std::make_shared<TNetworkAddress>(cluster_info->master_fe_addr); |
234 | 0 | } else { |
235 | 0 | master_addr = _vpartition->get_master_address(); |
236 | 0 | } |
237 | 0 | int time_out = _state->execution_timeout() * 1000; |
238 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
239 | 0 | master_addr->hostname, master_addr->port, |
240 | 0 | [&request, &result](FrontendServiceConnection& client) { |
241 | 0 | client->replacePartition(result, request); |
242 | 0 | }, |
243 | 0 | time_out)); |
244 | 0 | } |
245 | | |
246 | 1 | Status status(Status::create(result.status)); |
247 | 1 | VLOG_NOTICE << "auto detect replace partition result: " << result; |
248 | 1 | if (result.status.status_code == TStatusCode::OK) { |
249 | | // record new partitions |
250 | 2 | for (const auto& part : result.partitions) { |
251 | 2 | _new_partition_ids.insert(part.id); |
252 | 2 | VLOG_TRACE << "record new id: " << part.id; |
253 | 2 | } |
254 | | // replace data in _partitions |
255 | 1 | RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); |
256 | | // reuse the function as the args' structure are same. it add nodes/locations and incremental_open |
257 | 1 | auto result_as_create = cast_as_create_result(result); |
258 | 1 | RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); |
259 | 1 | } |
260 | | |
261 | 1 | return status; |
262 | 1 | } |
263 | | |
264 | | void VRowDistribution::_get_tablet_ids(Block* block, int32_t index_idx, |
265 | 9 | std::vector<int64_t>& tablet_ids) { |
266 | 9 | tablet_ids.resize(block->rows()); |
267 | 26 | for (int row_idx = 0; row_idx < block->rows(); row_idx++) { |
268 | 17 | if (_skip[row_idx]) { |
269 | 6 | continue; |
270 | 6 | } |
271 | 11 | auto& partition = _partitions[row_idx]; |
272 | 11 | auto& tablet_index = _tablet_indexes[row_idx]; |
273 | 11 | auto& index = partition->indexes[index_idx]; |
274 | | |
275 | 11 | auto tablet_id = index.tablets[tablet_index]; |
276 | 11 | tablet_ids[row_idx] = tablet_id; |
277 | 11 | } |
278 | 9 | } |
279 | | |
280 | 7 | void VRowDistribution::_filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id) { |
281 | 7 | auto& row_ids = row_part_tablet_id.row_ids; |
282 | 7 | auto& partition_ids = row_part_tablet_id.partition_ids; |
283 | 7 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
284 | | |
285 | 7 | auto rows = block->rows(); |
286 | | // row count of a block should not exceed UINT32_MAX |
287 | 7 | auto rows_uint32 = cast_set<uint32_t>(rows); |
288 | 19 | for (uint32_t i = 0; i < rows_uint32; i++) { |
289 | 12 | if (!_skip[i]) { |
290 | 6 | row_ids.emplace_back(i); |
291 | 6 | partition_ids.emplace_back(_partitions[i]->id); |
292 | 6 | tablet_ids.emplace_back(_tablet_ids[i]); |
293 | 6 | } |
294 | 12 | } |
295 | 7 | } |
296 | | |
297 | | Status VRowDistribution::_filter_block_by_skip_and_where_clause( |
298 | 2 | Block* block, const VExprContextSPtr& where_clause, RowPartTabletIds& row_part_tablet_id) { |
299 | | // TODO |
300 | | //SCOPED_RAW_TIMER(&_stat.where_clause_ns); |
301 | 2 | ColumnPtr filter_column; |
302 | 2 | RETURN_IF_ERROR(where_clause->execute(block, filter_column)); |
303 | | |
304 | 2 | auto& row_ids = row_part_tablet_id.row_ids; |
305 | 2 | auto& partition_ids = row_part_tablet_id.partition_ids; |
306 | 2 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
307 | 2 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
308 | 0 | auto rows = block->rows(); |
309 | | // row count of a block should not exceed UINT32_MAX |
310 | 0 | auto rows_uint32 = cast_set<uint32_t>(rows); |
311 | 0 | for (uint32_t i = 0; i < rows_uint32; i++) { |
312 | 0 | if (nullable_column->get_bool_inline(i) && !_skip[i]) { |
313 | 0 | row_ids.emplace_back(i); |
314 | 0 | partition_ids.emplace_back(_partitions[i]->id); |
315 | 0 | tablet_ids.emplace_back(_tablet_ids[i]); |
316 | 0 | } |
317 | 0 | } |
318 | 2 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
319 | 1 | bool ret = const_column->get_bool(0); |
320 | 1 | if (!ret) { |
321 | 1 | return Status::OK(); |
322 | 1 | } |
323 | | // should we optimize? |
324 | 0 | _filter_block_by_skip(block, row_part_tablet_id); |
325 | 1 | } else { |
326 | 1 | const auto& filter = assert_cast<const ColumnUInt8&>(*filter_column).get_data(); |
327 | 1 | auto rows = block->rows(); |
328 | | // row count of a block should not exceed UINT32_MAX |
329 | 1 | auto rows_uint32 = cast_set<uint32_t>(rows); |
330 | 4 | for (uint32_t i = 0; i < rows_uint32; i++) { |
331 | 3 | if (filter[i] != 0 && !_skip[i]) { |
332 | 2 | row_ids.emplace_back(i); |
333 | 2 | partition_ids.emplace_back(_partitions[i]->id); |
334 | 2 | tablet_ids.emplace_back(_tablet_ids[i]); |
335 | 2 | } |
336 | 3 | } |
337 | 1 | } |
338 | | |
339 | 1 | return Status::OK(); |
340 | 2 | } |
341 | | |
342 | | Status VRowDistribution::_filter_block(Block* block, |
343 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
344 | 18 | for (int i = 0; i < _schema->indexes().size(); i++) { |
345 | 9 | _get_tablet_ids(block, i, _tablet_ids); |
346 | 9 | auto& where_clause = _schema->indexes()[i]->where_clause; |
347 | 9 | if (where_clause != nullptr) { |
348 | 2 | RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, |
349 | 2 | row_part_tablet_ids[i])); |
350 | 7 | } else { |
351 | 7 | _filter_block_by_skip(block, row_part_tablet_ids[i]); |
352 | 7 | } |
353 | 9 | } |
354 | 9 | return Status::OK(); |
355 | 9 | } |
356 | | |
357 | | Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( |
358 | 5 | Block* block, bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids) { |
359 | 5 | int num_rows = cast_set<int>(block->rows()); |
360 | | |
361 | 5 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
362 | 5 | _tablet_indexes, _skip)); |
363 | 5 | if (has_filtered_rows) { |
364 | 0 | for (int i = 0; i < num_rows; i++) { |
365 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
366 | 0 | } |
367 | 0 | } |
368 | 5 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
369 | 5 | return Status::OK(); |
370 | 5 | } |
371 | | |
372 | | Status VRowDistribution::_deal_missing_map(const Block& input_block, Block* block, |
373 | | const std::vector<uint16_t>& partition_cols_idx, |
374 | 2 | int64_t& rows_stat_val) { |
375 | | // for missing partition keys, calc the missing partition and save in _partitions_need_create |
376 | 2 | auto [part_ctxs, part_exprs] = _get_partition_function(); |
377 | 2 | int part_col_num = cast_set<int>(part_exprs.size()); |
378 | | // the two vectors are in column-first-order |
379 | 2 | std::vector<std::vector<std::string>> col_strs; |
380 | 2 | std::vector<const NullMap*> col_null_maps; |
381 | 2 | col_strs.resize(part_col_num); |
382 | 2 | col_null_maps.reserve(part_col_num); |
383 | | |
384 | 2 | auto format_options = DataTypeSerDe::get_default_format_options(); |
385 | 2 | format_options.timezone = &_state->timezone_obj(); |
386 | | |
387 | 4 | for (int i = 0; i < part_col_num; ++i) { |
388 | 2 | auto return_type = part_exprs[i]->data_type(); |
389 | | // expose the data column. the return type would be nullable |
390 | 2 | const auto& [range_left_col, col_const] = |
391 | 2 | unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); |
392 | 2 | if (range_left_col->is_nullable()) { |
393 | 0 | col_null_maps.push_back(&( |
394 | 0 | assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data())); |
395 | 2 | } else { |
396 | 2 | col_null_maps.push_back(nullptr); |
397 | 2 | } |
398 | 4 | for (auto row : _missing_map) { |
399 | 4 | col_strs[i].push_back(return_type->to_string( |
400 | 4 | *range_left_col, index_check_const(row, col_const), format_options)); |
401 | 4 | } |
402 | 2 | } |
403 | | |
404 | | // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. |
405 | | // NOTE: must save old batching stats before calling _save_missing_values(), |
406 | | // because _save_missing_values() will update _batching_rows internally. |
407 | 2 | size_t old_bt_rows = _batching_rows; |
408 | 2 | size_t old_bt_bytes = _batching_bytes; |
409 | | |
410 | 2 | RETURN_IF_ERROR(_save_missing_values(input_block, col_strs, part_col_num, block, _missing_map, |
411 | 2 | col_null_maps)); |
412 | | |
413 | 2 | size_t new_bt_rows = _batching_block->rows(); |
414 | 2 | size_t new_bt_bytes = _batching_block->bytes(); |
415 | 2 | rows_stat_val -= new_bt_rows - old_bt_rows; |
416 | 2 | _state->update_num_rows_load_total(old_bt_rows - new_bt_rows); |
417 | 2 | _state->update_num_bytes_load_total(old_bt_bytes - new_bt_bytes); |
418 | 2 | DorisMetrics::instance()->load_rows->increment(old_bt_rows - new_bt_rows); |
419 | 2 | DorisMetrics::instance()->load_bytes->increment(old_bt_bytes - new_bt_bytes); |
420 | | |
421 | 2 | return Status::OK(); |
422 | 2 | } |
423 | | |
424 | | Status VRowDistribution::_generate_rows_distribution_for_auto_partition( |
425 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
426 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
427 | 2 | int64_t& rows_stat_val) { |
428 | 2 | int num_rows = cast_set<int>(block->rows()); |
429 | 2 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
430 | | |
431 | 2 | auto& partition_col = block->get_by_position(partition_keys[0]); |
432 | 2 | _missing_map.clear(); |
433 | 2 | _missing_map.reserve(partition_col.column->size()); |
434 | | |
435 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
436 | 2 | _tablet_indexes, _skip, &_missing_map)); |
437 | | |
438 | | // the missing vals for auto partition are also skipped. |
439 | 2 | if (has_filtered_rows) { |
440 | 0 | for (int i = 0; i < num_rows; i++) { |
441 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
442 | 0 | } |
443 | 0 | } |
444 | 2 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
445 | | |
446 | 2 | if (!_missing_map.empty()) { |
447 | 2 | RETURN_IF_ERROR(_deal_missing_map(input_block, block, partition_cols_idx, |
448 | 2 | rows_stat_val)); // send input block to save |
449 | 2 | } |
450 | 2 | return Status::OK(); |
451 | 2 | } |
452 | | |
453 | | Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( |
454 | | const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx, |
455 | | bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids, |
456 | 2 | int64_t& rows_stat_val) { |
457 | 2 | int num_rows = cast_set<int>(block->rows()); |
458 | | |
459 | | // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc, |
460 | | // and find the new partitions to use. |
461 | | // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto |
462 | | // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz |
463 | | // we already saved missing values. |
464 | 2 | if (_vpartition->is_auto_partition() && |
465 | 2 | _state->query_options().enable_auto_create_when_overwrite) { |
466 | | // allow auto create partition for missing rows. |
467 | 0 | std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys(); |
468 | 0 | auto partition_col = block->get_by_position(partition_keys[0]); |
469 | 0 | _missing_map.clear(); |
470 | 0 | _missing_map.reserve(partition_col.column->size()); |
471 | |
|
472 | 0 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
473 | 0 | _tablet_indexes, _skip, &_missing_map)); |
474 | | |
475 | | // allow and really need to create during auto-detect-overwriting. |
476 | 0 | if (!_missing_map.empty()) { |
477 | 0 | RETURN_IF_ERROR( |
478 | 0 | _deal_missing_map(input_block, block, partition_cols_idx, rows_stat_val)); |
479 | 0 | } |
480 | 2 | } else { |
481 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
482 | 2 | _tablet_indexes, _skip)); |
483 | 2 | } |
484 | 2 | RETURN_IF_ERROR(_replace_overwriting_partition()); |
485 | | |
486 | | // regenerate locations for new partitions & tablets |
487 | 2 | _reset_find_tablets(num_rows); |
488 | 2 | if (_vpartition->is_auto_partition() && |
489 | 2 | _state->query_options().enable_auto_create_when_overwrite) { |
490 | | // here _missing_map is just a placeholder |
491 | 0 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
492 | 0 | _tablet_indexes, _skip, &_missing_map)); |
493 | 0 | if (VLOG_TRACE_IS_ON) { |
494 | 0 | std::string tmp; |
495 | 0 | for (auto v : _missing_map) { |
496 | 0 | tmp += std::to_string(v).append(", "); |
497 | 0 | } |
498 | 0 | VLOG_TRACE << "Trace missing map of " << this << ':' << tmp; |
499 | 0 | } |
500 | 2 | } else { |
501 | 2 | RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, |
502 | 2 | _tablet_indexes, _skip)); |
503 | 2 | } |
504 | 2 | if (has_filtered_rows) { |
505 | 0 | for (int i = 0; i < num_rows; i++) { |
506 | 0 | _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; |
507 | 0 | } |
508 | 0 | } |
509 | 2 | RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); |
510 | 2 | return Status::OK(); |
511 | 2 | } |
512 | | |
513 | | void VRowDistribution::_reset_row_part_tablet_ids( |
514 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) { |
515 | 9 | row_part_tablet_ids.resize(_schema->indexes().size()); |
516 | 9 | for (auto& row_part_tablet_id : row_part_tablet_ids) { |
517 | 9 | auto& row_ids = row_part_tablet_id.row_ids; |
518 | 9 | auto& partition_ids = row_part_tablet_id.partition_ids; |
519 | 9 | auto& tablet_ids = row_part_tablet_id.tablet_ids; |
520 | | |
521 | 9 | row_ids.clear(); |
522 | 9 | partition_ids.clear(); |
523 | 9 | tablet_ids.clear(); |
524 | | // This is important for performance. |
525 | 9 | row_ids.reserve(rows); |
526 | 9 | partition_ids.reserve(rows); |
527 | 9 | tablet_ids.reserve(rows); |
528 | 9 | } |
529 | 9 | } |
530 | | |
531 | | Status VRowDistribution::generate_rows_distribution( |
532 | | Block& input_block, std::shared_ptr<Block>& block, |
533 | 9 | std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val) { |
534 | 9 | auto input_rows = input_block.rows(); |
535 | 9 | _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); |
536 | | |
537 | | // we store the batching block with value of `input_block`. so just do all of these again. |
538 | 9 | bool has_filtered_rows = false; |
539 | 9 | RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( |
540 | 9 | _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); |
541 | | |
542 | | // batching block rows which need new partitions. deal together at finish. |
543 | 9 | if (!_batching_block) [[unlikely]] { |
544 | 8 | std::unique_ptr<Block> tmp_block = input_block.create_same_struct_block(0); |
545 | 8 | _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); |
546 | 8 | } |
547 | | |
548 | 9 | auto num_rows = block->rows(); |
549 | 9 | _reset_find_tablets(num_rows); |
550 | | |
551 | | // if there's projection of partition calc, we need to calc it first. |
552 | 9 | auto [part_ctxs, part_funcs] = _get_partition_function(); |
553 | 9 | std::vector<uint16_t> partition_cols_idx; |
554 | 9 | if (_vpartition->is_projection_partition()) { |
555 | | // calc the start value of missing partition ranges. |
556 | 2 | auto func_size = part_funcs.size(); |
557 | 4 | for (int i = 0; i < func_size; ++i) { |
558 | 2 | int result_idx = -1; |
559 | | // we just calc left range here. leave right to FE to avoid dup calc. |
560 | 2 | RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx)); |
561 | | |
562 | 2 | VLOG_DEBUG << "Partition-calculated block:\n" << block->dump_data(0, 1); |
563 | 2 | DCHECK(result_idx != -1); |
564 | | |
565 | 2 | partition_cols_idx.push_back(cast_set<uint16_t>(result_idx)); |
566 | 2 | } |
567 | | |
568 | | // change the column to compare to transformed. |
569 | 2 | _vpartition->set_transformed_slots(partition_cols_idx); |
570 | 2 | } |
571 | | |
572 | 9 | Status st = Status::OK(); |
573 | 9 | if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { |
574 | | // when overwrite, no auto create partition allowed. |
575 | 2 | st = _generate_rows_distribution_for_auto_overwrite(input_block, block.get(), |
576 | 2 | partition_cols_idx, has_filtered_rows, |
577 | 2 | row_part_tablet_ids, rows_stat_val); |
578 | 7 | } else if (_vpartition->is_auto_partition() && !_deal_batched) { |
579 | 2 | st = _generate_rows_distribution_for_auto_partition(input_block, block.get(), |
580 | 2 | partition_cols_idx, has_filtered_rows, |
581 | 2 | row_part_tablet_ids, rows_stat_val); |
582 | 5 | } else { // not auto partition |
583 | 5 | st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows, |
584 | 5 | row_part_tablet_ids); |
585 | 5 | } |
586 | | |
587 | 9 | return st; |
588 | 9 | } |
589 | | |
590 | | // reuse vars for find_tablets |
591 | 11 | void VRowDistribution::_reset_find_tablets(int64_t rows) { |
592 | 11 | _tablet_finder->filter_bitmap().Reset(rows); |
593 | 11 | _partitions.assign(rows, nullptr); |
594 | 11 | _skip.assign(rows, false); |
595 | 11 | _tablet_indexes.assign(rows, 0); |
596 | 11 | } |
597 | | |
598 | | } // namespace doris |