be/src/core/block/block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "core/block/block.h" |
22 | | |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/data.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <snappy.h> |
27 | | #include <streamvbyte.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <cassert> |
31 | | #include <iomanip> |
32 | | #include <limits> |
33 | | #include <ranges> |
34 | | |
35 | | #include "agent/be_exec_version_manager.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "common/status.h" |
39 | | #include "core/assert_cast.h" |
40 | | #include "core/column/column.h" |
41 | | #include "core/column/column_const.h" |
42 | | #include "core/column/column_nothing.h" |
43 | | #include "core/column/column_nullable.h" |
44 | | #include "core/column/column_vector.h" |
45 | | #include "core/data_type/data_type_factory.hpp" |
46 | | #include "core/data_type/data_type_nullable.h" |
47 | | #include "core/data_type_serde/data_type_serde.h" |
48 | | #include "runtime/descriptors.h" |
49 | | #include "runtime/runtime_profile.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "util/block_compression.h" |
52 | | #include "util/faststring.h" |
53 | | #include "util/simd/bits.h" |
54 | | #include "util/slice.h" |
55 | | |
56 | | class SipHash; |
57 | | |
58 | | namespace doris::segment_v2 { |
59 | | enum CompressionTypePB : int; |
60 | | } // namespace doris::segment_v2 |
61 | | namespace doris { |
62 | | template <typename T> |
63 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
64 | 2 | RuntimeProfile::Counter* memory_used_counter = nullptr) { |
65 | 2 | T block; |
66 | 6 | while (blocks.try_dequeue(block)) { |
67 | 4 | if (memory_used_counter) { |
68 | 4 | if constexpr (std::is_same_v<T, Block>) { |
69 | 2 | memory_used_counter->update(-block.allocated_bytes()); |
70 | 2 | } else { |
71 | 2 | memory_used_counter->update(-block->allocated_bytes()); |
72 | 2 | } |
73 | 4 | } |
74 | 4 | } |
75 | 2 | } _ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | 2 | if constexpr (std::is_same_v<T, Block>) { | 69 | 2 | memory_used_counter->update(-block.allocated_bytes()); | 70 | | } else { | 71 | | memory_used_counter->update(-block->allocated_bytes()); | 72 | | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | | if constexpr (std::is_same_v<T, Block>) { | 69 | | memory_used_counter->update(-block.allocated_bytes()); | 70 | 2 | } else { | 71 | 2 | memory_used_counter->update(-block->allocated_bytes()); | 72 | 2 | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
|
76 | | |
77 | | template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, |
78 | | RuntimeProfile::Counter* memory_used_counter); |
79 | | template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, |
80 | | RuntimeProfile::Counter* memory_used_counter); |
81 | | |
82 | | namespace { |
83 | | |
84 | | // The no-clone fast path is only safe when the whole column tree is uniquely |
85 | | // owned. A composite column with shared children still needs COW detachment. |
86 | 32.6k | bool is_recursively_exclusive(const IColumn& column) { |
87 | 32.6k | if (!column.is_exclusive()) { |
88 | 13 | return false; |
89 | 13 | } |
90 | | |
91 | 32.6k | bool exclusive = true; |
92 | 32.6k | IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) { |
93 | 27.8k | if (!exclusive) { |
94 | 0 | return; |
95 | 0 | } |
96 | 27.8k | const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn); |
97 | 27.8k | DCHECK(subcolumn_ptr); |
98 | 27.8k | exclusive = is_recursively_exclusive(*subcolumn_ptr); |
99 | 27.8k | }; |
100 | | // `for_each_subcolumn` only exposes a mutable callback type. This callback |
101 | | // only reads the wrapped pointers and never calls the non-const accessors. |
102 | 32.6k | const_cast<IColumn&>(column).for_each_subcolumn(callback); |
103 | 32.6k | return exclusive; |
104 | 32.6k | } |
105 | | |
106 | | // Acquire one live Block slot transactionally. Shared columns are detached while |
107 | | // the original slot is still intact, so a clone failure cannot leave Block with |
108 | | // a moved-from/null column. Exclusive column trees keep the stealing fast path. |
109 | 4.84k | MutableColumnPtr scoped_mutate_column(ColumnPtr& column, const DataTypePtr& type) { |
110 | 4.84k | DCHECK(type); |
111 | 4.84k | if (!column) { |
112 | 1 | return type->create_column(); |
113 | 1 | } |
114 | | |
115 | 4.84k | MutableColumnPtr mutable_column; |
116 | 4.84k | if (is_recursively_exclusive(*column)) { |
117 | 4.82k | mutable_column = std::move(*column).mutate(); |
118 | 4.82k | } else { |
119 | 13 | mutable_column = IColumn::mutate(column); |
120 | 13 | } |
121 | 4.84k | column = nullptr; |
122 | 4.84k | return mutable_column; |
123 | 4.84k | } |
124 | | |
125 | | } // namespace |
126 | | |
127 | 4.37k | Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {} |
128 | | |
129 | 258k | Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} |
130 | | |
131 | 56.0k | Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { |
132 | 271k | for (auto* const slot_desc : slots) { |
133 | 271k | auto column_ptr = slot_desc->get_empty_mutable_column(); |
134 | 271k | column_ptr->reserve(block_size); |
135 | 271k | insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), |
136 | 271k | slot_desc->col_name())); |
137 | 271k | } |
138 | 56.0k | } |
139 | | |
140 | 1 | Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) { |
141 | 1 | std::vector<SlotDescriptor*> slot_ptrs(slots.size()); |
142 | 3 | for (size_t i = 0; i < slots.size(); ++i) { |
143 | | // Slots remain unmodified and are used to read column information; const_cast can be employed. |
144 | | // used in src/exec/rowid_fetcher.cpp |
145 | 2 | slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]); |
146 | 2 | } |
147 | 1 | *this = Block(slot_ptrs, block_size); |
148 | 1 | } |
149 | | |
150 | | Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, |
151 | 994 | int64_t* decompress_time) { |
152 | 994 | swap(Block()); |
153 | 994 | int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; |
154 | 994 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
155 | | |
156 | 994 | const char* buf = nullptr; |
157 | 994 | std::string compression_scratch; |
158 | 994 | if (pblock.compressed()) { |
159 | | // Decompress |
160 | 483 | SCOPED_RAW_TIMER(decompress_time); |
161 | 483 | const char* compressed_data = pblock.column_values().c_str(); |
162 | 483 | size_t compressed_size = pblock.column_values().size(); |
163 | 483 | size_t uncompressed_size = 0; |
164 | 483 | if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { |
165 | 483 | BlockCompressionCodec* codec; |
166 | 483 | RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); |
167 | 483 | uncompressed_size = pblock.uncompressed_size(); |
168 | | // Should also use allocator to allocate memory here. |
169 | 483 | compression_scratch.resize(uncompressed_size); |
170 | 483 | Slice decompressed_slice(compression_scratch); |
171 | 483 | RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), |
172 | 483 | &decompressed_slice)); |
173 | 483 | DCHECK(uncompressed_size == decompressed_slice.size); |
174 | 483 | } else { |
175 | 0 | bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, |
176 | 0 | &uncompressed_size); |
177 | 0 | DCHECK(success) << "snappy::GetUncompressedLength failed"; |
178 | 0 | compression_scratch.resize(uncompressed_size); |
179 | 0 | success = snappy::RawUncompress(compressed_data, compressed_size, |
180 | 0 | compression_scratch.data()); |
181 | 0 | DCHECK(success) << "snappy::RawUncompress failed"; |
182 | 0 | } |
183 | 483 | *uncompressed_bytes = uncompressed_size; |
184 | 483 | buf = compression_scratch.data(); |
185 | 511 | } else { |
186 | 511 | buf = pblock.column_values().data(); |
187 | 511 | } |
188 | | |
189 | 1.57k | for (const auto& pcol_meta : pblock.column_metas()) { |
190 | 1.57k | DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta); |
191 | 1.57k | MutableColumnPtr data_column = type->create_column(); |
192 | | // Here will try to allocate large memory, should return error if failed. |
193 | 1.57k | RETURN_IF_CATCH_EXCEPTION( |
194 | 1.57k | buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); |
195 | 1.57k | data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); |
196 | 1.57k | } |
197 | | |
198 | 994 | return Status::OK(); |
199 | 994 | } |
200 | | |
201 | 13.5k | void Block::reserve(size_t count) { |
202 | 13.5k | data.reserve(count); |
203 | 13.5k | } |
204 | | |
205 | 4 | void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { |
206 | 4 | if (position > data.size()) { |
207 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
208 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
209 | 1 | data.size(), dump_names()); |
210 | 1 | } |
211 | | |
212 | 3 | data.emplace(data.begin() + position, elem); |
213 | 3 | } |
214 | | |
215 | 3 | void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { |
216 | 3 | if (position > data.size()) { |
217 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
218 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
219 | 1 | data.size(), dump_names()); |
220 | 1 | } |
221 | | |
222 | 2 | data.emplace(data.begin() + position, std::move(elem)); |
223 | 2 | } |
224 | | |
225 | 8.09k | void Block::clear_names() { |
226 | 177k | for (auto& entry : data) { |
227 | 177k | entry.name.clear(); |
228 | 177k | } |
229 | 8.09k | } |
230 | | |
231 | 14.6k | void Block::insert(const ColumnWithTypeAndName& elem) { |
232 | 14.6k | data.emplace_back(elem); |
233 | 14.6k | } |
234 | | |
235 | 1.00M | void Block::insert(ColumnWithTypeAndName&& elem) { |
236 | 1.00M | data.emplace_back(std::move(elem)); |
237 | 1.00M | } |
238 | | |
239 | 3 | void Block::erase(const std::set<size_t>& positions) { |
240 | 3 | for (unsigned long position : std::ranges::reverse_view(positions)) { |
241 | 2 | erase(position); |
242 | 2 | } |
243 | 3 | } |
244 | | |
245 | 725 | void Block::erase_tail(size_t start) { |
246 | 725 | DCHECK(start <= data.size()) << fmt::format( |
247 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size()); |
248 | 725 | data.erase(data.begin() + start, data.end()); |
249 | 725 | } |
250 | | |
251 | 124k | void Block::erase(size_t position) { |
252 | 124k | DCHECK(!data.empty()) << "Block is empty"; |
253 | 124k | DCHECK_LT(position, data.size()) << fmt::format( |
254 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size() - 1); |
255 | | |
256 | 124k | erase_impl(position); |
257 | 124k | } |
258 | | |
259 | 124k | void Block::erase_impl(size_t position) { |
260 | 124k | data.erase(data.begin() + position); |
261 | 124k | } |
262 | | |
263 | 76.0k | ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { |
264 | 76.0k | if (position >= data.size()) { |
265 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
266 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
267 | 0 | data.size(), dump_names()); |
268 | 0 | } |
269 | 76.0k | return data[position]; |
270 | 76.0k | } |
271 | | |
272 | 109 | const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const { |
273 | 109 | if (position >= data.size()) { |
274 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
275 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
276 | 0 | data.size(), dump_names()); |
277 | 0 | } |
278 | 109 | return data[position]; |
279 | 109 | } |
280 | | |
281 | 89 | int Block::get_position_by_name(const std::string& name) const { |
282 | 1.04k | for (int i = 0; i < data.size(); i++) { |
283 | 1.04k | if (data[i].name == name) { |
284 | 86 | return i; |
285 | 86 | } |
286 | 1.04k | } |
287 | 3 | return -1; |
288 | 89 | } |
289 | | |
290 | 5 | void Block::check_number_of_rows(bool allow_null_columns) const { |
291 | 5 | ssize_t rows = -1; |
292 | 9 | for (const auto& elem : data) { |
293 | 9 | if (!elem.column && allow_null_columns) { |
294 | 2 | continue; |
295 | 2 | } |
296 | | |
297 | 7 | if (!elem.column) { |
298 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
299 | 1 | "Column {} in block is nullptr, in method check_number_of_rows.", |
300 | 1 | elem.name); |
301 | 1 | } |
302 | | |
303 | 6 | ssize_t size = elem.column->size(); |
304 | | |
305 | 6 | if (rows == -1) { |
306 | 5 | rows = size; |
307 | 5 | } else if (rows != size) { |
308 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}", |
309 | 1 | dump_structure()); |
310 | 1 | } |
311 | 6 | } |
312 | 5 | } |
313 | | |
314 | 2.63M | Status Block::check_type_and_column() const { |
315 | 2.63M | #ifndef NDEBUG |
316 | 2.63M | for (const auto& elem : data) { |
317 | 211k | if (!elem.column) { |
318 | 0 | continue; |
319 | 0 | } |
320 | 211k | if (!elem.type) { |
321 | 0 | continue; |
322 | 0 | } |
323 | | |
324 | | // ColumnNothing is a special column type, it is used to represent a column that |
325 | | // is not materialized, so we don't need to check it. |
326 | 211k | if (check_and_get_column<ColumnNothing>(elem.column.get())) { |
327 | 0 | continue; |
328 | 0 | } |
329 | | |
330 | 211k | const auto& type = elem.type; |
331 | 211k | const auto& column = elem.column; |
332 | | |
333 | 211k | RETURN_IF_ERROR(column->column_self_check()); |
334 | 211k | auto st = type->check_column(*column); |
335 | 211k | if (!st.ok()) { |
336 | 1 | return Status::InternalError( |
337 | 1 | "Column {} in block is not compatible with its column type :{}, data type :{}, " |
338 | 1 | "error: {}", |
339 | 1 | elem.name, column->get_name(), type->get_name(), st.msg()); |
340 | 1 | } |
341 | 211k | } |
342 | 2.63M | #endif |
343 | 2.63M | return Status::OK(); |
344 | 2.63M | } |
345 | | |
346 | 1.36M | Status Block::check_column_and_type_not_null() const { |
347 | 1.55M | for (size_t i = 0; i != data.size(); ++i) { |
348 | 190k | const auto& elem = data[i]; |
349 | 190k | if (!elem.column) { |
350 | 1 | return Status::InternalError("Column in block is nullptr, column index: {}, name: {}", |
351 | 1 | i, elem.name); |
352 | 1 | } |
353 | 190k | if (!elem.type) { |
354 | 1 | return Status::InternalError("Type in block is nullptr, column index: {}, name: {}", i, |
355 | 1 | elem.name); |
356 | 1 | } |
357 | 190k | } |
358 | 1.36M | return Status::OK(); |
359 | 1.36M | } |
360 | | |
361 | 50.2M | size_t Block::rows() const { |
362 | 50.2M | for (const auto& elem : data) { |
363 | 46.4M | if (elem.column) { |
364 | 46.4M | return elem.column->size(); |
365 | 46.4M | } |
366 | 46.4M | } |
367 | | |
368 | 3.79M | return 0; |
369 | 50.2M | } |
370 | | |
371 | 6 | void Block::set_num_rows(size_t length) { |
372 | 6 | if (rows() > length) { |
373 | 4 | for (auto& elem : data) { |
374 | 4 | if (elem.column) { |
375 | 4 | elem.column = elem.column->shrink(length); |
376 | 4 | } |
377 | 4 | } |
378 | 4 | } |
379 | 6 | } |
380 | | |
381 | 1 | void Block::skip_num_rows(int64_t& length) { |
382 | 1 | auto origin_rows = rows(); |
383 | 1 | if (origin_rows <= length) { |
384 | 0 | clear(); |
385 | 0 | length -= origin_rows; |
386 | 1 | } else { |
387 | 1 | for (auto& elem : data) { |
388 | 1 | if (elem.column) { |
389 | 1 | elem.column = elem.column->cut(length, origin_rows - length); |
390 | 1 | } |
391 | 1 | } |
392 | 1 | } |
393 | 1 | } |
394 | | |
395 | 13.1k | size_t Block::bytes() const { |
396 | 13.1k | size_t res = 0; |
397 | 27.8k | for (const auto& elem : data) { |
398 | 27.8k | if (!elem.column) { |
399 | 0 | std::stringstream ss; |
400 | 0 | for (const auto& e : data) { |
401 | 0 | ss << e.name + " "; |
402 | 0 | } |
403 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
404 | 0 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
405 | 0 | elem.name, ss.str()); |
406 | 0 | } |
407 | 27.8k | res += elem.column->byte_size(); |
408 | 27.8k | } |
409 | | |
410 | 13.1k | return res; |
411 | 13.1k | } |
412 | | |
413 | 194k | size_t Block::allocated_bytes() const { |
414 | 194k | size_t res = 0; |
415 | 381k | for (const auto& elem : data) { |
416 | 381k | if (!elem.column) { |
417 | | // Sometimes if expr failed, then there will be a nullptr |
418 | | // column left in the block. |
419 | 1 | continue; |
420 | 1 | } |
421 | 381k | res += elem.column->allocated_bytes(); |
422 | 381k | } |
423 | | |
424 | 194k | return res; |
425 | 194k | } |
426 | | |
427 | 8 | std::string Block::dump_names() const { |
428 | 8 | std::string out; |
429 | 23 | for (auto it = data.begin(); it != data.end(); ++it) { |
430 | 15 | if (it != data.begin()) { |
431 | 7 | out += ", "; |
432 | 7 | } |
433 | 15 | out += it->name; |
434 | 15 | } |
435 | 8 | return out; |
436 | 8 | } |
437 | | |
438 | 7 | std::string Block::dump_types() const { |
439 | 7 | std::string out; |
440 | 21 | for (auto it = data.begin(); it != data.end(); ++it) { |
441 | 14 | if (it != data.begin()) { |
442 | 7 | out += ", "; |
443 | 7 | } |
444 | 14 | out += it->type->get_name(); |
445 | 14 | } |
446 | 7 | return out; |
447 | 7 | } |
448 | | |
449 | 31 | std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
450 | 31 | std::stringstream ss; |
451 | | |
452 | 31 | std::vector<std::string> headers; |
453 | 31 | headers.reserve(columns()); |
454 | 46 | for (const auto& it : data) { |
455 | | // fmt::format is from the {fmt} library, you might be using std::format in C++20 |
456 | | // If not, you can build the string with a stringstream as a fallback. |
457 | 46 | headers.push_back(fmt::format("{}({})", it.name, it.type->get_name())); |
458 | 46 | } |
459 | | |
460 | 31 | size_t start_row = std::min(begin, rows()); |
461 | 31 | size_t end_row = std::min(rows(), begin + row_limit); |
462 | | |
463 | 31 | auto format_options = DataTypeSerDe::get_default_format_options(); |
464 | 31 | auto time_zone = cctz::utc_time_zone(); |
465 | 31 | format_options.timezone = &time_zone; |
466 | | |
467 | 31 | ss << "["; |
468 | 3.59k | for (size_t row_num = start_row; row_num < end_row; ++row_num) { |
469 | 3.56k | if (row_num > start_row) { |
470 | 3.53k | ss << ","; |
471 | 3.53k | } |
472 | 3.56k | ss << "{"; |
473 | 8.33k | for (size_t i = 0; i < columns(); ++i) { |
474 | 4.77k | if (i > 0) { |
475 | 1.21k | ss << ","; |
476 | 1.21k | } |
477 | 4.77k | ss << "\"" << headers[i] << "\":"; |
478 | 4.77k | std::string s; |
479 | | |
480 | | // This value-extraction logic is preserved from your original function |
481 | | // to maintain consistency, especially for handling nullability mismatches. |
482 | 4.77k | if (data[i].column && data[i].type->is_nullable() && !data[i].column->is_nullable()) { |
483 | | // This branch handles a specific internal representation of nullable columns. |
484 | | // The original code would assert here if allow_null_mismatch is false. |
485 | 0 | assert(allow_null_mismatch); |
486 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
487 | 0 | ->get_nested_type() |
488 | 0 | ->to_string(*data[i].column, row_num, format_options); |
489 | 4.77k | } else { |
490 | | // This is the standard path. The to_string method is expected to correctly |
491 | | // handle all cases, including when the column is null (e.g., by returning "NULL"). |
492 | 4.77k | s = data[i].to_string(row_num, format_options); |
493 | 4.77k | } |
494 | 4.77k | ss << "\"" << s << "\""; |
495 | 4.77k | } |
496 | 3.56k | ss << "}"; |
497 | 3.56k | } |
498 | 31 | ss << "]"; |
499 | 31 | return ss.str(); |
500 | 31 | } |
501 | | |
502 | 858 | std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
503 | 858 | std::vector<std::string> headers; |
504 | 858 | std::vector<int> headers_size; |
505 | 2.10k | for (const auto& it : data) { |
506 | 2.10k | std::string s = fmt::format("{}({})", it.name, it.type->get_name()); |
507 | 2.10k | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
508 | 2.10k | headers.emplace_back(s); |
509 | 2.10k | } |
510 | | |
511 | 858 | std::stringstream out; |
512 | | // header upper line |
513 | 2.16k | auto line = [&]() { |
514 | 8.07k | for (size_t i = 0; i < columns(); ++i) { |
515 | 5.91k | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
516 | 5.91k | } |
517 | 2.16k | out << std::setw(1) << "+" << std::endl; |
518 | 2.16k | }; |
519 | 858 | line(); |
520 | | // header text |
521 | 2.96k | for (size_t i = 0; i < columns(); ++i) { |
522 | 2.10k | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
523 | 2.10k | << headers[i]; |
524 | 2.10k | } |
525 | 858 | out << std::setw(1) << "|" << std::endl; |
526 | | // header bottom line |
527 | 858 | line(); |
528 | 858 | if (rows() == 0) { |
529 | 414 | return out.str(); |
530 | 414 | } |
531 | | |
532 | 444 | auto format_options = DataTypeSerDe::get_default_format_options(); |
533 | 444 | auto time_zone = cctz::utc_time_zone(); |
534 | 444 | format_options.timezone = &time_zone; |
535 | | |
536 | | // content |
537 | 12.2k | for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { |
538 | 32.4k | for (size_t i = 0; i < columns(); ++i) { |
539 | 20.6k | if (!data[i].column || data[i].column->empty()) { |
540 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
541 | 0 | << std::right; |
542 | 0 | continue; |
543 | 0 | } |
544 | 20.6k | std::string s; |
545 | 20.6k | if (data[i].column) { // column may be const |
546 | | // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null |
547 | 20.6k | if (data[i].type->is_nullable() && !data[i].column->is_nullable()) { |
548 | 0 | assert(allow_null_mismatch); |
549 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
550 | 0 | ->get_nested_type() |
551 | 0 | ->to_string(*data[i].column, row_num, format_options); |
552 | 20.6k | } else { |
553 | 20.6k | s = data[i].to_string(row_num, format_options); |
554 | 20.6k | } |
555 | 20.6k | } |
556 | 20.6k | if (s.length() > headers_size[i]) { |
557 | 2.12k | s = s.substr(0, headers_size[i] - 3) + "..."; |
558 | 2.12k | } |
559 | 20.6k | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
560 | 20.6k | << std::right << s; |
561 | 20.6k | } |
562 | 11.7k | out << std::setw(1) << "|" << std::endl; |
563 | 11.7k | } |
564 | | // bottom line |
565 | 444 | line(); |
566 | 444 | if (row_limit < rows()) { |
567 | 112 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
568 | 112 | } |
569 | 444 | return out.str(); |
570 | 444 | } |
571 | | |
572 | 1 | std::string Block::dump_one_line(size_t row, int column_end) const { |
573 | 1 | assert(column_end <= columns()); |
574 | 1 | fmt::memory_buffer line; |
575 | | |
576 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
577 | 1 | auto time_zone = cctz::utc_time_zone(); |
578 | 1 | format_options.timezone = &time_zone; |
579 | | |
580 | 3 | for (int i = 0; i < column_end; ++i) { |
581 | 2 | if (LIKELY(i != 0)) { |
582 | | // TODO: need more effective function of to string. now the impl is slow |
583 | 1 | fmt::format_to(line, " {}", data[i].to_string(row, format_options)); |
584 | 1 | } else { |
585 | 1 | fmt::format_to(line, "{}", data[i].to_string(row, format_options)); |
586 | 1 | } |
587 | 2 | } |
588 | 1 | return fmt::to_string(line); |
589 | 1 | } |
590 | | |
591 | 47 | std::string Block::dump_structure() const { |
592 | 47 | std::string out; |
593 | 382 | for (auto it = data.begin(); it != data.end(); ++it) { |
594 | 335 | if (it != data.begin()) { |
595 | 288 | out += ", \n"; |
596 | 288 | } |
597 | 335 | out += it->dump_structure(); |
598 | 335 | } |
599 | 47 | return out; |
600 | 47 | } |
601 | | |
602 | 48.9k | Block Block::clone_empty() const { |
603 | 48.9k | Block res; |
604 | 95.8k | for (const auto& elem : data) { |
605 | 95.8k | res.insert(elem.clone_empty()); |
606 | 95.8k | } |
607 | 48.9k | return res; |
608 | 48.9k | } |
609 | | |
610 | 32 | MutableColumns Block::clone_empty_columns() const { |
611 | 32 | size_t num_columns = data.size(); |
612 | 32 | MutableColumns columns(num_columns); |
613 | 142 | for (size_t i = 0; i < num_columns; ++i) { |
614 | 110 | columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column(); |
615 | 110 | } |
616 | 32 | return columns; |
617 | 32 | } |
618 | | |
619 | 26.1k | Columns Block::get_columns() const { |
620 | 26.1k | size_t num_columns = data.size(); |
621 | 26.1k | Columns columns(num_columns); |
622 | 113k | for (size_t i = 0; i < num_columns; ++i) { |
623 | 87.8k | columns[i] = data[i].column->convert_to_full_column_if_const(); |
624 | 87.8k | } |
625 | 26.1k | return columns; |
626 | 26.1k | } |
627 | | |
628 | 559 | Columns Block::get_columns_and_convert() { |
629 | 559 | size_t num_columns = data.size(); |
630 | 559 | Columns columns(num_columns); |
631 | 1.18k | for (size_t i = 0; i < num_columns; ++i) { |
632 | 622 | data[i].column = data[i].column->convert_to_full_column_if_const(); |
633 | 622 | columns[i] = data[i].column; |
634 | 622 | } |
635 | 559 | return columns; |
636 | 559 | } |
637 | | |
638 | 2.51k | Block::ScopedMutableColumns::ScopedMutableColumns(Block& block) : _block(&block) { |
639 | 2.51k | const size_t num_columns = block.data.size(); |
640 | 2.51k | _columns.resize(num_columns); |
641 | 2.51k | size_t acquired_columns = 0; |
642 | 2.51k | try { |
643 | 7.25k | for (; acquired_columns < num_columns; ++acquired_columns) { |
644 | 4.74k | auto& column_with_type_and_name = block.data[acquired_columns]; |
645 | 4.74k | _columns[acquired_columns] = scoped_mutate_column(column_with_type_and_name.column, |
646 | 4.74k | column_with_type_and_name.type); |
647 | 4.74k | } |
648 | 2.51k | } catch (...) { |
649 | 4 | for (size_t i = 0; i < acquired_columns; ++i) { |
650 | 2 | block.data[i].column = std::move(_columns[i]); |
651 | 2 | } |
652 | 2 | _block = nullptr; |
653 | 2 | throw; |
654 | 2 | } |
655 | 2.51k | } |
656 | | |
657 | 2.51k | Block::ScopedMutableColumns::~ScopedMutableColumns() { |
658 | 2.51k | restore(); |
659 | 2.51k | } |
660 | | |
661 | | Block::ScopedMutableColumns::ScopedMutableColumns(ScopedMutableColumns&& other) noexcept |
662 | 0 | : _block(std::exchange(other._block, nullptr)), _columns(std::move(other._columns)) {} |
663 | | |
664 | | Block::ScopedMutableColumns& Block::ScopedMutableColumns::operator=( |
665 | 0 | ScopedMutableColumns&& other) noexcept { |
666 | 0 | if (this != &other) { |
667 | 0 | restore(); |
668 | 0 | _block = std::exchange(other._block, nullptr); |
669 | 0 | _columns = std::move(other._columns); |
670 | 0 | } |
671 | 0 | return *this; |
672 | 0 | } |
673 | | |
674 | 2 | const DataTypePtr& Block::ScopedMutableColumns::get_datatype_by_position(size_t position) const { |
675 | 2 | DCHECK(_block != nullptr); |
676 | 2 | return _block->get_by_position(position).type; |
677 | 2 | } |
678 | | |
679 | 2 | const std::string& Block::ScopedMutableColumns::get_name_by_position(size_t position) const { |
680 | 2 | DCHECK(_block != nullptr); |
681 | 2 | return _block->get_by_position(position).name; |
682 | 2 | } |
683 | | |
684 | 816 | MutableColumns Block::ScopedMutableColumns::release() { |
685 | 816 | DCHECK(_block != nullptr); |
686 | 816 | _block = nullptr; |
687 | 816 | return std::move(_columns); |
688 | 816 | } |
689 | | |
690 | 2.99k | void Block::ScopedMutableColumns::restore() { |
691 | 2.99k | if (_block != nullptr) { |
692 | 1.69k | _block->set_columns(std::move(_columns)); |
693 | 1.69k | _block = nullptr; |
694 | 1.69k | } |
695 | 2.99k | } |
696 | | |
697 | | Block::ScopedMutableColumn::ScopedMutableColumn(Block& block, size_t position) |
698 | 98 | : _block(&block), _position(position) { |
699 | 98 | DCHECK_LT(_position, _block->data.size()); |
700 | 98 | auto& column_with_type_and_name = _block->data[_position]; |
701 | 98 | DCHECK(column_with_type_and_name.type); |
702 | 98 | _column = |
703 | 98 | scoped_mutate_column(column_with_type_and_name.column, column_with_type_and_name.type); |
704 | 98 | } |
705 | | |
706 | 97 | Block::ScopedMutableColumn::~ScopedMutableColumn() { |
707 | 97 | restore(); |
708 | 97 | } |
709 | | |
710 | | Block::ScopedMutableColumn::ScopedMutableColumn(ScopedMutableColumn&& other) noexcept |
711 | 0 | : _block(std::exchange(other._block, nullptr)), |
712 | 0 | _position(other._position), |
713 | 0 | _column(std::move(other._column)) {} |
714 | | |
715 | | Block::ScopedMutableColumn& Block::ScopedMutableColumn::operator=( |
716 | 0 | ScopedMutableColumn&& other) noexcept { |
717 | 0 | if (this != &other) { |
718 | 0 | restore(); |
719 | 0 | _block = std::exchange(other._block, nullptr); |
720 | 0 | _position = other._position; |
721 | 0 | _column = std::move(other._column); |
722 | 0 | } |
723 | 0 | return *this; |
724 | 0 | } |
725 | | |
726 | 97 | void Block::ScopedMutableColumn::restore() { |
727 | 97 | if (_block != nullptr) { |
728 | 97 | DCHECK_LT(_position, _block->data.size()); |
729 | 97 | _block->data[_position].column = std::move(_column); |
730 | 97 | _block = nullptr; |
731 | 97 | } |
732 | 97 | } |
733 | | |
734 | 2.51k | Block::ScopedMutableColumns Block::mutate_columns_scoped() & { |
735 | 2.51k | return ScopedMutableColumns(*this); |
736 | 2.51k | } |
737 | | |
738 | 98 | Block::ScopedMutableColumn Block::mutate_column_scoped(size_t position) & { |
739 | 98 | return ScopedMutableColumn(*this, position); |
740 | 98 | } |
741 | | |
742 | 817 | ScopedMutableBlock::ScopedMutableBlock(Block* block) { |
743 | 817 | DCHECK(block != nullptr); |
744 | 817 | DataTypes data_types = block->get_data_types(); |
745 | 817 | std::vector<std::string> names = block->get_names(); |
746 | 817 | auto columns_guard = block->mutate_columns_scoped(); |
747 | 817 | _mutable_block.data_types() = std::move(data_types); |
748 | 817 | _mutable_block.get_names() = std::move(names); |
749 | 817 | _mutable_block.set_mutable_columns(columns_guard.release()); |
750 | 817 | _block = block; |
751 | 817 | } |
752 | | |
753 | 146k | MutableColumns Block::mutate_columns() && { |
754 | 146k | size_t num_columns = data.size(); |
755 | 146k | MutableColumns columns(num_columns); |
756 | 433k | for (size_t i = 0; i < num_columns; ++i) { |
757 | 287k | DCHECK(data[i].type); |
758 | 287k | columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column)) |
759 | 287k | : data[i].type->create_column(); |
760 | 287k | } |
761 | 146k | return columns; |
762 | 146k | } |
763 | | |
764 | 3.50k | void Block::set_columns(MutableColumns&& columns) { |
765 | 3.50k | DCHECK_GE(columns.size(), data.size()) |
766 | 0 | << fmt::format("Invalid size of columns, columns size: {}, data size: {}", |
767 | 0 | columns.size(), data.size()); |
768 | 3.50k | size_t num_columns = data.size(); |
769 | 11.9k | for (size_t i = 0; i < num_columns; ++i) { |
770 | 8.45k | data[i].column = std::move(columns[i]); |
771 | 8.45k | } |
772 | 3.50k | } |
773 | | |
774 | 51 | Block Block::clone_without_columns(const std::vector<int>* column_offset) const { |
775 | 51 | Block res; |
776 | | |
777 | 51 | if (column_offset != nullptr) { |
778 | 32 | size_t num_columns = column_offset->size(); |
779 | 174 | for (size_t i = 0; i < num_columns; ++i) { |
780 | 142 | res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name}); |
781 | 142 | } |
782 | 32 | } else { |
783 | 19 | size_t num_columns = data.size(); |
784 | 53 | for (size_t i = 0; i < num_columns; ++i) { |
785 | 34 | res.insert({nullptr, data[i].type, data[i].name}); |
786 | 34 | } |
787 | 19 | } |
788 | 51 | return res; |
789 | 51 | } |
790 | | |
791 | 55.8k | const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { |
792 | 55.8k | return data; |
793 | 55.8k | } |
794 | | |
795 | 145k | std::vector<std::string> Block::get_names() const { |
796 | 145k | std::vector<std::string> res; |
797 | 145k | res.reserve(columns()); |
798 | | |
799 | 285k | for (const auto& elem : data) { |
800 | 285k | res.push_back(elem.name); |
801 | 285k | } |
802 | | |
803 | 145k | return res; |
804 | 145k | } |
805 | | |
806 | 145k | DataTypes Block::get_data_types() const { |
807 | 145k | DataTypes res; |
808 | 145k | res.reserve(columns()); |
809 | | |
810 | 285k | for (const auto& elem : data) { |
811 | 285k | res.push_back(elem.type); |
812 | 285k | } |
813 | | |
814 | 145k | return res; |
815 | 145k | } |
816 | | |
817 | 52.8k | void Block::clear() { |
818 | 52.8k | data.clear(); |
819 | 52.8k | } |
820 | | |
821 | 1.29M | void Block::clear_column_data(int64_t column_size) { |
822 | 1.29M | SCOPED_SKIP_MEMORY_CHECK(); |
823 | | // data.size() greater than column_size, means here have some |
824 | | // function exec result in block, need erase it here |
825 | 1.29M | if (column_size != -1 and data.size() > column_size) { |
826 | 2.20k | for (int64_t i = data.size() - 1; i >= column_size; --i) { |
827 | 1.10k | erase(i); |
828 | 1.10k | } |
829 | 1.10k | } |
830 | 1.29M | for (auto& d : data) { |
831 | 53.3k | if (d.column) { |
832 | 53.3k | if (d.column->is_exclusive()) { |
833 | 53.1k | d.column->assert_mutable()->clear(); |
834 | 53.1k | } else { |
835 | 245 | d.column = d.column->clone_empty(); |
836 | 245 | } |
837 | 53.3k | } |
838 | 53.3k | } |
839 | 1.29M | } |
840 | | |
841 | 46 | void Block::clear_column_data(const std::vector<uint32_t>& columns_to_clear) { |
842 | 46 | SCOPED_SKIP_MEMORY_CHECK(); |
843 | 79 | for (auto col : columns_to_clear) { |
844 | 79 | DCHECK_LT(col, data.size()); |
845 | 79 | auto& column = data[col].column; |
846 | 79 | if (column) { |
847 | 79 | if (column->is_exclusive()) { |
848 | 77 | column->assert_mutable()->clear(); |
849 | 77 | } else { |
850 | 2 | column = column->clone_empty(); |
851 | 2 | } |
852 | 79 | } |
853 | 79 | } |
854 | 46 | } |
855 | | |
856 | | void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
857 | 48.0k | bool need_keep_first) { |
858 | 48.0k | if (data.size() >= column_keep_flags.size()) { |
859 | 48.0k | auto origin_rows = rows(); |
860 | 142k | for (size_t i = 0; i < column_keep_flags.size(); ++i) { |
861 | 94.1k | if (!column_keep_flags[i]) { |
862 | 36.8k | data[i].column = data[i].column->clone_empty(); |
863 | 36.8k | } |
864 | 94.1k | } |
865 | | |
866 | 48.0k | if (need_keep_first && !column_keep_flags[0]) { |
867 | 1 | auto first_column = data[0].column->clone_empty(); |
868 | 1 | first_column->resize(origin_rows); |
869 | 1 | data[0].column = std::move(first_column); |
870 | 1 | } |
871 | 48.0k | } |
872 | 48.0k | } |
873 | | |
874 | 1.32k | void Block::swap(Block& other) noexcept { |
875 | 1.32k | SCOPED_SKIP_MEMORY_CHECK(); |
876 | 1.32k | data.swap(other.data); |
877 | 1.32k | } |
878 | | |
879 | 1.70k | void Block::swap(Block&& other) noexcept { |
880 | 1.70k | SCOPED_SKIP_MEMORY_CHECK(); |
881 | 1.70k | data = std::move(other.data); |
882 | 1.70k | } |
883 | | |
884 | 3 | void Block::shuffle_columns(const std::vector<int>& result_column_ids) { |
885 | 3 | Container tmp_data; |
886 | 3 | tmp_data.reserve(result_column_ids.size()); |
887 | 5 | for (const int result_column_id : result_column_ids) { |
888 | 5 | tmp_data.push_back(data[result_column_id]); |
889 | 5 | } |
890 | 3 | data = std::move(tmp_data); |
891 | 3 | } |
892 | | |
893 | 2 | void Block::update_hash(SipHash& hash) const { |
894 | 8 | for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { |
895 | 12 | for (const auto& col : data) { |
896 | 12 | col.column->update_hash_with_value(row_no, hash); |
897 | 12 | } |
898 | 6 | } |
899 | 2 | } |
900 | | |
901 | | void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
902 | 531 | const IColumn::Filter& filter) { |
903 | 531 | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
904 | 1.35k | for (const auto& col : columns_to_filter) { |
905 | 1.35k | auto& column = block->get_by_position(col).column; |
906 | 1.35k | if (column->size() == count) { |
907 | 1.29k | continue; |
908 | 1.29k | } |
909 | 61 | if (count == 0) { |
910 | 2 | if (column->is_exclusive()) { |
911 | 0 | column->assert_mutable()->clear(); |
912 | 2 | } else { |
913 | 2 | column = column->clone_empty(); |
914 | 2 | } |
915 | 2 | continue; |
916 | 2 | } |
917 | 59 | if (column->is_exclusive()) { |
918 | | // COW: safe to mutate in-place since we have exclusive ownership |
919 | 59 | const auto result_size = column->assert_mutable()->filter(filter); |
920 | 59 | if (result_size != count) [[unlikely]] { |
921 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
922 | 0 | "result_size not equal with filter_size, result_size={}, " |
923 | 0 | "filter_size={}", |
924 | 0 | result_size, count); |
925 | 0 | } |
926 | 59 | } else { |
927 | | // COW: must create a copy since column is shared |
928 | 0 | column = column->filter(filter, count); |
929 | 0 | } |
930 | 59 | } |
931 | 531 | } |
932 | | |
933 | | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter, |
934 | 1 | uint32_t column_to_keep) { |
935 | 1 | std::vector<uint32_t> columns_to_filter; |
936 | 1 | columns_to_filter.resize(column_to_keep); |
937 | 3 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
938 | 2 | columns_to_filter[i] = i; |
939 | 2 | } |
940 | 1 | filter_block_internal(block, columns_to_filter, filter); |
941 | 1 | } |
942 | | |
943 | 8 | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) { |
944 | 8 | const size_t count = |
945 | 8 | filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
946 | 24 | for (int i = 0; i < block->columns(); ++i) { |
947 | 16 | auto& column = block->get_by_position(i).column; |
948 | 16 | if (column->is_exclusive()) { |
949 | 16 | column->assert_mutable()->filter(filter); |
950 | 16 | } else { |
951 | 0 | column = column->filter(filter, count); |
952 | 0 | } |
953 | 16 | } |
954 | 8 | } |
955 | | |
956 | | Status Block::append_to_block_by_selector(MutableBlock* dst, |
957 | 1 | const IColumn::Selector& selector) const { |
958 | 1 | RETURN_IF_CATCH_EXCEPTION({ |
959 | 1 | DCHECK_EQ(data.size(), dst->mutable_columns().size()); |
960 | 1 | for (size_t i = 0; i < data.size(); i++) { |
961 | | // FIXME: this is a quickfix. we assume that only partition functions make there some |
962 | 1 | if (!is_column_const(*data[i].column)) { |
963 | 1 | data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); |
964 | 1 | } |
965 | 1 | } |
966 | 1 | }); |
967 | 1 | return Status::OK(); |
968 | 1 | } |
969 | | |
970 | | Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
971 | 496 | size_t filter_column_id, size_t column_to_keep) { |
972 | 496 | const auto& filter_column = block->get_by_position(filter_column_id).column; |
973 | 496 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
974 | 1 | const auto& nested_column = nullable_column->get_nested_column_ptr(); |
975 | | |
976 | 1 | MutableColumnPtr mutable_holder = |
977 | 1 | nested_column->use_count() == 1 |
978 | 1 | ? nested_column->assert_mutable() |
979 | 1 | : nested_column->clone_resized(nested_column->size()); |
980 | | |
981 | 1 | auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get()); |
982 | 1 | const auto* __restrict null_map = nullable_column->get_null_map_data().data(); |
983 | 1 | IColumn::Filter& filter = concrete_column->get_data(); |
984 | 1 | auto* __restrict filter_data = filter.data(); |
985 | | |
986 | 1 | const size_t size = filter.size(); |
987 | 4 | for (size_t i = 0; i < size; ++i) { |
988 | 3 | filter_data[i] &= !null_map[i]; |
989 | 3 | } |
990 | 1 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
991 | 495 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
992 | 2 | bool ret = const_column->get_bool(0); |
993 | 2 | if (!ret) { |
994 | 2 | for (const auto& col : columns_to_filter) { |
995 | 2 | auto& column = block->get_by_position(col).column; |
996 | 2 | if (column->is_exclusive()) { |
997 | 2 | column->assert_mutable()->clear(); |
998 | 2 | } else { |
999 | 0 | column = column->clone_empty(); |
1000 | 0 | } |
1001 | 2 | } |
1002 | 1 | } |
1003 | 493 | } else { |
1004 | 493 | const IColumn::Filter& filter = |
1005 | 493 | assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data(); |
1006 | 493 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
1007 | 493 | } |
1008 | | |
1009 | 496 | erase_useless_column(block, column_to_keep); |
1010 | 496 | return Status::OK(); |
1011 | 496 | } |
1012 | | |
1013 | 490 | Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) { |
1014 | 490 | std::vector<uint32_t> columns_to_filter; |
1015 | 490 | columns_to_filter.resize(column_to_keep); |
1016 | 1.77k | for (uint32_t i = 0; i < column_to_keep; ++i) { |
1017 | 1.28k | columns_to_filter[i] = i; |
1018 | 1.28k | } |
1019 | 490 | return filter_block(block, columns_to_filter, filter_column_id, column_to_keep); |
1020 | 490 | } |
1021 | | |
1022 | | Status Block::serialize(int be_exec_version, PBlock* pblock, |
1023 | | /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, |
1024 | | size_t* compressed_bytes, int64_t* compress_time, |
1025 | | segment_v2::CompressionTypePB compression_type, |
1026 | 2.72k | bool allow_transfer_large_data) const { |
1027 | 2.72k | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
1028 | 2.72k | pblock->set_be_exec_version(be_exec_version); |
1029 | | |
1030 | | // calc uncompressed size for allocation |
1031 | 2.72k | size_t content_uncompressed_size = 0; |
1032 | 3.39k | for (const auto& c : *this) { |
1033 | 3.39k | PColumnMeta* pcm = pblock->add_column_metas(); |
1034 | 3.39k | c.to_pb_column_meta(pcm); |
1035 | 18.4E | DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type"; |
1036 | | // get serialized size |
1037 | 3.39k | content_uncompressed_size += |
1038 | 3.39k | c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version()); |
1039 | 3.39k | } |
1040 | | |
1041 | | // serialize data values |
1042 | | // when data type is HLL, content_uncompressed_size maybe larger than real size. |
1043 | 2.72k | std::string column_values; |
1044 | 2.72k | try { |
1045 | | // TODO: After support c++23, we should use resize_and_overwrite to replace resize |
1046 | 2.72k | column_values.resize(content_uncompressed_size); |
1047 | 2.72k | } catch (...) { |
1048 | 0 | std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", |
1049 | 0 | content_uncompressed_size); |
1050 | 0 | LOG(WARNING) << msg; |
1051 | 0 | return Status::BufferAllocFailed(msg); |
1052 | 0 | } |
1053 | 2.72k | char* buf = column_values.data(); |
1054 | | |
1055 | 3.40k | for (const auto& c : *this) { |
1056 | 3.40k | buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); |
1057 | 3.40k | } |
1058 | 2.72k | *uncompressed_bytes = content_uncompressed_size; |
1059 | 2.72k | const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; |
1060 | 2.72k | *compressed_bytes = serialize_bytes; |
1061 | 2.72k | column_values.resize(serialize_bytes); |
1062 | | |
1063 | | // compress |
1064 | 2.72k | if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { |
1065 | 626 | SCOPED_RAW_TIMER(compress_time); |
1066 | 626 | pblock->set_compression_type(compression_type); |
1067 | 626 | pblock->set_uncompressed_size(serialize_bytes); |
1068 | | |
1069 | 626 | BlockCompressionCodec* codec; |
1070 | 626 | RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); |
1071 | | |
1072 | 626 | faststring buf_compressed; |
1073 | 626 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
1074 | 626 | codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); |
1075 | 626 | size_t compressed_size = buf_compressed.size(); |
1076 | 626 | if (LIKELY(compressed_size < serialize_bytes)) { |
1077 | | // TODO: rethink the logic here may copy again ? |
1078 | 626 | pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); |
1079 | 626 | pblock->set_compressed(true); |
1080 | 626 | *compressed_bytes = compressed_size; |
1081 | 626 | } else { |
1082 | 0 | pblock->set_column_values(std::move(column_values)); |
1083 | 0 | } |
1084 | | |
1085 | 626 | VLOG_ROW << "uncompressed size: " << content_uncompressed_size |
1086 | 0 | << ", compressed size: " << compressed_size; |
1087 | 2.10k | } else { |
1088 | 2.10k | pblock->set_column_values(std::move(column_values)); |
1089 | 2.10k | } |
1090 | 2.72k | if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { |
1091 | 0 | return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", |
1092 | 0 | *compressed_bytes); |
1093 | 0 | } |
1094 | 2.72k | return Status::OK(); |
1095 | 2.72k | } |
1096 | | |
1097 | 240k | size_t MutableBlock::rows() const { |
1098 | 240k | for (const auto& column : _columns) { |
1099 | 144k | if (column) { |
1100 | 144k | return column->size(); |
1101 | 144k | } |
1102 | 144k | } |
1103 | | |
1104 | 96.0k | return 0; |
1105 | 240k | } |
1106 | | |
1107 | 0 | void MutableBlock::swap(MutableBlock& another) noexcept { |
1108 | 0 | SCOPED_SKIP_MEMORY_CHECK(); |
1109 | 0 | _columns.swap(another._columns); |
1110 | 0 | _data_types.swap(another._data_types); |
1111 | 0 | _names.swap(another._names); |
1112 | 0 | } |
1113 | | |
1114 | 0 | void MutableBlock::add_row(const Block* block, int row) { |
1115 | 0 | const auto& block_data = block->get_columns_with_type_and_name(); |
1116 | 0 | for (size_t i = 0; i < _columns.size(); ++i) { |
1117 | 0 | _columns[i]->insert_from(*block_data[i].column.get(), row); |
1118 | 0 | } |
1119 | 0 | } |
1120 | | |
1121 | | Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, |
1122 | 163 | const uint32_t* row_end, const std::vector<int>* column_offset) { |
1123 | 163 | RETURN_IF_CATCH_EXCEPTION({ |
1124 | 163 | DCHECK_LE(columns(), block->columns()); |
1125 | 163 | if (column_offset != nullptr) { |
1126 | 163 | DCHECK_EQ(columns(), column_offset->size()); |
1127 | 163 | } |
1128 | 163 | const auto& block_data = block->get_columns_with_type_and_name(); |
1129 | 163 | for (size_t i = 0; i < _columns.size(); ++i) { |
1130 | 163 | const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i]; |
1131 | 163 | DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); |
1132 | 163 | auto& dst = _columns[i]; |
1133 | 163 | const auto& src = *src_col.column.get(); |
1134 | 163 | DCHECK_GE(src.size(), row_end - row_begin); |
1135 | 163 | dst->insert_indices_from(src, row_begin, row_end); |
1136 | 163 | } |
1137 | 163 | }); |
1138 | 162 | return Status::OK(); |
1139 | 163 | } |
1140 | | |
1141 | 126 | Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { |
1142 | 126 | RETURN_IF_CATCH_EXCEPTION({ |
1143 | 126 | DCHECK_LE(columns(), block->columns()); |
1144 | 126 | const auto& block_data = block->get_columns_with_type_and_name(); |
1145 | 126 | for (size_t i = 0; i < _columns.size(); ++i) { |
1146 | 126 | DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); |
1147 | 126 | auto& dst = _columns[i]; |
1148 | 126 | const auto& src = *block_data[i].column.get(); |
1149 | 126 | dst->insert_range_from(src, row_begin, length); |
1150 | 126 | } |
1151 | 126 | }); |
1152 | 126 | return Status::OK(); |
1153 | 126 | } |
1154 | | |
1155 | 144k | Block MutableBlock::to_block(int start_column) { |
1156 | 144k | return to_block(start_column, (int)_columns.size()); |
1157 | 144k | } |
1158 | | |
1159 | 144k | Block MutableBlock::to_block(int start_column, int end_column) { |
1160 | 144k | ColumnsWithTypeAndName columns_with_schema; |
1161 | 144k | columns_with_schema.reserve(end_column - start_column); |
1162 | 428k | for (size_t i = start_column; i < end_column; ++i) { |
1163 | 283k | columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); |
1164 | 283k | } |
1165 | 144k | return {columns_with_schema}; |
1166 | 144k | } |
1167 | | |
1168 | 1 | std::string MutableBlock::dump_data_json(size_t row_limit) const { |
1169 | 1 | std::stringstream ss; |
1170 | 1 | std::vector<std::string> headers; |
1171 | | |
1172 | 1 | headers.reserve(columns()); |
1173 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1174 | 1 | headers.push_back(_data_types[i]->get_name()); |
1175 | 1 | } |
1176 | 1 | size_t num_rows_to_dump = std::min(rows(), row_limit); |
1177 | 1 | ss << "["; |
1178 | | |
1179 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1180 | 1 | auto time_zone = cctz::utc_time_zone(); |
1181 | 1 | format_options.timezone = &time_zone; |
1182 | | |
1183 | 4 | for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) { |
1184 | 3 | if (row_num > 0) { |
1185 | 2 | ss << ","; |
1186 | 2 | } |
1187 | 3 | ss << "{"; |
1188 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1189 | 3 | if (i > 0) { |
1190 | 0 | ss << ","; |
1191 | 0 | } |
1192 | 3 | ss << "\"" << headers[i] << "\":"; |
1193 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1194 | 3 | ss << "\"" << s << "\""; |
1195 | 3 | } |
1196 | 3 | ss << "}"; |
1197 | 3 | } |
1198 | 1 | ss << "]"; |
1199 | 1 | return ss.str(); |
1200 | 1 | } |
1201 | | |
1202 | 1 | std::string MutableBlock::dump_data(size_t row_limit) const { |
1203 | 1 | std::vector<std::string> headers; |
1204 | 1 | std::vector<int> headers_size; |
1205 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1206 | 1 | std::string s = _data_types[i]->get_name(); |
1207 | 1 | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
1208 | 1 | headers.emplace_back(s); |
1209 | 1 | } |
1210 | | |
1211 | 1 | std::stringstream out; |
1212 | | // header upper line |
1213 | 3 | auto line = [&]() { |
1214 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1215 | 3 | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
1216 | 3 | } |
1217 | 3 | out << std::setw(1) << "+" << std::endl; |
1218 | 3 | }; |
1219 | 1 | line(); |
1220 | | // header text |
1221 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1222 | 1 | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
1223 | 1 | << headers[i]; |
1224 | 1 | } |
1225 | 1 | out << std::setw(1) << "|" << std::endl; |
1226 | | // header bottom line |
1227 | 1 | line(); |
1228 | 1 | if (rows() == 0) { |
1229 | 0 | return out.str(); |
1230 | 0 | } |
1231 | | |
1232 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1233 | 1 | auto time_zone = cctz::utc_time_zone(); |
1234 | 1 | format_options.timezone = &time_zone; |
1235 | | |
1236 | | // content |
1237 | 4 | for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { |
1238 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1239 | 3 | if (_columns[i].get()->empty()) { |
1240 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1241 | 0 | << std::right; |
1242 | 0 | continue; |
1243 | 0 | } |
1244 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1245 | 3 | if (s.length() > headers_size[i]) { |
1246 | 0 | s = s.substr(0, headers_size[i] - 3) + "..."; |
1247 | 0 | } |
1248 | 3 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1249 | 3 | << std::right << s; |
1250 | 3 | } |
1251 | 3 | out << std::setw(1) << "|" << std::endl; |
1252 | 3 | } |
1253 | | // bottom line |
1254 | 1 | line(); |
1255 | 1 | if (row_limit < rows()) { |
1256 | 0 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
1257 | 0 | } |
1258 | 1 | return out.str(); |
1259 | 1 | } |
1260 | | |
1261 | 48.0k | std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const { |
1262 | 48.0k | auto temp_block = Block::create_unique(); |
1263 | 94.1k | for (const auto& d : data) { |
1264 | 94.1k | auto column = d.type->create_column(); |
1265 | 94.1k | if (is_reserve) { |
1266 | 0 | column->reserve(size); |
1267 | 94.1k | } else { |
1268 | 94.1k | column->insert_many_defaults(size); |
1269 | 94.1k | } |
1270 | 94.1k | temp_block->insert({std::move(column), d.type, d.name}); |
1271 | 94.1k | } |
1272 | 48.0k | return temp_block; |
1273 | 48.0k | } |
1274 | | |
1275 | 96.1k | size_t MutableBlock::allocated_bytes() const { |
1276 | 96.1k | size_t res = 0; |
1277 | 188k | for (const auto& col : _columns) { |
1278 | 188k | if (col) { |
1279 | 188k | res += col->allocated_bytes(); |
1280 | 188k | } |
1281 | 188k | } |
1282 | | |
1283 | 96.1k | return res; |
1284 | 96.1k | } |
1285 | | |
1286 | 1 | void MutableBlock::clear_column_data() noexcept { |
1287 | 1 | SCOPED_SKIP_MEMORY_CHECK(); |
1288 | 1 | for (auto& col : _columns) { |
1289 | 1 | if (col) { |
1290 | 1 | col->clear(); |
1291 | 1 | } |
1292 | 1 | } |
1293 | 1 | } |
1294 | | |
1295 | 5 | std::string MutableBlock::dump_names() const { |
1296 | 5 | std::string out; |
1297 | 16 | for (auto it = _names.begin(); it != _names.end(); ++it) { |
1298 | 11 | if (it != _names.begin()) { |
1299 | 6 | out += ", "; |
1300 | 6 | } |
1301 | 11 | out += *it; |
1302 | 11 | } |
1303 | 5 | return out; |
1304 | 5 | } |
1305 | | } // namespace doris |