be/src/core/block/block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "core/block/block.h" |
22 | | |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/data.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <snappy.h> |
27 | | #include <streamvbyte.h> |
28 | | #include <sys/types.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <cassert> |
32 | | #include <iomanip> |
33 | | #include <iterator> |
34 | | #include <limits> |
35 | | #include <ranges> |
36 | | |
37 | | #include "agent/be_exec_version_manager.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/assert_cast.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/column/column_const.h" |
45 | | #include "core/column/column_nothing.h" |
46 | | #include "core/column/column_nullable.h" |
47 | | #include "core/column/column_vector.h" |
48 | | #include "core/data_type/data_type_factory.hpp" |
49 | | #include "core/data_type/data_type_nullable.h" |
50 | | #include "core/data_type_serde/data_type_serde.h" |
51 | | #include "runtime/descriptors.h" |
52 | | #include "runtime/runtime_profile.h" |
53 | | #include "runtime/thread_context.h" |
54 | | #include "util/block_compression.h" |
55 | | #include "util/faststring.h" |
56 | | #include "util/simd/bits.h" |
57 | | #include "util/slice.h" |
58 | | |
59 | | class SipHash; |
60 | | |
61 | | namespace doris::segment_v2 { |
62 | | enum CompressionTypePB : int; |
63 | | } // namespace doris::segment_v2 |
64 | | namespace doris { |
65 | | template <typename T> |
66 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
67 | 2 | RuntimeProfile::Counter* memory_used_counter = nullptr) { |
68 | 2 | T block; |
69 | 6 | while (blocks.try_dequeue(block)) { |
70 | 4 | if (memory_used_counter) { |
71 | 4 | if constexpr (std::is_same_v<T, Block>) { |
72 | 2 | memory_used_counter->update(-block.allocated_bytes()); |
73 | 2 | } else { |
74 | 2 | memory_used_counter->update(-block->allocated_bytes()); |
75 | 2 | } |
76 | 4 | } |
77 | 4 | } |
78 | 2 | } _ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 67 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 68 | 1 | T block; | 69 | 3 | while (blocks.try_dequeue(block)) { | 70 | 2 | if (memory_used_counter) { | 71 | 2 | if constexpr (std::is_same_v<T, Block>) { | 72 | 2 | memory_used_counter->update(-block.allocated_bytes()); | 73 | | } else { | 74 | | memory_used_counter->update(-block->allocated_bytes()); | 75 | | } | 76 | 2 | } | 77 | 2 | } | 78 | 1 | } |
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 67 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 68 | 1 | T block; | 69 | 3 | while (blocks.try_dequeue(block)) { | 70 | 2 | if (memory_used_counter) { | 71 | | if constexpr (std::is_same_v<T, Block>) { | 72 | | memory_used_counter->update(-block.allocated_bytes()); | 73 | 2 | } else { | 74 | 2 | memory_used_counter->update(-block->allocated_bytes()); | 75 | 2 | } | 76 | 2 | } | 77 | 2 | } | 78 | 1 | } |
|
79 | | |
80 | | template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, |
81 | | RuntimeProfile::Counter* memory_used_counter); |
82 | | template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, |
83 | | RuntimeProfile::Counter* memory_used_counter); |
84 | | |
85 | 2.71k | Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {} |
86 | | |
87 | 257k | Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} |
88 | | |
89 | 56.0k | Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { |
90 | 271k | for (auto* const slot_desc : slots) { |
91 | 271k | auto column_ptr = slot_desc->get_empty_mutable_column(); |
92 | 271k | column_ptr->reserve(block_size); |
93 | 271k | insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), |
94 | 271k | slot_desc->col_name())); |
95 | 271k | } |
96 | 56.0k | } |
97 | | |
98 | 1 | Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) { |
99 | 1 | std::vector<SlotDescriptor*> slot_ptrs(slots.size()); |
100 | 3 | for (size_t i = 0; i < slots.size(); ++i) { |
101 | | // Slots remain unmodified and are used to read column information; const_cast can be employed. |
102 | | // used in src/exec/rowid_fetcher.cpp |
103 | 2 | slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]); |
104 | 2 | } |
105 | 1 | *this = Block(slot_ptrs, block_size); |
106 | 1 | } |
107 | | |
108 | | Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, |
109 | 992 | int64_t* decompress_time) { |
110 | 992 | swap(Block()); |
111 | 992 | int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; |
112 | 992 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
113 | | |
114 | 992 | const char* buf = nullptr; |
115 | 992 | std::string compression_scratch; |
116 | 992 | if (pblock.compressed()) { |
117 | | // Decompress |
118 | 481 | SCOPED_RAW_TIMER(decompress_time); |
119 | 481 | const char* compressed_data = pblock.column_values().c_str(); |
120 | 481 | size_t compressed_size = pblock.column_values().size(); |
121 | 481 | size_t uncompressed_size = 0; |
122 | 481 | if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { |
123 | 481 | BlockCompressionCodec* codec; |
124 | 481 | RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); |
125 | 481 | uncompressed_size = pblock.uncompressed_size(); |
126 | | // Should also use allocator to allocate memory here. |
127 | 481 | compression_scratch.resize(uncompressed_size); |
128 | 481 | Slice decompressed_slice(compression_scratch); |
129 | 481 | RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), |
130 | 481 | &decompressed_slice)); |
131 | 481 | DCHECK(uncompressed_size == decompressed_slice.size); |
132 | 481 | } else { |
133 | 0 | bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, |
134 | 0 | &uncompressed_size); |
135 | 0 | DCHECK(success) << "snappy::GetUncompressedLength failed"; |
136 | 0 | compression_scratch.resize(uncompressed_size); |
137 | 0 | success = snappy::RawUncompress(compressed_data, compressed_size, |
138 | 0 | compression_scratch.data()); |
139 | 0 | DCHECK(success) << "snappy::RawUncompress failed"; |
140 | 0 | } |
141 | 481 | *uncompressed_bytes = uncompressed_size; |
142 | 481 | buf = compression_scratch.data(); |
143 | 511 | } else { |
144 | 511 | buf = pblock.column_values().data(); |
145 | 511 | } |
146 | | |
147 | 1.57k | for (const auto& pcol_meta : pblock.column_metas()) { |
148 | 1.57k | DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta); |
149 | 1.57k | MutableColumnPtr data_column = type->create_column(); |
150 | | // Here will try to allocate large memory, should return error if failed. |
151 | 1.57k | RETURN_IF_CATCH_EXCEPTION( |
152 | 1.57k | buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); |
153 | 1.57k | data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); |
154 | 1.57k | } |
155 | | |
156 | 992 | return Status::OK(); |
157 | 992 | } |
158 | | |
159 | 13.2k | void Block::reserve(size_t count) { |
160 | 13.2k | data.reserve(count); |
161 | 13.2k | } |
162 | | |
163 | 4 | void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { |
164 | 4 | if (position > data.size()) { |
165 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
166 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
167 | 1 | data.size(), dump_names()); |
168 | 1 | } |
169 | | |
170 | 3 | data.emplace(data.begin() + position, elem); |
171 | 3 | } |
172 | | |
173 | 3 | void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { |
174 | 3 | if (position > data.size()) { |
175 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
176 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
177 | 1 | data.size(), dump_names()); |
178 | 1 | } |
179 | | |
180 | 2 | data.emplace(data.begin() + position, std::move(elem)); |
181 | 2 | } |
182 | | |
183 | 8.09k | void Block::clear_names() { |
184 | 177k | for (auto& entry : data) { |
185 | 177k | entry.name.clear(); |
186 | 177k | } |
187 | 8.09k | } |
188 | | |
189 | 14.8k | void Block::insert(const ColumnWithTypeAndName& elem) { |
190 | 14.8k | data.emplace_back(elem); |
191 | 14.8k | } |
192 | | |
193 | 998k | void Block::insert(ColumnWithTypeAndName&& elem) { |
194 | 998k | data.emplace_back(std::move(elem)); |
195 | 998k | } |
196 | | |
197 | 3 | void Block::erase(const std::set<size_t>& positions) { |
198 | 3 | for (unsigned long position : std::ranges::reverse_view(positions)) { |
199 | 2 | erase(position); |
200 | 2 | } |
201 | 3 | } |
202 | | |
203 | 1.17k | void Block::erase_tail(size_t start) { |
204 | 1.17k | DCHECK(start <= data.size()) << fmt::format( |
205 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size()); |
206 | 1.17k | data.erase(data.begin() + start, data.end()); |
207 | 1.17k | } |
208 | | |
209 | 123k | void Block::erase(size_t position) { |
210 | 123k | DCHECK(!data.empty()) << "Block is empty"; |
211 | 123k | DCHECK_LT(position, data.size()) << fmt::format( |
212 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size() - 1); |
213 | | |
214 | 123k | erase_impl(position); |
215 | 123k | } |
216 | | |
217 | 123k | void Block::erase_impl(size_t position) { |
218 | 123k | data.erase(data.begin() + position); |
219 | 123k | } |
220 | | |
221 | 75.9k | ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { |
222 | 75.9k | if (position >= data.size()) { |
223 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
224 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
225 | 0 | data.size(), dump_names()); |
226 | 0 | } |
227 | 75.9k | return data[position]; |
228 | 75.9k | } |
229 | | |
230 | 109 | const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const { |
231 | 109 | if (position >= data.size()) { |
232 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
233 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
234 | 0 | data.size(), dump_names()); |
235 | 0 | } |
236 | 109 | return data[position]; |
237 | 109 | } |
238 | | |
239 | 35 | int Block::get_position_by_name(const std::string& name) const { |
240 | 271 | for (int i = 0; i < data.size(); i++) { |
241 | 270 | if (data[i].name == name) { |
242 | 34 | return i; |
243 | 34 | } |
244 | 270 | } |
245 | 1 | return -1; |
246 | 35 | } |
247 | | |
248 | 3 | void Block::check_number_of_rows(bool allow_null_columns) const { |
249 | 3 | ssize_t rows = -1; |
250 | 7 | for (const auto& elem : data) { |
251 | 7 | if (!elem.column && allow_null_columns) { |
252 | 2 | continue; |
253 | 2 | } |
254 | | |
255 | 5 | if (!elem.column) { |
256 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
257 | 1 | "Column {} in block is nullptr, in method check_number_of_rows.", |
258 | 1 | elem.name); |
259 | 1 | } |
260 | | |
261 | 4 | ssize_t size = elem.column->size(); |
262 | | |
263 | 4 | if (rows == -1) { |
264 | 3 | rows = size; |
265 | 3 | } else if (rows != size) { |
266 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}", |
267 | 1 | dump_structure()); |
268 | 1 | } |
269 | 4 | } |
270 | 3 | } |
271 | | |
272 | 2.95M | Status Block::check_type_and_column() const { |
273 | 2.95M | #ifndef NDEBUG |
274 | 2.95M | for (const auto& elem : data) { |
275 | 30.5k | if (!elem.column) { |
276 | 0 | continue; |
277 | 0 | } |
278 | 30.5k | if (!elem.type) { |
279 | 0 | continue; |
280 | 0 | } |
281 | | |
282 | | // ColumnNothing is a special column type, it is used to represent a column that |
283 | | // is not materialized, so we don't need to check it. |
284 | 30.5k | if (check_and_get_column<ColumnNothing>(elem.column.get())) { |
285 | 0 | continue; |
286 | 0 | } |
287 | | |
288 | 30.5k | const auto& type = elem.type; |
289 | 30.5k | const auto& column = elem.column; |
290 | | |
291 | 30.5k | RETURN_IF_ERROR(column->column_self_check()); |
292 | 30.5k | auto st = type->check_column(*column); |
293 | 30.5k | if (!st.ok()) { |
294 | 1 | return Status::InternalError( |
295 | 1 | "Column {} in block is not compatible with its column type :{}, data type :{}, " |
296 | 1 | "error: {}", |
297 | 1 | elem.name, column->get_name(), type->get_name(), st.msg()); |
298 | 1 | } |
299 | 30.5k | } |
300 | 2.95M | #endif |
301 | 2.95M | return Status::OK(); |
302 | 2.95M | } |
303 | | |
304 | 50.9M | size_t Block::rows() const { |
305 | 50.9M | for (const auto& elem : data) { |
306 | 46.5M | if (elem.column) { |
307 | 46.5M | return elem.column->size(); |
308 | 46.5M | } |
309 | 46.5M | } |
310 | | |
311 | 4.41M | return 0; |
312 | 50.9M | } |
313 | | |
314 | 6 | void Block::set_num_rows(size_t length) { |
315 | 6 | if (rows() > length) { |
316 | 4 | for (auto& elem : data) { |
317 | 4 | if (elem.column) { |
318 | 4 | elem.column = elem.column->shrink(length); |
319 | 4 | } |
320 | 4 | } |
321 | 4 | } |
322 | 6 | } |
323 | | |
324 | 1 | void Block::skip_num_rows(int64_t& length) { |
325 | 1 | auto origin_rows = rows(); |
326 | 1 | if (origin_rows <= length) { |
327 | 0 | clear(); |
328 | 0 | length -= origin_rows; |
329 | 1 | } else { |
330 | 1 | for (auto& elem : data) { |
331 | 1 | if (elem.column) { |
332 | 1 | elem.column = elem.column->cut(length, origin_rows - length); |
333 | 1 | } |
334 | 1 | } |
335 | 1 | } |
336 | 1 | } |
337 | | |
338 | 3.49k | size_t Block::bytes() const { |
339 | 3.49k | size_t res = 0; |
340 | 8.81k | for (const auto& elem : data) { |
341 | 8.81k | if (!elem.column) { |
342 | 0 | std::stringstream ss; |
343 | 0 | for (const auto& e : data) { |
344 | 0 | ss << e.name + " "; |
345 | 0 | } |
346 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
347 | 0 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
348 | 0 | elem.name, ss.str()); |
349 | 0 | } |
350 | 8.81k | res += elem.column->byte_size(); |
351 | 8.81k | } |
352 | | |
353 | 3.49k | return res; |
354 | 3.49k | } |
355 | | |
356 | 1 | std::string Block::columns_bytes() const { |
357 | 1 | std::stringstream res; |
358 | 1 | res << "column bytes: ["; |
359 | 2 | for (const auto& elem : data) { |
360 | 2 | if (!elem.column) { |
361 | 1 | std::stringstream ss; |
362 | 3 | for (const auto& e : data) { |
363 | 3 | ss << e.name + " "; |
364 | 3 | } |
365 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
366 | 1 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
367 | 1 | elem.name, ss.str()); |
368 | 1 | } |
369 | 1 | res << ", " << elem.column->byte_size(); |
370 | 1 | } |
371 | 0 | res << "]"; |
372 | 0 | return res.str(); |
373 | 1 | } |
374 | | |
375 | 193k | size_t Block::allocated_bytes() const { |
376 | 193k | size_t res = 0; |
377 | 378k | for (const auto& elem : data) { |
378 | 378k | if (!elem.column) { |
379 | | // Sometimes if expr failed, then there will be a nullptr |
380 | | // column left in the block. |
381 | 1 | continue; |
382 | 1 | } |
383 | 378k | res += elem.column->allocated_bytes(); |
384 | 378k | } |
385 | | |
386 | 193k | return res; |
387 | 193k | } |
388 | | |
389 | 7 | std::string Block::dump_names() const { |
390 | 7 | std::string out; |
391 | 20 | for (auto it = data.begin(); it != data.end(); ++it) { |
392 | 13 | if (it != data.begin()) { |
393 | 6 | out += ", "; |
394 | 6 | } |
395 | 13 | out += it->name; |
396 | 13 | } |
397 | 7 | return out; |
398 | 7 | } |
399 | | |
400 | 6 | std::string Block::dump_types() const { |
401 | 6 | std::string out; |
402 | 18 | for (auto it = data.begin(); it != data.end(); ++it) { |
403 | 12 | if (it != data.begin()) { |
404 | 6 | out += ", "; |
405 | 6 | } |
406 | 12 | out += it->type->get_name(); |
407 | 12 | } |
408 | 6 | return out; |
409 | 6 | } |
410 | | |
411 | 31 | std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
412 | 31 | std::stringstream ss; |
413 | | |
414 | 31 | std::vector<std::string> headers; |
415 | 31 | headers.reserve(columns()); |
416 | 46 | for (const auto& it : data) { |
417 | | // fmt::format is from the {fmt} library, you might be using std::format in C++20 |
418 | | // If not, you can build the string with a stringstream as a fallback. |
419 | 46 | headers.push_back(fmt::format("{}({})", it.name, it.type->get_name())); |
420 | 46 | } |
421 | | |
422 | 31 | size_t start_row = std::min(begin, rows()); |
423 | 31 | size_t end_row = std::min(rows(), begin + row_limit); |
424 | | |
425 | 31 | auto format_options = DataTypeSerDe::get_default_format_options(); |
426 | 31 | auto time_zone = cctz::utc_time_zone(); |
427 | 31 | format_options.timezone = &time_zone; |
428 | | |
429 | 31 | ss << "["; |
430 | 3.59k | for (size_t row_num = start_row; row_num < end_row; ++row_num) { |
431 | 3.56k | if (row_num > start_row) { |
432 | 3.53k | ss << ","; |
433 | 3.53k | } |
434 | 3.56k | ss << "{"; |
435 | 8.33k | for (size_t i = 0; i < columns(); ++i) { |
436 | 4.77k | if (i > 0) { |
437 | 1.21k | ss << ","; |
438 | 1.21k | } |
439 | 4.77k | ss << "\"" << headers[i] << "\":"; |
440 | 4.77k | std::string s; |
441 | | |
442 | | // This value-extraction logic is preserved from your original function |
443 | | // to maintain consistency, especially for handling nullability mismatches. |
444 | 4.77k | if (data[i].column && data[i].type->is_nullable() && |
445 | 4.77k | !data[i].column->is_concrete_nullable()) { |
446 | | // This branch handles a specific internal representation of nullable columns. |
447 | | // The original code would assert here if allow_null_mismatch is false. |
448 | 0 | assert(allow_null_mismatch); |
449 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
450 | 0 | ->get_nested_type() |
451 | 0 | ->to_string(*data[i].column, row_num, format_options); |
452 | 4.77k | } else { |
453 | | // This is the standard path. The to_string method is expected to correctly |
454 | | // handle all cases, including when the column is null (e.g., by returning "NULL"). |
455 | 4.77k | s = data[i].to_string(row_num, format_options); |
456 | 4.77k | } |
457 | 4.77k | ss << "\"" << s << "\""; |
458 | 4.77k | } |
459 | 3.56k | ss << "}"; |
460 | 3.56k | } |
461 | 31 | ss << "]"; |
462 | 31 | return ss.str(); |
463 | 31 | } |
464 | | |
465 | 858 | std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
466 | 858 | std::vector<std::string> headers; |
467 | 858 | std::vector<int> headers_size; |
468 | 2.10k | for (const auto& it : data) { |
469 | 2.10k | std::string s = fmt::format("{}({})", it.name, it.type->get_name()); |
470 | 2.10k | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
471 | 2.10k | headers.emplace_back(s); |
472 | 2.10k | } |
473 | | |
474 | 858 | std::stringstream out; |
475 | | // header upper line |
476 | 2.16k | auto line = [&]() { |
477 | 8.07k | for (size_t i = 0; i < columns(); ++i) { |
478 | 5.91k | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
479 | 5.91k | } |
480 | 2.16k | out << std::setw(1) << "+" << std::endl; |
481 | 2.16k | }; |
482 | 858 | line(); |
483 | | // header text |
484 | 2.96k | for (size_t i = 0; i < columns(); ++i) { |
485 | 2.10k | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
486 | 2.10k | << headers[i]; |
487 | 2.10k | } |
488 | 858 | out << std::setw(1) << "|" << std::endl; |
489 | | // header bottom line |
490 | 858 | line(); |
491 | 858 | if (rows() == 0) { |
492 | 414 | return out.str(); |
493 | 414 | } |
494 | | |
495 | 444 | auto format_options = DataTypeSerDe::get_default_format_options(); |
496 | 444 | auto time_zone = cctz::utc_time_zone(); |
497 | 444 | format_options.timezone = &time_zone; |
498 | | |
499 | | // content |
500 | 12.2k | for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { |
501 | 32.4k | for (size_t i = 0; i < columns(); ++i) { |
502 | 20.6k | if (!data[i].column || data[i].column->empty()) { |
503 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
504 | 0 | << std::right; |
505 | 0 | continue; |
506 | 0 | } |
507 | 20.6k | std::string s; |
508 | 20.6k | if (data[i].column) { // column may be const |
509 | | // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null |
510 | 20.6k | if (data[i].type->is_nullable() && !data[i].column->is_concrete_nullable()) { |
511 | 0 | assert(allow_null_mismatch); |
512 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
513 | 0 | ->get_nested_type() |
514 | 0 | ->to_string(*data[i].column, row_num, format_options); |
515 | 20.6k | } else { |
516 | 20.6k | s = data[i].to_string(row_num, format_options); |
517 | 20.6k | } |
518 | 20.6k | } |
519 | 20.6k | if (s.length() > headers_size[i]) { |
520 | 2.12k | s = s.substr(0, headers_size[i] - 3) + "..."; |
521 | 2.12k | } |
522 | 20.6k | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
523 | 20.6k | << std::right << s; |
524 | 20.6k | } |
525 | 11.7k | out << std::setw(1) << "|" << std::endl; |
526 | 11.7k | } |
527 | | // bottom line |
528 | 444 | line(); |
529 | 444 | if (row_limit < rows()) { |
530 | 112 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
531 | 112 | } |
532 | 444 | return out.str(); |
533 | 444 | } |
534 | | |
535 | 1 | std::string Block::dump_one_line(size_t row, int column_end) const { |
536 | 1 | assert(column_end <= columns()); |
537 | 1 | fmt::memory_buffer line; |
538 | | |
539 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
540 | 1 | auto time_zone = cctz::utc_time_zone(); |
541 | 1 | format_options.timezone = &time_zone; |
542 | | |
543 | 3 | for (int i = 0; i < column_end; ++i) { |
544 | 2 | if (LIKELY(i != 0)) { |
545 | | // TODO: need more effective function of to string. now the impl is slow |
546 | 1 | fmt::format_to(line, " {}", data[i].to_string(row, format_options)); |
547 | 1 | } else { |
548 | 1 | fmt::format_to(line, "{}", data[i].to_string(row, format_options)); |
549 | 1 | } |
550 | 2 | } |
551 | 1 | return fmt::to_string(line); |
552 | 1 | } |
553 | | |
554 | 46 | std::string Block::dump_structure() const { |
555 | 46 | std::string out; |
556 | 373 | for (auto it = data.begin(); it != data.end(); ++it) { |
557 | 327 | if (it != data.begin()) { |
558 | 281 | out += ", \n"; |
559 | 281 | } |
560 | 327 | out += it->dump_structure(); |
561 | 327 | } |
562 | 46 | return out; |
563 | 46 | } |
564 | | |
565 | 48.9k | Block Block::clone_empty() const { |
566 | 48.9k | Block res; |
567 | 95.8k | for (const auto& elem : data) { |
568 | 95.8k | res.insert(elem.clone_empty()); |
569 | 95.8k | } |
570 | 48.9k | return res; |
571 | 48.9k | } |
572 | | |
573 | 31 | MutableColumns Block::clone_empty_columns() const { |
574 | 31 | size_t num_columns = data.size(); |
575 | 31 | MutableColumns columns(num_columns); |
576 | 140 | for (size_t i = 0; i < num_columns; ++i) { |
577 | 109 | columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column(); |
578 | 109 | } |
579 | 31 | return columns; |
580 | 31 | } |
581 | | |
582 | 25.8k | Columns Block::get_columns() const { |
583 | 25.8k | size_t num_columns = data.size(); |
584 | 25.8k | Columns columns(num_columns); |
585 | 113k | for (size_t i = 0; i < num_columns; ++i) { |
586 | 87.1k | columns[i] = data[i].column->convert_to_full_column_if_const(); |
587 | 87.1k | } |
588 | 25.8k | return columns; |
589 | 25.8k | } |
590 | | |
591 | 549 | Columns Block::get_columns_and_convert() { |
592 | 549 | size_t num_columns = data.size(); |
593 | 549 | Columns columns(num_columns); |
594 | 1.16k | for (size_t i = 0; i < num_columns; ++i) { |
595 | 612 | data[i].column = data[i].column->convert_to_full_column_if_const(); |
596 | 612 | columns[i] = data[i].column; |
597 | 612 | } |
598 | 549 | return columns; |
599 | 549 | } |
600 | | |
601 | 149k | MutableColumns Block::mutate_columns() { |
602 | 149k | size_t num_columns = data.size(); |
603 | 149k | MutableColumns columns(num_columns); |
604 | 443k | for (size_t i = 0; i < num_columns; ++i) { |
605 | 293k | DCHECK(data[i].type); |
606 | 293k | columns[i] = data[i].column ? (*std::move(data[i].column)).mutate() |
607 | 293k | : data[i].type->create_column(); |
608 | 293k | } |
609 | 149k | return columns; |
610 | 149k | } |
611 | | |
612 | 2.46k | void Block::set_columns(MutableColumns&& columns) { |
613 | 2.46k | DCHECK_GE(columns.size(), data.size()) |
614 | 0 | << fmt::format("Invalid size of columns, columns size: {}, data size: {}", |
615 | 0 | columns.size(), data.size()); |
616 | 2.46k | size_t num_columns = data.size(); |
617 | 6.39k | for (size_t i = 0; i < num_columns; ++i) { |
618 | 3.92k | data[i].column = std::move(columns[i]); |
619 | 3.92k | } |
620 | 2.46k | } |
621 | | |
622 | 0 | Block Block::clone_with_columns(MutableColumns&& columns) const { |
623 | 0 | Block res; |
624 | |
|
625 | 0 | size_t num_columns = data.size(); |
626 | 0 | for (size_t i = 0; i < num_columns; ++i) { |
627 | 0 | res.insert({std::move(columns[i]), data[i].type, data[i].name}); |
628 | 0 | } |
629 | |
|
630 | 0 | return res; |
631 | 0 | } |
632 | | |
633 | 31 | Block Block::clone_without_columns(const std::vector<int>* column_offset) const { |
634 | 31 | Block res; |
635 | | |
636 | 31 | if (column_offset != nullptr) { |
637 | 12 | size_t num_columns = column_offset->size(); |
638 | 73 | for (size_t i = 0; i < num_columns; ++i) { |
639 | 61 | res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name}); |
640 | 61 | } |
641 | 19 | } else { |
642 | 19 | size_t num_columns = data.size(); |
643 | 53 | for (size_t i = 0; i < num_columns; ++i) { |
644 | 34 | res.insert({nullptr, data[i].type, data[i].name}); |
645 | 34 | } |
646 | 19 | } |
647 | 31 | return res; |
648 | 31 | } |
649 | | |
650 | 55.6k | const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { |
651 | 55.6k | return data; |
652 | 55.6k | } |
653 | | |
654 | 145k | std::vector<std::string> Block::get_names() const { |
655 | 145k | std::vector<std::string> res; |
656 | 145k | res.reserve(columns()); |
657 | | |
658 | 285k | for (const auto& elem : data) { |
659 | 285k | res.push_back(elem.name); |
660 | 285k | } |
661 | | |
662 | 145k | return res; |
663 | 145k | } |
664 | | |
665 | 145k | DataTypes Block::get_data_types() const { |
666 | 145k | DataTypes res; |
667 | 145k | res.reserve(columns()); |
668 | | |
669 | 285k | for (const auto& elem : data) { |
670 | 285k | res.push_back(elem.type); |
671 | 285k | } |
672 | | |
673 | 145k | return res; |
674 | 145k | } |
675 | | |
676 | 52.8k | void Block::clear() { |
677 | 52.8k | data.clear(); |
678 | 52.8k | } |
679 | | |
680 | 1.52M | void Block::clear_column_data(int64_t column_size) noexcept { |
681 | 1.52M | SCOPED_SKIP_MEMORY_CHECK(); |
682 | | // data.size() greater than column_size, means here have some |
683 | | // function exec result in block, need erase it here |
684 | 1.52M | if (column_size != -1 and data.size() > column_size) { |
685 | 2.24k | for (int64_t i = data.size() - 1; i >= column_size; --i) { |
686 | 1.12k | erase(i); |
687 | 1.12k | } |
688 | 1.12k | } |
689 | 1.52M | for (auto& d : data) { |
690 | 89.3k | if (d.column) { |
691 | | // Temporarily disable reference count check because a column might be referenced multiple times within a block. |
692 | | // Queries like this: `select c, c from t1;` |
693 | 89.3k | (*std::move(d.column)).assume_mutable()->clear(); |
694 | 89.3k | } |
695 | 89.3k | } |
696 | 1.52M | } |
697 | | |
698 | | void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
699 | 48.0k | bool need_keep_first) { |
700 | 48.0k | if (data.size() >= column_keep_flags.size()) { |
701 | 48.0k | auto origin_rows = rows(); |
702 | 142k | for (size_t i = 0; i < column_keep_flags.size(); ++i) { |
703 | 94.1k | if (!column_keep_flags[i]) { |
704 | 36.8k | data[i].column = data[i].column->clone_empty(); |
705 | 36.8k | } |
706 | 94.1k | } |
707 | | |
708 | 48.0k | if (need_keep_first && !column_keep_flags[0]) { |
709 | 1 | auto first_column = data[0].column->clone_empty(); |
710 | 1 | first_column->resize(origin_rows); |
711 | 1 | data[0].column = std::move(first_column); |
712 | 1 | } |
713 | 48.0k | } |
714 | 48.0k | } |
715 | | |
716 | 1.32k | void Block::swap(Block& other) noexcept { |
717 | 1.32k | SCOPED_SKIP_MEMORY_CHECK(); |
718 | 1.32k | data.swap(other.data); |
719 | 1.32k | } |
720 | | |
721 | 1.69k | void Block::swap(Block&& other) noexcept { |
722 | 1.69k | SCOPED_SKIP_MEMORY_CHECK(); |
723 | 1.69k | data = std::move(other.data); |
724 | 1.69k | } |
725 | | |
726 | 3 | void Block::shuffle_columns(const std::vector<int>& result_column_ids) { |
727 | 3 | Container tmp_data; |
728 | 3 | tmp_data.reserve(result_column_ids.size()); |
729 | 5 | for (const int result_column_id : result_column_ids) { |
730 | 5 | tmp_data.push_back(data[result_column_id]); |
731 | 5 | } |
732 | 3 | data = std::move(tmp_data); |
733 | 3 | } |
734 | | |
735 | 2 | void Block::update_hash(SipHash& hash) const { |
736 | 8 | for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { |
737 | 12 | for (const auto& col : data) { |
738 | 12 | col.column->update_hash_with_value(row_no, hash); |
739 | 12 | } |
740 | 6 | } |
741 | 2 | } |
742 | | |
743 | | void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
744 | 1.02k | const IColumn::Filter& filter) { |
745 | 1.02k | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
746 | 2.35k | for (const auto& col : columns_to_filter) { |
747 | 2.35k | auto& column = block->get_by_position(col).column; |
748 | 2.35k | if (column->size() == count) { |
749 | 2.31k | continue; |
750 | 2.31k | } |
751 | 43 | if (count == 0) { |
752 | 2 | if (column->is_exclusive()) { |
753 | 0 | column->assume_mutable()->clear(); |
754 | 2 | } else { |
755 | 2 | column = column->clone_empty(); |
756 | 2 | } |
757 | 2 | continue; |
758 | 2 | } |
759 | 41 | if (column->is_exclusive()) { |
760 | | // COW: safe to mutate in-place since we have exclusive ownership |
761 | 23 | const auto result_size = column->assume_mutable()->filter(filter); |
762 | 23 | if (result_size != count) [[unlikely]] { |
763 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
764 | 0 | "result_size not equal with filter_size, result_size={}, " |
765 | 0 | "filter_size={}", |
766 | 0 | result_size, count); |
767 | 0 | } |
768 | 23 | } else { |
769 | | // COW: must create a copy since column is shared |
770 | 18 | column = column->filter(filter, count); |
771 | 18 | } |
772 | 41 | } |
773 | 1.02k | } |
774 | | |
775 | | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter, |
776 | 1 | uint32_t column_to_keep) { |
777 | 1 | std::vector<uint32_t> columns_to_filter; |
778 | 1 | columns_to_filter.resize(column_to_keep); |
779 | 3 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
780 | 2 | columns_to_filter[i] = i; |
781 | 2 | } |
782 | 1 | filter_block_internal(block, columns_to_filter, filter); |
783 | 1 | } |
784 | | |
785 | 8 | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) { |
786 | 8 | const size_t count = |
787 | 8 | filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
788 | 24 | for (int i = 0; i < block->columns(); ++i) { |
789 | 16 | auto& column = block->get_by_position(i).column; |
790 | 16 | if (column->is_exclusive()) { |
791 | 16 | column->assume_mutable()->filter(filter); |
792 | 16 | } else { |
793 | 0 | column = column->filter(filter, count); |
794 | 0 | } |
795 | 16 | } |
796 | 8 | } |
797 | | |
798 | | Status Block::append_to_block_by_selector(MutableBlock* dst, |
799 | 1 | const IColumn::Selector& selector) const { |
800 | 1 | RETURN_IF_CATCH_EXCEPTION({ |
801 | 1 | DCHECK_EQ(data.size(), dst->mutable_columns().size()); |
802 | 1 | for (size_t i = 0; i < data.size(); i++) { |
803 | | // FIXME: this is a quickfix. we assume that only partition functions make there some |
804 | 1 | if (!is_column_const(*data[i].column)) { |
805 | 1 | data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); |
806 | 1 | } |
807 | 1 | } |
808 | 1 | }); |
809 | 1 | return Status::OK(); |
810 | 1 | } |
811 | | |
812 | | Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
813 | 1.02k | size_t filter_column_id, size_t column_to_keep) { |
814 | 1.02k | const auto& filter_column = block->get_by_position(filter_column_id).column; |
815 | 1.02k | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
816 | 1 | const auto& nested_column = nullable_column->get_nested_column_ptr(); |
817 | | |
818 | 1 | MutableColumnPtr mutable_holder = |
819 | 1 | nested_column->use_count() == 1 |
820 | 1 | ? nested_column->assume_mutable() |
821 | 1 | : nested_column->clone_resized(nested_column->size()); |
822 | | |
823 | 1 | auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get()); |
824 | 1 | const auto* __restrict null_map = nullable_column->get_null_map_data().data(); |
825 | 1 | IColumn::Filter& filter = concrete_column->get_data(); |
826 | 1 | auto* __restrict filter_data = filter.data(); |
827 | | |
828 | 1 | const size_t size = filter.size(); |
829 | 4 | for (size_t i = 0; i < size; ++i) { |
830 | 3 | filter_data[i] &= !null_map[i]; |
831 | 3 | } |
832 | 1 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
833 | 1.01k | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
834 | 2 | bool ret = const_column->get_bool(0); |
835 | 2 | if (!ret) { |
836 | 2 | for (const auto& col : columns_to_filter) { |
837 | 2 | auto& column = block->get_by_position(col).column; |
838 | 2 | if (column->is_exclusive()) { |
839 | 2 | column->assume_mutable()->clear(); |
840 | 2 | } else { |
841 | 0 | column = column->clone_empty(); |
842 | 0 | } |
843 | 2 | } |
844 | 1 | } |
845 | 1.01k | } else { |
846 | 1.01k | const IColumn::Filter& filter = |
847 | 1.01k | assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data(); |
848 | 1.01k | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
849 | 1.01k | } |
850 | | |
851 | 1.02k | erase_useless_column(block, column_to_keep); |
852 | 1.02k | return Status::OK(); |
853 | 1.02k | } |
854 | | |
855 | 1.01k | Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) { |
856 | 1.01k | std::vector<uint32_t> columns_to_filter; |
857 | 1.01k | columns_to_filter.resize(column_to_keep); |
858 | 3.34k | for (uint32_t i = 0; i < column_to_keep; ++i) { |
859 | 2.32k | columns_to_filter[i] = i; |
860 | 2.32k | } |
861 | 1.01k | return filter_block(block, columns_to_filter, filter_column_id, column_to_keep); |
862 | 1.01k | } |
863 | | |
864 | | Status Block::serialize(int be_exec_version, PBlock* pblock, |
865 | | /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, |
866 | | size_t* compressed_bytes, int64_t* compress_time, |
867 | | segment_v2::CompressionTypePB compression_type, |
868 | 1.11k | bool allow_transfer_large_data) const { |
869 | 1.11k | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
870 | 1.11k | pblock->set_be_exec_version(be_exec_version); |
871 | | |
872 | | // calc uncompressed size for allocation |
873 | 1.11k | size_t content_uncompressed_size = 0; |
874 | 1.77k | for (const auto& c : *this) { |
875 | 1.77k | PColumnMeta* pcm = pblock->add_column_metas(); |
876 | 1.77k | c.to_pb_column_meta(pcm); |
877 | 1.77k | DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type"; |
878 | | // get serialized size |
879 | 1.77k | content_uncompressed_size += |
880 | 1.77k | c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version()); |
881 | 1.77k | } |
882 | | |
883 | | // serialize data values |
884 | | // when data type is HLL, content_uncompressed_size maybe larger than real size. |
885 | 1.11k | std::string column_values; |
886 | 1.11k | try { |
887 | | // TODO: After support c++23, we should use resize_and_overwrite to replace resize |
888 | 1.11k | column_values.resize(content_uncompressed_size); |
889 | 1.11k | } catch (...) { |
890 | 0 | std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", |
891 | 0 | content_uncompressed_size); |
892 | 0 | LOG(WARNING) << msg; |
893 | 0 | return Status::BufferAllocFailed(msg); |
894 | 0 | } |
895 | 1.12k | char* buf = column_values.data(); |
896 | | |
897 | 1.80k | for (const auto& c : *this) { |
898 | 1.80k | buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); |
899 | 1.80k | } |
900 | 1.12k | *uncompressed_bytes = content_uncompressed_size; |
901 | 1.12k | const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; |
902 | 1.12k | *compressed_bytes = serialize_bytes; |
903 | 1.12k | column_values.resize(serialize_bytes); |
904 | | |
905 | | // compress |
906 | 1.12k | if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { |
907 | 625 | SCOPED_RAW_TIMER(compress_time); |
908 | 625 | pblock->set_compression_type(compression_type); |
909 | 625 | pblock->set_uncompressed_size(serialize_bytes); |
910 | | |
911 | 625 | BlockCompressionCodec* codec; |
912 | 625 | RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); |
913 | | |
914 | 625 | faststring buf_compressed; |
915 | 625 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
916 | 625 | codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); |
917 | 625 | size_t compressed_size = buf_compressed.size(); |
918 | 625 | if (LIKELY(compressed_size < serialize_bytes)) { |
919 | | // TODO: rethink the logic here may copy again ? |
920 | 625 | pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); |
921 | 625 | pblock->set_compressed(true); |
922 | 625 | *compressed_bytes = compressed_size; |
923 | 625 | } else { |
924 | 0 | pblock->set_column_values(std::move(column_values)); |
925 | 0 | } |
926 | | |
927 | 625 | VLOG_ROW << "uncompressed size: " << content_uncompressed_size |
928 | 0 | << ", compressed size: " << compressed_size; |
929 | 625 | } else { |
930 | 496 | pblock->set_column_values(std::move(column_values)); |
931 | 496 | } |
932 | 1.12k | if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { |
933 | 0 | return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", |
934 | 0 | *compressed_bytes); |
935 | 0 | } |
936 | 1.12k | return Status::OK(); |
937 | 1.12k | } |
938 | | |
939 | 240k | size_t MutableBlock::rows() const { |
940 | 240k | for (const auto& column : _columns) { |
941 | 144k | if (column) { |
942 | 144k | return column->size(); |
943 | 144k | } |
944 | 144k | } |
945 | | |
946 | 96.0k | return 0; |
947 | 240k | } |
948 | | |
949 | 0 | void MutableBlock::swap(MutableBlock& another) noexcept { |
950 | 0 | SCOPED_SKIP_MEMORY_CHECK(); |
951 | 0 | _columns.swap(another._columns); |
952 | 0 | _data_types.swap(another._data_types); |
953 | 0 | _names.swap(another._names); |
954 | 0 | } |
955 | | |
956 | 0 | void MutableBlock::add_row(const Block* block, int row) { |
957 | 0 | const auto& block_data = block->get_columns_with_type_and_name(); |
958 | 0 | for (size_t i = 0; i < _columns.size(); ++i) { |
959 | 0 | _columns[i]->insert_from(*block_data[i].column.get(), row); |
960 | 0 | } |
961 | 0 | } |
962 | | |
963 | | Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, |
964 | 139 | const uint32_t* row_end, const std::vector<int>* column_offset) { |
965 | 139 | RETURN_IF_CATCH_EXCEPTION({ |
966 | 139 | DCHECK_LE(columns(), block->columns()); |
967 | 139 | if (column_offset != nullptr) { |
968 | 139 | DCHECK_EQ(columns(), column_offset->size()); |
969 | 139 | } |
970 | 139 | const auto& block_data = block->get_columns_with_type_and_name(); |
971 | 139 | for (size_t i = 0; i < _columns.size(); ++i) { |
972 | 139 | const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i]; |
973 | 139 | DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); |
974 | 139 | auto& dst = _columns[i]; |
975 | 139 | const auto& src = *src_col.column.get(); |
976 | 139 | DCHECK_GE(src.size(), row_end - row_begin); |
977 | 139 | dst->insert_indices_from(src, row_begin, row_end); |
978 | 139 | } |
979 | 139 | }); |
980 | 139 | return Status::OK(); |
981 | 139 | } |
982 | | |
983 | 142 | Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { |
984 | 142 | RETURN_IF_CATCH_EXCEPTION({ |
985 | 142 | DCHECK_LE(columns(), block->columns()); |
986 | 142 | const auto& block_data = block->get_columns_with_type_and_name(); |
987 | 142 | for (size_t i = 0; i < _columns.size(); ++i) { |
988 | 142 | DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); |
989 | 142 | auto& dst = _columns[i]; |
990 | 142 | const auto& src = *block_data[i].column.get(); |
991 | 142 | dst->insert_range_from(src, row_begin, length); |
992 | 142 | } |
993 | 142 | }); |
994 | 142 | return Status::OK(); |
995 | 142 | } |
996 | | |
997 | 144k | Block MutableBlock::to_block(int start_column) { |
998 | 144k | return to_block(start_column, (int)_columns.size()); |
999 | 144k | } |
1000 | | |
1001 | 144k | Block MutableBlock::to_block(int start_column, int end_column) { |
1002 | 144k | ColumnsWithTypeAndName columns_with_schema; |
1003 | 144k | columns_with_schema.reserve(end_column - start_column); |
1004 | 428k | for (size_t i = start_column; i < end_column; ++i) { |
1005 | 283k | columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); |
1006 | 283k | } |
1007 | 144k | return {columns_with_schema}; |
1008 | 144k | } |
1009 | | |
1010 | 1 | std::string MutableBlock::dump_data_json(size_t row_limit) const { |
1011 | 1 | std::stringstream ss; |
1012 | 1 | std::vector<std::string> headers; |
1013 | | |
1014 | 1 | headers.reserve(columns()); |
1015 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1016 | 1 | headers.push_back(_data_types[i]->get_name()); |
1017 | 1 | } |
1018 | 1 | size_t num_rows_to_dump = std::min(rows(), row_limit); |
1019 | 1 | ss << "["; |
1020 | | |
1021 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1022 | 1 | auto time_zone = cctz::utc_time_zone(); |
1023 | 1 | format_options.timezone = &time_zone; |
1024 | | |
1025 | 4 | for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) { |
1026 | 3 | if (row_num > 0) { |
1027 | 2 | ss << ","; |
1028 | 2 | } |
1029 | 3 | ss << "{"; |
1030 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1031 | 3 | if (i > 0) { |
1032 | 0 | ss << ","; |
1033 | 0 | } |
1034 | 3 | ss << "\"" << headers[i] << "\":"; |
1035 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1036 | 3 | ss << "\"" << s << "\""; |
1037 | 3 | } |
1038 | 3 | ss << "}"; |
1039 | 3 | } |
1040 | 1 | ss << "]"; |
1041 | 1 | return ss.str(); |
1042 | 1 | } |
1043 | | |
1044 | 1 | std::string MutableBlock::dump_data(size_t row_limit) const { |
1045 | 1 | std::vector<std::string> headers; |
1046 | 1 | std::vector<int> headers_size; |
1047 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1048 | 1 | std::string s = _data_types[i]->get_name(); |
1049 | 1 | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
1050 | 1 | headers.emplace_back(s); |
1051 | 1 | } |
1052 | | |
1053 | 1 | std::stringstream out; |
1054 | | // header upper line |
1055 | 3 | auto line = [&]() { |
1056 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1057 | 3 | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
1058 | 3 | } |
1059 | 3 | out << std::setw(1) << "+" << std::endl; |
1060 | 3 | }; |
1061 | 1 | line(); |
1062 | | // header text |
1063 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1064 | 1 | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
1065 | 1 | << headers[i]; |
1066 | 1 | } |
1067 | 1 | out << std::setw(1) << "|" << std::endl; |
1068 | | // header bottom line |
1069 | 1 | line(); |
1070 | 1 | if (rows() == 0) { |
1071 | 0 | return out.str(); |
1072 | 0 | } |
1073 | | |
1074 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1075 | 1 | auto time_zone = cctz::utc_time_zone(); |
1076 | 1 | format_options.timezone = &time_zone; |
1077 | | |
1078 | | // content |
1079 | 4 | for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { |
1080 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1081 | 3 | if (_columns[i].get()->empty()) { |
1082 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1083 | 0 | << std::right; |
1084 | 0 | continue; |
1085 | 0 | } |
1086 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1087 | 3 | if (s.length() > headers_size[i]) { |
1088 | 0 | s = s.substr(0, headers_size[i] - 3) + "..."; |
1089 | 0 | } |
1090 | 3 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1091 | 3 | << std::right << s; |
1092 | 3 | } |
1093 | 3 | out << std::setw(1) << "|" << std::endl; |
1094 | 3 | } |
1095 | | // bottom line |
1096 | 1 | line(); |
1097 | 1 | if (row_limit < rows()) { |
1098 | 0 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
1099 | 0 | } |
1100 | 1 | return out.str(); |
1101 | 1 | } |
1102 | | |
1103 | 48.0k | std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const { |
1104 | 48.0k | auto temp_block = Block::create_unique(); |
1105 | 94.1k | for (const auto& d : data) { |
1106 | 94.1k | auto column = d.type->create_column(); |
1107 | 94.1k | if (is_reserve) { |
1108 | 0 | column->reserve(size); |
1109 | 94.1k | } else { |
1110 | 94.1k | column->insert_many_defaults(size); |
1111 | 94.1k | } |
1112 | 94.1k | temp_block->insert({std::move(column), d.type, d.name}); |
1113 | 94.1k | } |
1114 | 48.0k | return temp_block; |
1115 | 48.0k | } |
1116 | | |
1117 | 17.5k | void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { |
1118 | 17.5k | for (auto idx : char_type_idx) { |
1119 | 2 | if (idx < data.size()) { |
1120 | 1 | auto& col_and_name = this->get_by_position(idx); |
1121 | 1 | col_and_name.column->assume_mutable()->shrink_padding_chars(); |
1122 | 1 | } |
1123 | 2 | } |
1124 | 17.5k | } |
1125 | | |
1126 | 96.1k | size_t MutableBlock::allocated_bytes() const { |
1127 | 96.1k | size_t res = 0; |
1128 | 188k | for (const auto& col : _columns) { |
1129 | 188k | if (col) { |
1130 | 188k | res += col->allocated_bytes(); |
1131 | 188k | } |
1132 | 188k | } |
1133 | | |
1134 | 96.1k | return res; |
1135 | 96.1k | } |
1136 | | |
1137 | 1 | void MutableBlock::clear_column_data() noexcept { |
1138 | 1 | SCOPED_SKIP_MEMORY_CHECK(); |
1139 | 1 | for (auto& col : _columns) { |
1140 | 1 | if (col) { |
1141 | 1 | col->clear(); |
1142 | 1 | } |
1143 | 1 | } |
1144 | 1 | } |
1145 | | |
1146 | 4 | std::string MutableBlock::dump_names() const { |
1147 | 4 | std::string out; |
1148 | 14 | for (auto it = _names.begin(); it != _names.end(); ++it) { |
1149 | 10 | if (it != _names.begin()) { |
1150 | 6 | out += ", "; |
1151 | 6 | } |
1152 | 10 | out += *it; |
1153 | 10 | } |
1154 | 4 | return out; |
1155 | 4 | } |
1156 | | } // namespace doris |