be/src/core/block/block.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "core/block/block.h" |
22 | | |
23 | | #include <fmt/format.h> |
24 | | #include <gen_cpp/data.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <snappy.h> |
27 | | #include <streamvbyte.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <cassert> |
31 | | #include <iomanip> |
32 | | #include <limits> |
33 | | #include <ranges> |
34 | | |
35 | | #include "agent/be_exec_version_manager.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "common/status.h" |
39 | | #include "core/assert_cast.h" |
40 | | #include "core/column/column.h" |
41 | | #include "core/column/column_const.h" |
42 | | #include "core/column/column_nothing.h" |
43 | | #include "core/column/column_nullable.h" |
44 | | #include "core/column/column_vector.h" |
45 | | #include "core/data_type/data_type_factory.hpp" |
46 | | #include "core/data_type/data_type_nullable.h" |
47 | | #include "core/data_type_serde/data_type_serde.h" |
48 | | #include "runtime/descriptors.h" |
49 | | #include "runtime/runtime_profile.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "util/block_compression.h" |
52 | | #include "util/faststring.h" |
53 | | #include "util/simd/bits.h" |
54 | | #include "util/slice.h" |
55 | | |
56 | | class SipHash; |
57 | | |
58 | | namespace doris::segment_v2 { |
59 | | enum CompressionTypePB : int; |
60 | | } // namespace doris::segment_v2 |
61 | | namespace doris { |
62 | | template <typename T> |
63 | | void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks, |
64 | 2 | RuntimeProfile::Counter* memory_used_counter = nullptr) { |
65 | 2 | T block; |
66 | 6 | while (blocks.try_dequeue(block)) { |
67 | 4 | if (memory_used_counter) { |
68 | 4 | if constexpr (std::is_same_v<T, Block>) { |
69 | 2 | memory_used_counter->update(-block.allocated_bytes()); |
70 | 2 | } else { |
71 | 2 | memory_used_counter->update(-block->allocated_bytes()); |
72 | 2 | } |
73 | 4 | } |
74 | 4 | } |
75 | 2 | } _ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | 2 | if constexpr (std::is_same_v<T, Block>) { | 69 | 2 | memory_used_counter->update(-block.allocated_bytes()); | 70 | | } else { | 71 | | memory_used_counter->update(-block->allocated_bytes()); | 72 | | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE Line | Count | Source | 64 | 1 | RuntimeProfile::Counter* memory_used_counter = nullptr) { | 65 | 1 | T block; | 66 | 3 | while (blocks.try_dequeue(block)) { | 67 | 2 | if (memory_used_counter) { | 68 | | if constexpr (std::is_same_v<T, Block>) { | 69 | | memory_used_counter->update(-block.allocated_bytes()); | 70 | 2 | } else { | 71 | 2 | memory_used_counter->update(-block->allocated_bytes()); | 72 | 2 | } | 73 | 2 | } | 74 | 2 | } | 75 | 1 | } |
|
76 | | |
77 | | template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&, |
78 | | RuntimeProfile::Counter* memory_used_counter); |
79 | | template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&, |
80 | | RuntimeProfile::Counter* memory_used_counter); |
81 | | |
82 | | namespace { |
83 | | |
84 | | // The no-clone fast path is only safe when the whole column tree is uniquely |
85 | | // owned. A composite column with shared children still needs COW detachment. |
86 | 32.5k | bool is_recursively_exclusive(const IColumn& column) { |
87 | 32.5k | if (!column.is_exclusive()) { |
88 | 13 | return false; |
89 | 13 | } |
90 | | |
91 | 32.5k | bool exclusive = true; |
92 | 32.5k | IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) { |
93 | 27.7k | if (!exclusive) { |
94 | 0 | return; |
95 | 0 | } |
96 | 27.7k | const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn); |
97 | 27.7k | DCHECK(subcolumn_ptr); |
98 | 27.7k | exclusive = is_recursively_exclusive(*subcolumn_ptr); |
99 | 27.7k | }; |
100 | | // `for_each_subcolumn` only exposes a mutable callback type. This callback |
101 | | // only reads the wrapped pointers and never calls the non-const accessors. |
102 | 32.5k | const_cast<IColumn&>(column).for_each_subcolumn(callback); |
103 | 32.5k | return exclusive; |
104 | 32.5k | } |
105 | | |
106 | | // Acquire one live Block slot transactionally. Shared columns are detached while |
107 | | // the original slot is still intact, so a clone failure cannot leave Block with |
108 | | // a moved-from/null column. Exclusive column trees keep the stealing fast path. |
109 | 4.79k | MutableColumnPtr scoped_mutate_column(ColumnPtr& column, const DataTypePtr& type) { |
110 | 4.79k | DCHECK(type); |
111 | 4.79k | if (!column) { |
112 | 1 | return type->create_column(); |
113 | 1 | } |
114 | | |
115 | 4.79k | MutableColumnPtr mutable_column; |
116 | 4.79k | if (is_recursively_exclusive(*column)) { |
117 | 4.78k | mutable_column = std::move(*column).mutate(); |
118 | 4.78k | } else { |
119 | 13 | mutable_column = IColumn::mutate(column); |
120 | 13 | } |
121 | 4.79k | column = nullptr; |
122 | 4.79k | return mutable_column; |
123 | 4.79k | } |
124 | | |
125 | | } // namespace |
126 | | |
127 | 4.35k | Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {} |
128 | | |
129 | 257k | Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {} |
130 | | |
131 | 56.0k | Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { |
132 | 271k | for (auto* const slot_desc : slots) { |
133 | 271k | auto column_ptr = slot_desc->get_empty_mutable_column(); |
134 | 271k | column_ptr->reserve(block_size); |
135 | 271k | insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), |
136 | 271k | slot_desc->col_name())); |
137 | 271k | } |
138 | 56.0k | } |
139 | | |
140 | 1 | Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) { |
141 | 1 | std::vector<SlotDescriptor*> slot_ptrs(slots.size()); |
142 | 3 | for (size_t i = 0; i < slots.size(); ++i) { |
143 | | // Slots remain unmodified and are used to read column information; const_cast can be employed. |
144 | | // used in src/exec/rowid_fetcher.cpp |
145 | 2 | slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]); |
146 | 2 | } |
147 | 1 | *this = Block(slot_ptrs, block_size); |
148 | 1 | } |
149 | | |
150 | | Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes, |
151 | 990 | int64_t* decompress_time) { |
152 | 990 | swap(Block()); |
153 | 990 | int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; |
154 | 990 | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
155 | | |
156 | 990 | const char* buf = nullptr; |
157 | 990 | std::string compression_scratch; |
158 | 990 | if (pblock.compressed()) { |
159 | | // Decompress |
160 | 479 | SCOPED_RAW_TIMER(decompress_time); |
161 | 479 | const char* compressed_data = pblock.column_values().c_str(); |
162 | 479 | size_t compressed_size = pblock.column_values().size(); |
163 | 479 | size_t uncompressed_size = 0; |
164 | 479 | if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { |
165 | 479 | BlockCompressionCodec* codec; |
166 | 479 | RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); |
167 | 479 | uncompressed_size = pblock.uncompressed_size(); |
168 | | // Should also use allocator to allocate memory here. |
169 | 479 | compression_scratch.resize(uncompressed_size); |
170 | 479 | Slice decompressed_slice(compression_scratch); |
171 | 479 | RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), |
172 | 479 | &decompressed_slice)); |
173 | 479 | DCHECK(uncompressed_size == decompressed_slice.size); |
174 | 479 | } else { |
175 | 0 | bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, |
176 | 0 | &uncompressed_size); |
177 | 0 | DCHECK(success) << "snappy::GetUncompressedLength failed"; |
178 | 0 | compression_scratch.resize(uncompressed_size); |
179 | 0 | success = snappy::RawUncompress(compressed_data, compressed_size, |
180 | 0 | compression_scratch.data()); |
181 | 0 | DCHECK(success) << "snappy::RawUncompress failed"; |
182 | 0 | } |
183 | 479 | *uncompressed_bytes = uncompressed_size; |
184 | 479 | buf = compression_scratch.data(); |
185 | 511 | } else { |
186 | 511 | buf = pblock.column_values().data(); |
187 | 511 | } |
188 | | |
189 | 1.56k | for (const auto& pcol_meta : pblock.column_metas()) { |
190 | 1.56k | DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta); |
191 | 1.56k | MutableColumnPtr data_column = type->create_column(); |
192 | | // Here will try to allocate large memory, should return error if failed. |
193 | 1.56k | RETURN_IF_CATCH_EXCEPTION( |
194 | 1.56k | buf = type->deserialize(buf, &data_column, pblock.be_exec_version())); |
195 | 1.56k | data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); |
196 | 1.56k | } |
197 | | |
198 | 990 | return Status::OK(); |
199 | 990 | } |
200 | | |
201 | 13.3k | void Block::reserve(size_t count) { |
202 | 13.3k | data.reserve(count); |
203 | 13.3k | } |
204 | | |
205 | 4 | void Block::insert(size_t position, const ColumnWithTypeAndName& elem) { |
206 | 4 | if (position > data.size()) { |
207 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
208 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
209 | 1 | data.size(), dump_names()); |
210 | 1 | } |
211 | | |
212 | 3 | data.emplace(data.begin() + position, elem); |
213 | 3 | } |
214 | | |
215 | 3 | void Block::insert(size_t position, ColumnWithTypeAndName&& elem) { |
216 | 3 | if (position > data.size()) { |
217 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
218 | 1 | "invalid input position, position={}, data.size={}, names={}", position, |
219 | 1 | data.size(), dump_names()); |
220 | 1 | } |
221 | | |
222 | 2 | data.emplace(data.begin() + position, std::move(elem)); |
223 | 2 | } |
224 | | |
225 | 8.09k | void Block::clear_names() { |
226 | 177k | for (auto& entry : data) { |
227 | 177k | entry.name.clear(); |
228 | 177k | } |
229 | 8.09k | } |
230 | | |
231 | 14.4k | void Block::insert(const ColumnWithTypeAndName& elem) { |
232 | 14.4k | data.emplace_back(elem); |
233 | 14.4k | } |
234 | | |
235 | 999k | void Block::insert(ColumnWithTypeAndName&& elem) { |
236 | 999k | data.emplace_back(std::move(elem)); |
237 | 999k | } |
238 | | |
239 | 3 | void Block::erase(const std::set<size_t>& positions) { |
240 | 3 | for (unsigned long position : std::ranges::reverse_view(positions)) { |
241 | 2 | erase(position); |
242 | 2 | } |
243 | 3 | } |
244 | | |
245 | 709 | void Block::erase_tail(size_t start) { |
246 | 709 | DCHECK(start <= data.size()) << fmt::format( |
247 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size()); |
248 | 709 | data.erase(data.begin() + start, data.end()); |
249 | 709 | } |
250 | | |
251 | 123k | void Block::erase(size_t position) { |
252 | 123k | DCHECK(!data.empty()) << "Block is empty"; |
253 | 123k | DCHECK_LT(position, data.size()) << fmt::format( |
254 | 0 | "Position out of bound in Block::erase(), max position = {}", data.size() - 1); |
255 | | |
256 | 123k | erase_impl(position); |
257 | 123k | } |
258 | | |
259 | 123k | void Block::erase_impl(size_t position) { |
260 | 123k | data.erase(data.begin() + position); |
261 | 123k | } |
262 | | |
263 | 76.0k | ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) { |
264 | 76.0k | if (position >= data.size()) { |
265 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
266 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
267 | 0 | data.size(), dump_names()); |
268 | 0 | } |
269 | 76.0k | return data[position]; |
270 | 76.0k | } |
271 | | |
272 | 109 | const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const { |
273 | 109 | if (position >= data.size()) { |
274 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
275 | 0 | "invalid input position, position={}, data.size={}, names={}", position, |
276 | 0 | data.size(), dump_names()); |
277 | 0 | } |
278 | 109 | return data[position]; |
279 | 109 | } |
280 | | |
281 | 86 | int Block::get_position_by_name(const std::string& name) const { |
282 | 1.03k | for (int i = 0; i < data.size(); i++) { |
283 | 1.03k | if (data[i].name == name) { |
284 | 85 | return i; |
285 | 85 | } |
286 | 1.03k | } |
287 | 1 | return -1; |
288 | 86 | } |
289 | | |
290 | 5 | void Block::check_number_of_rows(bool allow_null_columns) const { |
291 | 5 | ssize_t rows = -1; |
292 | 9 | for (const auto& elem : data) { |
293 | 9 | if (!elem.column && allow_null_columns) { |
294 | 2 | continue; |
295 | 2 | } |
296 | | |
297 | 7 | if (!elem.column) { |
298 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, |
299 | 1 | "Column {} in block is nullptr, in method check_number_of_rows.", |
300 | 1 | elem.name); |
301 | 1 | } |
302 | | |
303 | 6 | ssize_t size = elem.column->size(); |
304 | | |
305 | 6 | if (rows == -1) { |
306 | 5 | rows = size; |
307 | 5 | } else if (rows != size) { |
308 | 1 | throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}", |
309 | 1 | dump_structure()); |
310 | 1 | } |
311 | 6 | } |
312 | 5 | } |
313 | | |
314 | 3.21M | Status Block::check_type_and_column() const { |
315 | 3.21M | #ifndef NDEBUG |
316 | 3.21M | for (const auto& elem : data) { |
317 | 20.7k | if (!elem.column) { |
318 | 0 | continue; |
319 | 0 | } |
320 | 20.7k | if (!elem.type) { |
321 | 0 | continue; |
322 | 0 | } |
323 | | |
324 | | // ColumnNothing is a special column type, it is used to represent a column that |
325 | | // is not materialized, so we don't need to check it. |
326 | 20.7k | if (check_and_get_column<ColumnNothing>(elem.column.get())) { |
327 | 0 | continue; |
328 | 0 | } |
329 | | |
330 | 20.7k | const auto& type = elem.type; |
331 | 20.7k | const auto& column = elem.column; |
332 | | |
333 | 20.7k | RETURN_IF_ERROR(column->column_self_check()); |
334 | 20.7k | auto st = type->check_column(*column); |
335 | 20.7k | if (!st.ok()) { |
336 | 1 | return Status::InternalError( |
337 | 1 | "Column {} in block is not compatible with its column type :{}, data type :{}, " |
338 | 1 | "error: {}", |
339 | 1 | elem.name, column->get_name(), type->get_name(), st.msg()); |
340 | 1 | } |
341 | 20.7k | } |
342 | 3.21M | #endif |
343 | 3.21M | return Status::OK(); |
344 | 3.21M | } |
345 | | |
346 | 51.2M | size_t Block::rows() const { |
347 | 51.2M | for (const auto& elem : data) { |
348 | 46.4M | if (elem.column) { |
349 | 46.4M | return elem.column->size(); |
350 | 46.4M | } |
351 | 46.4M | } |
352 | | |
353 | 4.80M | return 0; |
354 | 51.2M | } |
355 | | |
356 | 6 | void Block::set_num_rows(size_t length) { |
357 | 6 | if (rows() > length) { |
358 | 4 | for (auto& elem : data) { |
359 | 4 | if (elem.column) { |
360 | 4 | elem.column = elem.column->shrink(length); |
361 | 4 | } |
362 | 4 | } |
363 | 4 | } |
364 | 6 | } |
365 | | |
366 | 1 | void Block::skip_num_rows(int64_t& length) { |
367 | 1 | auto origin_rows = rows(); |
368 | 1 | if (origin_rows <= length) { |
369 | 0 | clear(); |
370 | 0 | length -= origin_rows; |
371 | 1 | } else { |
372 | 1 | for (auto& elem : data) { |
373 | 1 | if (elem.column) { |
374 | 1 | elem.column = elem.column->cut(length, origin_rows - length); |
375 | 1 | } |
376 | 1 | } |
377 | 1 | } |
378 | 1 | } |
379 | | |
380 | 12.8k | size_t Block::bytes() const { |
381 | 12.8k | size_t res = 0; |
382 | 27.2k | for (const auto& elem : data) { |
383 | 27.2k | if (!elem.column) { |
384 | 0 | std::stringstream ss; |
385 | 0 | for (const auto& e : data) { |
386 | 0 | ss << e.name + " "; |
387 | 0 | } |
388 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
389 | 0 | "Column {} in block is nullptr, in method bytes. All Columns are {}", |
390 | 0 | elem.name, ss.str()); |
391 | 0 | } |
392 | 27.2k | res += elem.column->byte_size(); |
393 | 27.2k | } |
394 | | |
395 | 12.8k | return res; |
396 | 12.8k | } |
397 | | |
398 | 194k | size_t Block::allocated_bytes() const { |
399 | 194k | size_t res = 0; |
400 | 380k | for (const auto& elem : data) { |
401 | 380k | if (!elem.column) { |
402 | | // Sometimes if expr failed, then there will be a nullptr |
403 | | // column left in the block. |
404 | 1 | continue; |
405 | 1 | } |
406 | 380k | res += elem.column->allocated_bytes(); |
407 | 380k | } |
408 | | |
409 | 194k | return res; |
410 | 194k | } |
411 | | |
412 | 8 | std::string Block::dump_names() const { |
413 | 8 | std::string out; |
414 | 23 | for (auto it = data.begin(); it != data.end(); ++it) { |
415 | 15 | if (it != data.begin()) { |
416 | 7 | out += ", "; |
417 | 7 | } |
418 | 15 | out += it->name; |
419 | 15 | } |
420 | 8 | return out; |
421 | 8 | } |
422 | | |
423 | 7 | std::string Block::dump_types() const { |
424 | 7 | std::string out; |
425 | 21 | for (auto it = data.begin(); it != data.end(); ++it) { |
426 | 14 | if (it != data.begin()) { |
427 | 7 | out += ", "; |
428 | 7 | } |
429 | 14 | out += it->type->get_name(); |
430 | 14 | } |
431 | 7 | return out; |
432 | 7 | } |
433 | | |
434 | 31 | std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
435 | 31 | std::stringstream ss; |
436 | | |
437 | 31 | std::vector<std::string> headers; |
438 | 31 | headers.reserve(columns()); |
439 | 46 | for (const auto& it : data) { |
440 | | // fmt::format is from the {fmt} library, you might be using std::format in C++20 |
441 | | // If not, you can build the string with a stringstream as a fallback. |
442 | 46 | headers.push_back(fmt::format("{}({})", it.name, it.type->get_name())); |
443 | 46 | } |
444 | | |
445 | 31 | size_t start_row = std::min(begin, rows()); |
446 | 31 | size_t end_row = std::min(rows(), begin + row_limit); |
447 | | |
448 | 31 | auto format_options = DataTypeSerDe::get_default_format_options(); |
449 | 31 | auto time_zone = cctz::utc_time_zone(); |
450 | 31 | format_options.timezone = &time_zone; |
451 | | |
452 | 31 | ss << "["; |
453 | 3.59k | for (size_t row_num = start_row; row_num < end_row; ++row_num) { |
454 | 3.56k | if (row_num > start_row) { |
455 | 3.53k | ss << ","; |
456 | 3.53k | } |
457 | 3.56k | ss << "{"; |
458 | 8.33k | for (size_t i = 0; i < columns(); ++i) { |
459 | 4.77k | if (i > 0) { |
460 | 1.21k | ss << ","; |
461 | 1.21k | } |
462 | 4.77k | ss << "\"" << headers[i] << "\":"; |
463 | 4.77k | std::string s; |
464 | | |
465 | | // This value-extraction logic is preserved from your original function |
466 | | // to maintain consistency, especially for handling nullability mismatches. |
467 | 4.77k | if (data[i].column && data[i].type->is_nullable() && |
468 | 4.77k | !data[i].column->is_concrete_nullable()) { |
469 | | // This branch handles a specific internal representation of nullable columns. |
470 | | // The original code would assert here if allow_null_mismatch is false. |
471 | 0 | assert(allow_null_mismatch); |
472 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
473 | 0 | ->get_nested_type() |
474 | 0 | ->to_string(*data[i].column, row_num, format_options); |
475 | 4.77k | } else { |
476 | | // This is the standard path. The to_string method is expected to correctly |
477 | | // handle all cases, including when the column is null (e.g., by returning "NULL"). |
478 | 4.77k | s = data[i].to_string(row_num, format_options); |
479 | 4.77k | } |
480 | 4.77k | ss << "\"" << s << "\""; |
481 | 4.77k | } |
482 | 3.56k | ss << "}"; |
483 | 3.56k | } |
484 | 31 | ss << "]"; |
485 | 31 | return ss.str(); |
486 | 31 | } |
487 | | |
488 | 858 | std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const { |
489 | 858 | std::vector<std::string> headers; |
490 | 858 | std::vector<int> headers_size; |
491 | 2.10k | for (const auto& it : data) { |
492 | 2.10k | std::string s = fmt::format("{}({})", it.name, it.type->get_name()); |
493 | 2.10k | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
494 | 2.10k | headers.emplace_back(s); |
495 | 2.10k | } |
496 | | |
497 | 858 | std::stringstream out; |
498 | | // header upper line |
499 | 2.16k | auto line = [&]() { |
500 | 8.07k | for (size_t i = 0; i < columns(); ++i) { |
501 | 5.91k | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
502 | 5.91k | } |
503 | 2.16k | out << std::setw(1) << "+" << std::endl; |
504 | 2.16k | }; |
505 | 858 | line(); |
506 | | // header text |
507 | 2.96k | for (size_t i = 0; i < columns(); ++i) { |
508 | 2.10k | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
509 | 2.10k | << headers[i]; |
510 | 2.10k | } |
511 | 858 | out << std::setw(1) << "|" << std::endl; |
512 | | // header bottom line |
513 | 858 | line(); |
514 | 858 | if (rows() == 0) { |
515 | 414 | return out.str(); |
516 | 414 | } |
517 | | |
518 | 444 | auto format_options = DataTypeSerDe::get_default_format_options(); |
519 | 444 | auto time_zone = cctz::utc_time_zone(); |
520 | 444 | format_options.timezone = &time_zone; |
521 | | |
522 | | // content |
523 | 12.2k | for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { |
524 | 32.4k | for (size_t i = 0; i < columns(); ++i) { |
525 | 20.6k | if (!data[i].column || data[i].column->empty()) { |
526 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
527 | 0 | << std::right; |
528 | 0 | continue; |
529 | 0 | } |
530 | 20.6k | std::string s; |
531 | 20.6k | if (data[i].column) { // column may be const |
532 | | // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null |
533 | 20.6k | if (data[i].type->is_nullable() && !data[i].column->is_concrete_nullable()) { |
534 | 0 | assert(allow_null_mismatch); |
535 | 0 | s = assert_cast<const DataTypeNullable*>(data[i].type.get()) |
536 | 0 | ->get_nested_type() |
537 | 0 | ->to_string(*data[i].column, row_num, format_options); |
538 | 20.6k | } else { |
539 | 20.6k | s = data[i].to_string(row_num, format_options); |
540 | 20.6k | } |
541 | 20.6k | } |
542 | 20.6k | if (s.length() > headers_size[i]) { |
543 | 2.12k | s = s.substr(0, headers_size[i] - 3) + "..."; |
544 | 2.12k | } |
545 | 20.6k | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
546 | 20.6k | << std::right << s; |
547 | 20.6k | } |
548 | 11.7k | out << std::setw(1) << "|" << std::endl; |
549 | 11.7k | } |
550 | | // bottom line |
551 | 444 | line(); |
552 | 444 | if (row_limit < rows()) { |
553 | 112 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
554 | 112 | } |
555 | 444 | return out.str(); |
556 | 444 | } |
557 | | |
558 | 1 | std::string Block::dump_one_line(size_t row, int column_end) const { |
559 | 1 | assert(column_end <= columns()); |
560 | 1 | fmt::memory_buffer line; |
561 | | |
562 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
563 | 1 | auto time_zone = cctz::utc_time_zone(); |
564 | 1 | format_options.timezone = &time_zone; |
565 | | |
566 | 3 | for (int i = 0; i < column_end; ++i) { |
567 | 2 | if (LIKELY(i != 0)) { |
568 | | // TODO: need more effective function of to string. now the impl is slow |
569 | 1 | fmt::format_to(line, " {}", data[i].to_string(row, format_options)); |
570 | 1 | } else { |
571 | 1 | fmt::format_to(line, "{}", data[i].to_string(row, format_options)); |
572 | 1 | } |
573 | 2 | } |
574 | 1 | return fmt::to_string(line); |
575 | 1 | } |
576 | | |
577 | 47 | std::string Block::dump_structure() const { |
578 | 47 | std::string out; |
579 | 382 | for (auto it = data.begin(); it != data.end(); ++it) { |
580 | 335 | if (it != data.begin()) { |
581 | 288 | out += ", \n"; |
582 | 288 | } |
583 | 335 | out += it->dump_structure(); |
584 | 335 | } |
585 | 47 | return out; |
586 | 47 | } |
587 | | |
588 | 48.9k | Block Block::clone_empty() const { |
589 | 48.9k | Block res; |
590 | 95.8k | for (const auto& elem : data) { |
591 | 95.8k | res.insert(elem.clone_empty()); |
592 | 95.8k | } |
593 | 48.9k | return res; |
594 | 48.9k | } |
595 | | |
596 | 31 | MutableColumns Block::clone_empty_columns() const { |
597 | 31 | size_t num_columns = data.size(); |
598 | 31 | MutableColumns columns(num_columns); |
599 | 140 | for (size_t i = 0; i < num_columns; ++i) { |
600 | 109 | columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column(); |
601 | 109 | } |
602 | 31 | return columns; |
603 | 31 | } |
604 | | |
605 | 26.0k | Columns Block::get_columns() const { |
606 | 26.0k | size_t num_columns = data.size(); |
607 | 26.0k | Columns columns(num_columns); |
608 | 113k | for (size_t i = 0; i < num_columns; ++i) { |
609 | 87.4k | columns[i] = data[i].column->convert_to_full_column_if_const(); |
610 | 87.4k | } |
611 | 26.0k | return columns; |
612 | 26.0k | } |
613 | | |
614 | 553 | Columns Block::get_columns_and_convert() { |
615 | 553 | size_t num_columns = data.size(); |
616 | 553 | Columns columns(num_columns); |
617 | 1.16k | for (size_t i = 0; i < num_columns; ++i) { |
618 | 616 | data[i].column = data[i].column->convert_to_full_column_if_const(); |
619 | 616 | columns[i] = data[i].column; |
620 | 616 | } |
621 | 553 | return columns; |
622 | 553 | } |
623 | | |
624 | 2.47k | Block::ScopedMutableColumns::ScopedMutableColumns(Block& block) : _block(&block) { |
625 | 2.47k | const size_t num_columns = block.data.size(); |
626 | 2.47k | _columns.resize(num_columns); |
627 | 2.47k | size_t acquired_columns = 0; |
628 | 2.47k | try { |
629 | 7.17k | for (; acquired_columns < num_columns; ++acquired_columns) { |
630 | 4.69k | auto& column_with_type_and_name = block.data[acquired_columns]; |
631 | 4.69k | _columns[acquired_columns] = scoped_mutate_column(column_with_type_and_name.column, |
632 | 4.69k | column_with_type_and_name.type); |
633 | 4.69k | } |
634 | 2.47k | } catch (...) { |
635 | 4 | for (size_t i = 0; i < acquired_columns; ++i) { |
636 | 2 | block.data[i].column = std::move(_columns[i]); |
637 | 2 | } |
638 | 2 | _block = nullptr; |
639 | 2 | throw; |
640 | 2 | } |
641 | 2.47k | } |
642 | | |
643 | 2.47k | Block::ScopedMutableColumns::~ScopedMutableColumns() { |
644 | 2.47k | restore(); |
645 | 2.47k | } |
646 | | |
647 | | Block::ScopedMutableColumns::ScopedMutableColumns(ScopedMutableColumns&& other) noexcept |
648 | 0 | : _block(std::exchange(other._block, nullptr)), _columns(std::move(other._columns)) {} |
649 | | |
650 | | Block::ScopedMutableColumns& Block::ScopedMutableColumns::operator=( |
651 | 0 | ScopedMutableColumns&& other) noexcept { |
652 | 0 | if (this != &other) { |
653 | 0 | restore(); |
654 | 0 | _block = std::exchange(other._block, nullptr); |
655 | 0 | _columns = std::move(other._columns); |
656 | 0 | } |
657 | 0 | return *this; |
658 | 0 | } |
659 | | |
660 | 2 | const DataTypePtr& Block::ScopedMutableColumns::get_datatype_by_position(size_t position) const { |
661 | 2 | DCHECK(_block != nullptr); |
662 | 2 | return _block->get_by_position(position).type; |
663 | 2 | } |
664 | | |
665 | 2 | const std::string& Block::ScopedMutableColumns::get_name_by_position(size_t position) const { |
666 | 2 | DCHECK(_block != nullptr); |
667 | 2 | return _block->get_by_position(position).name; |
668 | 2 | } |
669 | | |
670 | 811 | MutableColumns Block::ScopedMutableColumns::release() { |
671 | 811 | DCHECK(_block != nullptr); |
672 | 811 | _block = nullptr; |
673 | 811 | return std::move(_columns); |
674 | 811 | } |
675 | | |
676 | 2.95k | void Block::ScopedMutableColumns::restore() { |
677 | 2.95k | if (_block != nullptr) { |
678 | 1.66k | _block->set_columns(std::move(_columns)); |
679 | 1.66k | _block = nullptr; |
680 | 1.66k | } |
681 | 2.95k | } |
682 | | |
683 | | Block::ScopedMutableColumn::ScopedMutableColumn(Block& block, size_t position) |
684 | 98 | : _block(&block), _position(position) { |
685 | 98 | DCHECK_LT(_position, _block->data.size()); |
686 | 98 | auto& column_with_type_and_name = _block->data[_position]; |
687 | 98 | DCHECK(column_with_type_and_name.type); |
688 | 98 | _column = |
689 | 98 | scoped_mutate_column(column_with_type_and_name.column, column_with_type_and_name.type); |
690 | 98 | } |
691 | | |
692 | 97 | Block::ScopedMutableColumn::~ScopedMutableColumn() { |
693 | 97 | restore(); |
694 | 97 | } |
695 | | |
696 | | Block::ScopedMutableColumn::ScopedMutableColumn(ScopedMutableColumn&& other) noexcept |
697 | 0 | : _block(std::exchange(other._block, nullptr)), |
698 | 0 | _position(other._position), |
699 | 0 | _column(std::move(other._column)) {} |
700 | | |
701 | | Block::ScopedMutableColumn& Block::ScopedMutableColumn::operator=( |
702 | 0 | ScopedMutableColumn&& other) noexcept { |
703 | 0 | if (this != &other) { |
704 | 0 | restore(); |
705 | 0 | _block = std::exchange(other._block, nullptr); |
706 | 0 | _position = other._position; |
707 | 0 | _column = std::move(other._column); |
708 | 0 | } |
709 | 0 | return *this; |
710 | 0 | } |
711 | | |
712 | 97 | void Block::ScopedMutableColumn::restore() { |
713 | 97 | if (_block != nullptr) { |
714 | 97 | DCHECK_LT(_position, _block->data.size()); |
715 | 97 | _block->data[_position].column = std::move(_column); |
716 | 97 | _block = nullptr; |
717 | 97 | } |
718 | 97 | } |
719 | | |
720 | 2.47k | Block::ScopedMutableColumns Block::mutate_columns_scoped() & { |
721 | 2.47k | return ScopedMutableColumns(*this); |
722 | 2.47k | } |
723 | | |
724 | 98 | Block::ScopedMutableColumn Block::mutate_column_scoped(size_t position) & { |
725 | 98 | return ScopedMutableColumn(*this, position); |
726 | 98 | } |
727 | | |
728 | 812 | ScopedMutableBlock::ScopedMutableBlock(Block* block) { |
729 | 812 | DCHECK(block != nullptr); |
730 | 812 | DataTypes data_types = block->get_data_types(); |
731 | 812 | std::vector<std::string> names = block->get_names(); |
732 | 812 | auto columns_guard = block->mutate_columns_scoped(); |
733 | 812 | _mutable_block.data_types() = std::move(data_types); |
734 | 812 | _mutable_block.get_names() = std::move(names); |
735 | 812 | _mutable_block.set_mutable_columns(columns_guard.release()); |
736 | 812 | _block = block; |
737 | 812 | } |
738 | | |
739 | 145k | MutableColumns Block::mutate_columns() && { |
740 | 145k | size_t num_columns = data.size(); |
741 | 145k | MutableColumns columns(num_columns); |
742 | 433k | for (size_t i = 0; i < num_columns; ++i) { |
743 | 287k | DCHECK(data[i].type); |
744 | 287k | columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column)) |
745 | 287k | : data[i].type->create_column(); |
746 | 287k | } |
747 | 145k | return columns; |
748 | 145k | } |
749 | | |
750 | 3.37k | void Block::set_columns(MutableColumns&& columns) { |
751 | 3.37k | DCHECK_GE(columns.size(), data.size()) |
752 | 0 | << fmt::format("Invalid size of columns, columns size: {}, data size: {}", |
753 | 0 | columns.size(), data.size()); |
754 | 3.37k | size_t num_columns = data.size(); |
755 | 11.6k | for (size_t i = 0; i < num_columns; ++i) { |
756 | 8.22k | data[i].column = std::move(columns[i]); |
757 | 8.22k | } |
758 | 3.37k | } |
759 | | |
760 | 49 | Block Block::clone_without_columns(const std::vector<int>* column_offset) const { |
761 | 49 | Block res; |
762 | | |
763 | 49 | if (column_offset != nullptr) { |
764 | 30 | size_t num_columns = column_offset->size(); |
765 | 170 | for (size_t i = 0; i < num_columns; ++i) { |
766 | 140 | res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name}); |
767 | 140 | } |
768 | 30 | } else { |
769 | 19 | size_t num_columns = data.size(); |
770 | 53 | for (size_t i = 0; i < num_columns; ++i) { |
771 | 34 | res.insert({nullptr, data[i].type, data[i].name}); |
772 | 34 | } |
773 | 19 | } |
774 | 49 | return res; |
775 | 49 | } |
776 | | |
777 | 55.7k | const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const { |
778 | 55.7k | return data; |
779 | 55.7k | } |
780 | | |
781 | 145k | std::vector<std::string> Block::get_names() const { |
782 | 145k | std::vector<std::string> res; |
783 | 145k | res.reserve(columns()); |
784 | | |
785 | 285k | for (const auto& elem : data) { |
786 | 285k | res.push_back(elem.name); |
787 | 285k | } |
788 | | |
789 | 145k | return res; |
790 | 145k | } |
791 | | |
792 | 145k | DataTypes Block::get_data_types() const { |
793 | 145k | DataTypes res; |
794 | 145k | res.reserve(columns()); |
795 | | |
796 | 285k | for (const auto& elem : data) { |
797 | 285k | res.push_back(elem.type); |
798 | 285k | } |
799 | | |
800 | 145k | return res; |
801 | 145k | } |
802 | | |
803 | 52.7k | void Block::clear() { |
804 | 52.7k | data.clear(); |
805 | 52.7k | } |
806 | | |
807 | 1.62M | void Block::clear_column_data(int64_t column_size) { |
808 | 1.62M | SCOPED_SKIP_MEMORY_CHECK(); |
809 | | // data.size() greater than column_size, means here have some |
810 | | // function exec result in block, need erase it here |
811 | 1.62M | if (column_size != -1 and data.size() > column_size) { |
812 | 2.20k | for (int64_t i = data.size() - 1; i >= column_size; --i) { |
813 | 1.10k | erase(i); |
814 | 1.10k | } |
815 | 1.10k | } |
816 | 1.62M | for (auto& d : data) { |
817 | 51.3k | if (d.column) { |
818 | 51.3k | if (d.column->is_exclusive()) { |
819 | 51.1k | d.column->assert_mutable()->clear(); |
820 | 51.1k | } else { |
821 | 212 | d.column = d.column->clone_empty(); |
822 | 212 | } |
823 | 51.3k | } |
824 | 51.3k | } |
825 | 1.62M | } |
826 | | |
827 | 46 | void Block::clear_column_data(const std::vector<uint32_t>& columns_to_clear) { |
828 | 46 | SCOPED_SKIP_MEMORY_CHECK(); |
829 | 79 | for (auto col : columns_to_clear) { |
830 | 79 | DCHECK_LT(col, data.size()); |
831 | 79 | auto& column = data[col].column; |
832 | 79 | if (column) { |
833 | 79 | if (column->is_exclusive()) { |
834 | 77 | column->assert_mutable()->clear(); |
835 | 77 | } else { |
836 | 2 | column = column->clone_empty(); |
837 | 2 | } |
838 | 79 | } |
839 | 79 | } |
840 | 46 | } |
841 | | |
842 | | void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags, |
843 | 48.0k | bool need_keep_first) { |
844 | 48.0k | if (data.size() >= column_keep_flags.size()) { |
845 | 48.0k | auto origin_rows = rows(); |
846 | 142k | for (size_t i = 0; i < column_keep_flags.size(); ++i) { |
847 | 94.1k | if (!column_keep_flags[i]) { |
848 | 36.8k | data[i].column = data[i].column->clone_empty(); |
849 | 36.8k | } |
850 | 94.1k | } |
851 | | |
852 | 48.0k | if (need_keep_first && !column_keep_flags[0]) { |
853 | 1 | auto first_column = data[0].column->clone_empty(); |
854 | 1 | first_column->resize(origin_rows); |
855 | 1 | data[0].column = std::move(first_column); |
856 | 1 | } |
857 | 48.0k | } |
858 | 48.0k | } |
859 | | |
860 | 1.32k | void Block::swap(Block& other) noexcept { |
861 | 1.32k | SCOPED_SKIP_MEMORY_CHECK(); |
862 | 1.32k | data.swap(other.data); |
863 | 1.32k | } |
864 | | |
865 | 1.70k | void Block::swap(Block&& other) noexcept { |
866 | 1.70k | SCOPED_SKIP_MEMORY_CHECK(); |
867 | 1.70k | data = std::move(other.data); |
868 | 1.70k | } |
869 | | |
870 | 3 | void Block::shuffle_columns(const std::vector<int>& result_column_ids) { |
871 | 3 | Container tmp_data; |
872 | 3 | tmp_data.reserve(result_column_ids.size()); |
873 | 5 | for (const int result_column_id : result_column_ids) { |
874 | 5 | tmp_data.push_back(data[result_column_id]); |
875 | 5 | } |
876 | 3 | data = std::move(tmp_data); |
877 | 3 | } |
878 | | |
879 | 2 | void Block::update_hash(SipHash& hash) const { |
880 | 8 | for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { |
881 | 12 | for (const auto& col : data) { |
882 | 12 | col.column->update_hash_with_value(row_no, hash); |
883 | 12 | } |
884 | 6 | } |
885 | 2 | } |
886 | | |
887 | | void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, |
888 | 519 | const IColumn::Filter& filter) { |
889 | 519 | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
890 | 1.34k | for (const auto& col : columns_to_filter) { |
891 | 1.34k | auto& column = block->get_by_position(col).column; |
892 | 1.34k | if (column->size() == count) { |
893 | 1.29k | continue; |
894 | 1.29k | } |
895 | 43 | if (count == 0) { |
896 | 2 | if (column->is_exclusive()) { |
897 | 0 | column->assert_mutable()->clear(); |
898 | 2 | } else { |
899 | 2 | column = column->clone_empty(); |
900 | 2 | } |
901 | 2 | continue; |
902 | 2 | } |
903 | 41 | if (column->is_exclusive()) { |
904 | | // COW: safe to mutate in-place since we have exclusive ownership |
905 | 41 | const auto result_size = column->assert_mutable()->filter(filter); |
906 | 41 | if (result_size != count) [[unlikely]] { |
907 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
908 | 0 | "result_size not equal with filter_size, result_size={}, " |
909 | 0 | "filter_size={}", |
910 | 0 | result_size, count); |
911 | 0 | } |
912 | 41 | } else { |
913 | | // COW: must create a copy since column is shared |
914 | 0 | column = column->filter(filter, count); |
915 | 0 | } |
916 | 41 | } |
917 | 519 | } |
918 | | |
919 | | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter, |
920 | 1 | uint32_t column_to_keep) { |
921 | 1 | std::vector<uint32_t> columns_to_filter; |
922 | 1 | columns_to_filter.resize(column_to_keep); |
923 | 3 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
924 | 2 | columns_to_filter[i] = i; |
925 | 2 | } |
926 | 1 | filter_block_internal(block, columns_to_filter, filter); |
927 | 1 | } |
928 | | |
929 | 8 | void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) { |
930 | 8 | const size_t count = |
931 | 8 | filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
932 | 24 | for (int i = 0; i < block->columns(); ++i) { |
933 | 16 | auto& column = block->get_by_position(i).column; |
934 | 16 | if (column->is_exclusive()) { |
935 | 16 | column->assert_mutable()->filter(filter); |
936 | 16 | } else { |
937 | 0 | column = column->filter(filter, count); |
938 | 0 | } |
939 | 16 | } |
940 | 8 | } |
941 | | |
942 | | Status Block::append_to_block_by_selector(MutableBlock* dst, |
943 | 1 | const IColumn::Selector& selector) const { |
944 | 1 | RETURN_IF_CATCH_EXCEPTION({ |
945 | 1 | DCHECK_EQ(data.size(), dst->mutable_columns().size()); |
946 | 1 | for (size_t i = 0; i < data.size(); i++) { |
947 | | // FIXME: this is a quickfix. we assume that only partition functions make there some |
948 | 1 | if (!is_column_const(*data[i].column)) { |
949 | 1 | data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); |
950 | 1 | } |
951 | 1 | } |
952 | 1 | }); |
953 | 1 | return Status::OK(); |
954 | 1 | } |
955 | | |
956 | | Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, |
957 | 496 | size_t filter_column_id, size_t column_to_keep) { |
958 | 496 | const auto& filter_column = block->get_by_position(filter_column_id).column; |
959 | 496 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
960 | 1 | const auto& nested_column = nullable_column->get_nested_column_ptr(); |
961 | | |
962 | 1 | MutableColumnPtr mutable_holder = |
963 | 1 | nested_column->use_count() == 1 |
964 | 1 | ? nested_column->assert_mutable() |
965 | 1 | : nested_column->clone_resized(nested_column->size()); |
966 | | |
967 | 1 | auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get()); |
968 | 1 | const auto* __restrict null_map = nullable_column->get_null_map_data().data(); |
969 | 1 | IColumn::Filter& filter = concrete_column->get_data(); |
970 | 1 | auto* __restrict filter_data = filter.data(); |
971 | | |
972 | 1 | const size_t size = filter.size(); |
973 | 4 | for (size_t i = 0; i < size; ++i) { |
974 | 3 | filter_data[i] &= !null_map[i]; |
975 | 3 | } |
976 | 1 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
977 | 495 | } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
978 | 2 | bool ret = const_column->get_bool(0); |
979 | 2 | if (!ret) { |
980 | 2 | for (const auto& col : columns_to_filter) { |
981 | 2 | auto& column = block->get_by_position(col).column; |
982 | 2 | if (column->is_exclusive()) { |
983 | 2 | column->assert_mutable()->clear(); |
984 | 2 | } else { |
985 | 0 | column = column->clone_empty(); |
986 | 0 | } |
987 | 2 | } |
988 | 1 | } |
989 | 493 | } else { |
990 | 493 | const IColumn::Filter& filter = |
991 | 493 | assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data(); |
992 | 493 | RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter)); |
993 | 493 | } |
994 | | |
995 | 496 | erase_useless_column(block, column_to_keep); |
996 | 496 | return Status::OK(); |
997 | 496 | } |
998 | | |
999 | 490 | Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) { |
1000 | 490 | std::vector<uint32_t> columns_to_filter; |
1001 | 490 | columns_to_filter.resize(column_to_keep); |
1002 | 1.77k | for (uint32_t i = 0; i < column_to_keep; ++i) { |
1003 | 1.28k | columns_to_filter[i] = i; |
1004 | 1.28k | } |
1005 | 490 | return filter_block(block, columns_to_filter, filter_column_id, column_to_keep); |
1006 | 490 | } |
1007 | | |
1008 | | Status Block::serialize(int be_exec_version, PBlock* pblock, |
1009 | | /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, |
1010 | | size_t* compressed_bytes, int64_t* compress_time, |
1011 | | segment_v2::CompressionTypePB compression_type, |
1012 | 2.73k | bool allow_transfer_large_data) const { |
1013 | 2.73k | RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); |
1014 | 2.73k | pblock->set_be_exec_version(be_exec_version); |
1015 | | |
1016 | | // calc uncompressed size for allocation |
1017 | 2.73k | size_t content_uncompressed_size = 0; |
1018 | 3.42k | for (const auto& c : *this) { |
1019 | 3.42k | PColumnMeta* pcm = pblock->add_column_metas(); |
1020 | 3.42k | c.to_pb_column_meta(pcm); |
1021 | 3.42k | DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type"; |
1022 | | // get serialized size |
1023 | 3.42k | content_uncompressed_size += |
1024 | 3.42k | c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version()); |
1025 | 3.42k | } |
1026 | | |
1027 | | // serialize data values |
1028 | | // when data type is HLL, content_uncompressed_size maybe larger than real size. |
1029 | 2.73k | std::string column_values; |
1030 | 2.73k | try { |
1031 | | // TODO: After support c++23, we should use resize_and_overwrite to replace resize |
1032 | 2.73k | column_values.resize(content_uncompressed_size); |
1033 | 2.73k | } catch (...) { |
1034 | 0 | std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.", |
1035 | 0 | content_uncompressed_size); |
1036 | 0 | LOG(WARNING) << msg; |
1037 | 0 | return Status::BufferAllocFailed(msg); |
1038 | 0 | } |
1039 | 2.73k | char* buf = column_values.data(); |
1040 | | |
1041 | 3.42k | for (const auto& c : *this) { |
1042 | 3.42k | buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); |
1043 | 3.42k | } |
1044 | 2.73k | *uncompressed_bytes = content_uncompressed_size; |
1045 | 2.73k | const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; |
1046 | 2.73k | *compressed_bytes = serialize_bytes; |
1047 | 2.73k | column_values.resize(serialize_bytes); |
1048 | | |
1049 | | // compress |
1050 | 2.73k | if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { |
1051 | 624 | SCOPED_RAW_TIMER(compress_time); |
1052 | 624 | pblock->set_compression_type(compression_type); |
1053 | 624 | pblock->set_uncompressed_size(serialize_bytes); |
1054 | | |
1055 | 624 | BlockCompressionCodec* codec; |
1056 | 624 | RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); |
1057 | | |
1058 | 624 | faststring buf_compressed; |
1059 | 624 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION( |
1060 | 624 | codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed)); |
1061 | 624 | size_t compressed_size = buf_compressed.size(); |
1062 | 624 | if (LIKELY(compressed_size < serialize_bytes)) { |
1063 | | // TODO: rethink the logic here may copy again ? |
1064 | 624 | pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); |
1065 | 624 | pblock->set_compressed(true); |
1066 | 624 | *compressed_bytes = compressed_size; |
1067 | 624 | } else { |
1068 | 0 | pblock->set_column_values(std::move(column_values)); |
1069 | 0 | } |
1070 | | |
1071 | 624 | VLOG_ROW << "uncompressed size: " << content_uncompressed_size |
1072 | 0 | << ", compressed size: " << compressed_size; |
1073 | 2.10k | } else { |
1074 | 2.10k | pblock->set_column_values(std::move(column_values)); |
1075 | 2.10k | } |
1076 | 2.73k | if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { |
1077 | 0 | return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", |
1078 | 0 | *compressed_bytes); |
1079 | 0 | } |
1080 | 2.73k | return Status::OK(); |
1081 | 2.73k | } |
1082 | | |
1083 | 240k | size_t MutableBlock::rows() const { |
1084 | 240k | for (const auto& column : _columns) { |
1085 | 144k | if (column) { |
1086 | 144k | return column->size(); |
1087 | 144k | } |
1088 | 144k | } |
1089 | | |
1090 | 96.0k | return 0; |
1091 | 240k | } |
1092 | | |
1093 | 0 | void MutableBlock::swap(MutableBlock& another) noexcept { |
1094 | 0 | SCOPED_SKIP_MEMORY_CHECK(); |
1095 | 0 | _columns.swap(another._columns); |
1096 | 0 | _data_types.swap(another._data_types); |
1097 | 0 | _names.swap(another._names); |
1098 | 0 | } |
1099 | | |
1100 | 0 | void MutableBlock::add_row(const Block* block, int row) { |
1101 | 0 | const auto& block_data = block->get_columns_with_type_and_name(); |
1102 | 0 | for (size_t i = 0; i < _columns.size(); ++i) { |
1103 | 0 | _columns[i]->insert_from(*block_data[i].column.get(), row); |
1104 | 0 | } |
1105 | 0 | } |
1106 | | |
1107 | | Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, |
1108 | 162 | const uint32_t* row_end, const std::vector<int>* column_offset) { |
1109 | 162 | RETURN_IF_CATCH_EXCEPTION({ |
1110 | 162 | DCHECK_LE(columns(), block->columns()); |
1111 | 162 | if (column_offset != nullptr) { |
1112 | 162 | DCHECK_EQ(columns(), column_offset->size()); |
1113 | 162 | } |
1114 | 162 | const auto& block_data = block->get_columns_with_type_and_name(); |
1115 | 162 | for (size_t i = 0; i < _columns.size(); ++i) { |
1116 | 162 | const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i]; |
1117 | 162 | DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); |
1118 | 162 | auto& dst = _columns[i]; |
1119 | 162 | const auto& src = *src_col.column.get(); |
1120 | 162 | DCHECK_GE(src.size(), row_end - row_begin); |
1121 | 162 | dst->insert_indices_from(src, row_begin, row_end); |
1122 | 162 | } |
1123 | 162 | }); |
1124 | 161 | return Status::OK(); |
1125 | 162 | } |
1126 | | |
1127 | 126 | Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { |
1128 | 126 | RETURN_IF_CATCH_EXCEPTION({ |
1129 | 126 | DCHECK_LE(columns(), block->columns()); |
1130 | 126 | const auto& block_data = block->get_columns_with_type_and_name(); |
1131 | 126 | for (size_t i = 0; i < _columns.size(); ++i) { |
1132 | 126 | DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); |
1133 | 126 | auto& dst = _columns[i]; |
1134 | 126 | const auto& src = *block_data[i].column.get(); |
1135 | 126 | dst->insert_range_from(src, row_begin, length); |
1136 | 126 | } |
1137 | 126 | }); |
1138 | 126 | return Status::OK(); |
1139 | 126 | } |
1140 | | |
1141 | 144k | Block MutableBlock::to_block(int start_column) { |
1142 | 144k | return to_block(start_column, (int)_columns.size()); |
1143 | 144k | } |
1144 | | |
1145 | 144k | Block MutableBlock::to_block(int start_column, int end_column) { |
1146 | 144k | ColumnsWithTypeAndName columns_with_schema; |
1147 | 144k | columns_with_schema.reserve(end_column - start_column); |
1148 | 428k | for (size_t i = start_column; i < end_column; ++i) { |
1149 | 283k | columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]); |
1150 | 283k | } |
1151 | 144k | return {columns_with_schema}; |
1152 | 144k | } |
1153 | | |
1154 | 1 | std::string MutableBlock::dump_data_json(size_t row_limit) const { |
1155 | 1 | std::stringstream ss; |
1156 | 1 | std::vector<std::string> headers; |
1157 | | |
1158 | 1 | headers.reserve(columns()); |
1159 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1160 | 1 | headers.push_back(_data_types[i]->get_name()); |
1161 | 1 | } |
1162 | 1 | size_t num_rows_to_dump = std::min(rows(), row_limit); |
1163 | 1 | ss << "["; |
1164 | | |
1165 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1166 | 1 | auto time_zone = cctz::utc_time_zone(); |
1167 | 1 | format_options.timezone = &time_zone; |
1168 | | |
1169 | 4 | for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) { |
1170 | 3 | if (row_num > 0) { |
1171 | 2 | ss << ","; |
1172 | 2 | } |
1173 | 3 | ss << "{"; |
1174 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1175 | 3 | if (i > 0) { |
1176 | 0 | ss << ","; |
1177 | 0 | } |
1178 | 3 | ss << "\"" << headers[i] << "\":"; |
1179 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1180 | 3 | ss << "\"" << s << "\""; |
1181 | 3 | } |
1182 | 3 | ss << "}"; |
1183 | 3 | } |
1184 | 1 | ss << "]"; |
1185 | 1 | return ss.str(); |
1186 | 1 | } |
1187 | | |
1188 | 1 | std::string MutableBlock::dump_data(size_t row_limit) const { |
1189 | 1 | std::vector<std::string> headers; |
1190 | 1 | std::vector<int> headers_size; |
1191 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1192 | 1 | std::string s = _data_types[i]->get_name(); |
1193 | 1 | headers_size.push_back(s.size() > 15 ? (int)s.size() : 15); |
1194 | 1 | headers.emplace_back(s); |
1195 | 1 | } |
1196 | | |
1197 | 1 | std::stringstream out; |
1198 | | // header upper line |
1199 | 3 | auto line = [&]() { |
1200 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1201 | 3 | out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; |
1202 | 3 | } |
1203 | 3 | out << std::setw(1) << "+" << std::endl; |
1204 | 3 | }; |
1205 | 1 | line(); |
1206 | | // header text |
1207 | 2 | for (size_t i = 0; i < columns(); ++i) { |
1208 | 1 | out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) |
1209 | 1 | << headers[i]; |
1210 | 1 | } |
1211 | 1 | out << std::setw(1) << "|" << std::endl; |
1212 | | // header bottom line |
1213 | 1 | line(); |
1214 | 1 | if (rows() == 0) { |
1215 | 0 | return out.str(); |
1216 | 0 | } |
1217 | | |
1218 | 1 | auto format_options = DataTypeSerDe::get_default_format_options(); |
1219 | 1 | auto time_zone = cctz::utc_time_zone(); |
1220 | 1 | format_options.timezone = &time_zone; |
1221 | | |
1222 | | // content |
1223 | 4 | for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { |
1224 | 6 | for (size_t i = 0; i < columns(); ++i) { |
1225 | 3 | if (_columns[i].get()->empty()) { |
1226 | 0 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1227 | 0 | << std::right; |
1228 | 0 | continue; |
1229 | 0 | } |
1230 | 3 | std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options); |
1231 | 3 | if (s.length() > headers_size[i]) { |
1232 | 0 | s = s.substr(0, headers_size[i] - 3) + "..."; |
1233 | 0 | } |
1234 | 3 | out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) |
1235 | 3 | << std::right << s; |
1236 | 3 | } |
1237 | 3 | out << std::setw(1) << "|" << std::endl; |
1238 | 3 | } |
1239 | | // bottom line |
1240 | 1 | line(); |
1241 | 1 | if (row_limit < rows()) { |
1242 | 0 | out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; |
1243 | 0 | } |
1244 | 1 | return out.str(); |
1245 | 1 | } |
1246 | | |
1247 | 48.0k | std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const { |
1248 | 48.0k | auto temp_block = Block::create_unique(); |
1249 | 94.1k | for (const auto& d : data) { |
1250 | 94.1k | auto column = d.type->create_column(); |
1251 | 94.1k | if (is_reserve) { |
1252 | 0 | column->reserve(size); |
1253 | 94.1k | } else { |
1254 | 94.1k | column->insert_many_defaults(size); |
1255 | 94.1k | } |
1256 | 94.1k | temp_block->insert({std::move(column), d.type, d.name}); |
1257 | 94.1k | } |
1258 | 48.0k | return temp_block; |
1259 | 48.0k | } |
1260 | | |
1261 | 10.1k | void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) { |
1262 | 10.1k | for (auto idx : char_type_idx) { |
1263 | 2 | if (idx < data.size()) { |
1264 | 1 | auto& col_and_name = this->get_by_position(idx); |
1265 | 1 | if (col_and_name.column->is_exclusive()) { |
1266 | 1 | col_and_name.column->assert_mutable()->shrink_padding_chars(); |
1267 | 1 | } else { |
1268 | 0 | auto mutable_col = std::move(*col_and_name.column).mutate(); |
1269 | 0 | mutable_col->shrink_padding_chars(); |
1270 | 0 | col_and_name.column = std::move(mutable_col); |
1271 | 0 | } |
1272 | 1 | } |
1273 | 2 | } |
1274 | 10.1k | } |
1275 | | |
1276 | 96.1k | size_t MutableBlock::allocated_bytes() const { |
1277 | 96.1k | size_t res = 0; |
1278 | 188k | for (const auto& col : _columns) { |
1279 | 188k | if (col) { |
1280 | 188k | res += col->allocated_bytes(); |
1281 | 188k | } |
1282 | 188k | } |
1283 | | |
1284 | 96.1k | return res; |
1285 | 96.1k | } |
1286 | | |
1287 | 1 | void MutableBlock::clear_column_data() noexcept { |
1288 | 1 | SCOPED_SKIP_MEMORY_CHECK(); |
1289 | 1 | for (auto& col : _columns) { |
1290 | 1 | if (col) { |
1291 | 1 | col->clear(); |
1292 | 1 | } |
1293 | 1 | } |
1294 | 1 | } |
1295 | | |
1296 | 5 | std::string MutableBlock::dump_names() const { |
1297 | 5 | std::string out; |
1298 | 16 | for (auto it = _names.begin(); it != _names.end(); ++it) { |
1299 | 11 | if (it != _names.begin()) { |
1300 | 6 | out += ", "; |
1301 | 6 | } |
1302 | 11 | out += *it; |
1303 | 11 | } |
1304 | 5 | return out; |
1305 | 5 | } |
1306 | | } // namespace doris |