be/src/core/block/block.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <glog/logging.h> |
24 | | #include <parallel_hashmap/phmap.h> |
25 | | |
26 | | #include <cstddef> |
27 | | #include <cstdint> |
28 | | #include <initializer_list> |
29 | | #include <list> |
30 | | #include <memory> |
31 | | #include <ostream> |
32 | | #include <set> |
33 | | #include <string> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/be_mock_util.h" |
38 | | #include "common/exception.h" |
39 | | #include "common/factory_creator.h" |
40 | | #include "common/status.h" |
41 | | #include "core/block/column_with_type_and_name.h" |
42 | | #include "core/block/columns_with_type_and_name.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/column/column_nullable.h" |
45 | | #include "core/data_type/data_type.h" |
46 | | #include "core/data_type/data_type_nullable.h" |
47 | | #include "core/types.h" |
48 | | |
49 | | class SipHash; |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | class TupleDescriptor; |
54 | | class PBlock; |
55 | | class SlotDescriptor; |
56 | | |
57 | | namespace segment_v2 { |
58 | | enum CompressionTypePB : int; |
59 | | } // namespace segment_v2 |
60 | | |
61 | | /** Container for set of columns for bunch of rows in memory. |
62 | | * This is unit of data processing. |
63 | | * Also contains metadata - data types of columns and their names |
64 | | * (either original names from a table, or generated names during temporary calculations). |
65 | | * Allows to insert, remove columns in arbitrary position, to change order of columns. |
66 | | */ |
67 | | class MutableBlock; |
68 | | |
69 | | class Block { |
70 | | ENABLE_FACTORY_CREATOR(Block); |
71 | | |
72 | | private: |
73 | | using Container = ColumnsWithTypeAndName; |
74 | | Container data; |
75 | | |
76 | | public: |
77 | 432k | Block() = default; |
78 | | Block(std::initializer_list<ColumnWithTypeAndName> il); |
79 | | Block(ColumnsWithTypeAndName data_); |
80 | | Block(const std::vector<SlotDescriptor*>& slots, size_t block_size); |
81 | | Block(const std::vector<SlotDescriptor>& slots, size_t block_size); |
82 | | |
83 | 815k | MOCK_FUNCTION ~Block() = default; |
84 | 808 | Block(const Block& block) = default; |
85 | 48.2k | Block& operator=(const Block& p) = default; |
86 | 63.6k | Block(Block&& block) = default; |
87 | 868 | Block& operator=(Block&& other) = default; |
88 | | |
89 | | void reserve(size_t count); |
90 | | // Make sure the nammes is useless when use block |
91 | | void clear_names(); |
92 | | |
93 | | /// insert the column at the specified position |
94 | | void insert(size_t position, const ColumnWithTypeAndName& elem); |
95 | | void insert(size_t position, ColumnWithTypeAndName&& elem); |
96 | | /// insert the column to the end |
97 | | void insert(const ColumnWithTypeAndName& elem); |
98 | | void insert(ColumnWithTypeAndName&& elem); |
99 | | /// remove the column at the specified position |
100 | | void erase(size_t position); |
101 | | /// remove the column at the [start, end) |
102 | | void erase_tail(size_t start); |
103 | | /// remove the columns at the specified positions |
104 | | void erase(const std::set<size_t>& positions); |
105 | | // T was std::set<int>, std::vector<int>, std::list<int> |
106 | | template <class T> |
107 | 0 | void erase_not_in(const T& container) { |
108 | 0 | Container new_data; |
109 | 0 | for (auto pos : container) { |
110 | 0 | new_data.emplace_back(std::move(data[pos])); |
111 | 0 | } |
112 | 0 | std::swap(data, new_data); |
113 | 0 | } |
114 | | |
115 | 17 | std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const { |
116 | 17 | std::unordered_map<std::string, uint32_t> name_to_index_map; |
117 | 136 | for (uint32_t i = 0; i < data.size(); ++i) { |
118 | 119 | name_to_index_map[data[i].name] = i; |
119 | 119 | } |
120 | 17 | return name_to_index_map; |
121 | 17 | } |
122 | | |
123 | | /// References are invalidated after calling functions above. |
124 | 17.6M | ColumnWithTypeAndName& get_by_position(size_t position) { |
125 | 17.6M | DCHECK(data.size() > position) |
126 | 0 | << ", data.size()=" << data.size() << ", position=" << position; |
127 | 17.6M | return data[position]; |
128 | 17.6M | } |
129 | 29.1M | const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } |
130 | | |
131 | 32.5k | void replace_by_position(size_t position, ColumnPtr&& res) { |
132 | 32.5k | this->get_by_position(position).column = std::move(res); |
133 | 32.5k | } |
134 | | |
135 | 6 | void replace_by_position(size_t position, const ColumnPtr& res) { |
136 | 6 | this->get_by_position(position).column = res; |
137 | 6 | } |
138 | | |
139 | 721 | void replace_by_position_if_const(size_t position) { |
140 | 721 | auto& element = this->get_by_position(position); |
141 | 721 | element.column = element.column->convert_to_full_column_if_const(); |
142 | 721 | } |
143 | | |
144 | | ColumnWithTypeAndName& safe_get_by_position(size_t position); |
145 | | const ColumnWithTypeAndName& safe_get_by_position(size_t position) const; |
146 | | |
147 | 72.0k | Container::iterator begin() { return data.begin(); } |
148 | 72.0k | Container::iterator end() { return data.end(); } |
149 | 9.64k | Container::const_iterator begin() const { return data.begin(); } |
150 | 9.64k | Container::const_iterator end() const { return data.end(); } |
151 | 0 | Container::const_iterator cbegin() const { return data.cbegin(); } |
152 | 0 | Container::const_iterator cend() const { return data.cend(); } |
153 | | |
154 | | // Get position of column by name. Returns -1 if there is no column with that name. |
155 | | // ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently. |
156 | | int get_position_by_name(const std::string& name) const; |
157 | | |
158 | | const ColumnsWithTypeAndName& get_columns_with_type_and_name() const; |
159 | | |
160 | | std::vector<std::string> get_names() const; |
161 | | DataTypes get_data_types() const; |
162 | | |
163 | 60 | DataTypePtr get_data_type(size_t index) const { |
164 | 60 | CHECK(index < data.size()); |
165 | 60 | return data[index].type; |
166 | 60 | } |
167 | | |
168 | | /// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0. |
169 | | size_t rows() const; |
170 | | |
171 | | // Cut the rows in block, use in LIMIT operation |
172 | | void set_num_rows(size_t length); |
173 | | |
174 | | // Skip the rows in block, use in OFFSET, LIMIT operation |
175 | | void skip_num_rows(int64_t& offset); |
176 | | |
177 | | /// As the assumption we used around, the number of columns won't exceed int16 range. so no need to worry when we |
178 | | /// assign it to int32. |
179 | 18.2M | uint32_t columns() const { return static_cast<uint32_t>(data.size()); } |
180 | | |
181 | | /// Checks that every column in block is not nullptr and has same number of elements. |
182 | | void check_number_of_rows(bool allow_null_columns = false) const; |
183 | | |
184 | | Status check_type_and_column() const; |
185 | | |
186 | | Status check_column_and_type_not_null() const; |
187 | | |
188 | | /// Approximate number of bytes used by column data in memory. |
189 | | /// This reflects the actual data footprint (e.g. string contents, numeric arrays) |
190 | | /// and is the metric used by adaptive batch size byte budgets. |
191 | | size_t bytes() const; |
192 | | |
193 | | /// Approximate number of allocated (reserved) bytes in memory. |
194 | | /// This may be larger than bytes() due to pre-allocated capacity in vectors/arenas. |
195 | | /// Used for memory tracking and profiling. |
196 | | MOCK_FUNCTION size_t allocated_bytes() const; |
197 | | |
198 | | /** Get a list of column names separated by commas. */ |
199 | | std::string dump_names() const; |
200 | | |
201 | | std::string dump_types() const; |
202 | | |
203 | | /** List of names, types and lengths of columns. Designed for debugging. */ |
204 | | std::string dump_structure() const; |
205 | | |
206 | | /** Get the same block, but empty. */ |
207 | | Block clone_empty() const; |
208 | | |
209 | | Columns get_columns() const; |
210 | | Columns get_columns_and_convert(); |
211 | | |
212 | | Block clone_without_columns(const std::vector<int>* column_offset = nullptr) const; |
213 | | |
214 | | /** Get empty columns with the same types as in block. */ |
215 | | MutableColumns clone_empty_columns() const; |
216 | | |
217 | | // RAII owner for mutating columns borrowed from a live Block. While the |
218 | | // guard is alive, the Block's column slots are moved out and column data |
219 | | // must be accessed through mutable_columns(). The guard restores columns on |
220 | | // destruction, so use it when the caller may exit early after detaching. |
221 | | class ScopedMutableColumns { |
222 | | public: |
223 | | explicit ScopedMutableColumns(Block& block); |
224 | | ~ScopedMutableColumns(); |
225 | | |
226 | | ScopedMutableColumns(const ScopedMutableColumns&) = delete; |
227 | | ScopedMutableColumns& operator=(const ScopedMutableColumns&) = delete; |
228 | | ScopedMutableColumns(ScopedMutableColumns&& other) noexcept; |
229 | | ScopedMutableColumns& operator=(ScopedMutableColumns&& other) noexcept; |
230 | | |
231 | 1.66k | MutableColumns& mutable_columns() { return _columns; } |
232 | 0 | const MutableColumns& mutable_columns() const { return _columns; } |
233 | | const DataTypePtr& get_datatype_by_position(size_t position) const; |
234 | | const std::string& get_name_by_position(size_t position) const; |
235 | | |
236 | | // Transfer the borrowed owners to another RAII object that will restore |
237 | | // them. After release(), the original Block remains without columns |
238 | | // until that owner restores them. Normal callers should let this guard |
239 | | // restore on destruction. |
240 | | MutableColumns release(); |
241 | | void restore(); |
242 | | |
243 | | private: |
244 | | Block* _block = nullptr; |
245 | | MutableColumns _columns; |
246 | | }; |
247 | | |
248 | | // Single-column variant for localized mutation of a live Block slot. The |
249 | | // selected slot is unavailable from the Block until this guard restores it. |
250 | | class ScopedMutableColumn { |
251 | | public: |
252 | | ScopedMutableColumn(Block& block, size_t position); |
253 | | ~ScopedMutableColumn(); |
254 | | |
255 | | ScopedMutableColumn(const ScopedMutableColumn&) = delete; |
256 | | ScopedMutableColumn& operator=(const ScopedMutableColumn&) = delete; |
257 | | ScopedMutableColumn(ScopedMutableColumn&& other) noexcept; |
258 | | ScopedMutableColumn& operator=(ScopedMutableColumn&& other) noexcept; |
259 | | |
260 | 97 | MutableColumnPtr& mutable_column() { return _column; } |
261 | 0 | const MutableColumnPtr& mutable_column() const { return _column; } |
262 | | |
263 | | void restore(); |
264 | | |
265 | | private: |
266 | | Block* _block = nullptr; |
267 | | size_t _position = 0; |
268 | | MutableColumnPtr _column; |
269 | | }; |
270 | | |
271 | | /** Get columns from a consumed block for mutation. Columns in block will be nullptr. */ |
272 | | MutableColumns mutate_columns() &&; |
273 | | MutableColumns mutate_columns() & = delete; |
274 | | |
275 | | /** Temporarily mutate a live Block's columns. The returned guard owns the columns and |
276 | | * restores them on destruction; prefer this over manual move/writeback. |
277 | | */ |
278 | | ScopedMutableColumns mutate_columns_scoped() &; |
279 | | ScopedMutableColumns mutate_columns_scoped() && = delete; |
280 | | |
281 | | /** Temporarily mutate one live Block column; use when only one slot needs ownership. */ |
282 | | ScopedMutableColumn mutate_column_scoped(size_t position) &; |
283 | | ScopedMutableColumn mutate_column_scoped(size_t position) && = delete; |
284 | | |
285 | | /** Replace columns in a block */ |
286 | | void set_columns(MutableColumns&& columns); |
287 | | void clear(); |
288 | | void swap(Block& other) noexcept; |
289 | | void swap(Block&& other) noexcept; |
290 | | |
291 | | // Shuffle columns in place based on the result_column_ids |
292 | | void shuffle_columns(const std::vector<int>& result_column_ids); |
293 | | |
294 | | // column_size == -1 clears all columns; otherwise clear [0, column_size) |
295 | | // and drop the rest. Shared columns are detached through clone_empty(), so |
296 | | // allocation or clone failures propagate. |
297 | | void clear_column_data(int64_t column_size = -1); |
298 | | void clear_column_data(const std::vector<uint32_t>& columns_to_clear); |
299 | | |
300 | 15.8k | MOCK_FUNCTION bool mem_reuse() { return !data.empty(); } |
301 | | |
302 | 12 | bool is_empty_column() { return data.empty(); } |
303 | | |
304 | 3.23M | bool empty() const { return rows() == 0; } |
305 | | |
306 | | /** |
307 | | * Updates SipHash of the Block, using update method of columns. |
308 | | * Returns hash for block, that could be used to differentiate blocks |
309 | | * with same structure, but different data. |
310 | | */ |
311 | | void update_hash(SipHash& hash) const; |
312 | | |
313 | | /** |
314 | | * Get block data in string. |
315 | | * If code is in default_implementation_for_nulls or something likely, type and column's nullity could |
316 | | * temporarily be not same. set allow_null_mismatch to true to dump it correctly. |
317 | | */ |
318 | | std::string dump_data(size_t begin = 0, size_t row_limit = 100, |
319 | | bool allow_null_mismatch = false) const; |
320 | | |
321 | | std::string dump_data_json(size_t begin = 0, size_t row_limit = 100, |
322 | | bool allow_null_mismatch = false) const; |
323 | | |
324 | | /** Get one line data from block, only use in load data */ |
325 | | std::string dump_one_line(size_t row, int column_end) const; |
326 | | |
327 | | Status append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; |
328 | | |
329 | | // need exception safety |
330 | | static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
331 | | const IColumn::Filter& filter); |
332 | | // need exception safety |
333 | | static void filter_block_internal(Block* block, const IColumn::Filter& filter, |
334 | | uint32_t column_to_keep); |
335 | | // need exception safety |
336 | | static void filter_block_internal(Block* block, const IColumn::Filter& filter); |
337 | | |
338 | | static Status filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
339 | | size_t filter_column_id, size_t column_to_keep); |
340 | | |
341 | | static Status filter_block(Block* block, size_t filter_column_id, size_t column_to_keep); |
342 | | |
343 | 717 | static void erase_useless_column(Block* block, size_t column_to_keep) { |
344 | 717 | block->erase_tail(column_to_keep); |
345 | 717 | } |
346 | | |
347 | | // serialize block to PBlock |
348 | | Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes, |
349 | | size_t* compressed_bytes, int64_t* compress_time, |
350 | | segment_v2::CompressionTypePB compression_type, |
351 | | bool allow_transfer_large_data = false) const; |
352 | | |
353 | | Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, int64_t* decompress_time); |
354 | | |
355 | | std::unique_ptr<Block> create_same_struct_block(size_t size, bool is_reserve = false) const; |
356 | | |
357 | | /** Compares (*this) n-th row and rhs m-th row. |
358 | | * Returns negative number, 0, or positive number (*this) n-th row is less, equal, greater than rhs m-th row respectively. |
359 | | * Is used in sortings. |
360 | | * |
361 | | * If one of element's value is NaN or NULLs, then: |
362 | | * - if nan_direction_hint == -1, NaN and NULLs are considered as least than everything other; |
363 | | * - if nan_direction_hint == 1, NaN and NULLs are considered as greatest than everything other. |
364 | | * For example, if nan_direction_hint == -1 is used by descending sorting, NaNs will be at the end. |
365 | | * |
366 | | * For non Nullable and non floating point types, nan_direction_hint is ignored. |
367 | | */ |
368 | 3 | int compare_at(size_t n, size_t m, const Block& rhs, int nan_direction_hint) const { |
369 | 3 | DCHECK_EQ(columns(), rhs.columns()); |
370 | 3 | return compare_at(n, m, columns(), rhs, nan_direction_hint); |
371 | 3 | } |
372 | | |
373 | | int compare_at(size_t n, size_t m, size_t num_columns, const Block& rhs, |
374 | 7.15M | int nan_direction_hint) const { |
375 | 7.15M | DCHECK_GE(columns(), num_columns); |
376 | 7.15M | DCHECK_GE(rhs.columns(), num_columns); |
377 | | |
378 | 7.15M | DCHECK_LE(n, rows()); |
379 | 7.15M | DCHECK_LE(m, rhs.rows()); |
380 | 9.94M | for (size_t i = 0; i < num_columns; ++i) { |
381 | 7.19M | DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type)); |
382 | 7.19M | auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column), |
383 | 7.19M | nan_direction_hint); |
384 | 7.19M | if (res) { |
385 | 4.41M | return res; |
386 | 4.41M | } |
387 | 7.19M | } |
388 | 2.74M | return 0; |
389 | 7.15M | } |
390 | | |
391 | | int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns, |
392 | 2 | const Block& rhs, int nan_direction_hint) const { |
393 | 2 | DCHECK_GE(columns(), compare_columns->size()); |
394 | 2 | DCHECK_GE(rhs.columns(), compare_columns->size()); |
395 | | |
396 | 2 | DCHECK_LE(n, rows()); |
397 | 2 | DCHECK_LE(m, rhs.rows()); |
398 | 3 | for (auto i : *compare_columns) { |
399 | 3 | DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type)); |
400 | 3 | auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column), |
401 | 3 | nan_direction_hint); |
402 | 3 | if (res) { |
403 | 2 | return res; |
404 | 2 | } |
405 | 3 | } |
406 | 0 | return 0; |
407 | 2 | } |
408 | | |
409 | | //note(wb) no DCHECK here, because this method is only used after compare_at now, so no need to repeat check here. |
410 | | // If this method is used in more places, you can add DCHECK case by case. |
411 | | int compare_column_at(size_t n, size_t m, size_t col_idx, const Block& rhs, |
412 | 26.7k | int nan_direction_hint) const { |
413 | 26.7k | auto res = get_by_position(col_idx).column->compare_at( |
414 | 26.7k | n, m, *(rhs.get_by_position(col_idx).column), nan_direction_hint); |
415 | 26.7k | return res; |
416 | 26.7k | } |
417 | | |
418 | | void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
419 | | bool need_keep_first); |
420 | | |
421 | | // Helper: sum byte_size() of all mutable columns. |
422 | | // Unlike Block::bytes() which operates on immutable ColumnPtr, |
423 | | // this works on MutableColumns during block construction (e.g. in BlockReader). |
424 | 300k | static inline size_t columns_byte_size(const MutableColumns& cols) { |
425 | 300k | size_t total = 0; |
426 | 605k | for (const auto& col : cols) { |
427 | 605k | total += col->byte_size(); |
428 | 605k | } |
429 | 300k | return total; |
430 | 300k | } |
431 | | |
432 | | private: |
433 | | void erase_impl(size_t position); |
434 | | }; |
435 | | |
436 | | using Blocks = std::vector<Block>; |
437 | | using BlocksList = std::list<Block>; |
438 | | using BlocksPtr = std::shared_ptr<Blocks>; |
439 | | using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>; |
440 | | |
441 | | class MutableBlock { |
442 | | ENABLE_FACTORY_CREATOR(MutableBlock); |
443 | | |
444 | | private: |
445 | | MutableColumns _columns; |
446 | | DataTypes _data_types; |
447 | | std::vector<std::string> _names; |
448 | | |
449 | | public: |
450 | | // Build from a consumed Block. This has no restore contract: the source |
451 | | // Block is left without columns and must not be used as a live output block. |
452 | | // For caller-owned live Blocks, use ScopedMutableBlock or |
453 | | // mutate_columns_scoped() instead. |
454 | 48.0k | static MutableBlock build_mutable_block(Block&& block) { |
455 | 48.0k | return MutableBlock(std::move(block)); |
456 | 48.0k | } |
457 | 1 | static MutableBlock build_mutable_block(std::nullptr_t) { return MutableBlock(); } |
458 | | static MutableBlock build_mutable_block(Block* block) = delete; |
459 | 73.0k | MutableBlock() = default; |
460 | 217k | ~MutableBlock() = default; |
461 | | MutableBlock(const MutableBlock&) = delete; |
462 | | MutableBlock& operator=(const MutableBlock&) = delete; |
463 | | MutableBlock(MutableBlock&& m_block) noexcept |
464 | | : _columns(std::move(m_block._columns)), |
465 | | _data_types(std::move(m_block._data_types)), |
466 | 0 | _names(std::move(m_block._names)) {} |
467 | | |
468 | | // Consumes block columns and converts them to mutable columns recursively. |
469 | | // This constructor is for temporary/owned Blocks only. |
470 | | MutableBlock(Block&& block) |
471 | 144k | : _columns(std::move(block).mutate_columns()), |
472 | 144k | _data_types(block.get_data_types()), |
473 | 144k | _names(block.get_names()) {} |
474 | | |
475 | 96.0k | MutableBlock& operator=(MutableBlock&& m_block) noexcept { |
476 | 96.0k | _columns = std::move(m_block._columns); |
477 | 96.0k | _data_types = std::move(m_block._data_types); |
478 | 96.0k | _names = std::move(m_block._names); |
479 | 96.0k | return *this; |
480 | 96.0k | } |
481 | | |
482 | | size_t rows() const; |
483 | 446 | size_t columns() const { return _columns.size(); } |
484 | | |
485 | 144k | bool empty() const { return rows() == 0; } |
486 | | |
487 | 49.4k | MutableColumns& mutable_columns() { return _columns; } |
488 | 0 | const MutableColumns& mutable_columns() const { return _columns; } |
489 | | |
490 | 817 | void set_mutable_columns(MutableColumns&& columns) { _columns = std::move(columns); } |
491 | | |
492 | 816 | DataTypes& data_types() { return _data_types; } |
493 | | |
494 | 158 | MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; } |
495 | 64 | const MutableColumnPtr& get_column_by_position(size_t position) const { |
496 | 64 | return _columns[position]; |
497 | 64 | } |
498 | | |
499 | 9 | DataTypePtr& get_datatype_by_position(size_t position) { return _data_types[position]; } |
500 | 26 | const DataTypePtr& get_datatype_by_position(size_t position) const { |
501 | 26 | return _data_types[position]; |
502 | 26 | } |
503 | | |
504 | 38 | int compare_one_column(size_t n, size_t m, size_t column_id, int nan_direction_hint) const { |
505 | 38 | DCHECK_LE(column_id, columns()); |
506 | 38 | DCHECK_LE(n, rows()); |
507 | 38 | DCHECK_LE(m, rows()); |
508 | 38 | auto& column = get_column_by_position(column_id); |
509 | 38 | return column->compare_at(n, m, *column, nan_direction_hint); |
510 | 38 | } |
511 | | |
512 | | int compare_at(size_t n, size_t m, size_t num_columns, const MutableBlock& rhs, |
513 | 6 | int nan_direction_hint) const { |
514 | 6 | DCHECK_GE(columns(), num_columns); |
515 | 6 | DCHECK_GE(rhs.columns(), num_columns); |
516 | | |
517 | 6 | DCHECK_LE(n, rows()); |
518 | 6 | DCHECK_LE(m, rhs.rows()); |
519 | 14 | for (size_t i = 0; i < num_columns; ++i) { |
520 | 11 | DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i))); |
521 | 11 | auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)), |
522 | 11 | nan_direction_hint); |
523 | 11 | if (res) { |
524 | 3 | return res; |
525 | 3 | } |
526 | 11 | } |
527 | 3 | return 0; |
528 | 6 | } |
529 | | |
530 | | int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns, |
531 | 0 | const MutableBlock& rhs, int nan_direction_hint) const { |
532 | 0 | DCHECK_GE(columns(), compare_columns->size()); |
533 | 0 | DCHECK_GE(rhs.columns(), compare_columns->size()); |
534 | |
|
535 | 0 | DCHECK_LE(n, rows()); |
536 | 0 | DCHECK_LE(m, rhs.rows()); |
537 | 0 | for (auto i : *compare_columns) { |
538 | 0 | DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i))); |
539 | 0 | auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)), |
540 | 0 | nan_direction_hint); |
541 | 0 | if (res) { |
542 | 0 | return res; |
543 | 0 | } |
544 | 0 | } |
545 | 0 | return 0; |
546 | 0 | } |
547 | | |
548 | 4 | std::string dump_types() const { |
549 | 4 | std::string res; |
550 | 10 | for (auto type : _data_types) { |
551 | 10 | if (!res.empty()) { |
552 | 6 | res += ", "; |
553 | 6 | } |
554 | 10 | res += type->get_name(); |
555 | 10 | } |
556 | 4 | return res; |
557 | 4 | } |
558 | | |
559 | | template <typename T> |
560 | 135 | [[nodiscard]] Status merge(T&& block) { |
561 | 135 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); |
562 | 135 | } _ZN5doris12MutableBlock5mergeIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 560 | 57 | [[nodiscard]] Status merge(T&& block) { | 561 | 57 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); | 562 | 57 | } |
_ZN5doris12MutableBlock5mergeINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 560 | 78 | [[nodiscard]] Status merge(T&& block) { | 561 | 78 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); | 562 | 78 | } |
|
563 | | |
564 | | template <typename T> |
565 | 48.0k | [[nodiscard]] Status merge_ignore_overflow(T&& block) { |
566 | 48.0k | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); |
567 | 48.0k | } _ZN5doris12MutableBlock21merge_ignore_overflowIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 565 | 1 | [[nodiscard]] Status merge_ignore_overflow(T&& block) { | 566 | 1 | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); | 567 | 1 | } |
_ZN5doris12MutableBlock21merge_ignore_overflowINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 565 | 48.0k | [[nodiscard]] Status merge_ignore_overflow(T&& block) { | 566 | 48.0k | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); | 567 | 48.0k | } |
|
568 | | |
569 | | // only use for join. call ignore_overflow to prevent from throw exception in join |
570 | | template <typename T> |
571 | 48.0k | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { |
572 | 48.0k | if (_columns.size() != block.columns()) { |
573 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
574 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " |
575 | 1 | "input column count: {}, [columns: {}, " |
576 | 1 | "types: {}], ", |
577 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), |
578 | 1 | block.dump_names(), block.dump_types()); |
579 | 1 | } |
580 | 142k | for (int i = 0; i < _columns.size(); ++i) { |
581 | 94.2k | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { |
582 | 1 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, |
583 | 1 | "Merge block not match, self:[columns: {}, types: {}], " |
584 | 1 | "input:[columns: {}, types: {}], ", |
585 | 1 | dump_names(), dump_types(), block.dump_names(), |
586 | 1 | block.dump_types()); |
587 | 1 | } |
588 | 94.2k | _columns[i]->insert_range_from_ignore_overflow( |
589 | 94.2k | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, |
590 | 94.2k | block.rows()); |
591 | 94.2k | } |
592 | 48.0k | return Status::OK(); |
593 | 48.0k | } _ZN5doris12MutableBlock26merge_impl_ignore_overflowIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 571 | 48.0k | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { | 572 | 48.0k | if (_columns.size() != block.columns()) { | 573 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 574 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 575 | 1 | "input column count: {}, [columns: {}, " | 576 | 1 | "types: {}], ", | 577 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 578 | 1 | block.dump_names(), block.dump_types()); | 579 | 1 | } | 580 | 142k | for (int i = 0; i < _columns.size(); ++i) { | 581 | 94.2k | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 582 | 0 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, | 583 | 0 | "Merge block not match, self:[columns: {}, types: {}], " | 584 | 0 | "input:[columns: {}, types: {}], ", | 585 | 0 | dump_names(), dump_types(), block.dump_names(), | 586 | 0 | block.dump_types()); | 587 | 0 | } | 588 | 94.2k | _columns[i]->insert_range_from_ignore_overflow( | 589 | 94.2k | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, | 590 | 94.2k | block.rows()); | 591 | 94.2k | } | 592 | 48.0k | return Status::OK(); | 593 | 48.0k | } |
_ZN5doris12MutableBlock26merge_impl_ignore_overflowINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 571 | 1 | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { | 572 | 1 | if (_columns.size() != block.columns()) { | 573 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 574 | 0 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 575 | 0 | "input column count: {}, [columns: {}, " | 576 | 0 | "types: {}], ", | 577 | 0 | _columns.size(), dump_names(), dump_types(), block.columns(), | 578 | 0 | block.dump_names(), block.dump_types()); | 579 | 0 | } | 580 | 3 | for (int i = 0; i < _columns.size(); ++i) { | 581 | 3 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 582 | 1 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, | 583 | 1 | "Merge block not match, self:[columns: {}, types: {}], " | 584 | 1 | "input:[columns: {}, types: {}], ", | 585 | 1 | dump_names(), dump_types(), block.dump_names(), | 586 | 1 | block.dump_types()); | 587 | 1 | } | 588 | 2 | _columns[i]->insert_range_from_ignore_overflow( | 589 | 2 | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, | 590 | 2 | block.rows()); | 591 | 2 | } | 592 | 0 | return Status::OK(); | 593 | 1 | } |
|
594 | | |
595 | | template <typename T> |
596 | 137 | [[nodiscard]] Status merge_impl(T&& block) { |
597 | | // merge is not supported in dynamic block |
598 | 137 | if (_columns.empty() && _data_types.empty()) { |
599 | 40 | _data_types = block.get_data_types(); |
600 | 40 | _names = block.get_names(); |
601 | 40 | _columns.resize(block.columns()); |
602 | 139 | for (size_t i = 0; i < block.columns(); ++i) { |
603 | 99 | if (block.get_by_position(i).column) { |
604 | 98 | _columns[i] = (*std::move(block.get_by_position(i) |
605 | 98 | .column->convert_to_full_column_if_const())) |
606 | 98 | .mutate(); |
607 | 98 | } else { |
608 | 1 | _columns[i] = _data_types[i]->create_column(); |
609 | 1 | } |
610 | 99 | } |
611 | 97 | } else { |
612 | 97 | if (_columns.size() != block.columns()) { |
613 | 2 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
614 | 2 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " |
615 | 2 | "input column count: {}, [columns: {}, " |
616 | 2 | "types: {}], ", |
617 | 2 | _columns.size(), dump_names(), dump_types(), block.columns(), |
618 | 2 | block.dump_names(), block.dump_types()); |
619 | 2 | } |
620 | 250 | for (int i = 0; i < _columns.size(); ++i) { |
621 | 155 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { |
622 | 1 | DCHECK(_data_types[i]->is_nullable()) |
623 | 0 | << " target type: " << _data_types[i]->get_name() |
624 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); |
625 | 1 | DCHECK(((DataTypeNullable*)_data_types[i].get()) |
626 | 1 | ->get_nested_type() |
627 | 1 | ->equals(*block.get_by_position(i).type)); |
628 | 1 | DCHECK(!block.get_by_position(i).type->is_nullable()); |
629 | 1 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) |
630 | 1 | ->convert_to_full_column_if_const(), |
631 | 1 | 0, block.rows()); |
632 | 154 | } else { |
633 | 154 | _columns[i]->insert_range_from( |
634 | 154 | *block.get_by_position(i) |
635 | 154 | .column->convert_to_full_column_if_const() |
636 | 154 | .get(), |
637 | 154 | 0, block.rows()); |
638 | 154 | } |
639 | 155 | } |
640 | 95 | } |
641 | 135 | return Status::OK(); |
642 | 137 | } _ZN5doris12MutableBlock10merge_implIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 596 | 135 | [[nodiscard]] Status merge_impl(T&& block) { | 597 | | // merge is not supported in dynamic block | 598 | 135 | if (_columns.empty() && _data_types.empty()) { | 599 | 40 | _data_types = block.get_data_types(); | 600 | 40 | _names = block.get_names(); | 601 | 40 | _columns.resize(block.columns()); | 602 | 139 | for (size_t i = 0; i < block.columns(); ++i) { | 603 | 99 | if (block.get_by_position(i).column) { | 604 | 98 | _columns[i] = (*std::move(block.get_by_position(i) | 605 | 98 | .column->convert_to_full_column_if_const())) | 606 | 98 | .mutate(); | 607 | 98 | } else { | 608 | 1 | _columns[i] = _data_types[i]->create_column(); | 609 | 1 | } | 610 | 99 | } | 611 | 95 | } else { | 612 | 95 | if (_columns.size() != block.columns()) { | 613 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 614 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 615 | 1 | "input column count: {}, [columns: {}, " | 616 | 1 | "types: {}], ", | 617 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 618 | 1 | block.dump_names(), block.dump_types()); | 619 | 1 | } | 620 | 246 | for (int i = 0; i < _columns.size(); ++i) { | 621 | 152 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 622 | 0 | DCHECK(_data_types[i]->is_nullable()) | 623 | 0 | << " target type: " << _data_types[i]->get_name() | 624 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); | 625 | 0 | DCHECK(((DataTypeNullable*)_data_types[i].get()) | 626 | 0 | ->get_nested_type() | 627 | 0 | ->equals(*block.get_by_position(i).type)); | 628 | 0 | DCHECK(!block.get_by_position(i).type->is_nullable()); | 629 | 0 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) | 630 | 0 | ->convert_to_full_column_if_const(), | 631 | 0 | 0, block.rows()); | 632 | 152 | } else { | 633 | 152 | _columns[i]->insert_range_from( | 634 | 152 | *block.get_by_position(i) | 635 | 152 | .column->convert_to_full_column_if_const() | 636 | 152 | .get(), | 637 | 152 | 0, block.rows()); | 638 | 152 | } | 639 | 152 | } | 640 | 94 | } | 641 | 134 | return Status::OK(); | 642 | 135 | } |
_ZN5doris12MutableBlock10merge_implINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 596 | 2 | [[nodiscard]] Status merge_impl(T&& block) { | 597 | | // merge is not supported in dynamic block | 598 | 2 | if (_columns.empty() && _data_types.empty()) { | 599 | 0 | _data_types = block.get_data_types(); | 600 | 0 | _names = block.get_names(); | 601 | 0 | _columns.resize(block.columns()); | 602 | 0 | for (size_t i = 0; i < block.columns(); ++i) { | 603 | 0 | if (block.get_by_position(i).column) { | 604 | 0 | _columns[i] = (*std::move(block.get_by_position(i) | 605 | 0 | .column->convert_to_full_column_if_const())) | 606 | 0 | .mutate(); | 607 | 0 | } else { | 608 | 0 | _columns[i] = _data_types[i]->create_column(); | 609 | 0 | } | 610 | 0 | } | 611 | 2 | } else { | 612 | 2 | if (_columns.size() != block.columns()) { | 613 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 614 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 615 | 1 | "input column count: {}, [columns: {}, " | 616 | 1 | "types: {}], ", | 617 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 618 | 1 | block.dump_names(), block.dump_types()); | 619 | 1 | } | 620 | 4 | for (int i = 0; i < _columns.size(); ++i) { | 621 | 3 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 622 | 1 | DCHECK(_data_types[i]->is_nullable()) | 623 | 0 | << " target type: " << _data_types[i]->get_name() | 624 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); | 625 | 1 | DCHECK(((DataTypeNullable*)_data_types[i].get()) | 626 | 1 | ->get_nested_type() | 627 | 1 | ->equals(*block.get_by_position(i).type)); | 628 | 1 | DCHECK(!block.get_by_position(i).type->is_nullable()); | 629 | 1 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) | 630 | 1 | ->convert_to_full_column_if_const(), | 631 | 1 | 0, block.rows()); | 632 | 2 | } else { | 633 | 2 | _columns[i]->insert_range_from( | 634 | 2 | *block.get_by_position(i) | 635 | 2 | .column->convert_to_full_column_if_const() | 636 | 2 | .get(), | 637 | 2 | 0, block.rows()); | 638 | 2 | } | 639 | 3 | } | 640 | 1 | } | 641 | 1 | return Status::OK(); | 642 | 2 | } |
|
643 | | |
644 | | // move to columns' data to a Block. this will invalidate |
645 | | Block to_block(int start_column = 0); |
646 | | Block to_block(int start_column, int end_column); |
647 | | |
648 | | void swap(MutableBlock& other) noexcept; |
649 | | |
650 | | void add_row(const Block* block, int row); |
651 | | // Batch add row should return error status if allocate memory failed. |
652 | | Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end, |
653 | | const std::vector<int>* column_offset = nullptr); |
654 | | Status add_rows(const Block* block, size_t row_begin, size_t length); |
655 | | |
656 | | std::string dump_data(size_t row_limit = 100) const; |
657 | | std::string dump_data_json(size_t row_limit = 100) const; |
658 | | |
659 | 66 | void clear() { |
660 | 66 | _columns.clear(); |
661 | 66 | _data_types.clear(); |
662 | 66 | _names.clear(); |
663 | 66 | } |
664 | | |
665 | | // Clear owned mutable columns in place. MutableBlock already owns its |
666 | | // columns exclusively, so this does not perform COW detaching or cloning. |
667 | | void clear_column_data() noexcept; |
668 | | |
669 | | size_t allocated_bytes() const; |
670 | | |
671 | 48.0k | size_t bytes() const { |
672 | 48.0k | size_t res = 0; |
673 | 94.0k | for (const auto& elem : _columns) { |
674 | 94.0k | res += elem->byte_size(); |
675 | 94.0k | } |
676 | | |
677 | 48.0k | return res; |
678 | 48.0k | } |
679 | | |
680 | 816 | std::vector<std::string>& get_names() { return _names; } |
681 | | |
682 | | /** Get a list of column names separated by commas. */ |
683 | | std::string dump_names() const; |
684 | | }; |
685 | | |
686 | | // RAII adapter for code that wants the MutableBlock API over a live Block. It |
687 | | // owns only the temporary mutable columns and restores them to the Block on |
688 | | // destruction. While the adapter is alive, read/write column data through |
689 | | // mutable_block()/mutable_columns(); the Block's column slots are moved out. |
690 | | class ScopedMutableBlock { |
691 | | public: |
692 | | ScopedMutableBlock() = delete; |
693 | | explicit ScopedMutableBlock(Block* block); |
694 | 816 | ~ScopedMutableBlock() { restore(); } |
695 | | |
696 | | ScopedMutableBlock(const ScopedMutableBlock&) = delete; |
697 | | ScopedMutableBlock& operator=(const ScopedMutableBlock&) = delete; |
698 | | |
699 | | ScopedMutableBlock(ScopedMutableBlock&& other) noexcept |
700 | | : _block(std::exchange(other._block, nullptr)), |
701 | 0 | _mutable_block(std::move(other._mutable_block)) {} |
702 | | |
703 | 0 | ScopedMutableBlock& operator=(ScopedMutableBlock&& other) noexcept { |
704 | 0 | if (this != &other) { |
705 | 0 | restore(); |
706 | 0 | _block = std::exchange(other._block, nullptr); |
707 | 0 | _mutable_block = std::move(other._mutable_block); |
708 | 0 | } |
709 | 0 | return *this; |
710 | 0 | } |
711 | | |
712 | 815 | MutableBlock& mutable_block() { return _mutable_block; } |
713 | 0 | const MutableBlock& mutable_block() const { return _mutable_block; } |
714 | 1 | MutableColumns& mutable_columns() { return _mutable_block.mutable_columns(); } |
715 | 0 | const MutableColumns& mutable_columns() const { return _mutable_block.mutable_columns(); } |
716 | | |
717 | 939 | void restore() { |
718 | 939 | if (_block != nullptr) { |
719 | 816 | _block->set_columns(std::move(_mutable_block.mutable_columns())); |
720 | 816 | _block = nullptr; |
721 | 816 | } |
722 | 939 | } |
723 | | |
724 | | private: |
725 | | Block* _block = nullptr; |
726 | | MutableBlock _mutable_block; |
727 | | }; |
728 | | |
729 | | struct IteratorRowRef { |
730 | | std::shared_ptr<Block> block; |
731 | | int row_pos; |
732 | | bool is_same; |
733 | | |
734 | | template <typename T> |
735 | 1.06M | int compare(const IteratorRowRef& rhs, const T& compare_arguments) const { |
736 | 1.06M | return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1); |
737 | 1.06M | } Unexecuted instantiation: _ZNK5doris14IteratorRowRef7compareIPKSt6vectorIjSaIjEEEEiRKS0_RKT_ _ZNK5doris14IteratorRowRef7compareImEEiRKS0_RKT_ Line | Count | Source | 735 | 1.06M | int compare(const IteratorRowRef& rhs, const T& compare_arguments) const { | 736 | 1.06M | return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1); | 737 | 1.06M | } |
|
738 | | |
739 | 512 | void reset() { |
740 | 512 | block = nullptr; |
741 | 512 | row_pos = -1; |
742 | 512 | is_same = false; |
743 | 512 | } |
744 | | }; |
745 | | |
746 | | using BlockView = std::vector<IteratorRowRef>; |
747 | | using BlockUPtr = std::unique_ptr<Block>; |
748 | | |
749 | | } // namespace doris |