be/src/core/block/block.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <glog/logging.h> |
24 | | #include <parallel_hashmap/phmap.h> |
25 | | |
26 | | #include <cstddef> |
27 | | #include <cstdint> |
28 | | #include <initializer_list> |
29 | | #include <list> |
30 | | #include <memory> |
31 | | #include <ostream> |
32 | | #include <set> |
33 | | #include <string> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/be_mock_util.h" |
38 | | #include "common/exception.h" |
39 | | #include "common/factory_creator.h" |
40 | | #include "common/status.h" |
41 | | #include "core/block/column_with_type_and_name.h" |
42 | | #include "core/block/columns_with_type_and_name.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/column/column_nullable.h" |
45 | | #include "core/data_type/data_type.h" |
46 | | #include "core/data_type/data_type_nullable.h" |
47 | | #include "core/types.h" |
48 | | |
49 | | class SipHash; |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | class TupleDescriptor; |
54 | | class PBlock; |
55 | | class SlotDescriptor; |
56 | | |
57 | | namespace segment_v2 { |
58 | | enum CompressionTypePB : int; |
59 | | } // namespace segment_v2 |
60 | | |
61 | | /** Container for set of columns for bunch of rows in memory. |
62 | | * This is unit of data processing. |
63 | | * Also contains metadata - data types of columns and their names |
64 | | * (either original names from a table, or generated names during temporary calculations). |
65 | | * Allows to insert, remove columns in arbitrary position, to change order of columns. |
66 | | */ |
67 | | class MutableBlock; |
68 | | |
69 | | class Block { |
70 | | ENABLE_FACTORY_CREATOR(Block); |
71 | | |
72 | | private: |
73 | | using Container = ColumnsWithTypeAndName; |
74 | | Container data; |
75 | | |
76 | | public: |
77 | 8.21M | Block() = default; |
78 | | Block(std::initializer_list<ColumnWithTypeAndName> il); |
79 | | Block(ColumnsWithTypeAndName data_); |
80 | | Block(const std::vector<SlotDescriptor*>& slots, size_t block_size); |
81 | | Block(const std::vector<SlotDescriptor>& slots, size_t block_size); |
82 | | |
83 | 8.71M | MOCK_FUNCTION ~Block() = default; |
84 | 847 | Block(const Block& block) = default; |
85 | 48.2k | Block& operator=(const Block& p) = default; |
86 | 63.5k | Block(Block&& block) = default; |
87 | 851 | Block& operator=(Block&& other) = default; |
88 | | |
89 | | void reserve(size_t count); |
90 | | // Make sure the nammes is useless when use block |
91 | | void clear_names(); |
92 | | |
93 | | /// insert the column at the specified position |
94 | | void insert(size_t position, const ColumnWithTypeAndName& elem); |
95 | | void insert(size_t position, ColumnWithTypeAndName&& elem); |
96 | | /// insert the column to the end |
97 | | void insert(const ColumnWithTypeAndName& elem); |
98 | | void insert(ColumnWithTypeAndName&& elem); |
99 | | /// remove the column at the specified position |
100 | | void erase(size_t position); |
101 | | /// remove the column at the [start, end) |
102 | | void erase_tail(size_t start); |
103 | | /// remove the columns at the specified positions |
104 | | void erase(const std::set<size_t>& positions); |
105 | | // T was std::set<int>, std::vector<int>, std::list<int> |
106 | | template <class T> |
107 | 0 | void erase_not_in(const T& container) { |
108 | 0 | Container new_data; |
109 | 0 | for (auto pos : container) { |
110 | 0 | new_data.emplace_back(std::move(data[pos])); |
111 | 0 | } |
112 | 0 | std::swap(data, new_data); |
113 | 0 | } |
114 | | |
115 | 17 | std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const { |
116 | 17 | std::unordered_map<std::string, uint32_t> name_to_index_map; |
117 | 136 | for (uint32_t i = 0; i < data.size(); ++i) { |
118 | 119 | name_to_index_map[data[i].name] = i; |
119 | 119 | } |
120 | 17 | return name_to_index_map; |
121 | 17 | } |
122 | | |
123 | | /// References are invalidated after calling functions above. |
124 | 17.6M | ColumnWithTypeAndName& get_by_position(size_t position) { |
125 | 18.4E | DCHECK(data.size() > position) |
126 | 18.4E | << ", data.size()=" << data.size() << ", position=" << position; |
127 | 17.6M | return data[position]; |
128 | 17.6M | } |
129 | 29.1M | const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } |
130 | | |
131 | 31.6k | void replace_by_position(size_t position, ColumnPtr&& res) { |
132 | 31.6k | this->get_by_position(position).column = std::move(res); |
133 | 31.6k | } |
134 | | |
135 | 4 | void replace_by_position(size_t position, const ColumnPtr& res) { |
136 | 4 | this->get_by_position(position).column = res; |
137 | 4 | } |
138 | | |
139 | 703 | void replace_by_position_if_const(size_t position) { |
140 | 703 | auto& element = this->get_by_position(position); |
141 | 703 | element.column = element.column->convert_to_full_column_if_const(); |
142 | 703 | } |
143 | | |
144 | | ColumnWithTypeAndName& safe_get_by_position(size_t position); |
145 | | const ColumnWithTypeAndName& safe_get_by_position(size_t position) const; |
146 | | |
147 | 71.7k | Container::iterator begin() { return data.begin(); } |
148 | 71.7k | Container::iterator end() { return data.end(); } |
149 | 9.47k | Container::const_iterator begin() const { return data.begin(); } |
150 | 9.45k | Container::const_iterator end() const { return data.end(); } |
151 | 0 | Container::const_iterator cbegin() const { return data.cbegin(); } |
152 | 0 | Container::const_iterator cend() const { return data.cend(); } |
153 | | |
154 | | // Get position of column by name. Returns -1 if there is no column with that name. |
155 | | // ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently. |
156 | | int get_position_by_name(const std::string& name) const; |
157 | | |
158 | | const ColumnsWithTypeAndName& get_columns_with_type_and_name() const; |
159 | | |
160 | | std::vector<std::string> get_names() const; |
161 | | DataTypes get_data_types() const; |
162 | | |
163 | 59 | DataTypePtr get_data_type(size_t index) const { |
164 | 59 | CHECK(index < data.size()); |
165 | 59 | return data[index].type; |
166 | 59 | } |
167 | | |
168 | | /// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0. |
169 | | size_t rows() const; |
170 | | |
171 | | // Cut the rows in block, use in LIMIT operation |
172 | | void set_num_rows(size_t length); |
173 | | |
174 | | // Skip the rows in block, use in OFFSET, LIMIT operation |
175 | | void skip_num_rows(int64_t& offset); |
176 | | |
177 | | /// As the assumption we used around, the number of columns won't exceed int16 range. so no need to worry when we |
178 | | /// assign it to int32. |
179 | 18.2M | uint32_t columns() const { return static_cast<uint32_t>(data.size()); } |
180 | | |
181 | | /// Checks that every column in block is not nullptr and has same number of elements. |
182 | | void check_number_of_rows(bool allow_null_columns = false) const; |
183 | | |
184 | | Status check_type_and_column() const; |
185 | | |
186 | | /// Approximate number of bytes used by column data in memory. |
187 | | /// This reflects the actual data footprint (e.g. string contents, numeric arrays) |
188 | | /// and is the metric used by adaptive batch size byte budgets. |
189 | | size_t bytes() const; |
190 | | |
191 | | /// Approximate number of allocated (reserved) bytes in memory. |
192 | | /// This may be larger than bytes() due to pre-allocated capacity in vectors/arenas. |
193 | | /// Used for memory tracking and profiling. |
194 | | MOCK_FUNCTION size_t allocated_bytes() const; |
195 | | |
196 | | /** Get a list of column names separated by commas. */ |
197 | | std::string dump_names() const; |
198 | | |
199 | | std::string dump_types() const; |
200 | | |
201 | | /** List of names, types and lengths of columns. Designed for debugging. */ |
202 | | std::string dump_structure() const; |
203 | | |
204 | | /** Get the same block, but empty. */ |
205 | | Block clone_empty() const; |
206 | | |
207 | | Columns get_columns() const; |
208 | | Columns get_columns_and_convert(); |
209 | | |
210 | | Block clone_without_columns(const std::vector<int>* column_offset = nullptr) const; |
211 | | |
212 | | /** Get empty columns with the same types as in block. */ |
213 | | MutableColumns clone_empty_columns() const; |
214 | | |
215 | | // RAII owner for mutating columns borrowed from a live Block. While the |
216 | | // guard is alive, the Block's column slots are moved out and column data |
217 | | // must be accessed through mutable_columns(). The guard restores columns on |
218 | | // destruction, so use it when the caller may exit early after detaching. |
219 | | class ScopedMutableColumns { |
220 | | public: |
221 | | explicit ScopedMutableColumns(Block& block); |
222 | | ~ScopedMutableColumns(); |
223 | | |
224 | | ScopedMutableColumns(const ScopedMutableColumns&) = delete; |
225 | | ScopedMutableColumns& operator=(const ScopedMutableColumns&) = delete; |
226 | | ScopedMutableColumns(ScopedMutableColumns&& other) noexcept; |
227 | | ScopedMutableColumns& operator=(ScopedMutableColumns&& other) noexcept; |
228 | | |
229 | 1.66k | MutableColumns& mutable_columns() { return _columns; } |
230 | 0 | const MutableColumns& mutable_columns() const { return _columns; } |
231 | | const DataTypePtr& get_datatype_by_position(size_t position) const; |
232 | | const std::string& get_name_by_position(size_t position) const; |
233 | | |
234 | | // Transfer the borrowed owners to another RAII object that will restore |
235 | | // them. After release(), the original Block remains without columns |
236 | | // until that owner restores them. Normal callers should let this guard |
237 | | // restore on destruction. |
238 | | MutableColumns release(); |
239 | | void restore(); |
240 | | |
241 | | private: |
242 | | Block* _block = nullptr; |
243 | | MutableColumns _columns; |
244 | | }; |
245 | | |
246 | | // Single-column variant for localized mutation of a live Block slot. The |
247 | | // selected slot is unavailable from the Block until this guard restores it. |
248 | | class ScopedMutableColumn { |
249 | | public: |
250 | | ScopedMutableColumn(Block& block, size_t position); |
251 | | ~ScopedMutableColumn(); |
252 | | |
253 | | ScopedMutableColumn(const ScopedMutableColumn&) = delete; |
254 | | ScopedMutableColumn& operator=(const ScopedMutableColumn&) = delete; |
255 | | ScopedMutableColumn(ScopedMutableColumn&& other) noexcept; |
256 | | ScopedMutableColumn& operator=(ScopedMutableColumn&& other) noexcept; |
257 | | |
258 | 97 | MutableColumnPtr& mutable_column() { return _column; } |
259 | 0 | const MutableColumnPtr& mutable_column() const { return _column; } |
260 | | |
261 | | void restore(); |
262 | | |
263 | | private: |
264 | | Block* _block = nullptr; |
265 | | size_t _position = 0; |
266 | | MutableColumnPtr _column; |
267 | | }; |
268 | | |
269 | | /** Get columns from a consumed block for mutation. Columns in block will be nullptr. */ |
270 | | MutableColumns mutate_columns() &&; |
271 | | MutableColumns mutate_columns() & = delete; |
272 | | |
273 | | /** Temporarily mutate a live Block's columns. The returned guard owns the columns and |
274 | | * restores them on destruction; prefer this over manual move/writeback. |
275 | | */ |
276 | | ScopedMutableColumns mutate_columns_scoped() &; |
277 | | ScopedMutableColumns mutate_columns_scoped() && = delete; |
278 | | |
279 | | /** Temporarily mutate one live Block column; use when only one slot needs ownership. */ |
280 | | ScopedMutableColumn mutate_column_scoped(size_t position) &; |
281 | | ScopedMutableColumn mutate_column_scoped(size_t position) && = delete; |
282 | | |
283 | | /** Replace columns in a block */ |
284 | | void set_columns(MutableColumns&& columns); |
285 | | void clear(); |
286 | | void swap(Block& other) noexcept; |
287 | | void swap(Block&& other) noexcept; |
288 | | |
289 | | // Shuffle columns in place based on the result_column_ids |
290 | | void shuffle_columns(const std::vector<int>& result_column_ids); |
291 | | |
292 | | // column_size == -1 clears all columns; otherwise clear [0, column_size) |
293 | | // and drop the rest. Shared columns are detached through clone_empty(), so |
294 | | // allocation or clone failures propagate. |
295 | | void clear_column_data(int64_t column_size = -1); |
296 | | void clear_column_data(const std::vector<uint32_t>& columns_to_clear); |
297 | | |
298 | 15.4k | MOCK_FUNCTION bool mem_reuse() { return !data.empty(); } |
299 | | |
300 | 12 | bool is_empty_column() { return data.empty(); } |
301 | | |
302 | 3.17M | bool empty() const { return rows() == 0; } |
303 | | |
304 | | /** |
305 | | * Updates SipHash of the Block, using update method of columns. |
306 | | * Returns hash for block, that could be used to differentiate blocks |
307 | | * with same structure, but different data. |
308 | | */ |
309 | | void update_hash(SipHash& hash) const; |
310 | | |
311 | | /** |
312 | | * Get block data in string. |
313 | | * If code is in default_implementation_for_nulls or something likely, type and column's nullity could |
314 | | * temporarily be not same. set allow_null_mismatch to true to dump it correctly. |
315 | | */ |
316 | | std::string dump_data(size_t begin = 0, size_t row_limit = 100, |
317 | | bool allow_null_mismatch = false) const; |
318 | | |
319 | | std::string dump_data_json(size_t begin = 0, size_t row_limit = 100, |
320 | | bool allow_null_mismatch = false) const; |
321 | | |
322 | | /** Get one line data from block, only use in load data */ |
323 | | std::string dump_one_line(size_t row, int column_end) const; |
324 | | |
325 | | Status append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; |
326 | | |
327 | | // need exception safety |
328 | | static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
329 | | const IColumn::Filter& filter); |
330 | | // need exception safety |
331 | | static void filter_block_internal(Block* block, const IColumn::Filter& filter, |
332 | | uint32_t column_to_keep); |
333 | | // need exception safety |
334 | | static void filter_block_internal(Block* block, const IColumn::Filter& filter); |
335 | | |
336 | | static Status filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
337 | | size_t filter_column_id, size_t column_to_keep); |
338 | | |
339 | | static Status filter_block(Block* block, size_t filter_column_id, size_t column_to_keep); |
340 | | |
341 | 708 | static void erase_useless_column(Block* block, size_t column_to_keep) { |
342 | 708 | block->erase_tail(column_to_keep); |
343 | 708 | } |
344 | | |
345 | | // serialize block to PBlock |
346 | | Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes, |
347 | | size_t* compressed_bytes, int64_t* compress_time, |
348 | | segment_v2::CompressionTypePB compression_type, |
349 | | bool allow_transfer_large_data = false) const; |
350 | | |
351 | | Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, int64_t* decompress_time); |
352 | | |
353 | | std::unique_ptr<Block> create_same_struct_block(size_t size, bool is_reserve = false) const; |
354 | | |
355 | | /** Compares (*this) n-th row and rhs m-th row. |
356 | | * Returns negative number, 0, or positive number (*this) n-th row is less, equal, greater than rhs m-th row respectively. |
357 | | * Is used in sortings. |
358 | | * |
359 | | * If one of element's value is NaN or NULLs, then: |
360 | | * - if nan_direction_hint == -1, NaN and NULLs are considered as least than everything other; |
361 | | * - if nan_direction_hint == 1, NaN and NULLs are considered as greatest than everything other. |
362 | | * For example, if nan_direction_hint == -1 is used by descending sorting, NaNs will be at the end. |
363 | | * |
364 | | * For non Nullable and non floating point types, nan_direction_hint is ignored. |
365 | | */ |
366 | 3 | int compare_at(size_t n, size_t m, const Block& rhs, int nan_direction_hint) const { |
367 | 3 | DCHECK_EQ(columns(), rhs.columns()); |
368 | 3 | return compare_at(n, m, columns(), rhs, nan_direction_hint); |
369 | 3 | } |
370 | | |
371 | | int compare_at(size_t n, size_t m, size_t num_columns, const Block& rhs, |
372 | 7.15M | int nan_direction_hint) const { |
373 | 7.15M | DCHECK_GE(columns(), num_columns); |
374 | 7.15M | DCHECK_GE(rhs.columns(), num_columns); |
375 | | |
376 | 7.15M | DCHECK_LE(n, rows()); |
377 | 7.15M | DCHECK_LE(m, rhs.rows()); |
378 | 9.94M | for (size_t i = 0; i < num_columns; ++i) { |
379 | 7.19M | DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type)); |
380 | 7.19M | auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column), |
381 | 7.19M | nan_direction_hint); |
382 | 7.19M | if (res) { |
383 | 4.40M | return res; |
384 | 4.40M | } |
385 | 7.19M | } |
386 | 2.74M | return 0; |
387 | 7.15M | } |
388 | | |
389 | | int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns, |
390 | 2 | const Block& rhs, int nan_direction_hint) const { |
391 | 2 | DCHECK_GE(columns(), compare_columns->size()); |
392 | 2 | DCHECK_GE(rhs.columns(), compare_columns->size()); |
393 | | |
394 | 2 | DCHECK_LE(n, rows()); |
395 | 2 | DCHECK_LE(m, rhs.rows()); |
396 | 3 | for (auto i : *compare_columns) { |
397 | 3 | DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type)); |
398 | 3 | auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column), |
399 | 3 | nan_direction_hint); |
400 | 3 | if (res) { |
401 | 2 | return res; |
402 | 2 | } |
403 | 3 | } |
404 | 0 | return 0; |
405 | 2 | } |
406 | | |
407 | | //note(wb) no DCHECK here, because this method is only used after compare_at now, so no need to repeat check here. |
408 | | // If this method is used in more places, you can add DCHECK case by case. |
409 | | int compare_column_at(size_t n, size_t m, size_t col_idx, const Block& rhs, |
410 | 26.9k | int nan_direction_hint) const { |
411 | 26.9k | auto res = get_by_position(col_idx).column->compare_at( |
412 | 26.9k | n, m, *(rhs.get_by_position(col_idx).column), nan_direction_hint); |
413 | 26.9k | return res; |
414 | 26.9k | } |
415 | | |
416 | | // for String type or Array<String> type |
417 | | void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx); |
418 | | |
419 | | void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
420 | | bool need_keep_first); |
421 | | |
422 | | // Helper: sum byte_size() of all mutable columns. |
423 | | // Unlike Block::bytes() which operates on immutable ColumnPtr, |
424 | | // this works on MutableColumns during block construction (e.g. in BlockReader). |
425 | 300k | static inline size_t columns_byte_size(const MutableColumns& cols) { |
426 | 300k | size_t total = 0; |
427 | 605k | for (const auto& col : cols) { |
428 | 605k | total += col->byte_size(); |
429 | 605k | } |
430 | 300k | return total; |
431 | 300k | } |
432 | | |
433 | | private: |
434 | | void erase_impl(size_t position); |
435 | | }; |
436 | | |
437 | | using Blocks = std::vector<Block>; |
438 | | using BlocksList = std::list<Block>; |
439 | | using BlocksPtr = std::shared_ptr<Blocks>; |
440 | | using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>; |
441 | | |
442 | | class MutableBlock { |
443 | | ENABLE_FACTORY_CREATOR(MutableBlock); |
444 | | |
445 | | private: |
446 | | MutableColumns _columns; |
447 | | DataTypes _data_types; |
448 | | std::vector<std::string> _names; |
449 | | |
450 | | public: |
451 | | // Build from a consumed Block. This has no restore contract: the source |
452 | | // Block is left without columns and must not be used as a live output block. |
453 | | // For caller-owned live Blocks, use ScopedMutableBlock or |
454 | | // mutate_columns_scoped() instead. |
455 | 48.0k | static MutableBlock build_mutable_block(Block&& block) { |
456 | 48.0k | return MutableBlock(std::move(block)); |
457 | 48.0k | } |
458 | 1 | static MutableBlock build_mutable_block(std::nullptr_t) { return MutableBlock(); } |
459 | | static MutableBlock build_mutable_block(Block* block) = delete; |
460 | 73.0k | MutableBlock() = default; |
461 | 217k | ~MutableBlock() = default; |
462 | | MutableBlock(const MutableBlock&) = delete; |
463 | | MutableBlock& operator=(const MutableBlock&) = delete; |
464 | | MutableBlock(MutableBlock&& m_block) noexcept |
465 | | : _columns(std::move(m_block._columns)), |
466 | | _data_types(std::move(m_block._data_types)), |
467 | 0 | _names(std::move(m_block._names)) {} |
468 | | |
469 | | // Consumes block columns and converts them to mutable columns recursively. |
470 | | // This constructor is for temporary/owned Blocks only. |
471 | | MutableBlock(Block&& block) |
472 | 144k | : _columns(std::move(block).mutate_columns()), |
473 | 144k | _data_types(block.get_data_types()), |
474 | 144k | _names(block.get_names()) {} |
475 | | |
476 | 96.0k | MutableBlock& operator=(MutableBlock&& m_block) noexcept { |
477 | 96.0k | _columns = std::move(m_block._columns); |
478 | 96.0k | _data_types = std::move(m_block._data_types); |
479 | 96.0k | _names = std::move(m_block._names); |
480 | 96.0k | return *this; |
481 | 96.0k | } |
482 | | |
483 | | size_t rows() const; |
484 | 444 | size_t columns() const { return _columns.size(); } |
485 | | |
486 | 144k | bool empty() const { return rows() == 0; } |
487 | | |
488 | 49.4k | MutableColumns& mutable_columns() { return _columns; } |
489 | 0 | const MutableColumns& mutable_columns() const { return _columns; } |
490 | | |
491 | 812 | void set_mutable_columns(MutableColumns&& columns) { _columns = std::move(columns); } |
492 | | |
493 | 811 | DataTypes& data_types() { return _data_types; } |
494 | | |
495 | 158 | MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; } |
496 | 64 | const MutableColumnPtr& get_column_by_position(size_t position) const { |
497 | 64 | return _columns[position]; |
498 | 64 | } |
499 | | |
500 | 9 | DataTypePtr& get_datatype_by_position(size_t position) { return _data_types[position]; } |
501 | 26 | const DataTypePtr& get_datatype_by_position(size_t position) const { |
502 | 26 | return _data_types[position]; |
503 | 26 | } |
504 | | |
505 | 38 | int compare_one_column(size_t n, size_t m, size_t column_id, int nan_direction_hint) const { |
506 | 38 | DCHECK_LE(column_id, columns()); |
507 | 38 | DCHECK_LE(n, rows()); |
508 | 38 | DCHECK_LE(m, rows()); |
509 | 38 | auto& column = get_column_by_position(column_id); |
510 | 38 | return column->compare_at(n, m, *column, nan_direction_hint); |
511 | 38 | } |
512 | | |
513 | | int compare_at(size_t n, size_t m, size_t num_columns, const MutableBlock& rhs, |
514 | 6 | int nan_direction_hint) const { |
515 | 6 | DCHECK_GE(columns(), num_columns); |
516 | 6 | DCHECK_GE(rhs.columns(), num_columns); |
517 | | |
518 | 6 | DCHECK_LE(n, rows()); |
519 | 6 | DCHECK_LE(m, rhs.rows()); |
520 | 14 | for (size_t i = 0; i < num_columns; ++i) { |
521 | 11 | DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i))); |
522 | 11 | auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)), |
523 | 11 | nan_direction_hint); |
524 | 11 | if (res) { |
525 | 3 | return res; |
526 | 3 | } |
527 | 11 | } |
528 | 3 | return 0; |
529 | 6 | } |
530 | | |
531 | | int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns, |
532 | 0 | const MutableBlock& rhs, int nan_direction_hint) const { |
533 | 0 | DCHECK_GE(columns(), compare_columns->size()); |
534 | 0 | DCHECK_GE(rhs.columns(), compare_columns->size()); |
535 | |
|
536 | 0 | DCHECK_LE(n, rows()); |
537 | 0 | DCHECK_LE(m, rhs.rows()); |
538 | 0 | for (auto i : *compare_columns) { |
539 | 0 | DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i))); |
540 | 0 | auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)), |
541 | 0 | nan_direction_hint); |
542 | 0 | if (res) { |
543 | 0 | return res; |
544 | 0 | } |
545 | 0 | } |
546 | 0 | return 0; |
547 | 0 | } |
548 | | |
549 | 4 | std::string dump_types() const { |
550 | 4 | std::string res; |
551 | 10 | for (auto type : _data_types) { |
552 | 10 | if (!res.empty()) { |
553 | 6 | res += ", "; |
554 | 6 | } |
555 | 10 | res += type->get_name(); |
556 | 10 | } |
557 | 4 | return res; |
558 | 4 | } |
559 | | |
560 | | template <typename T> |
561 | 134 | [[nodiscard]] Status merge(T&& block) { |
562 | 134 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); |
563 | 134 | } _ZN5doris12MutableBlock5mergeIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 561 | 56 | [[nodiscard]] Status merge(T&& block) { | 562 | 56 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); | 563 | 56 | } |
_ZN5doris12MutableBlock5mergeINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 561 | 78 | [[nodiscard]] Status merge(T&& block) { | 562 | 78 | RETURN_IF_CATCH_EXCEPTION(return merge_impl(block);); | 563 | 78 | } |
|
564 | | |
565 | | template <typename T> |
566 | 48.0k | [[nodiscard]] Status merge_ignore_overflow(T&& block) { |
567 | 48.0k | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); |
568 | 48.0k | } _ZN5doris12MutableBlock21merge_ignore_overflowIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 566 | 1 | [[nodiscard]] Status merge_ignore_overflow(T&& block) { | 567 | 1 | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); | 568 | 1 | } |
_ZN5doris12MutableBlock21merge_ignore_overflowINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 566 | 48.0k | [[nodiscard]] Status merge_ignore_overflow(T&& block) { | 567 | 48.0k | RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block);); | 568 | 48.0k | } |
|
569 | | |
570 | | // only use for join. call ignore_overflow to prevent from throw exception in join |
571 | | template <typename T> |
572 | 48.0k | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { |
573 | 48.0k | if (_columns.size() != block.columns()) { |
574 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
575 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " |
576 | 1 | "input column count: {}, [columns: {}, " |
577 | 1 | "types: {}], ", |
578 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), |
579 | 1 | block.dump_names(), block.dump_types()); |
580 | 1 | } |
581 | 142k | for (int i = 0; i < _columns.size(); ++i) { |
582 | 94.2k | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { |
583 | 1 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, |
584 | 1 | "Merge block not match, self:[columns: {}, types: {}], " |
585 | 1 | "input:[columns: {}, types: {}], ", |
586 | 1 | dump_names(), dump_types(), block.dump_names(), |
587 | 1 | block.dump_types()); |
588 | 1 | } |
589 | 94.2k | _columns[i]->insert_range_from_ignore_overflow( |
590 | 94.2k | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, |
591 | 94.2k | block.rows()); |
592 | 94.2k | } |
593 | 48.0k | return Status::OK(); |
594 | 48.0k | } _ZN5doris12MutableBlock26merge_impl_ignore_overflowIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 572 | 48.0k | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { | 573 | 48.0k | if (_columns.size() != block.columns()) { | 574 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 575 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 576 | 1 | "input column count: {}, [columns: {}, " | 577 | 1 | "types: {}], ", | 578 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 579 | 1 | block.dump_names(), block.dump_types()); | 580 | 1 | } | 581 | 142k | for (int i = 0; i < _columns.size(); ++i) { | 582 | 94.2k | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 583 | 0 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, | 584 | 0 | "Merge block not match, self:[columns: {}, types: {}], " | 585 | 0 | "input:[columns: {}, types: {}], ", | 586 | 0 | dump_names(), dump_types(), block.dump_names(), | 587 | 0 | block.dump_types()); | 588 | 0 | } | 589 | 94.2k | _columns[i]->insert_range_from_ignore_overflow( | 590 | 94.2k | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, | 591 | 94.2k | block.rows()); | 592 | 94.2k | } | 593 | 48.0k | return Status::OK(); | 594 | 48.0k | } |
_ZN5doris12MutableBlock26merge_impl_ignore_overflowINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 572 | 1 | [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { | 573 | 1 | if (_columns.size() != block.columns()) { | 574 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 575 | 0 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 576 | 0 | "input column count: {}, [columns: {}, " | 577 | 0 | "types: {}], ", | 578 | 0 | _columns.size(), dump_names(), dump_types(), block.columns(), | 579 | 0 | block.dump_names(), block.dump_types()); | 580 | 0 | } | 581 | 3 | for (int i = 0; i < _columns.size(); ++i) { | 582 | 3 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 583 | 1 | throw doris::Exception(doris::ErrorCode::FATAL_ERROR, | 584 | 1 | "Merge block not match, self:[columns: {}, types: {}], " | 585 | 1 | "input:[columns: {}, types: {}], ", | 586 | 1 | dump_names(), dump_types(), block.dump_names(), | 587 | 1 | block.dump_types()); | 588 | 1 | } | 589 | 2 | _columns[i]->insert_range_from_ignore_overflow( | 590 | 2 | *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0, | 591 | 2 | block.rows()); | 592 | 2 | } | 593 | 0 | return Status::OK(); | 594 | 1 | } |
|
595 | | |
596 | | template <typename T> |
597 | 136 | [[nodiscard]] Status merge_impl(T&& block) { |
598 | | // merge is not supported in dynamic block |
599 | 136 | if (_columns.empty() && _data_types.empty()) { |
600 | 40 | _data_types = block.get_data_types(); |
601 | 40 | _names = block.get_names(); |
602 | 40 | _columns.resize(block.columns()); |
603 | 139 | for (size_t i = 0; i < block.columns(); ++i) { |
604 | 99 | if (block.get_by_position(i).column) { |
605 | 98 | _columns[i] = (*std::move(block.get_by_position(i) |
606 | 98 | .column->convert_to_full_column_if_const())) |
607 | 98 | .mutate(); |
608 | 98 | } else { |
609 | 1 | _columns[i] = _data_types[i]->create_column(); |
610 | 1 | } |
611 | 99 | } |
612 | 96 | } else { |
613 | 96 | if (_columns.size() != block.columns()) { |
614 | 2 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
615 | 2 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " |
616 | 2 | "input column count: {}, [columns: {}, " |
617 | 2 | "types: {}], ", |
618 | 2 | _columns.size(), dump_names(), dump_types(), block.columns(), |
619 | 2 | block.dump_names(), block.dump_types()); |
620 | 2 | } |
621 | 248 | for (int i = 0; i < _columns.size(); ++i) { |
622 | 154 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { |
623 | 1 | DCHECK(_data_types[i]->is_nullable()) |
624 | 0 | << " target type: " << _data_types[i]->get_name() |
625 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); |
626 | 1 | DCHECK(((DataTypeNullable*)_data_types[i].get()) |
627 | 1 | ->get_nested_type() |
628 | 1 | ->equals(*block.get_by_position(i).type)); |
629 | 1 | DCHECK(!block.get_by_position(i).type->is_nullable()); |
630 | 1 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) |
631 | 1 | ->convert_to_full_column_if_const(), |
632 | 1 | 0, block.rows()); |
633 | 153 | } else { |
634 | 153 | _columns[i]->insert_range_from( |
635 | 153 | *block.get_by_position(i) |
636 | 153 | .column->convert_to_full_column_if_const() |
637 | 153 | .get(), |
638 | 153 | 0, block.rows()); |
639 | 153 | } |
640 | 154 | } |
641 | 94 | } |
642 | 134 | return Status::OK(); |
643 | 136 | } _ZN5doris12MutableBlock10merge_implIRNS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 597 | 134 | [[nodiscard]] Status merge_impl(T&& block) { | 598 | | // merge is not supported in dynamic block | 599 | 134 | if (_columns.empty() && _data_types.empty()) { | 600 | 40 | _data_types = block.get_data_types(); | 601 | 40 | _names = block.get_names(); | 602 | 40 | _columns.resize(block.columns()); | 603 | 139 | for (size_t i = 0; i < block.columns(); ++i) { | 604 | 99 | if (block.get_by_position(i).column) { | 605 | 98 | _columns[i] = (*std::move(block.get_by_position(i) | 606 | 98 | .column->convert_to_full_column_if_const())) | 607 | 98 | .mutate(); | 608 | 98 | } else { | 609 | 1 | _columns[i] = _data_types[i]->create_column(); | 610 | 1 | } | 611 | 99 | } | 612 | 94 | } else { | 613 | 94 | if (_columns.size() != block.columns()) { | 614 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 615 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 616 | 1 | "input column count: {}, [columns: {}, " | 617 | 1 | "types: {}], ", | 618 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 619 | 1 | block.dump_names(), block.dump_types()); | 620 | 1 | } | 621 | 244 | for (int i = 0; i < _columns.size(); ++i) { | 622 | 151 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 623 | 0 | DCHECK(_data_types[i]->is_nullable()) | 624 | 0 | << " target type: " << _data_types[i]->get_name() | 625 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); | 626 | 0 | DCHECK(((DataTypeNullable*)_data_types[i].get()) | 627 | 0 | ->get_nested_type() | 628 | 0 | ->equals(*block.get_by_position(i).type)); | 629 | 0 | DCHECK(!block.get_by_position(i).type->is_nullable()); | 630 | 0 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) | 631 | 0 | ->convert_to_full_column_if_const(), | 632 | 0 | 0, block.rows()); | 633 | 151 | } else { | 634 | 151 | _columns[i]->insert_range_from( | 635 | 151 | *block.get_by_position(i) | 636 | 151 | .column->convert_to_full_column_if_const() | 637 | 151 | .get(), | 638 | 151 | 0, block.rows()); | 639 | 151 | } | 640 | 151 | } | 641 | 93 | } | 642 | 133 | return Status::OK(); | 643 | 134 | } |
_ZN5doris12MutableBlock10merge_implINS_5BlockEEENS_6StatusEOT_ Line | Count | Source | 597 | 2 | [[nodiscard]] Status merge_impl(T&& block) { | 598 | | // merge is not supported in dynamic block | 599 | 2 | if (_columns.empty() && _data_types.empty()) { | 600 | 0 | _data_types = block.get_data_types(); | 601 | 0 | _names = block.get_names(); | 602 | 0 | _columns.resize(block.columns()); | 603 | 0 | for (size_t i = 0; i < block.columns(); ++i) { | 604 | 0 | if (block.get_by_position(i).column) { | 605 | 0 | _columns[i] = (*std::move(block.get_by_position(i) | 606 | 0 | .column->convert_to_full_column_if_const())) | 607 | 0 | .mutate(); | 608 | 0 | } else { | 609 | 0 | _columns[i] = _data_types[i]->create_column(); | 610 | 0 | } | 611 | 0 | } | 612 | 2 | } else { | 613 | 2 | if (_columns.size() != block.columns()) { | 614 | 1 | return Status::Error<ErrorCode::INTERNAL_ERROR>( | 615 | 1 | "Merge block not match, self column count: {}, [columns: {}, types: {}], " | 616 | 1 | "input column count: {}, [columns: {}, " | 617 | 1 | "types: {}], ", | 618 | 1 | _columns.size(), dump_names(), dump_types(), block.columns(), | 619 | 1 | block.dump_names(), block.dump_types()); | 620 | 1 | } | 621 | 4 | for (int i = 0; i < _columns.size(); ++i) { | 622 | 3 | if (!_data_types[i]->equals(*block.get_by_position(i).type)) { | 623 | 1 | DCHECK(_data_types[i]->is_nullable()) | 624 | 0 | << " target type: " << _data_types[i]->get_name() | 625 | 0 | << " src type: " << block.get_by_position(i).type->get_name(); | 626 | 1 | DCHECK(((DataTypeNullable*)_data_types[i].get()) | 627 | 1 | ->get_nested_type() | 628 | 1 | ->equals(*block.get_by_position(i).type)); | 629 | 1 | DCHECK(!block.get_by_position(i).type->is_nullable()); | 630 | 1 | _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column) | 631 | 1 | ->convert_to_full_column_if_const(), | 632 | 1 | 0, block.rows()); | 633 | 2 | } else { | 634 | 2 | _columns[i]->insert_range_from( | 635 | 2 | *block.get_by_position(i) | 636 | 2 | .column->convert_to_full_column_if_const() | 637 | 2 | .get(), | 638 | 2 | 0, block.rows()); | 639 | 2 | } | 640 | 3 | } | 641 | 1 | } | 642 | 1 | return Status::OK(); | 643 | 2 | } |
|
644 | | |
645 | | // move to columns' data to a Block. this will invalidate |
646 | | Block to_block(int start_column = 0); |
647 | | Block to_block(int start_column, int end_column); |
648 | | |
649 | | void swap(MutableBlock& other) noexcept; |
650 | | |
651 | | void add_row(const Block* block, int row); |
652 | | // Batch add row should return error status if allocate memory failed. |
653 | | Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end, |
654 | | const std::vector<int>* column_offset = nullptr); |
655 | | Status add_rows(const Block* block, size_t row_begin, size_t length); |
656 | | |
657 | | std::string dump_data(size_t row_limit = 100) const; |
658 | | std::string dump_data_json(size_t row_limit = 100) const; |
659 | | |
660 | 64 | void clear() { |
661 | 64 | _columns.clear(); |
662 | 64 | _data_types.clear(); |
663 | 64 | _names.clear(); |
664 | 64 | } |
665 | | |
666 | | // Clear owned mutable columns in place. MutableBlock already owns its |
667 | | // columns exclusively, so this does not perform COW detaching or cloning. |
668 | | void clear_column_data() noexcept; |
669 | | |
670 | | size_t allocated_bytes() const; |
671 | | |
672 | 48.0k | size_t bytes() const { |
673 | 48.0k | size_t res = 0; |
674 | 94.0k | for (const auto& elem : _columns) { |
675 | 94.0k | res += elem->byte_size(); |
676 | 94.0k | } |
677 | | |
678 | 48.0k | return res; |
679 | 48.0k | } |
680 | | |
681 | 811 | std::vector<std::string>& get_names() { return _names; } |
682 | | |
683 | | /** Get a list of column names separated by commas. */ |
684 | | std::string dump_names() const; |
685 | | }; |
686 | | |
687 | | // RAII adapter for code that wants the MutableBlock API over a live Block. It |
688 | | // owns only the temporary mutable columns and restores them to the Block on |
689 | | // destruction. While the adapter is alive, read/write column data through |
690 | | // mutable_block()/mutable_columns(); the Block's column slots are moved out. |
691 | | class ScopedMutableBlock { |
692 | | public: |
693 | | ScopedMutableBlock() = delete; |
694 | | explicit ScopedMutableBlock(Block* block); |
695 | 811 | ~ScopedMutableBlock() { restore(); } |
696 | | |
697 | | ScopedMutableBlock(const ScopedMutableBlock&) = delete; |
698 | | ScopedMutableBlock& operator=(const ScopedMutableBlock&) = delete; |
699 | | |
700 | | ScopedMutableBlock(ScopedMutableBlock&& other) noexcept |
701 | | : _block(std::exchange(other._block, nullptr)), |
702 | 0 | _mutable_block(std::move(other._mutable_block)) {} |
703 | | |
704 | 0 | ScopedMutableBlock& operator=(ScopedMutableBlock&& other) noexcept { |
705 | 0 | if (this != &other) { |
706 | 0 | restore(); |
707 | 0 | _block = std::exchange(other._block, nullptr); |
708 | 0 | _mutable_block = std::move(other._mutable_block); |
709 | 0 | } |
710 | 0 | return *this; |
711 | 0 | } |
712 | | |
713 | 810 | MutableBlock& mutable_block() { return _mutable_block; } |
714 | 0 | const MutableBlock& mutable_block() const { return _mutable_block; } |
715 | 1 | MutableColumns& mutable_columns() { return _mutable_block.mutable_columns(); } |
716 | 0 | const MutableColumns& mutable_columns() const { return _mutable_block.mutable_columns(); } |
717 | | |
718 | 933 | void restore() { |
719 | 933 | if (_block != nullptr) { |
720 | 811 | _block->set_columns(std::move(_mutable_block.mutable_columns())); |
721 | 811 | _block = nullptr; |
722 | 811 | } |
723 | 933 | } |
724 | | |
725 | | private: |
726 | | Block* _block = nullptr; |
727 | | MutableBlock _mutable_block; |
728 | | }; |
729 | | |
730 | | struct IteratorRowRef { |
731 | | std::shared_ptr<Block> block; |
732 | | int row_pos; |
733 | | bool is_same; |
734 | | |
735 | | template <typename T> |
736 | 1.06M | int compare(const IteratorRowRef& rhs, const T& compare_arguments) const { |
737 | 1.06M | return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1); |
738 | 1.06M | } Unexecuted instantiation: _ZNK5doris14IteratorRowRef7compareIPKSt6vectorIjSaIjEEEEiRKS0_RKT_ _ZNK5doris14IteratorRowRef7compareImEEiRKS0_RKT_ Line | Count | Source | 736 | 1.06M | int compare(const IteratorRowRef& rhs, const T& compare_arguments) const { | 737 | 1.06M | return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1); | 738 | 1.06M | } |
|
739 | | |
740 | 512 | void reset() { |
741 | 512 | block = nullptr; |
742 | 512 | row_pos = -1; |
743 | 512 | is_same = false; |
744 | 512 | } |
745 | | }; |
746 | | |
747 | | using BlockView = std::vector<IteratorRowRef>; |
748 | | using BlockUPtr = std::unique_ptr<Block>; |
749 | | |
750 | | } // namespace doris |