be/src/core/block/block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "core/block/block.h" |
22 | | |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/data.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <snappy.h> |
27 | | #include <streamvbyte.h> |
28 | | #include <sys/types.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <cassert> |
32 | | #include <iomanip> |
33 | | #include <iterator> |
34 | | #include <limits> |
35 | | #include <ranges> |
36 | | |
37 | | #include "agent/be_exec_version_manager.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/assert_cast.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/column/column_const.h" |
45 | | #include "core/column/column_nothing.h" |
46 | | #include "core/column/column_nullable.h" |
47 | | #include "core/column/column_vector.h" |
48 | | #include "core/data_type/data_type_factory.hpp" |
49 | | #include "core/data_type/data_type_nullable.h" |
50 | | #include "core/data_type_serde/data_type_serde.h" |
51 | | #include "runtime/descriptors.h" |
52 | | #include "runtime/runtime_profile.h" |
53 | | #include "runtime/thread_context.h" |
54 | | #include "util/block_compression.h" |
55 | | #include "util/faststring.h" |
56 | | #include "util/simd/bits.h" |
57 | | #include "util/slice.h" |
58 | | |
59 | | class SipHash; |
60 | | |
61 | | namespace doris::segment_v2 { |
62 | | enum CompressionTypePB : int; |
63 | | } // namespace doris::segment_v2 |
64 | | #include "common/compile_check_begin.h" |
65 | | namespace doris { |
66 | | template <typename T> |
67 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
68 | 2 | RuntimeProfile::Counter* memory_used_counter = nullptr) { |
69 | 2 | T block; |
70 | 6 | while (blocks.try_dequeue(block)) { |
71 | 4 | if (memory_used_counter) { |
72 | 4 | if constexpr (std::is_same_v<T, Block>) { |
73 | 2 | memory_used_counter->update(-block.allocated_bytes()); |
74 | 2 | } else { |
75 | 2 | memory_used_counter->update(-block->allocated_bytes()); |
76 | 2 | } |
77 | 4 | } |
78 | 4 | } |
79 | 2 | } _ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 68 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 69 | 1 | T block; | 70 | 3 | while (blocks.try_dequeue(block)) { | 71 | 2 | if (memory_used_counter) { | 72 | 2 | if constexpr (std::is_same_v<T, Block>) { | 73 | 2 | memory_used_counter->update(-block.allocated_bytes()); | 74 | | } else { | 75 | | memory_used_counter->update(-block->allocated_bytes()); | 76 | | } | 77 | 2 | } | 78 | 2 | } | 79 | 1 | } |
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 68 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 69 | 1 | T block; | 70 | 3 | while (blocks.try_dequeue(block)) { | 71 | 2 | if (memory_used_counter) { | 72 | | if constexpr (std::is_same_v<T, Block>) { | 73 | | memory_used_counter->update(-block.allocated_bytes()); | 74 | 2 | } else { | 75 | 2 | memory_used_counter->update(-block->allocated_bytes()); | 76 | 2 | } | 77 | 2 | } | 78 | 2 | } | 79 | 1 | } |
|
80 | | |
81 | | template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, |
82 | | RuntimeProfile::Counter* memory_used_counter); |
83 | | template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, |
84 | | RuntimeProfile::Counter* memory_used_counter); |
85 | | |
86 | 2.52k | Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {} |
87 | | |
88 | 257k | Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} |
89 | | |
90 | 56.0k | Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { |
91 | 271k | for (auto* const slot_desc : slots) { |
92 | 271k | auto column_ptr = slot_desc->get_empty_mutable_column(); |
93 | 271k | column_ptr->reserve(block_size); |
94 | 271k | insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), |
95 | 271k | slot_desc->col_name())); |
96 | 271k | } |
97 | 56.0k | } |
98 | | |
99 | 1 | Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) { |
100 | 1 | std::vector<SlotDescriptor*> slot_ptrs(slots.size()); |
101 | 3 | for (size_t i = 0; i < slots.size(); ++i) { |
102 | | // Slots remain unmodified and are used to read column information; const_cast can be employed. |
103 | | // used in src/exec/rowid_fetcher.cpp |
104 | 2 | slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]); |
105 | 2 | } |
106 | 1 | *this = Block(slot_ptrs, block_size); |
107 | 1 | } |
108 | | |
109 | | Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, |
110 | 785 | int64_t* decompress_time) { |
111 | 785 | swap(Block()); |
112 | 785 | int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; |
113 | 785 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
114 | | |
115 | 785 | const char* buf = nullptr; |
116 | 785 | std::string compression_scratch; |
117 | 785 | if (pblock.compressed()) { |
118 | | // Decompress |
119 | 277 | SCOPED_RAW_TIMER(decompress_time); |
120 | 277 | const char* compressed_data = pblock.column_values().c_str(); |
121 | 277 | size_t compressed_size = pblock.column_values().size(); |
122 | 277 | size_t uncompressed_size = 0; |
123 | 277 | if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { |
124 | 277 | BlockCompressionCodec* codec; |
125 | 277 | RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); |
126 | 277 | uncompressed_size = pblock.uncompressed_size(); |
127 | | // Should also use allocator to allocate memory here. |
128 | 277 | compression_scratch.resize(uncompressed_size); |
129 | 277 | Slice decompressed_slice(compression_scratch); |
130 | 277 | RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), |
131 | 277 | &decompressed_slice)); |
132 | 277 | DCHECK(uncompressed_size == decompressed_slice.size); |
133 | 277 | } else { |
134 | 0 | bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, |
135 | 0 | &uncompressed_size); |
136 | 0 | DCHECK(success) << "snappy::GetUncompressedLength failed"; |
137 | 0 | compression_scratch.resize(uncompressed_size); |
138 | 0 | success = snappy::RawUncompress(compressed_data, compressed_size, |
139 | 0 | compression_scratch.data()); |
140 | 0 | DCHECK(success) << "snappy::RawUncompress failed"; |
141 | 0 | } |
142 | 277 | *uncompressed_bytes = uncompressed_size; |
143 | 277 | buf = compression_scratch.data(); |
144 | 508 | } else { |
145 | 508 | buf = pblock.column_values().data(); |
146 | 508 | } |
147 | | |
148 | 1.31k | for (const auto& pcol_meta : pblock.column_metas()) { |
149 | 1.31k | DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta); |
150 | 1.31k | MutableColumnPtr data_column = type->create_column(); |
151 | | // Here will try to allocate large memory, should return error if failed. |
152 | 1.31k | RETURN_IF_CATCH_EXCEPTION( |
153 | 1.31k | buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); |
154 | 1.31k | data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); |
155 | 1.31k | } |
156 | | |
157 | 785 | return Status::OK(); |
158 | 785 | } |
159 | | |
160 | 13.2k | void Block::reserve(size_t count) { |
161 | 13.2k | data.reserve(count); |
162 | 13.2k | } |
163 | | |
164 | 4 | void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { |
165 | 4 | if (position > data.size()) { |
166 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
167 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
168 | 1 | data.size(), dump_names()); |
169 | 1 | } |
170 | | |
171 | 3 | data.emplace(data.begin() + position, elem); |
172 | 3 | } |
173 | | |
174 | 3 | void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { |
175 | 3 | if (position > data.size()) { |
176 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
177 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
178 | 1 | data.size(), dump_names()); |
179 | 1 | } |
180 | | |
181 | 2 | data.emplace(data.begin() + position, std::move(elem)); |
182 | 2 | } |
183 | | |
184 | 8.09k | void Block::clear_names() { |
185 | 177k | for (auto& entry : data) { |
186 | 177k | entry.name.clear(); |
187 | 177k | } |
188 | 8.09k | } |
189 | | |
190 | 14.7k | void Block::insert(const ColumnWithTypeAndName& elem) { |
191 | 14.7k | data.emplace_back(elem); |
192 | 14.7k | } |
193 | | |
194 | 997k | void Block::insert(ColumnWithTypeAndName&& elem) { |
195 | 997k | data.emplace_back(std::move(elem)); |
196 | 997k | } |
197 | | |
198 | 3 | void Block::erase(const std::set<size_t>& positions) { |
199 | 3 | for (unsigned long position : std::ranges::reverse_view(positions)) { |
200 | 2 | erase(position); |
201 | 2 | } |
202 | 3 | } |
203 | | |
204 | 18.5k | void Block::erase_tail(size_t start) { |
205 | 18.5k | DCHECK(start <= data.size()) << fmt::format( |
206 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size()); |
207 | 18.5k | data.erase(data.begin() + start, data.end()); |
208 | 18.5k | } |
209 | | |
210 | 123k | void Block::erase(size_t position) { |
211 | 123k | DCHECK(!data.empty()) << "Block is empty"; |
212 | 123k | DCHECK_LT(position, data.size()) << fmt::format( |
213 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size() - 1); |
214 | | |
215 | 123k | erase_impl(position); |
216 | 123k | } |
217 | | |
218 | 123k | void Block::erase_impl(size_t position) { |
219 | 123k | data.erase(data.begin() + position); |
220 | 123k | } |
221 | | |
222 | 75.9k | ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { |
223 | 75.9k | if (position >= data.size()) { |
224 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
225 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
226 | 0 | data.size(), dump_names()); |
227 | 0 | } |
228 | 75.9k | return data[position]; |
229 | 75.9k | } |
230 | | |
231 | 97 | const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const { |
232 | 97 | if (position >= data.size()) { |
233 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
234 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
235 | 0 | data.size(), dump_names()); |
236 | 0 | } |
237 | 97 | return data[position]; |
238 | 97 | } |
239 | | |
240 | 33 | int Block::get_position_by_name(const std::string& name) const { |
241 | 251 | for (int i = 0; i < data.size(); i++) { |
242 | 250 | if (data[i].name == name) { |
243 | 32 | return i; |
244 | 32 | } |
245 | 250 | } |
246 | 1 | return -1; |
247 | 33 | } |
248 | | |
249 | 3 | void Block::check_number_of_rows(bool allow_null_columns) const { |
250 | 3 | ssize_t rows = -1; |
251 | 7 | for (const auto& elem : data) { |
252 | 7 | if (!elem.column && allow_null_columns) { |
253 | 2 | continue; |
254 | 2 | } |
255 | | |
256 | 5 | if (!elem.column) { |
257 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
258 | 1 | "Column {} in block is nullptr, in method check_number_of_rows.", |
259 | 1 | elem.name); |
260 | 1 | } |
261 | | |
262 | 4 | ssize_t size = elem.column->size(); |
263 | | |
264 | 4 | if (rows == -1) { |
265 | 3 | rows = size; |
266 | 3 | } else if (rows != size) { |
267 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}", |
268 | 1 | dump_structure()); |
269 | 1 | } |
270 | 4 | } |
271 | 3 | } |
272 | | |
273 | 1.81M | Status Block::check_type_and_column() const { |
274 | 1.81M | #ifndef NDEBUG |
275 | 1.81M | for (const auto& elem : data) { |
276 | 30.5k | if (!elem.column) { |
277 | 0 | continue; |
278 | 0 | } |
279 | 30.5k | if (!elem.type) { |
280 | 0 | continue; |
281 | 0 | } |
282 | | |
283 | | // ColumnNothing is a special column type, it is used to represent a column that |
284 | | // is not materialized, so we don't need to check it. |
285 | 30.5k | if (check_and_get_column<ColumnNothing>(elem.column.get())) { |
286 | 0 | continue; |
287 | 0 | } |
288 | | |
289 | 30.5k | const auto& type = elem.type; |
290 | 30.5k | const auto& column = elem.column; |
291 | | |
292 | 30.5k | RETURN_IF_ERROR(column->column_self_check()); |
293 | 30.5k | auto st = type->check_column(*column); |
294 | 30.5k | if (!st.ok()) { |
295 | 1 | return Status::InternalError( |
296 | 1 | "Column {} in block is not compatible with its column type :{}, data type :{}, " |
297 | 1 | "error: {}", |
298 | 1 | elem.name, column->get_name(), type->get_name(), st.msg()); |
299 | 1 | } |
300 | 30.5k | } |
301 | 1.81M | #endif |
302 | 1.81M | return Status::OK(); |
303 | 1.81M | } |
304 | | |
305 | 49.1M | size_t Block::rows() const { |
306 | 49.1M | for (const auto& elem : data) { |
307 | 46.5M | if (elem.column) { |
308 | 46.5M | return elem.column->size(); |
309 | 46.5M | } |
310 | 46.5M | } |
311 | | |
312 | 2.69M | return 0; |
313 | 49.1M | } |
314 | | |
315 | 6 | void Block::set_num_rows(size_t length) { |
316 | 6 | if (rows() > length) { |
317 | 4 | for (auto& elem : data) { |
318 | 4 | if (elem.column) { |
319 | 4 | elem.column = elem.column->shrink(length); |
320 | 4 | } |
321 | 4 | } |
322 | 4 | } |
323 | 6 | } |
324 | | |
325 | 1 | void Block::skip_num_rows(int64_t& length) { |
326 | 1 | auto origin_rows = rows(); |
327 | 1 | if (origin_rows <= length) { |
328 | 0 | clear(); |
329 | 0 | length -= origin_rows; |
330 | 1 | } else { |
331 | 1 | for (auto& elem : data) { |
332 | 1 | if (elem.column) { |
333 | 1 | elem.column = elem.column->cut(length, origin_rows - length); |
334 | 1 | } |
335 | 1 | } |
336 | 1 | } |
337 | 1 | } |
338 | | |
339 | 2.95k | size_t Block::bytes() const { |
340 | 2.95k | size_t res = 0; |
341 | 8.08k | for (const auto& elem : data) { |
342 | 8.08k | if (!elem.column) { |
343 | 0 | std::stringstream ss; |
344 | 0 | for (const auto& e : data) { |
345 | 0 | ss << e.name + " "; |
346 | 0 | } |
347 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
348 | 0 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
349 | 0 | elem.name, ss.str()); |
350 | 0 | } |
351 | 8.08k | res += elem.column->byte_size(); |
352 | 8.08k | } |
353 | | |
354 | 2.95k | return res; |
355 | 2.95k | } |
356 | | |
357 | 1 | std::string Block::columns_bytes() const { |
358 | 1 | std::stringstream res; |
359 | 1 | res << "column bytes: ["; |
360 | 2 | for (const auto& elem : data) { |
361 | 2 | if (!elem.column) { |
362 | 1 | std::stringstream ss; |
363 | 3 | for (const auto& e : data) { |
364 | 3 | ss << e.name + " "; |
365 | 3 | } |
366 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
367 | 1 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
368 | 1 | elem.name, ss.str()); |
369 | 1 | } |
370 | 1 | res << ", " << elem.column->byte_size(); |
371 | 1 | } |
372 | 0 | res << "]"; |
373 | 0 | return res.str(); |
374 | 1 | } |
375 | | |
376 | 193k | size_t Block::allocated_bytes() const { |
377 | 193k | size_t res = 0; |
378 | 378k | for (const auto& elem : data) { |
379 | 378k | if (!elem.column) { |
380 | | // Sometimes if expr failed, then there will be a nullptr |
381 | | // column left in the block. |
382 | 1 | continue; |
383 | 1 | } |
384 | 378k | res += elem.column->allocated_bytes(); |
385 | 378k | } |
386 | | |
387 | 193k | return res; |
388 | 193k | } |
389 | | |
390 | 7 | std::string Block::dump_names() const { |
391 | 7 | std::string out; |
392 | 20 | for (auto it = data.begin(); it != data.end(); ++it) { |
393 | 13 | if (it != data.begin()) { |
394 | 6 | out += ", "; |
395 | 6 | } |
396 | 13 | out += it->name; |
397 | 13 | } |
398 | 7 | return out; |
399 | 7 | } |
400 | | |
401 | 6 | std::string Block::dump_types() const { |
402 | 6 | std::string out; |
403 | 18 | for (auto it = data.begin(); it != data.end(); ++it) { |
404 | 12 | if (it != data.begin()) { |
405 | 6 | out += ", "; |
406 | 6 | } |
407 | 12 | out += it->type->get_name(); |
408 | 12 | } |
409 | 6 | return out; |
410 | 6 | } |
411 | | |
412 | 31 | std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
413 | 31 | std::stringstream ss; |
414 | | |
415 | 31 | std::vector<std::string> headers; |
416 | 31 | headers.reserve(columns()); |
417 | 46 | for (const auto& it : data) { |
418 | | // fmt::format is from the {fmt} library, you might be using std::format in C++20 |
419 | | // If not, you can build the string with a stringstream as a fallback. |
420 | 46 | headers.push_back(fmt::format("{}({})", it.name, it.type->get_name())); |
421 | 46 | } |
422 | | |
423 | 31 | size_t start_row = std::min(begin, rows()); |
424 | 31 | size_t end_row = std::min(rows(), begin + row_limit); |
425 | | |
426 | 31 | auto format_options = DataTypeSerDe::get_default_format_options(); |
427 | 31 | auto time_zone = cctz::utc_time_zone(); |
428 | 31 | format_options.timezone = &time_zone; |
429 | | |
430 | 31 | ss << "["; |
431 | 3.59k | for (size_t row_num = start_row; row_num < end_row; ++row_num) { |
432 | 3.56k | if (row_num > start_row) { |
433 | 3.53k | ss << ","; |
434 | 3.53k | } |
435 | 3.56k | ss << "{"; |
436 | 8.33k | for (size_t i = 0; i < columns(); ++i) { |
437 | 4.77k | if (i > 0) { |
438 | 1.21k | ss << ","; |
439 | 1.21k | } |
440 | 4.77k | ss << "\"" << headers[i] << "\":"; |
441 | 4.77k | std::string s; |
442 | | |
443 | | // This value-extraction logic is preserved from your original function |
444 | | // to maintain consistency, especially for handling nullability mismatches. |
445 | 4.77k | if (data[i].column && data[i].type->is_nullable() && |
446 | 4.77k | !data[i].column->is_concrete_nullable()) { |
447 | | // This branch handles a specific internal representation of nullable columns. |
448 | | // The original code would assert here if allow_null_mismatch is false. |
449 | 0 | assert(allow_null_mismatch); |
450 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
451 | 0 | ->get_nested_type() |
452 | 0 | ->to_string(*data[i].column, row_num, format_options); |
453 | 4.77k | } else { |
454 | | // This is the standard path. The to_string method is expected to correctly |
455 | | // handle all cases, including when the column is null (e.g., by returning "NULL"). |
456 | 4.77k | s = data[i].to_string(row_num, format_options); |
457 | 4.77k | } |
458 | 4.77k | ss << "\"" << s << "\""; |
459 | 4.77k | } |
460 | 3.56k | ss << "}"; |
461 | 3.56k | } |
462 | 31 | ss << "]"; |
463 | 31 | return ss.str(); |
464 | 31 | } |
465 | | |
466 | 853 | std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
467 | 853 | std::vector<std::string> headers; |
468 | 853 | std::vector<int> headers_size; |
469 | 2.06k | for (const auto& it : data) { |
470 | 2.06k | std::string s = fmt::format("{}({})", it.name, it.type->get_name()); |
471 | 2.06k | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
472 | 2.06k | headers.emplace_back(s); |
473 | 2.06k | } |
474 | | |
475 | 853 | std::stringstream out; |
476 | | // header upper line |
477 | 2.14k | auto line = [&]() { |
478 | 7.94k | for (size_t i = 0; i < columns(); ++i) { |
479 | 5.79k | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
480 | 5.79k | } |
481 | 2.14k | out << std::setw(1) << "+" << std::endl; |
482 | 2.14k | }; |
483 | 853 | line(); |
484 | | // header text |
485 | 2.92k | for (size_t i = 0; i < columns(); ++i) { |
486 | 2.06k | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
487 | 2.06k | << headers[i]; |
488 | 2.06k | } |
489 | 853 | out << std::setw(1) << "|" << std::endl; |
490 | | // header bottom line |
491 | 853 | line(); |
492 | 853 | if (rows() == 0) { |
493 | 414 | return out.str(); |
494 | 414 | } |
495 | | |
496 | 439 | auto format_options = DataTypeSerDe::get_default_format_options(); |
497 | 439 | auto time_zone = cctz::utc_time_zone(); |
498 | 439 | format_options.timezone = &time_zone; |
499 | | |
500 | | // content |
501 | 12.1k | for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { |
502 | 32.3k | for (size_t i = 0; i < columns(); ++i) { |
503 | 20.5k | if (!data[i].column || data[i].column->empty()) { |
504 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
505 | 0 | << std::right; |
506 | 0 | continue; |
507 | 0 | } |
508 | 20.5k | std::string s; |
509 | 20.5k | if (data[i].column) { // column may be const |
510 | | // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null |
511 | 20.5k | if (data[i].type->is_nullable() && !data[i].column->is_concrete_nullable()) { |
512 | 0 | assert(allow_null_mismatch); |
513 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
514 | 0 | ->get_nested_type() |
515 | 0 | ->to_string(*data[i].column, row_num, format_options); |
516 | 20.5k | } else { |
517 | 20.5k | s = data[i].to_string(row_num, format_options); |
518 | 20.5k | } |
519 | 20.5k | } |
520 | 20.5k | if (s.length() > headers_size[i]) { |
521 | 2.12k | s = s.substr(0, headers_size[i] - 3) + "..."; |
522 | 2.12k | } |
523 | 20.5k | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
524 | 20.5k | << std::right << s; |
525 | 20.5k | } |
526 | 11.7k | out << std::setw(1) << "|" << std::endl; |
527 | 11.7k | } |
528 | | // bottom line |
529 | 439 | line(); |
530 | 439 | if (row_limit < rows()) { |
531 | 112 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
532 | 112 | } |
533 | 439 | return out.str(); |
534 | 439 | } |
535 | | |
536 | 1 | std::string Block::dump_one_line(size_t row, int column_end) const { |
537 | 1 | assert(column_end <= columns()); |
538 | 1 | fmt::memory_buffer line; |
539 | | |
540 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
541 | 1 | auto time_zone = cctz::utc_time_zone(); |
542 | 1 | format_options.timezone = &time_zone; |
543 | | |
544 | 3 | for (int i = 0; i < column_end; ++i) { |
545 | 2 | if (LIKELY(i != 0)) { |
546 | | // TODO: need more effective function of to string. now the impl is slow |
547 | 1 | fmt::format_to(line, " {}", data[i].to_string(row, format_options)); |
548 | 1 | } else { |
549 | 1 | fmt::format_to(line, "{}", data[i].to_string(row, format_options)); |
550 | 1 | } |
551 | 2 | } |
552 | 1 | return fmt::to_string(line); |
553 | 1 | } |
554 | | |
555 | 46 | std::string Block::dump_structure() const { |
556 | 46 | std::string out; |
557 | 371 | for (auto it = data.begin(); it != data.end(); ++it) { |
558 | 325 | if (it != data.begin()) { |
559 | 279 | out += ", \n"; |
560 | 279 | } |
561 | 325 | out += it->dump_structure(); |
562 | 325 | } |
563 | 46 | return out; |
564 | 46 | } |
565 | | |
566 | 48.8k | Block Block::clone_empty() const { |
567 | 48.8k | Block res; |
568 | 95.6k | for (const auto& elem : data) { |
569 | 95.6k | res.insert(elem.clone_empty()); |
570 | 95.6k | } |
571 | 48.8k | return res; |
572 | 48.8k | } |
573 | | |
574 | 31 | MutableColumns Block::clone_empty_columns() const { |
575 | 31 | size_t num_columns = data.size(); |
576 | 31 | MutableColumns columns(num_columns); |
577 | 140 | for (size_t i = 0; i < num_columns; ++i) { |
578 | 109 | columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column(); |
579 | 109 | } |
580 | 31 | return columns; |
581 | 31 | } |
582 | | |
583 | 25.8k | Columns Block::get_columns() const { |
584 | 25.8k | size_t num_columns = data.size(); |
585 | 25.8k | Columns columns(num_columns); |
586 | 113k | for (size_t i = 0; i < num_columns; ++i) { |
587 | 87.1k | columns[i] = data[i].column->convert_to_full_column_if_const(); |
588 | 87.1k | } |
589 | 25.8k | return columns; |
590 | 25.8k | } |
591 | | |
592 | 507 | Columns Block::get_columns_and_convert() { |
593 | 507 | size_t num_columns = data.size(); |
594 | 507 | Columns columns(num_columns); |
595 | 1.03k | for (size_t i = 0; i < num_columns; ++i) { |
596 | 528 | data[i].column = data[i].column->convert_to_full_column_if_const(); |
597 | 528 | columns[i] = data[i].column; |
598 | 528 | } |
599 | 507 | return columns; |
600 | 507 | } |
601 | | |
602 | 149k | MutableColumns Block::mutate_columns() { |
603 | 149k | size_t num_columns = data.size(); |
604 | 149k | MutableColumns columns(num_columns); |
605 | 442k | for (size_t i = 0; i < num_columns; ++i) { |
606 | 293k | DCHECK(data[i].type); |
607 | 293k | columns[i] = data[i].column ? (*std::move(data[i].column)).mutate() |
608 | 293k | : data[i].type->create_column(); |
609 | 293k | } |
610 | 149k | return columns; |
611 | 149k | } |
612 | | |
613 | 2.42k | void Block::set_columns(MutableColumns&& columns) { |
614 | 2.42k | DCHECK_GE(columns.size(), data.size()) |
615 | 0 | << fmt::format("Invalid size of columns, columns size: {}, data size: {}", |
616 | 0 | columns.size(), data.size()); |
617 | 2.42k | size_t num_columns = data.size(); |
618 | 6.26k | for (size_t i = 0; i < num_columns; ++i) { |
619 | 3.83k | data[i].column = std::move(columns[i]); |
620 | 3.83k | } |
621 | 2.42k | } |
622 | | |
623 | 0 | Block Block::clone_with_columns(MutableColumns&& columns) const { |
624 | 0 | Block res; |
625 | |
|
626 | 0 | size_t num_columns = data.size(); |
627 | 0 | for (size_t i = 0; i < num_columns; ++i) { |
628 | 0 | res.insert({std::move(columns[i]), data[i].type, data[i].name}); |
629 | 0 | } |
630 | |
|
631 | 0 | return res; |
632 | 0 | } |
633 | | |
634 | 25 | Block Block::clone_without_columns(const std::vector<int>* column_offset) const { |
635 | 25 | Block res; |
636 | | |
637 | 25 | if (column_offset != nullptr) { |
638 | 12 | size_t num_columns = column_offset->size(); |
639 | 73 | for (size_t i = 0; i < num_columns; ++i) { |
640 | 61 | res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name}); |
641 | 61 | } |
642 | 13 | } else { |
643 | 13 | size_t num_columns = data.size(); |
644 | 35 | for (size_t i = 0; i < num_columns; ++i) { |
645 | 22 | res.insert({nullptr, data[i].type, data[i].name}); |
646 | 22 | } |
647 | 13 | } |
648 | 25 | return res; |
649 | 25 | } |
650 | | |
651 | 55.5k | const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { |
652 | 55.5k | return data; |
653 | 55.5k | } |
654 | | |
655 | 145k | std::vector<std::string> Block::get_names() const { |
656 | 145k | std::vector<std::string> res; |
657 | 145k | res.reserve(columns()); |
658 | | |
659 | 284k | for (const auto& elem : data) { |
660 | 284k | res.push_back(elem.name); |
661 | 284k | } |
662 | | |
663 | 145k | return res; |
664 | 145k | } |
665 | | |
666 | 145k | DataTypes Block::get_data_types() const { |
667 | 145k | DataTypes res; |
668 | 145k | res.reserve(columns()); |
669 | | |
670 | 284k | for (const auto& elem : data) { |
671 | 284k | res.push_back(elem.type); |
672 | 284k | } |
673 | | |
674 | 145k | return res; |
675 | 145k | } |
676 | | |
677 | 52.7k | void Block::clear() { |
678 | 52.7k | data.clear(); |
679 | 52.7k | } |
680 | | |
681 | 948k | void Block::clear_column_data(int64_t column_size) noexcept { |
682 | 948k | SCOPED_SKIP_MEMORY_CHECK(); |
683 | | // data.size() greater than column_size, means here have some |
684 | | // function exec result in block, need erase it here |
685 | 948k | if (column_size != -1 and data.size() > column_size) { |
686 | 2.24k | for (int64_t i = data.size() - 1; i >= column_size; --i) { |
687 | 1.12k | erase(i); |
688 | 1.12k | } |
689 | 1.12k | } |
690 | 948k | for (auto& d : data) { |
691 | 89.0k | if (d.column) { |
692 | | // Temporarily disable reference count check because a column might be referenced multiple times within a block. |
693 | | // Queries like this: `select c, c from t1;` |
694 | 89.0k | (*std::move(d.column)).assume_mutable()->clear(); |
695 | 89.0k | } |
696 | 89.0k | } |
697 | 948k | } |
698 | | |
699 | | void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
700 | 48.0k | bool need_keep_first) { |
701 | 48.0k | if (data.size() >= column_keep_flags.size()) { |
702 | 48.0k | auto origin_rows = rows(); |
703 | 142k | for (size_t i = 0; i < column_keep_flags.size(); ++i) { |
704 | 94.1k | if (!column_keep_flags[i]) { |
705 | 36.8k | data[i].column = data[i].column->clone_empty(); |
706 | 36.8k | } |
707 | 94.1k | } |
708 | | |
709 | 48.0k | if (need_keep_first && !column_keep_flags[0]) { |
710 | 1 | auto first_column = data[0].column->clone_empty(); |
711 | 1 | first_column->resize(origin_rows); |
712 | 1 | data[0].column = std::move(first_column); |
713 | 1 | } |
714 | 48.0k | } |
715 | 48.0k | } |
716 | | |
717 | 1.29k | void Block::swap(Block& other) noexcept { |
718 | 1.29k | SCOPED_SKIP_MEMORY_CHECK(); |
719 | 1.29k | data.swap(other.data); |
720 | 1.29k | } |
721 | | |
722 | 1.46k | void Block::swap(Block&& other) noexcept { |
723 | 1.46k | SCOPED_SKIP_MEMORY_CHECK(); |
724 | 1.46k | data = std::move(other.data); |
725 | 1.46k | } |
726 | | |
727 | 3 | void Block::shuffle_columns(const std::vector<int>& result_column_ids) { |
728 | 3 | Container tmp_data; |
729 | 3 | tmp_data.reserve(result_column_ids.size()); |
730 | 5 | for (const int result_column_id : result_column_ids) { |
731 | 5 | tmp_data.push_back(data[result_column_id]); |
732 | 5 | } |
733 | 3 | data = std::move(tmp_data); |
734 | 3 | } |
735 | | |
736 | 2 | void Block::update_hash(SipHash& hash) const { |
737 | 8 | for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { |
738 | 12 | for (const auto& col : data) { |
739 | 12 | col.column->update_hash_with_value(row_no, hash); |
740 | 12 | } |
741 | 6 | } |
742 | 2 | } |
743 | | |
744 | | void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
745 | 1.02k | const IColumn::Filter& filter) { |
746 | 1.02k | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
747 | 2.35k | for (const auto& col : columns_to_filter) { |
748 | 2.35k | auto& column = block->get_by_position(col).column; |
749 | 2.35k | if (column->size() == count) { |
750 | 2.31k | continue; |
751 | 2.31k | } |
752 | 43 | if (count == 0) { |
753 | 2 | if (column->is_exclusive()) { |
754 | 0 | column->assume_mutable()->clear(); |
755 | 2 | } else { |
756 | 2 | column = column->clone_empty(); |
757 | 2 | } |
758 | 2 | continue; |
759 | 2 | } |
760 | 41 | if (column->is_exclusive()) { |
761 | | // COW: safe to mutate in-place since we have exclusive ownership |
762 | 23 | const auto result_size = column->assume_mutable()->filter(filter); |
763 | 23 | if (result_size != count) [[unlikely]] { |
764 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
765 | 0 | "result_size not equal with filter_size, result_size={}, " |
766 | 0 | "filter_size={}", |
767 | 0 | result_size, count); |
768 | 0 | } |
769 | 23 | } else { |
770 | | // COW: must create a copy since column is shared |
771 | 18 | column = column->filter(filter, count); |
772 | 18 | } |
773 | 41 | } |
774 | 1.02k | } |
775 | | |
776 | | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter, |
777 | 1 | uint32_t column_to_keep) { |
778 | 1 | std::vector<uint32_t> columns_to_filter; |
779 | 1 | columns_to_filter.resize(column_to_keep); |
780 | 3 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
781 | 2 | columns_to_filter[i] = i; |
782 | 2 | } |
783 | 1 | filter_block_internal(block, columns_to_filter, filter); |
784 | 1 | } |
785 | | |
786 | 8 | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) { |
787 | 8 | const size_t count = |
788 | 8 | filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
789 | 24 | for (int i = 0; i < block->columns(); ++i) { |
790 | 16 | auto& column = block->get_by_position(i).column; |
791 | 16 | if (column->is_exclusive()) { |
792 | 16 | column->assume_mutable()->filter(filter); |
793 | 16 | } else { |
794 | 0 | column = column->filter(filter, count); |
795 | 0 | } |
796 | 16 | } |
797 | 8 | } |
798 | | |
799 | | Status Block::append_to_block_by_selector(MutableBlock* dst, |
800 | 1 | const IColumn::Selector& selector) const { |
801 | 1 | RETURN_IF_CATCH_EXCEPTION({ |
802 | 1 | DCHECK_EQ(data.size(), dst->mutable_columns().size()); |
803 | 1 | for (size_t i = 0; i < data.size(); i++) { |
804 | | // FIXME: this is a quickfix. we assume that only partition functions make there some |
805 | 1 | if (!is_column_const(*data[i].column)) { |
806 | 1 | data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); |
807 | 1 | } |
808 | 1 | } |
809 | 1 | }); |
810 | 1 | return Status::OK(); |
811 | 1 | } |
812 | | |
813 | | Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
814 | 1.02k | size_t filter_column_id, size_t column_to_keep) { |
815 | 1.02k | const auto& filter_column = block->get_by_position(filter_column_id).column; |
816 | 1.02k | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
817 | 1 | const auto& nested_column = nullable_column->get_nested_column_ptr(); |
818 | | |
819 | 1 | MutableColumnPtr mutable_holder = |
820 | 1 | nested_column->use_count() == 1 |
821 | 1 | ? nested_column->assume_mutable() |
822 | 1 | : nested_column->clone_resized(nested_column->size()); |
823 | | |
824 | 1 | auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get()); |
825 | 1 | const auto* __restrict null_map = nullable_column->get_null_map_data().data(); |
826 | 1 | IColumn::Filter& filter = concrete_column->get_data(); |
827 | 1 | auto* __restrict filter_data = filter.data(); |
828 | | |
829 | 1 | const size_t size = filter.size(); |
830 | 4 | for (size_t i = 0; i < size; ++i) { |
831 | 3 | filter_data[i] &= !null_map[i]; |
832 | 3 | } |
833 | 1 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
834 | 1.01k | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
835 | 2 | bool ret = const_column->get_bool(0); |
836 | 2 | if (!ret) { |
837 | 2 | for (const auto& col : columns_to_filter) { |
838 | 2 | auto& column = block->get_by_position(col).column; |
839 | 2 | if (column->is_exclusive()) { |
840 | 2 | column->assume_mutable()->clear(); |
841 | 2 | } else { |
842 | 0 | column = column->clone_empty(); |
843 | 0 | } |
844 | 2 | } |
845 | 1 | } |
846 | 1.01k | } else { |
847 | 1.01k | const IColumn::Filter& filter = |
848 | 1.01k | assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data(); |
849 | 1.01k | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
850 | 1.01k | } |
851 | | |
852 | 1.02k | erase_useless_column(block, column_to_keep); |
853 | 1.02k | return Status::OK(); |
854 | 1.02k | } |
855 | | |
856 | 1.01k | Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) { |
857 | 1.01k | std::vector<uint32_t> columns_to_filter; |
858 | 1.01k | columns_to_filter.resize(column_to_keep); |
859 | 3.34k | for (uint32_t i = 0; i < column_to_keep; ++i) { |
860 | 2.32k | columns_to_filter[i] = i; |
861 | 2.32k | } |
862 | 1.01k | return filter_block(block, columns_to_filter, filter_column_id, column_to_keep); |
863 | 1.01k | } |
864 | | |
865 | | Status Block::serialize(int be_exec_version, PBlock* pblock, |
866 | | /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, |
867 | | size_t* compressed_bytes, int64_t* compress_time, |
868 | | segment_v2::CompressionTypePB compression_type, |
869 | 835 | bool allow_transfer_large_data) const { |
870 | 835 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
871 | 835 | pblock->set_be_exec_version(be_exec_version); |
872 | | |
873 | | // calc uncompressed size for allocation |
874 | 835 | size_t content_uncompressed_size = 0; |
875 | 1.42k | for (const auto& c : *this) { |
876 | 1.42k | PColumnMeta* pcm = pblock->add_column_metas(); |
877 | 1.42k | c.to_pb_column_meta(pcm); |
878 | 1.42k | DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type"; |
879 | | // get serialized size |
880 | 1.42k | content_uncompressed_size += |
881 | 1.42k | c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version()); |
882 | 1.42k | } |
883 | | |
884 | | // serialize data values |
885 | | // when data type is HLL, content_uncompressed_size maybe larger than real size. |
886 | 835 | std::string column_values; |
887 | 835 | try { |
888 | | // TODO: After support c++23, we should use resize_and_overwrite to replace resize |
889 | 835 | column_values.resize(content_uncompressed_size); |
890 | 835 | } catch (...) { |
891 | 0 | std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", |
892 | 0 | content_uncompressed_size); |
893 | 0 | LOG(WARNING) << msg; |
894 | 0 | return Status::BufferAllocFailed(msg); |
895 | 0 | } |
896 | 829 | char* buf = column_values.data(); |
897 | | |
898 | 1.42k | for (const auto& c : *this) { |
899 | 1.42k | buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); |
900 | 1.42k | } |
901 | 829 | *uncompressed_bytes = content_uncompressed_size; |
902 | 829 | const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; |
903 | 829 | *compressed_bytes = serialize_bytes; |
904 | 829 | column_values.resize(serialize_bytes); |
905 | | |
906 | | // compress |
907 | 829 | if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { |
908 | 331 | SCOPED_RAW_TIMER(compress_time); |
909 | 331 | pblock->set_compression_type(compression_type); |
910 | 331 | pblock->set_uncompressed_size(serialize_bytes); |
911 | | |
912 | 331 | BlockCompressionCodec* codec; |
913 | 331 | RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); |
914 | | |
915 | 331 | faststring buf_compressed; |
916 | 331 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
917 | 331 | codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); |
918 | 331 | size_t compressed_size = buf_compressed.size(); |
919 | 331 | if (LIKELY(compressed_size < serialize_bytes)) { |
920 | | // TODO: rethink the logic here may copy again ? |
921 | 331 | pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); |
922 | 331 | pblock->set_compressed(true); |
923 | 331 | *compressed_bytes = compressed_size; |
924 | 331 | } else { |
925 | 0 | pblock->set_column_values(std::move(column_values)); |
926 | 0 | } |
927 | | |
928 | 331 | VLOG_ROW << "uncompressed size: " << content_uncompressed_size |
929 | 0 | << ", compressed size: " << compressed_size; |
930 | 498 | } else { |
931 | 498 | pblock->set_column_values(std::move(column_values)); |
932 | 498 | } |
933 | 829 | if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { |
934 | 0 | return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", |
935 | 0 | *compressed_bytes); |
936 | 0 | } |
937 | 829 | return Status::OK(); |
938 | 829 | } |
939 | | |
940 | 240k | size_t MutableBlock::rows() const { |
941 | 240k | for (const auto& column : _columns) { |
942 | 144k | if (column) { |
943 | 144k | return column->size(); |
944 | 144k | } |
945 | 144k | } |
946 | | |
947 | 96.0k | return 0; |
948 | 240k | } |
949 | | |
950 | 0 | void MutableBlock::swap(MutableBlock& another) noexcept { |
951 | 0 | SCOPED_SKIP_MEMORY_CHECK(); |
952 | 0 | _columns.swap(another._columns); |
953 | 0 | _data_types.swap(another._data_types); |
954 | 0 | _names.swap(another._names); |
955 | 0 | } |
956 | | |
957 | 0 | void MutableBlock::add_row(const Block* block, int row) { |
958 | 0 | const auto& block_data = block->get_columns_with_type_and_name(); |
959 | 0 | for (size_t i = 0; i < _columns.size(); ++i) { |
960 | 0 | _columns[i]->insert_from(*block_data[i].column.get(), row); |
961 | 0 | } |
962 | 0 | } |
963 | | |
964 | | Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, |
965 | 74 | const uint32_t* row_end, const std::vector<int>* column_offset) { |
966 | 74 | RETURN_IF_CATCH_EXCEPTION({ |
967 | 74 | DCHECK_LE(columns(), block->columns()); |
968 | 74 | if (column_offset != nullptr) { |
969 | 74 | DCHECK_EQ(columns(), column_offset->size()); |
970 | 74 | } |
971 | 74 | const auto& block_data = block->get_columns_with_type_and_name(); |
972 | 74 | for (size_t i = 0; i < _columns.size(); ++i) { |
973 | 74 | const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i]; |
974 | 74 | DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); |
975 | 74 | auto& dst = _columns[i]; |
976 | 74 | const auto& src = *src_col.column.get(); |
977 | 74 | DCHECK_GE(src.size(), row_end - row_begin); |
978 | 74 | dst->insert_indices_from(src, row_begin, row_end); |
979 | 74 | } |
980 | 74 | }); |
981 | 74 | return Status::OK(); |
982 | 74 | } |
983 | | |
984 | 142 | Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { |
985 | 142 | RETURN_IF_CATCH_EXCEPTION({ |
986 | 142 | DCHECK_LE(columns(), block->columns()); |
987 | 142 | const auto& block_data = block->get_columns_with_type_and_name(); |
988 | 142 | for (size_t i = 0; i < _columns.size(); ++i) { |
989 | 142 | DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); |
990 | 142 | auto& dst = _columns[i]; |
991 | 142 | const auto& src = *block_data[i].column.get(); |
992 | 142 | dst->insert_range_from(src, row_begin, length); |
993 | 142 | } |
994 | 142 | }); |
995 | 142 | return Status::OK(); |
996 | 142 | } |
997 | | |
998 | 144k | Block MutableBlock::to_block(int start_column) { |
999 | 144k | return to_block(start_column, (int)_columns.size()); |
1000 | 144k | } |
1001 | | |
1002 | 144k | Block MutableBlock::to_block(int start_column, int end_column) { |
1003 | 144k | ColumnsWithTypeAndName columns_with_schema; |
1004 | 144k | columns_with_schema.reserve(end_column - start_column); |
1005 | 428k | for (size_t i = start_column; i < end_column; ++i) { |
1006 | 283k | columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); |
1007 | 283k | } |
1008 | 144k | return {columns_with_schema}; |
1009 | 144k | } |
1010 | | |
1011 | 1 | std::string MutableBlock::dump_data_json(size_t row_limit) const { |
1012 | 1 | std::stringstream ss; |
1013 | 1 | std::vector<std::string> headers; |
1014 | | |
1015 | 1 | headers.reserve(columns()); |
1016 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1017 | 1 | headers.push_back(_data_types[i]->get_name()); |
1018 | 1 | } |
1019 | 1 | size_t num_rows_to_dump = std::min(rows(), row_limit); |
1020 | 1 | ss << "["; |
1021 | | |
1022 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1023 | 1 | auto time_zone = cctz::utc_time_zone(); |
1024 | 1 | format_options.timezone = &time_zone; |
1025 | | |
1026 | 4 | for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) { |
1027 | 3 | if (row_num > 0) { |
1028 | 2 | ss << ","; |
1029 | 2 | } |
1030 | 3 | ss << "{"; |
1031 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1032 | 3 | if (i > 0) { |
1033 | 0 | ss << ","; |
1034 | 0 | } |
1035 | 3 | ss << "\"" << headers[i] << "\":"; |
1036 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1037 | 3 | ss << "\"" << s << "\""; |
1038 | 3 | } |
1039 | 3 | ss << "}"; |
1040 | 3 | } |
1041 | 1 | ss << "]"; |
1042 | 1 | return ss.str(); |
1043 | 1 | } |
1044 | | |
1045 | 1 | std::string MutableBlock::dump_data(size_t row_limit) const { |
1046 | 1 | std::vector<std::string> headers; |
1047 | 1 | std::vector<int> headers_size; |
1048 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1049 | 1 | std::string s = _data_types[i]->get_name(); |
1050 | 1 | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
1051 | 1 | headers.emplace_back(s); |
1052 | 1 | } |
1053 | | |
1054 | 1 | std::stringstream out; |
1055 | | // header upper line |
1056 | 3 | auto line = [&]() { |
1057 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1058 | 3 | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
1059 | 3 | } |
1060 | 3 | out << std::setw(1) << "+" << std::endl; |
1061 | 3 | }; |
1062 | 1 | line(); |
1063 | | // header text |
1064 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1065 | 1 | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
1066 | 1 | << headers[i]; |
1067 | 1 | } |
1068 | 1 | out << std::setw(1) << "|" << std::endl; |
1069 | | // header bottom line |
1070 | 1 | line(); |
1071 | 1 | if (rows() == 0) { |
1072 | 0 | return out.str(); |
1073 | 0 | } |
1074 | | |
1075 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1076 | 1 | auto time_zone = cctz::utc_time_zone(); |
1077 | 1 | format_options.timezone = &time_zone; |
1078 | | |
1079 | | // content |
1080 | 4 | for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { |
1081 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1082 | 3 | if (_columns[i].get()->empty()) { |
1083 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1084 | 0 | << std::right; |
1085 | 0 | continue; |
1086 | 0 | } |
1087 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1088 | 3 | if (s.length() > headers_size[i]) { |
1089 | 0 | s = s.substr(0, headers_size[i] - 3) + "..."; |
1090 | 0 | } |
1091 | 3 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1092 | 3 | << std::right << s; |
1093 | 3 | } |
1094 | 3 | out << std::setw(1) << "|" << std::endl; |
1095 | 3 | } |
1096 | | // bottom line |
1097 | 1 | line(); |
1098 | 1 | if (row_limit < rows()) { |
1099 | 0 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
1100 | 0 | } |
1101 | 1 | return out.str(); |
1102 | 1 | } |
1103 | | |
1104 | 48.0k | std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const { |
1105 | 48.0k | auto temp_block = Block::create_unique(); |
1106 | 94.1k | for (const auto& d : data) { |
1107 | 94.1k | auto column = d.type->create_column(); |
1108 | 94.1k | if (is_reserve) { |
1109 | 0 | column->reserve(size); |
1110 | 94.1k | } else { |
1111 | 94.1k | column->insert_many_defaults(size); |
1112 | 94.1k | } |
1113 | 94.1k | temp_block->insert({std::move(column), d.type, d.name}); |
1114 | 94.1k | } |
1115 | 48.0k | return temp_block; |
1116 | 48.0k | } |
1117 | | |
1118 | 17.5k | void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { |
1119 | 17.5k | for (auto idx : char_type_idx) { |
1120 | 2 | if (idx < data.size()) { |
1121 | 1 | auto& col_and_name = this->get_by_position(idx); |
1122 | 1 | col_and_name.column->assume_mutable()->shrink_padding_chars(); |
1123 | 1 | } |
1124 | 2 | } |
1125 | 17.5k | } |
1126 | | |
1127 | 96.1k | size_t MutableBlock::allocated_bytes() const { |
1128 | 96.1k | size_t res = 0; |
1129 | 188k | for (const auto& col : _columns) { |
1130 | 188k | if (col) { |
1131 | 188k | res += col->allocated_bytes(); |
1132 | 188k | } |
1133 | 188k | } |
1134 | | |
1135 | 96.1k | return res; |
1136 | 96.1k | } |
1137 | | |
1138 | 1 | void MutableBlock::clear_column_data() noexcept { |
1139 | 1 | SCOPED_SKIP_MEMORY_CHECK(); |
1140 | 1 | for (auto& col : _columns) { |
1141 | 1 | if (col) { |
1142 | 1 | col->clear(); |
1143 | 1 | } |
1144 | 1 | } |
1145 | 1 | } |
1146 | | |
1147 | 4 | std::string MutableBlock::dump_names() const { |
1148 | 4 | std::string out; |
1149 | 14 | for (auto it = _names.begin(); it != _names.end(); ++it) { |
1150 | 10 | if (it != _names.begin()) { |
1151 | 6 | out += ", "; |
1152 | 6 | } |
1153 | 10 | out += *it; |
1154 | 10 | } |
1155 | 4 | return out; |
1156 | 4 | } |
1157 | | #include "common/compile_check_end.h" |
1158 | | } // namespace doris |