be/src/core/block/block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "core/block/block.h" |
22 | | |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/data.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <snappy.h> |
27 | | #include <streamvbyte.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <cassert> |
31 | | #include <iomanip> |
32 | | #include <limits> |
33 | | #include <ranges> |
34 | | |
35 | | #include "agent/be_exec_version_manager.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "common/status.h" |
39 | | #include "core/assert_cast.h" |
40 | | #include "core/column/column.h" |
41 | | #include "core/column/column_const.h" |
42 | | #include "core/column/column_nothing.h" |
43 | | #include "core/column/column_nullable.h" |
44 | | #include "core/column/column_vector.h" |
45 | | #include "core/data_type/data_type_factory.hpp" |
46 | | #include "core/data_type/data_type_nullable.h" |
47 | | #include "core/data_type_serde/data_type_serde.h" |
48 | | #include "runtime/descriptors.h" |
49 | | #include "runtime/runtime_profile.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "util/block_compression.h" |
52 | | #include "util/faststring.h" |
53 | | #include "util/simd/bits.h" |
54 | | #include "util/slice.h" |
55 | | |
56 | | class SipHash; |
57 | | |
58 | | namespace doris::segment_v2 { |
59 | | enum CompressionTypePB : int; |
60 | | } // namespace doris::segment_v2 |
61 | | namespace doris { |
62 | | template <typename T> |
63 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
64 | 2 | RuntimeProfile::Counter* memory_used_counter = nullptr) { |
65 | 2 | T block; |
66 | 6 | while (blocks.try_dequeue(block)) { |
67 | 4 | if (memory_used_counter) { |
68 | 4 | if constexpr (std::is_same_v<T, Block>) { |
69 | 2 | memory_used_counter->update(-block.allocated_bytes()); |
70 | 2 | } else { |
71 | 2 | memory_used_counter->update(-block->allocated_bytes()); |
72 | 2 | } |
73 | 4 | } |
74 | 4 | } |
75 | 2 | } _ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | 2 | if constexpr (std::is_same_v<T, Block>) { | 69 | 2 | memory_used_counter->update(-block.allocated_bytes()); | 70 | | } else { | 71 | | memory_used_counter->update(-block->allocated_bytes()); | 72 | | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | | if constexpr (std::is_same_v<T, Block>) { | 69 | | memory_used_counter->update(-block.allocated_bytes()); | 70 | 2 | } else { | 71 | 2 | memory_used_counter->update(-block->allocated_bytes()); | 72 | 2 | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
|
76 | | |
77 | | template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, |
78 | | RuntimeProfile::Counter* memory_used_counter); |
79 | | template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, |
80 | | RuntimeProfile::Counter* memory_used_counter); |
81 | | |
82 | 2.72k | Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {} |
83 | | |
84 | 257k | Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} |
85 | | |
86 | 56.0k | Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { |
87 | 271k | for (auto* const slot_desc : slots) { |
88 | 271k | auto column_ptr = slot_desc->get_empty_mutable_column(); |
89 | 271k | column_ptr->reserve(block_size); |
90 | 271k | insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), |
91 | 271k | slot_desc->col_name())); |
92 | 271k | } |
93 | 56.0k | } |
94 | | |
95 | 1 | Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) { |
96 | 1 | std::vector<SlotDescriptor*> slot_ptrs(slots.size()); |
97 | 3 | for (size_t i = 0; i < slots.size(); ++i) { |
98 | | // Slots remain unmodified and are used to read column information; const_cast can be employed. |
99 | | // used in src/exec/rowid_fetcher.cpp |
100 | 2 | slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]); |
101 | 2 | } |
102 | 1 | *this = Block(slot_ptrs, block_size); |
103 | 1 | } |
104 | | |
105 | | Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, |
106 | 994 | int64_t* decompress_time) { |
107 | 994 | swap(Block()); |
108 | 994 | int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; |
109 | 994 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
110 | | |
111 | 994 | const char* buf = nullptr; |
112 | 994 | std::string compression_scratch; |
113 | 994 | if (pblock.compressed()) { |
114 | | // Decompress |
115 | 483 | SCOPED_RAW_TIMER(decompress_time); |
116 | 483 | const char* compressed_data = pblock.column_values().c_str(); |
117 | 483 | size_t compressed_size = pblock.column_values().size(); |
118 | 483 | size_t uncompressed_size = 0; |
119 | 483 | if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { |
120 | 483 | BlockCompressionCodec* codec; |
121 | 483 | RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); |
122 | 483 | uncompressed_size = pblock.uncompressed_size(); |
123 | | // Should also use allocator to allocate memory here. |
124 | 483 | compression_scratch.resize(uncompressed_size); |
125 | 483 | Slice decompressed_slice(compression_scratch); |
126 | 483 | RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), |
127 | 483 | &decompressed_slice)); |
128 | 483 | DCHECK(uncompressed_size == decompressed_slice.size); |
129 | 483 | } else { |
130 | 0 | bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, |
131 | 0 | &uncompressed_size); |
132 | 0 | DCHECK(success) << "snappy::GetUncompressedLength failed"; |
133 | 0 | compression_scratch.resize(uncompressed_size); |
134 | 0 | success = snappy::RawUncompress(compressed_data, compressed_size, |
135 | 0 | compression_scratch.data()); |
136 | 0 | DCHECK(success) << "snappy::RawUncompress failed"; |
137 | 0 | } |
138 | 483 | *uncompressed_bytes = uncompressed_size; |
139 | 483 | buf = compression_scratch.data(); |
140 | 511 | } else { |
141 | 511 | buf = pblock.column_values().data(); |
142 | 511 | } |
143 | | |
144 | 1.57k | for (const auto& pcol_meta : pblock.column_metas()) { |
145 | 1.57k | DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta); |
146 | 1.57k | MutableColumnPtr data_column = type->create_column(); |
147 | | // Here will try to allocate large memory, should return error if failed. |
148 | 1.57k | RETURN_IF_CATCH_EXCEPTION( |
149 | 1.57k | buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); |
150 | 1.57k | data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); |
151 | 1.57k | } |
152 | | |
153 | 994 | return Status::OK(); |
154 | 994 | } |
155 | | |
156 | 13.2k | void Block::reserve(size_t count) { |
157 | 13.2k | data.reserve(count); |
158 | 13.2k | } |
159 | | |
160 | 4 | void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { |
161 | 4 | if (position > data.size()) { |
162 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
163 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
164 | 1 | data.size(), dump_names()); |
165 | 1 | } |
166 | | |
167 | 3 | data.emplace(data.begin() + position, elem); |
168 | 3 | } |
169 | | |
170 | 3 | void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { |
171 | 3 | if (position > data.size()) { |
172 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
173 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
174 | 1 | data.size(), dump_names()); |
175 | 1 | } |
176 | | |
177 | 2 | data.emplace(data.begin() + position, std::move(elem)); |
178 | 2 | } |
179 | | |
180 | 8.09k | void Block::clear_names() { |
181 | 177k | for (auto& entry : data) { |
182 | 177k | entry.name.clear(); |
183 | 177k | } |
184 | 8.09k | } |
185 | | |
186 | 14.2k | void Block::insert(const ColumnWithTypeAndName& elem) { |
187 | 14.2k | data.emplace_back(elem); |
188 | 14.2k | } |
189 | | |
190 | 998k | void Block::insert(ColumnWithTypeAndName&& elem) { |
191 | 998k | data.emplace_back(std::move(elem)); |
192 | 998k | } |
193 | | |
194 | 3 | void Block::erase(const std::set<size_t>& positions) { |
195 | 3 | for (unsigned long position : std::ranges::reverse_view(positions)) { |
196 | 2 | erase(position); |
197 | 2 | } |
198 | 3 | } |
199 | | |
200 | 708 | void Block::erase_tail(size_t start) { |
201 | 708 | DCHECK(start <= data.size()) << fmt::format( |
202 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size()); |
203 | 708 | data.erase(data.begin() + start, data.end()); |
204 | 708 | } |
205 | | |
206 | 123k | void Block::erase(size_t position) { |
207 | 123k | DCHECK(!data.empty()) << "Block is empty"; |
208 | 123k | DCHECK_LT(position, data.size()) << fmt::format( |
209 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size() - 1); |
210 | | |
211 | 123k | erase_impl(position); |
212 | 123k | } |
213 | | |
214 | 123k | void Block::erase_impl(size_t position) { |
215 | 123k | data.erase(data.begin() + position); |
216 | 123k | } |
217 | | |
218 | 76.0k | ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { |
219 | 76.0k | if (position >= data.size()) { |
220 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
221 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
222 | 0 | data.size(), dump_names()); |
223 | 0 | } |
224 | 76.0k | return data[position]; |
225 | 76.0k | } |
226 | | |
227 | 109 | const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const { |
228 | 109 | if (position >= data.size()) { |
229 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
230 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
231 | 0 | data.size(), dump_names()); |
232 | 0 | } |
233 | 109 | return data[position]; |
234 | 109 | } |
235 | | |
236 | 51 | int Block::get_position_by_name(const std::string& name) const { |
237 | 404 | for (int i = 0; i < data.size(); i++) { |
238 | 403 | if (data[i].name == name) { |
239 | 50 | return i; |
240 | 50 | } |
241 | 403 | } |
242 | 1 | return -1; |
243 | 51 | } |
244 | | |
245 | 3 | void Block::check_number_of_rows(bool allow_null_columns) const { |
246 | 3 | ssize_t rows = -1; |
247 | 7 | for (const auto& elem : data) { |
248 | 7 | if (!elem.column && allow_null_columns) { |
249 | 2 | continue; |
250 | 2 | } |
251 | | |
252 | 5 | if (!elem.column) { |
253 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
254 | 1 | "Column {} in block is nullptr, in method check_number_of_rows.", |
255 | 1 | elem.name); |
256 | 1 | } |
257 | | |
258 | 4 | ssize_t size = elem.column->size(); |
259 | | |
260 | 4 | if (rows == -1) { |
261 | 3 | rows = size; |
262 | 3 | } else if (rows != size) { |
263 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}", |
264 | 1 | dump_structure()); |
265 | 1 | } |
266 | 4 | } |
267 | 3 | } |
268 | | |
269 | 3.20M | Status Block::check_type_and_column() const { |
270 | 3.20M | #ifndef NDEBUG |
271 | 3.20M | for (const auto& elem : data) { |
272 | 20.7k | if (!elem.column) { |
273 | 0 | continue; |
274 | 0 | } |
275 | 20.7k | if (!elem.type) { |
276 | 0 | continue; |
277 | 0 | } |
278 | | |
279 | | // ColumnNothing is a special column type, it is used to represent a column that |
280 | | // is not materialized, so we don't need to check it. |
281 | 20.7k | if (check_and_get_column<ColumnNothing>(elem.column.get())) { |
282 | 0 | continue; |
283 | 0 | } |
284 | | |
285 | 20.7k | const auto& type = elem.type; |
286 | 20.7k | const auto& column = elem.column; |
287 | | |
288 | 20.7k | RETURN_IF_ERROR(column->column_self_check()); |
289 | 20.7k | auto st = type->check_column(*column); |
290 | 20.7k | if (!st.ok()) { |
291 | 1 | return Status::InternalError( |
292 | 1 | "Column {} in block is not compatible with its column type :{}, data type :{}, " |
293 | 1 | "error: {}", |
294 | 1 | elem.name, column->get_name(), type->get_name(), st.msg()); |
295 | 1 | } |
296 | 20.7k | } |
297 | 3.20M | #endif |
298 | 3.20M | return Status::OK(); |
299 | 3.20M | } |
300 | | |
301 | 51.2M | size_t Block::rows() const { |
302 | 51.2M | for (const auto& elem : data) { |
303 | 46.4M | if (elem.column) { |
304 | 46.4M | return elem.column->size(); |
305 | 46.4M | } |
306 | 46.4M | } |
307 | | |
308 | 4.78M | return 0; |
309 | 51.2M | } |
310 | | |
311 | 6 | void Block::set_num_rows(size_t length) { |
312 | 6 | if (rows() > length) { |
313 | 4 | for (auto& elem : data) { |
314 | 4 | if (elem.column) { |
315 | 4 | elem.column = elem.column->shrink(length); |
316 | 4 | } |
317 | 4 | } |
318 | 4 | } |
319 | 6 | } |
320 | | |
321 | 1 | void Block::skip_num_rows(int64_t& length) { |
322 | 1 | auto origin_rows = rows(); |
323 | 1 | if (origin_rows <= length) { |
324 | 0 | clear(); |
325 | 0 | length -= origin_rows; |
326 | 1 | } else { |
327 | 1 | for (auto& elem : data) { |
328 | 1 | if (elem.column) { |
329 | 1 | elem.column = elem.column->cut(length, origin_rows - length); |
330 | 1 | } |
331 | 1 | } |
332 | 1 | } |
333 | 1 | } |
334 | | |
335 | 12.8k | size_t Block::bytes() const { |
336 | 12.8k | size_t res = 0; |
337 | 27.2k | for (const auto& elem : data) { |
338 | 27.2k | if (!elem.column) { |
339 | 0 | std::stringstream ss; |
340 | 0 | for (const auto& e : data) { |
341 | 0 | ss << e.name + " "; |
342 | 0 | } |
343 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
344 | 0 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
345 | 0 | elem.name, ss.str()); |
346 | 0 | } |
347 | 27.2k | res += elem.column->byte_size(); |
348 | 27.2k | } |
349 | | |
350 | 12.8k | return res; |
351 | 12.8k | } |
352 | | |
353 | 194k | size_t Block::allocated_bytes() const { |
354 | 194k | size_t res = 0; |
355 | 380k | for (const auto& elem : data) { |
356 | 380k | if (!elem.column) { |
357 | | // Sometimes if expr failed, then there will be a nullptr |
358 | | // column left in the block. |
359 | 1 | continue; |
360 | 1 | } |
361 | 380k | res += elem.column->allocated_bytes(); |
362 | 380k | } |
363 | | |
364 | 194k | return res; |
365 | 194k | } |
366 | | |
367 | 7 | std::string Block::dump_names() const { |
368 | 7 | std::string out; |
369 | 20 | for (auto it = data.begin(); it != data.end(); ++it) { |
370 | 13 | if (it != data.begin()) { |
371 | 6 | out += ", "; |
372 | 6 | } |
373 | 13 | out += it->name; |
374 | 13 | } |
375 | 7 | return out; |
376 | 7 | } |
377 | | |
378 | 6 | std::string Block::dump_types() const { |
379 | 6 | std::string out; |
380 | 18 | for (auto it = data.begin(); it != data.end(); ++it) { |
381 | 12 | if (it != data.begin()) { |
382 | 6 | out += ", "; |
383 | 6 | } |
384 | 12 | out += it->type->get_name(); |
385 | 12 | } |
386 | 6 | return out; |
387 | 6 | } |
388 | | |
389 | 31 | std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
390 | 31 | std::stringstream ss; |
391 | | |
392 | 31 | std::vector<std::string> headers; |
393 | 31 | headers.reserve(columns()); |
394 | 46 | for (const auto& it : data) { |
395 | | // fmt::format is from the {fmt} library, you might be using std::format in C++20 |
396 | | // If not, you can build the string with a stringstream as a fallback. |
397 | 46 | headers.push_back(fmt::format("{}({})", it.name, it.type->get_name())); |
398 | 46 | } |
399 | | |
400 | 31 | size_t start_row = std::min(begin, rows()); |
401 | 31 | size_t end_row = std::min(rows(), begin + row_limit); |
402 | | |
403 | 31 | auto format_options = DataTypeSerDe::get_default_format_options(); |
404 | 31 | auto time_zone = cctz::utc_time_zone(); |
405 | 31 | format_options.timezone = &time_zone; |
406 | | |
407 | 31 | ss << "["; |
408 | 3.59k | for (size_t row_num = start_row; row_num < end_row; ++row_num) { |
409 | 3.56k | if (row_num > start_row) { |
410 | 3.53k | ss << ","; |
411 | 3.53k | } |
412 | 3.56k | ss << "{"; |
413 | 8.33k | for (size_t i = 0; i < columns(); ++i) { |
414 | 4.77k | if (i > 0) { |
415 | 1.21k | ss << ","; |
416 | 1.21k | } |
417 | 4.77k | ss << "\"" << headers[i] << "\":"; |
418 | 4.77k | std::string s; |
419 | | |
420 | | // This value-extraction logic is preserved from your original function |
421 | | // to maintain consistency, especially for handling nullability mismatches. |
422 | 4.77k | if (data[i].column && data[i].type->is_nullable() && |
423 | 4.77k | !data[i].column->is_concrete_nullable()) { |
424 | | // This branch handles a specific internal representation of nullable columns. |
425 | | // The original code would assert here if allow_null_mismatch is false. |
426 | 0 | assert(allow_null_mismatch); |
427 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
428 | 0 | ->get_nested_type() |
429 | 0 | ->to_string(*data[i].column, row_num, format_options); |
430 | 4.77k | } else { |
431 | | // This is the standard path. The to_string method is expected to correctly |
432 | | // handle all cases, including when the column is null (e.g., by returning "NULL"). |
433 | 4.77k | s = data[i].to_string(row_num, format_options); |
434 | 4.77k | } |
435 | 4.77k | ss << "\"" << s << "\""; |
436 | 4.77k | } |
437 | 3.56k | ss << "}"; |
438 | 3.56k | } |
439 | 31 | ss << "]"; |
440 | 31 | return ss.str(); |
441 | 31 | } |
442 | | |
443 | 858 | std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
444 | 858 | std::vector<std::string> headers; |
445 | 858 | std::vector<int> headers_size; |
446 | 2.10k | for (const auto& it : data) { |
447 | 2.10k | std::string s = fmt::format("{}({})", it.name, it.type->get_name()); |
448 | 2.10k | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
449 | 2.10k | headers.emplace_back(s); |
450 | 2.10k | } |
451 | | |
452 | 858 | std::stringstream out; |
453 | | // header upper line |
454 | 2.16k | auto line = [&]() { |
455 | 8.07k | for (size_t i = 0; i < columns(); ++i) { |
456 | 5.91k | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
457 | 5.91k | } |
458 | 2.16k | out << std::setw(1) << "+" << std::endl; |
459 | 2.16k | }; |
460 | 858 | line(); |
461 | | // header text |
462 | 2.96k | for (size_t i = 0; i < columns(); ++i) { |
463 | 2.10k | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
464 | 2.10k | << headers[i]; |
465 | 2.10k | } |
466 | 858 | out << std::setw(1) << "|" << std::endl; |
467 | | // header bottom line |
468 | 858 | line(); |
469 | 858 | if (rows() == 0) { |
470 | 414 | return out.str(); |
471 | 414 | } |
472 | | |
473 | 444 | auto format_options = DataTypeSerDe::get_default_format_options(); |
474 | 444 | auto time_zone = cctz::utc_time_zone(); |
475 | 444 | format_options.timezone = &time_zone; |
476 | | |
477 | | // content |
478 | 12.2k | for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { |
479 | 32.4k | for (size_t i = 0; i < columns(); ++i) { |
480 | 20.6k | if (!data[i].column || data[i].column->empty()) { |
481 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
482 | 0 | << std::right; |
483 | 0 | continue; |
484 | 0 | } |
485 | 20.6k | std::string s; |
486 | 20.6k | if (data[i].column) { // column may be const |
487 | | // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null |
488 | 20.6k | if (data[i].type->is_nullable() && !data[i].column->is_concrete_nullable()) { |
489 | 0 | assert(allow_null_mismatch); |
490 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
491 | 0 | ->get_nested_type() |
492 | 0 | ->to_string(*data[i].column, row_num, format_options); |
493 | 20.6k | } else { |
494 | 20.6k | s = data[i].to_string(row_num, format_options); |
495 | 20.6k | } |
496 | 20.6k | } |
497 | 20.6k | if (s.length() > headers_size[i]) { |
498 | 2.12k | s = s.substr(0, headers_size[i] - 3) + "..."; |
499 | 2.12k | } |
500 | 20.6k | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
501 | 20.6k | << std::right << s; |
502 | 20.6k | } |
503 | 11.7k | out << std::setw(1) << "|" << std::endl; |
504 | 11.7k | } |
505 | | // bottom line |
506 | 444 | line(); |
507 | 444 | if (row_limit < rows()) { |
508 | 112 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
509 | 112 | } |
510 | 444 | return out.str(); |
511 | 444 | } |
512 | | |
513 | 1 | std::string Block::dump_one_line(size_t row, int column_end) const { |
514 | 1 | assert(column_end <= columns()); |
515 | 1 | fmt::memory_buffer line; |
516 | | |
517 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
518 | 1 | auto time_zone = cctz::utc_time_zone(); |
519 | 1 | format_options.timezone = &time_zone; |
520 | | |
521 | 3 | for (int i = 0; i < column_end; ++i) { |
522 | 2 | if (LIKELY(i != 0)) { |
523 | | // TODO: need more effective function of to string. now the impl is slow |
524 | 1 | fmt::format_to(line, " {}", data[i].to_string(row, format_options)); |
525 | 1 | } else { |
526 | 1 | fmt::format_to(line, "{}", data[i].to_string(row, format_options)); |
527 | 1 | } |
528 | 2 | } |
529 | 1 | return fmt::to_string(line); |
530 | 1 | } |
531 | | |
532 | 46 | std::string Block::dump_structure() const { |
533 | 46 | std::string out; |
534 | 373 | for (auto it = data.begin(); it != data.end(); ++it) { |
535 | 327 | if (it != data.begin()) { |
536 | 281 | out += ", \n"; |
537 | 281 | } |
538 | 327 | out += it->dump_structure(); |
539 | 327 | } |
540 | 46 | return out; |
541 | 46 | } |
542 | | |
543 | 48.9k | Block Block::clone_empty() const { |
544 | 48.9k | Block res; |
545 | 95.8k | for (const auto& elem : data) { |
546 | 95.8k | res.insert(elem.clone_empty()); |
547 | 95.8k | } |
548 | 48.9k | return res; |
549 | 48.9k | } |
550 | | |
551 | 31 | MutableColumns Block::clone_empty_columns() const { |
552 | 31 | size_t num_columns = data.size(); |
553 | 31 | MutableColumns columns(num_columns); |
554 | 140 | for (size_t i = 0; i < num_columns; ++i) { |
555 | 109 | columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column(); |
556 | 109 | } |
557 | 31 | return columns; |
558 | 31 | } |
559 | | |
560 | 25.9k | Columns Block::get_columns() const { |
561 | 25.9k | size_t num_columns = data.size(); |
562 | 25.9k | Columns columns(num_columns); |
563 | 113k | for (size_t i = 0; i < num_columns; ++i) { |
564 | 87.1k | columns[i] = data[i].column->convert_to_full_column_if_const(); |
565 | 87.1k | } |
566 | 25.9k | return columns; |
567 | 25.9k | } |
568 | | |
569 | 549 | Columns Block::get_columns_and_convert() { |
570 | 549 | size_t num_columns = data.size(); |
571 | 549 | Columns columns(num_columns); |
572 | 1.16k | for (size_t i = 0; i < num_columns; ++i) { |
573 | 612 | data[i].column = data[i].column->convert_to_full_column_if_const(); |
574 | 612 | columns[i] = data[i].column; |
575 | 612 | } |
576 | 549 | return columns; |
577 | 549 | } |
578 | | |
579 | 148k | MutableColumns Block::mutate_columns() { |
580 | 148k | size_t num_columns = data.size(); |
581 | 148k | MutableColumns columns(num_columns); |
582 | 439k | for (size_t i = 0; i < num_columns; ++i) { |
583 | 291k | DCHECK(data[i].type); |
584 | 291k | columns[i] = data[i].column ? (*std::move(data[i].column)).mutate() |
585 | 291k | : data[i].type->create_column(); |
586 | 291k | } |
587 | 148k | return columns; |
588 | 148k | } |
589 | | |
590 | 1.65k | void Block::set_columns(MutableColumns&& columns) { |
591 | 1.65k | DCHECK_GE(columns.size(), data.size()) |
592 | 0 | << fmt::format("Invalid size of columns, columns size: {}, data size: {}", |
593 | 0 | columns.size(), data.size()); |
594 | 1.65k | size_t num_columns = data.size(); |
595 | 4.76k | for (size_t i = 0; i < num_columns; ++i) { |
596 | 3.11k | data[i].column = std::move(columns[i]); |
597 | 3.11k | } |
598 | 1.65k | } |
599 | | |
600 | 31 | Block Block::clone_without_columns(const std::vector<int>* column_offset) const { |
601 | 31 | Block res; |
602 | | |
603 | 31 | if (column_offset != nullptr) { |
604 | 12 | size_t num_columns = column_offset->size(); |
605 | 73 | for (size_t i = 0; i < num_columns; ++i) { |
606 | 61 | res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name}); |
607 | 61 | } |
608 | 19 | } else { |
609 | 19 | size_t num_columns = data.size(); |
610 | 53 | for (size_t i = 0; i < num_columns; ++i) { |
611 | 34 | res.insert({nullptr, data[i].type, data[i].name}); |
612 | 34 | } |
613 | 19 | } |
614 | 31 | return res; |
615 | 31 | } |
616 | | |
617 | 55.5k | const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { |
618 | 55.5k | return data; |
619 | 55.5k | } |
620 | | |
621 | 145k | std::vector<std::string> Block::get_names() const { |
622 | 145k | std::vector<std::string> res; |
623 | 145k | res.reserve(columns()); |
624 | | |
625 | 285k | for (const auto& elem : data) { |
626 | 285k | res.push_back(elem.name); |
627 | 285k | } |
628 | | |
629 | 145k | return res; |
630 | 145k | } |
631 | | |
632 | 145k | DataTypes Block::get_data_types() const { |
633 | 145k | DataTypes res; |
634 | 145k | res.reserve(columns()); |
635 | | |
636 | 285k | for (const auto& elem : data) { |
637 | 285k | res.push_back(elem.type); |
638 | 285k | } |
639 | | |
640 | 145k | return res; |
641 | 145k | } |
642 | | |
643 | 52.7k | void Block::clear() { |
644 | 52.7k | data.clear(); |
645 | 52.7k | } |
646 | | |
647 | 1.62M | void Block::clear_column_data(int64_t column_size) noexcept { |
648 | 1.62M | SCOPED_SKIP_MEMORY_CHECK(); |
649 | | // data.size() greater than column_size, means here have some |
650 | | // function exec result in block, need erase it here |
651 | 1.62M | if (column_size != -1 and data.size() > column_size) { |
652 | 2.20k | for (int64_t i = data.size() - 1; i >= column_size; --i) { |
653 | 1.10k | erase(i); |
654 | 1.10k | } |
655 | 1.10k | } |
656 | 1.62M | for (auto& d : data) { |
657 | 51.3k | if (d.column) { |
658 | | // Temporarily disable reference count check because a column might be referenced multiple times within a block. |
659 | | // Queries like this: `select c, c from t1;` |
660 | 51.3k | (*std::move(d.column)).assume_mutable()->clear(); |
661 | 51.3k | } |
662 | 51.3k | } |
663 | 1.62M | } |
664 | | |
665 | | void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
666 | 48.0k | bool need_keep_first) { |
667 | 48.0k | if (data.size() >= column_keep_flags.size()) { |
668 | 48.0k | auto origin_rows = rows(); |
669 | 142k | for (size_t i = 0; i < column_keep_flags.size(); ++i) { |
670 | 94.1k | if (!column_keep_flags[i]) { |
671 | 36.8k | data[i].column = data[i].column->clone_empty(); |
672 | 36.8k | } |
673 | 94.1k | } |
674 | | |
675 | 48.0k | if (need_keep_first && !column_keep_flags[0]) { |
676 | 1 | auto first_column = data[0].column->clone_empty(); |
677 | 1 | first_column->resize(origin_rows); |
678 | 1 | data[0].column = std::move(first_column); |
679 | 1 | } |
680 | 48.0k | } |
681 | 48.0k | } |
682 | | |
683 | 1.30k | void Block::swap(Block& other) noexcept { |
684 | 1.30k | SCOPED_SKIP_MEMORY_CHECK(); |
685 | 1.30k | data.swap(other.data); |
686 | 1.30k | } |
687 | | |
688 | 1.71k | void Block::swap(Block&& other) noexcept { |
689 | 1.71k | SCOPED_SKIP_MEMORY_CHECK(); |
690 | 1.71k | data = std::move(other.data); |
691 | 1.71k | } |
692 | | |
693 | 3 | void Block::shuffle_columns(const std::vector<int>& result_column_ids) { |
694 | 3 | Container tmp_data; |
695 | 3 | tmp_data.reserve(result_column_ids.size()); |
696 | 5 | for (const int result_column_id : result_column_ids) { |
697 | 5 | tmp_data.push_back(data[result_column_id]); |
698 | 5 | } |
699 | 3 | data = std::move(tmp_data); |
700 | 3 | } |
701 | | |
702 | 2 | void Block::update_hash(SipHash& hash) const { |
703 | 8 | for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { |
704 | 12 | for (const auto& col : data) { |
705 | 12 | col.column->update_hash_with_value(row_no, hash); |
706 | 12 | } |
707 | 6 | } |
708 | 2 | } |
709 | | |
710 | | void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
711 | 519 | const IColumn::Filter& filter) { |
712 | 519 | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
713 | 1.34k | for (const auto& col : columns_to_filter) { |
714 | 1.34k | auto& column = block->get_by_position(col).column; |
715 | 1.34k | if (column->size() == count) { |
716 | 1.29k | continue; |
717 | 1.29k | } |
718 | 43 | if (count == 0) { |
719 | 2 | if (column->is_exclusive()) { |
720 | 0 | column->assume_mutable()->clear(); |
721 | 2 | } else { |
722 | 2 | column = column->clone_empty(); |
723 | 2 | } |
724 | 2 | continue; |
725 | 2 | } |
726 | 41 | if (column->is_exclusive()) { |
727 | | // COW: safe to mutate in-place since we have exclusive ownership |
728 | 23 | const auto result_size = column->assume_mutable()->filter(filter); |
729 | 23 | if (result_size != count) [[unlikely]] { |
730 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
731 | 0 | "result_size not equal with filter_size, result_size={}, " |
732 | 0 | "filter_size={}", |
733 | 0 | result_size, count); |
734 | 0 | } |
735 | 23 | } else { |
736 | | // COW: must create a copy since column is shared |
737 | 18 | column = column->filter(filter, count); |
738 | 18 | } |
739 | 41 | } |
740 | 519 | } |
741 | | |
742 | | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter, |
743 | 1 | uint32_t column_to_keep) { |
744 | 1 | std::vector<uint32_t> columns_to_filter; |
745 | 1 | columns_to_filter.resize(column_to_keep); |
746 | 3 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
747 | 2 | columns_to_filter[i] = i; |
748 | 2 | } |
749 | 1 | filter_block_internal(block, columns_to_filter, filter); |
750 | 1 | } |
751 | | |
752 | 8 | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) { |
753 | 8 | const size_t count = |
754 | 8 | filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
755 | 24 | for (int i = 0; i < block->columns(); ++i) { |
756 | 16 | auto& column = block->get_by_position(i).column; |
757 | 16 | if (column->is_exclusive()) { |
758 | 16 | column->assume_mutable()->filter(filter); |
759 | 16 | } else { |
760 | 0 | column = column->filter(filter, count); |
761 | 0 | } |
762 | 16 | } |
763 | 8 | } |
764 | | |
765 | | Status Block::append_to_block_by_selector(MutableBlock* dst, |
766 | 1 | const IColumn::Selector& selector) const { |
767 | 1 | RETURN_IF_CATCH_EXCEPTION({ |
768 | 1 | DCHECK_EQ(data.size(), dst->mutable_columns().size()); |
769 | 1 | for (size_t i = 0; i < data.size(); i++) { |
770 | | // FIXME: this is a quickfix. we assume that only partition functions make there some |
771 | 1 | if (!is_column_const(*data[i].column)) { |
772 | 1 | data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); |
773 | 1 | } |
774 | 1 | } |
775 | 1 | }); |
776 | 1 | return Status::OK(); |
777 | 1 | } |
778 | | |
779 | | Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
780 | 496 | size_t filter_column_id, size_t column_to_keep) { |
781 | 496 | const auto& filter_column = block->get_by_position(filter_column_id).column; |
782 | 496 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
783 | 1 | const auto& nested_column = nullable_column->get_nested_column_ptr(); |
784 | | |
785 | 1 | MutableColumnPtr mutable_holder = |
786 | 1 | nested_column->use_count() == 1 |
787 | 1 | ? nested_column->assume_mutable() |
788 | 1 | : nested_column->clone_resized(nested_column->size()); |
789 | | |
790 | 1 | auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get()); |
791 | 1 | const auto* __restrict null_map = nullable_column->get_null_map_data().data(); |
792 | 1 | IColumn::Filter& filter = concrete_column->get_data(); |
793 | 1 | auto* __restrict filter_data = filter.data(); |
794 | | |
795 | 1 | const size_t size = filter.size(); |
796 | 4 | for (size_t i = 0; i < size; ++i) { |
797 | 3 | filter_data[i] &= !null_map[i]; |
798 | 3 | } |
799 | 1 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
800 | 495 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
801 | 2 | bool ret = const_column->get_bool(0); |
802 | 2 | if (!ret) { |
803 | 2 | for (const auto& col : columns_to_filter) { |
804 | 2 | auto& column = block->get_by_position(col).column; |
805 | 2 | if (column->is_exclusive()) { |
806 | 2 | column->assume_mutable()->clear(); |
807 | 2 | } else { |
808 | 0 | column = column->clone_empty(); |
809 | 0 | } |
810 | 2 | } |
811 | 1 | } |
812 | 493 | } else { |
813 | 493 | const IColumn::Filter& filter = |
814 | 493 | assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data(); |
815 | 493 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
816 | 493 | } |
817 | | |
818 | 496 | erase_useless_column(block, column_to_keep); |
819 | 496 | return Status::OK(); |
820 | 496 | } |
821 | | |
822 | 490 | Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) { |
823 | 490 | std::vector<uint32_t> columns_to_filter; |
824 | 490 | columns_to_filter.resize(column_to_keep); |
825 | 1.77k | for (uint32_t i = 0; i < column_to_keep; ++i) { |
826 | 1.28k | columns_to_filter[i] = i; |
827 | 1.28k | } |
828 | 490 | return filter_block(block, columns_to_filter, filter_column_id, column_to_keep); |
829 | 490 | } |
830 | | |
831 | | Status Block::serialize(int be_exec_version, PBlock* pblock, |
832 | | /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, |
833 | | size_t* compressed_bytes, int64_t* compress_time, |
834 | | segment_v2::CompressionTypePB compression_type, |
835 | 1.13k | bool allow_transfer_large_data) const { |
836 | 1.13k | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
837 | 1.13k | pblock->set_be_exec_version(be_exec_version); |
838 | | |
839 | | // calc uncompressed size for allocation |
840 | 1.13k | size_t content_uncompressed_size = 0; |
841 | 1.81k | for (const auto& c : *this) { |
842 | 1.81k | PColumnMeta* pcm = pblock->add_column_metas(); |
843 | 1.81k | c.to_pb_column_meta(pcm); |
844 | 1.81k | DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type"; |
845 | | // get serialized size |
846 | 1.81k | content_uncompressed_size += |
847 | 1.81k | c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version()); |
848 | 1.81k | } |
849 | | |
850 | | // serialize data values |
851 | | // when data type is HLL, content_uncompressed_size maybe larger than real size. |
852 | 1.13k | std::string column_values; |
853 | 1.13k | try { |
854 | | // TODO: After support c++23, we should use resize_and_overwrite to replace resize |
855 | 1.13k | column_values.resize(content_uncompressed_size); |
856 | 1.13k | } catch (...) { |
857 | 0 | std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", |
858 | 0 | content_uncompressed_size); |
859 | 0 | LOG(WARNING) << msg; |
860 | 0 | return Status::BufferAllocFailed(msg); |
861 | 0 | } |
862 | 1.12k | char* buf = column_values.data(); |
863 | | |
864 | 1.82k | for (const auto& c : *this) { |
865 | 1.82k | buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); |
866 | 1.82k | } |
867 | 1.12k | *uncompressed_bytes = content_uncompressed_size; |
868 | 1.12k | const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; |
869 | 1.12k | *compressed_bytes = serialize_bytes; |
870 | 1.12k | column_values.resize(serialize_bytes); |
871 | | |
872 | | // compress |
873 | 1.12k | if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { |
874 | 626 | SCOPED_RAW_TIMER(compress_time); |
875 | 626 | pblock->set_compression_type(compression_type); |
876 | 626 | pblock->set_uncompressed_size(serialize_bytes); |
877 | | |
878 | 626 | BlockCompressionCodec* codec; |
879 | 626 | RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); |
880 | | |
881 | 626 | faststring buf_compressed; |
882 | 626 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
883 | 626 | codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); |
884 | 626 | size_t compressed_size = buf_compressed.size(); |
885 | 626 | if (LIKELY(compressed_size < serialize_bytes)) { |
886 | | // TODO: rethink the logic here may copy again ? |
887 | 626 | pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); |
888 | 626 | pblock->set_compressed(true); |
889 | 626 | *compressed_bytes = compressed_size; |
890 | 626 | } else { |
891 | 0 | pblock->set_column_values(std::move(column_values)); |
892 | 0 | } |
893 | | |
894 | 626 | VLOG_ROW << "uncompressed size: " << content_uncompressed_size |
895 | 0 | << ", compressed size: " << compressed_size; |
896 | 626 | } else { |
897 | 500 | pblock->set_column_values(std::move(column_values)); |
898 | 500 | } |
899 | 1.12k | if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { |
900 | 0 | return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", |
901 | 0 | *compressed_bytes); |
902 | 0 | } |
903 | 1.12k | return Status::OK(); |
904 | 1.12k | } |
905 | | |
906 | 240k | size_t MutableBlock::rows() const { |
907 | 240k | for (const auto& column : _columns) { |
908 | 144k | if (column) { |
909 | 144k | return column->size(); |
910 | 144k | } |
911 | 144k | } |
912 | | |
913 | 96.0k | return 0; |
914 | 240k | } |
915 | | |
916 | 0 | void MutableBlock::swap(MutableBlock& another) noexcept { |
917 | 0 | SCOPED_SKIP_MEMORY_CHECK(); |
918 | 0 | _columns.swap(another._columns); |
919 | 0 | _data_types.swap(another._data_types); |
920 | 0 | _names.swap(another._names); |
921 | 0 | } |
922 | | |
923 | 0 | void MutableBlock::add_row(const Block* block, int row) { |
924 | 0 | const auto& block_data = block->get_columns_with_type_and_name(); |
925 | 0 | for (size_t i = 0; i < _columns.size(); ++i) { |
926 | 0 | _columns[i]->insert_from(*block_data[i].column.get(), row); |
927 | 0 | } |
928 | 0 | } |
929 | | |
930 | | Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, |
931 | 155 | const uint32_t* row_end, const std::vector<int>* column_offset) { |
932 | 155 | RETURN_IF_CATCH_EXCEPTION({ |
933 | 155 | DCHECK_LE(columns(), block->columns()); |
934 | 155 | if (column_offset != nullptr) { |
935 | 155 | DCHECK_EQ(columns(), column_offset->size()); |
936 | 155 | } |
937 | 155 | const auto& block_data = block->get_columns_with_type_and_name(); |
938 | 155 | for (size_t i = 0; i < _columns.size(); ++i) { |
939 | 155 | const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i]; |
940 | 155 | DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); |
941 | 155 | auto& dst = _columns[i]; |
942 | 155 | const auto& src = *src_col.column.get(); |
943 | 155 | DCHECK_GE(src.size(), row_end - row_begin); |
944 | 155 | dst->insert_indices_from(src, row_begin, row_end); |
945 | 155 | } |
946 | 155 | }); |
947 | 155 | return Status::OK(); |
948 | 155 | } |
949 | | |
950 | 126 | Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { |
951 | 126 | RETURN_IF_CATCH_EXCEPTION({ |
952 | 126 | DCHECK_LE(columns(), block->columns()); |
953 | 126 | const auto& block_data = block->get_columns_with_type_and_name(); |
954 | 126 | for (size_t i = 0; i < _columns.size(); ++i) { |
955 | 126 | DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); |
956 | 126 | auto& dst = _columns[i]; |
957 | 126 | const auto& src = *block_data[i].column.get(); |
958 | 126 | dst->insert_range_from(src, row_begin, length); |
959 | 126 | } |
960 | 126 | }); |
961 | 126 | return Status::OK(); |
962 | 126 | } |
963 | | |
964 | 144k | Block MutableBlock::to_block(int start_column) { |
965 | 144k | return to_block(start_column, (int)_columns.size()); |
966 | 144k | } |
967 | | |
968 | 144k | Block MutableBlock::to_block(int start_column, int end_column) { |
969 | 144k | ColumnsWithTypeAndName columns_with_schema; |
970 | 144k | columns_with_schema.reserve(end_column - start_column); |
971 | 428k | for (size_t i = start_column; i < end_column; ++i) { |
972 | 283k | columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); |
973 | 283k | } |
974 | 144k | return {columns_with_schema}; |
975 | 144k | } |
976 | | |
977 | 1 | std::string MutableBlock::dump_data_json(size_t row_limit) const { |
978 | 1 | std::stringstream ss; |
979 | 1 | std::vector<std::string> headers; |
980 | | |
981 | 1 | headers.reserve(columns()); |
982 | 2 | for (size_t i = 0; i < columns(); ++i) { |
983 | 1 | headers.push_back(_data_types[i]->get_name()); |
984 | 1 | } |
985 | 1 | size_t num_rows_to_dump = std::min(rows(), row_limit); |
986 | 1 | ss << "["; |
987 | | |
988 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
989 | 1 | auto time_zone = cctz::utc_time_zone(); |
990 | 1 | format_options.timezone = &time_zone; |
991 | | |
992 | 4 | for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) { |
993 | 3 | if (row_num > 0) { |
994 | 2 | ss << ","; |
995 | 2 | } |
996 | 3 | ss << "{"; |
997 | 6 | for (size_t i = 0; i < columns(); ++i) { |
998 | 3 | if (i > 0) { |
999 | 0 | ss << ","; |
1000 | 0 | } |
1001 | 3 | ss << "\"" << headers[i] << "\":"; |
1002 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1003 | 3 | ss << "\"" << s << "\""; |
1004 | 3 | } |
1005 | 3 | ss << "}"; |
1006 | 3 | } |
1007 | 1 | ss << "]"; |
1008 | 1 | return ss.str(); |
1009 | 1 | } |
1010 | | |
1011 | 1 | std::string MutableBlock::dump_data(size_t row_limit) const { |
1012 | 1 | std::vector<std::string> headers; |
1013 | 1 | std::vector<int> headers_size; |
1014 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1015 | 1 | std::string s = _data_types[i]->get_name(); |
1016 | 1 | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
1017 | 1 | headers.emplace_back(s); |
1018 | 1 | } |
1019 | | |
1020 | 1 | std::stringstream out; |
1021 | | // header upper line |
1022 | 3 | auto line = [&]() { |
1023 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1024 | 3 | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
1025 | 3 | } |
1026 | 3 | out << std::setw(1) << "+" << std::endl; |
1027 | 3 | }; |
1028 | 1 | line(); |
1029 | | // header text |
1030 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1031 | 1 | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
1032 | 1 | << headers[i]; |
1033 | 1 | } |
1034 | 1 | out << std::setw(1) << "|" << std::endl; |
1035 | | // header bottom line |
1036 | 1 | line(); |
1037 | 1 | if (rows() == 0) { |
1038 | 0 | return out.str(); |
1039 | 0 | } |
1040 | | |
1041 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1042 | 1 | auto time_zone = cctz::utc_time_zone(); |
1043 | 1 | format_options.timezone = &time_zone; |
1044 | | |
1045 | | // content |
1046 | 4 | for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { |
1047 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1048 | 3 | if (_columns[i].get()->empty()) { |
1049 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1050 | 0 | << std::right; |
1051 | 0 | continue; |
1052 | 0 | } |
1053 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1054 | 3 | if (s.length() > headers_size[i]) { |
1055 | 0 | s = s.substr(0, headers_size[i] - 3) + "..."; |
1056 | 0 | } |
1057 | 3 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1058 | 3 | << std::right << s; |
1059 | 3 | } |
1060 | 3 | out << std::setw(1) << "|" << std::endl; |
1061 | 3 | } |
1062 | | // bottom line |
1063 | 1 | line(); |
1064 | 1 | if (row_limit < rows()) { |
1065 | 0 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
1066 | 0 | } |
1067 | 1 | return out.str(); |
1068 | 1 | } |
1069 | | |
1070 | 48.0k | std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const { |
1071 | 48.0k | auto temp_block = Block::create_unique(); |
1072 | 94.1k | for (const auto& d : data) { |
1073 | 94.1k | auto column = d.type->create_column(); |
1074 | 94.1k | if (is_reserve) { |
1075 | 0 | column->reserve(size); |
1076 | 94.1k | } else { |
1077 | 94.1k | column->insert_many_defaults(size); |
1078 | 94.1k | } |
1079 | 94.1k | temp_block->insert({std::move(column), d.type, d.name}); |
1080 | 94.1k | } |
1081 | 48.0k | return temp_block; |
1082 | 48.0k | } |
1083 | | |
1084 | 10.1k | void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { |
1085 | 10.1k | for (auto idx : char_type_idx) { |
1086 | 2 | if (idx < data.size()) { |
1087 | 1 | auto& col_and_name = this->get_by_position(idx); |
1088 | 1 | col_and_name.column->assume_mutable()->shrink_padding_chars(); |
1089 | 1 | } |
1090 | 2 | } |
1091 | 10.1k | } |
1092 | | |
1093 | 96.1k | size_t MutableBlock::allocated_bytes() const { |
1094 | 96.1k | size_t res = 0; |
1095 | 188k | for (const auto& col : _columns) { |
1096 | 188k | if (col) { |
1097 | 188k | res += col->allocated_bytes(); |
1098 | 188k | } |
1099 | 188k | } |
1100 | | |
1101 | 96.1k | return res; |
1102 | 96.1k | } |
1103 | | |
1104 | 1 | void MutableBlock::clear_column_data() noexcept { |
1105 | 1 | SCOPED_SKIP_MEMORY_CHECK(); |
1106 | 1 | for (auto& col : _columns) { |
1107 | 1 | if (col) { |
1108 | 1 | col->clear(); |
1109 | 1 | } |
1110 | 1 | } |
1111 | 1 | } |
1112 | | |
1113 | 4 | std::string MutableBlock::dump_names() const { |
1114 | 4 | std::string out; |
1115 | 14 | for (auto it = _names.begin(); it != _names.end(); ++it) { |
1116 | 10 | if (it != _names.begin()) { |
1117 | 6 | out += ", "; |
1118 | 6 | } |
1119 | 10 | out += *it; |
1120 | 10 | } |
1121 | 4 | return out; |
1122 | 4 | } |
1123 | | } // namespace doris |