Coverage Report

Created: 2026-05-24 10:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/core/block/block.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp
19
// and modified by Doris
20
21
#include "core/block/block.h"
22
23
#include <fmt/format.h>
24
#include <gen_cpp/data.pb.h>
25
#include <glog/logging.h>
26
#include <snappy.h>
27
#include <streamvbyte.h>
28
29
#include <algorithm>
30
#include <cassert>
31
#include <iomanip>
32
#include <limits>
33
#include <ranges>
34
35
#include "agent/be_exec_version_manager.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/logging.h"
38
#include "common/status.h"
39
#include "core/assert_cast.h"
40
#include "core/column/column.h"
41
#include "core/column/column_const.h"
42
#include "core/column/column_nothing.h"
43
#include "core/column/column_nullable.h"
44
#include "core/column/column_vector.h"
45
#include "core/data_type/data_type_factory.hpp"
46
#include "core/data_type/data_type_nullable.h"
47
#include "core/data_type_serde/data_type_serde.h"
48
#include "runtime/descriptors.h"
49
#include "runtime/runtime_profile.h"
50
#include "runtime/thread_context.h"
51
#include "util/block_compression.h"
52
#include "util/faststring.h"
53
#include "util/simd/bits.h"
54
#include "util/slice.h"
55
56
class SipHash;
57
58
namespace doris::segment_v2 {
59
enum CompressionTypePB : int;
60
} // namespace doris::segment_v2
61
namespace doris {
62
template <typename T>
63
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
64
2
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
2
    T block;
66
6
    while (blocks.try_dequeue(block)) {
67
4
        if (memory_used_counter) {
68
4
            if constexpr (std::is_same_v<T, Block>) {
69
2
                memory_used_counter->update(-block.allocated_bytes());
70
2
            } else {
71
2
                memory_used_counter->update(-block->allocated_bytes());
72
2
            }
73
4
        }
74
4
    }
75
2
}
_ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE
Line
Count
Source
64
1
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
1
    T block;
66
3
    while (blocks.try_dequeue(block)) {
67
2
        if (memory_used_counter) {
68
2
            if constexpr (std::is_same_v<T, Block>) {
69
2
                memory_used_counter->update(-block.allocated_bytes());
70
            } else {
71
                memory_used_counter->update(-block->allocated_bytes());
72
            }
73
2
        }
74
2
    }
75
1
}
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE
Line
Count
Source
64
1
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
1
    T block;
66
3
    while (blocks.try_dequeue(block)) {
67
2
        if (memory_used_counter) {
68
            if constexpr (std::is_same_v<T, Block>) {
69
                memory_used_counter->update(-block.allocated_bytes());
70
2
            } else {
71
2
                memory_used_counter->update(-block->allocated_bytes());
72
2
            }
73
2
        }
74
2
    }
75
1
}
76
77
template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
78
                                  RuntimeProfile::Counter* memory_used_counter);
79
template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
80
                                      RuntimeProfile::Counter* memory_used_counter);
81
82
namespace {
83
84
// The no-clone fast path is only safe when the whole column tree is uniquely
85
// owned. A composite column with shared children still needs COW detachment.
86
32.5k
bool is_recursively_exclusive(const IColumn& column) {
87
32.5k
    if (!column.is_exclusive()) {
88
13
        return false;
89
13
    }
90
91
32.5k
    bool exclusive = true;
92
32.5k
    IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) {
93
27.7k
        if (!exclusive) {
94
0
            return;
95
0
        }
96
27.7k
        const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn);
97
27.7k
        DCHECK(subcolumn_ptr);
98
27.7k
        exclusive = is_recursively_exclusive(*subcolumn_ptr);
99
27.7k
    };
100
    // `for_each_subcolumn` only exposes a mutable callback type. This callback
101
    // only reads the wrapped pointers and never calls the non-const accessors.
102
32.5k
    const_cast<IColumn&>(column).for_each_subcolumn(callback);
103
32.5k
    return exclusive;
104
32.5k
}
105
106
// Acquire one live Block slot transactionally. Shared columns are detached while
107
// the original slot is still intact, so a clone failure cannot leave Block with
108
// a moved-from/null column. Exclusive column trees keep the stealing fast path.
109
4.79k
MutableColumnPtr scoped_mutate_column(ColumnPtr& column, const DataTypePtr& type) {
110
4.79k
    DCHECK(type);
111
4.79k
    if (!column) {
112
1
        return type->create_column();
113
1
    }
114
115
4.79k
    MutableColumnPtr mutable_column;
116
4.79k
    if (is_recursively_exclusive(*column)) {
117
4.78k
        mutable_column = std::move(*column).mutate();
118
4.78k
    } else {
119
13
        mutable_column = IColumn::mutate(column);
120
13
    }
121
4.79k
    column = nullptr;
122
4.79k
    return mutable_column;
123
4.79k
}
124
125
} // namespace
126
127
4.35k
Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {}
128
129
257k
Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {}
130
131
56.0k
Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) {
132
271k
    for (auto* const slot_desc : slots) {
133
271k
        auto column_ptr = slot_desc->get_empty_mutable_column();
134
271k
        column_ptr->reserve(block_size);
135
271k
        insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(),
136
271k
                                     slot_desc->col_name()));
137
271k
    }
138
56.0k
}
139
140
1
Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) {
141
1
    std::vector<SlotDescriptor*> slot_ptrs(slots.size());
142
3
    for (size_t i = 0; i < slots.size(); ++i) {
143
        // Slots remain unmodified and are used to read column information; const_cast can be employed.
144
        // used in src/exec/rowid_fetcher.cpp
145
2
        slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]);
146
2
    }
147
1
    *this = Block(slot_ptrs, block_size);
148
1
}
149
150
Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes,
151
990
                          int64_t* decompress_time) {
152
990
    swap(Block());
153
990
    int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0;
154
990
    RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
155
156
990
    const char* buf = nullptr;
157
990
    std::string compression_scratch;
158
990
    if (pblock.compressed()) {
159
        // Decompress
160
479
        SCOPED_RAW_TIMER(decompress_time);
161
479
        const char* compressed_data = pblock.column_values().c_str();
162
479
        size_t compressed_size = pblock.column_values().size();
163
479
        size_t uncompressed_size = 0;
164
479
        if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
165
479
            BlockCompressionCodec* codec;
166
479
            RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
167
479
            uncompressed_size = pblock.uncompressed_size();
168
            // Should also use allocator to allocate memory here.
169
479
            compression_scratch.resize(uncompressed_size);
170
479
            Slice decompressed_slice(compression_scratch);
171
479
            RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size),
172
479
                                              &decompressed_slice));
173
479
            DCHECK(uncompressed_size == decompressed_slice.size);
174
479
        } else {
175
0
            bool success = snappy::GetUncompressedLength(compressed_data, compressed_size,
176
0
                                                         &uncompressed_size);
177
0
            DCHECK(success) << "snappy::GetUncompressedLength failed";
178
0
            compression_scratch.resize(uncompressed_size);
179
0
            success = snappy::RawUncompress(compressed_data, compressed_size,
180
0
                                            compression_scratch.data());
181
0
            DCHECK(success) << "snappy::RawUncompress failed";
182
0
        }
183
479
        *uncompressed_bytes = uncompressed_size;
184
479
        buf = compression_scratch.data();
185
511
    } else {
186
511
        buf = pblock.column_values().data();
187
511
    }
188
189
1.56k
    for (const auto& pcol_meta : pblock.column_metas()) {
190
1.56k
        DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta);
191
1.56k
        MutableColumnPtr data_column = type->create_column();
192
        // Here will try to allocate large memory, should return error if failed.
193
1.56k
        RETURN_IF_CATCH_EXCEPTION(
194
1.56k
                buf = type->deserialize(buf, &data_column, pblock.be_exec_version()));
195
1.56k
        data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
196
1.56k
    }
197
198
990
    return Status::OK();
199
990
}
200
201
13.3k
void Block::reserve(size_t count) {
202
13.3k
    data.reserve(count);
203
13.3k
}
204
205
4
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
206
4
    if (position > data.size()) {
207
1
        throw Exception(ErrorCode::INTERNAL_ERROR,
208
1
                        "invalid input position, position={}, data.size={}, names={}", position,
209
1
                        data.size(), dump_names());
210
1
    }
211
212
3
    data.emplace(data.begin() + position, elem);
213
3
}
214
215
3
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
216
3
    if (position > data.size()) {
217
1
        throw Exception(ErrorCode::INTERNAL_ERROR,
218
1
                        "invalid input position, position={}, data.size={}, names={}", position,
219
1
                        data.size(), dump_names());
220
1
    }
221
222
2
    data.emplace(data.begin() + position, std::move(elem));
223
2
}
224
225
8.09k
void Block::clear_names() {
226
177k
    for (auto& entry : data) {
227
177k
        entry.name.clear();
228
177k
    }
229
8.09k
}
230
231
14.4k
void Block::insert(const ColumnWithTypeAndName& elem) {
232
14.4k
    data.emplace_back(elem);
233
14.4k
}
234
235
999k
void Block::insert(ColumnWithTypeAndName&& elem) {
236
999k
    data.emplace_back(std::move(elem));
237
999k
}
238
239
3
void Block::erase(const std::set<size_t>& positions) {
240
3
    for (unsigned long position : std::ranges::reverse_view(positions)) {
241
2
        erase(position);
242
2
    }
243
3
}
244
245
709
void Block::erase_tail(size_t start) {
246
709
    DCHECK(start <= data.size()) << fmt::format(
247
0
            "Position out of bound in Block::erase(), max position = {}", data.size());
248
709
    data.erase(data.begin() + start, data.end());
249
709
}
250
251
123k
void Block::erase(size_t position) {
252
123k
    DCHECK(!data.empty()) << "Block is empty";
253
123k
    DCHECK_LT(position, data.size()) << fmt::format(
254
0
            "Position out of bound in Block::erase(), max position = {}", data.size() - 1);
255
256
123k
    erase_impl(position);
257
123k
}
258
259
123k
void Block::erase_impl(size_t position) {
260
123k
    data.erase(data.begin() + position);
261
123k
}
262
263
76.0k
ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) {
264
76.0k
    if (position >= data.size()) {
265
0
        throw Exception(ErrorCode::INTERNAL_ERROR,
266
0
                        "invalid input position, position={}, data.size={}, names={}", position,
267
0
                        data.size(), dump_names());
268
0
    }
269
76.0k
    return data[position];
270
76.0k
}
271
272
109
const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const {
273
109
    if (position >= data.size()) {
274
0
        throw Exception(ErrorCode::INTERNAL_ERROR,
275
0
                        "invalid input position, position={}, data.size={}, names={}", position,
276
0
                        data.size(), dump_names());
277
0
    }
278
109
    return data[position];
279
109
}
280
281
86
int Block::get_position_by_name(const std::string& name) const {
282
1.03k
    for (int i = 0; i < data.size(); i++) {
283
1.03k
        if (data[i].name == name) {
284
85
            return i;
285
85
        }
286
1.03k
    }
287
1
    return -1;
288
86
}
289
290
5
void Block::check_number_of_rows(bool allow_null_columns) const {
291
5
    ssize_t rows = -1;
292
9
    for (const auto& elem : data) {
293
9
        if (!elem.column && allow_null_columns) {
294
2
            continue;
295
2
        }
296
297
7
        if (!elem.column) {
298
1
            throw Exception(ErrorCode::INTERNAL_ERROR,
299
1
                            "Column {} in block is nullptr, in method check_number_of_rows.",
300
1
                            elem.name);
301
1
        }
302
303
6
        ssize_t size = elem.column->size();
304
305
6
        if (rows == -1) {
306
5
            rows = size;
307
5
        } else if (rows != size) {
308
1
            throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}",
309
1
                            dump_structure());
310
1
        }
311
6
    }
312
5
}
313
314
3.21M
Status Block::check_type_and_column() const {
315
3.21M
#ifndef NDEBUG
316
3.21M
    for (const auto& elem : data) {
317
20.7k
        if (!elem.column) {
318
0
            continue;
319
0
        }
320
20.7k
        if (!elem.type) {
321
0
            continue;
322
0
        }
323
324
        // ColumnNothing is a special column type, it is used to represent a column that
325
        // is not materialized, so we don't need to check it.
326
20.7k
        if (check_and_get_column<ColumnNothing>(elem.column.get())) {
327
0
            continue;
328
0
        }
329
330
20.7k
        const auto& type = elem.type;
331
20.7k
        const auto& column = elem.column;
332
333
20.7k
        RETURN_IF_ERROR(column->column_self_check());
334
20.7k
        auto st = type->check_column(*column);
335
20.7k
        if (!st.ok()) {
336
1
            return Status::InternalError(
337
1
                    "Column {} in block is not compatible with its column type :{}, data type :{}, "
338
1
                    "error: {}",
339
1
                    elem.name, column->get_name(), type->get_name(), st.msg());
340
1
        }
341
20.7k
    }
342
3.21M
#endif
343
3.21M
    return Status::OK();
344
3.21M
}
345
346
51.2M
size_t Block::rows() const {
347
51.2M
    for (const auto& elem : data) {
348
46.4M
        if (elem.column) {
349
46.4M
            return elem.column->size();
350
46.4M
        }
351
46.4M
    }
352
353
4.80M
    return 0;
354
51.2M
}
355
356
6
void Block::set_num_rows(size_t length) {
357
6
    if (rows() > length) {
358
4
        for (auto& elem : data) {
359
4
            if (elem.column) {
360
4
                elem.column = elem.column->shrink(length);
361
4
            }
362
4
        }
363
4
    }
364
6
}
365
366
1
void Block::skip_num_rows(int64_t& length) {
367
1
    auto origin_rows = rows();
368
1
    if (origin_rows <= length) {
369
0
        clear();
370
0
        length -= origin_rows;
371
1
    } else {
372
1
        for (auto& elem : data) {
373
1
            if (elem.column) {
374
1
                elem.column = elem.column->cut(length, origin_rows - length);
375
1
            }
376
1
        }
377
1
    }
378
1
}
379
380
12.8k
size_t Block::bytes() const {
381
12.8k
    size_t res = 0;
382
27.2k
    for (const auto& elem : data) {
383
27.2k
        if (!elem.column) {
384
0
            std::stringstream ss;
385
0
            for (const auto& e : data) {
386
0
                ss << e.name + " ";
387
0
            }
388
0
            throw Exception(ErrorCode::INTERNAL_ERROR,
389
0
                            "Column {} in block is nullptr, in method bytes. All Columns are {}",
390
0
                            elem.name, ss.str());
391
0
        }
392
27.2k
        res += elem.column->byte_size();
393
27.2k
    }
394
395
12.8k
    return res;
396
12.8k
}
397
398
194k
size_t Block::allocated_bytes() const {
399
194k
    size_t res = 0;
400
380k
    for (const auto& elem : data) {
401
380k
        if (!elem.column) {
402
            // Sometimes if expr failed, then there will be a nullptr
403
            // column left in the block.
404
1
            continue;
405
1
        }
406
380k
        res += elem.column->allocated_bytes();
407
380k
    }
408
409
194k
    return res;
410
194k
}
411
412
8
std::string Block::dump_names() const {
413
8
    std::string out;
414
23
    for (auto it = data.begin(); it != data.end(); ++it) {
415
15
        if (it != data.begin()) {
416
7
            out += ", ";
417
7
        }
418
15
        out += it->name;
419
15
    }
420
8
    return out;
421
8
}
422
423
7
std::string Block::dump_types() const {
424
7
    std::string out;
425
21
    for (auto it = data.begin(); it != data.end(); ++it) {
426
14
        if (it != data.begin()) {
427
7
            out += ", ";
428
7
        }
429
14
        out += it->type->get_name();
430
14
    }
431
7
    return out;
432
7
}
433
434
31
std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const {
435
31
    std::stringstream ss;
436
437
31
    std::vector<std::string> headers;
438
31
    headers.reserve(columns());
439
46
    for (const auto& it : data) {
440
        // fmt::format is from the {fmt} library, you might be using std::format in C++20
441
        // If not, you can build the string with a stringstream as a fallback.
442
46
        headers.push_back(fmt::format("{}({})", it.name, it.type->get_name()));
443
46
    }
444
445
31
    size_t start_row = std::min(begin, rows());
446
31
    size_t end_row = std::min(rows(), begin + row_limit);
447
448
31
    auto format_options = DataTypeSerDe::get_default_format_options();
449
31
    auto time_zone = cctz::utc_time_zone();
450
31
    format_options.timezone = &time_zone;
451
452
31
    ss << "[";
453
3.59k
    for (size_t row_num = start_row; row_num < end_row; ++row_num) {
454
3.56k
        if (row_num > start_row) {
455
3.53k
            ss << ",";
456
3.53k
        }
457
3.56k
        ss << "{";
458
8.33k
        for (size_t i = 0; i < columns(); ++i) {
459
4.77k
            if (i > 0) {
460
1.21k
                ss << ",";
461
1.21k
            }
462
4.77k
            ss << "\"" << headers[i] << "\":";
463
4.77k
            std::string s;
464
465
            // This value-extraction logic is preserved from your original function
466
            // to maintain consistency, especially for handling nullability mismatches.
467
4.77k
            if (data[i].column && data[i].type->is_nullable() &&
468
4.77k
                !data[i].column->is_concrete_nullable()) {
469
                // This branch handles a specific internal representation of nullable columns.
470
                // The original code would assert here if allow_null_mismatch is false.
471
0
                assert(allow_null_mismatch);
472
0
                s = assert_cast<const DataTypeNullable*>(data[i].type.get())
473
0
                            ->get_nested_type()
474
0
                            ->to_string(*data[i].column, row_num, format_options);
475
4.77k
            } else {
476
                // This is the standard path. The to_string method is expected to correctly
477
                // handle all cases, including when the column is null (e.g., by returning "NULL").
478
4.77k
                s = data[i].to_string(row_num, format_options);
479
4.77k
            }
480
4.77k
            ss << "\"" << s << "\"";
481
4.77k
        }
482
3.56k
        ss << "}";
483
3.56k
    }
484
31
    ss << "]";
485
31
    return ss.str();
486
31
}
487
488
858
std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const {
489
858
    std::vector<std::string> headers;
490
858
    std::vector<int> headers_size;
491
2.10k
    for (const auto& it : data) {
492
2.10k
        std::string s = fmt::format("{}({})", it.name, it.type->get_name());
493
2.10k
        headers_size.push_back(s.size() > 15 ? (int)s.size() : 15);
494
2.10k
        headers.emplace_back(s);
495
2.10k
    }
496
497
858
    std::stringstream out;
498
    // header upper line
499
2.16k
    auto line = [&]() {
500
8.07k
        for (size_t i = 0; i < columns(); ++i) {
501
5.91k
            out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
502
5.91k
        }
503
2.16k
        out << std::setw(1) << "+" << std::endl;
504
2.16k
    };
505
858
    line();
506
    // header text
507
2.96k
    for (size_t i = 0; i < columns(); ++i) {
508
2.10k
        out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
509
2.10k
            << headers[i];
510
2.10k
    }
511
858
    out << std::setw(1) << "|" << std::endl;
512
    // header bottom line
513
858
    line();
514
858
    if (rows() == 0) {
515
414
        return out.str();
516
414
    }
517
518
444
    auto format_options = DataTypeSerDe::get_default_format_options();
519
444
    auto time_zone = cctz::utc_time_zone();
520
444
    format_options.timezone = &time_zone;
521
522
    // content
523
12.2k
    for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) {
524
32.4k
        for (size_t i = 0; i < columns(); ++i) {
525
20.6k
            if (!data[i].column || data[i].column->empty()) {
526
0
                out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
527
0
                    << std::right;
528
0
                continue;
529
0
            }
530
20.6k
            std::string s;
531
20.6k
            if (data[i].column) { // column may be const
532
                // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null
533
20.6k
                if (data[i].type->is_nullable() && !data[i].column->is_concrete_nullable()) {
534
0
                    assert(allow_null_mismatch);
535
0
                    s = assert_cast<const DataTypeNullable*>(data[i].type.get())
536
0
                                ->get_nested_type()
537
0
                                ->to_string(*data[i].column, row_num, format_options);
538
20.6k
                } else {
539
20.6k
                    s = data[i].to_string(row_num, format_options);
540
20.6k
                }
541
20.6k
            }
542
20.6k
            if (s.length() > headers_size[i]) {
543
2.12k
                s = s.substr(0, headers_size[i] - 3) + "...";
544
2.12k
            }
545
20.6k
            out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
546
20.6k
                << std::right << s;
547
20.6k
        }
548
11.7k
        out << std::setw(1) << "|" << std::endl;
549
11.7k
    }
550
    // bottom line
551
444
    line();
552
444
    if (row_limit < rows()) {
553
112
        out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
554
112
    }
555
444
    return out.str();
556
444
}
557
558
1
std::string Block::dump_one_line(size_t row, int column_end) const {
559
1
    assert(column_end <= columns());
560
1
    fmt::memory_buffer line;
561
562
1
    auto format_options = DataTypeSerDe::get_default_format_options();
563
1
    auto time_zone = cctz::utc_time_zone();
564
1
    format_options.timezone = &time_zone;
565
566
3
    for (int i = 0; i < column_end; ++i) {
567
2
        if (LIKELY(i != 0)) {
568
            // TODO: need more effective function of to string. now the impl is slow
569
1
            fmt::format_to(line, " {}", data[i].to_string(row, format_options));
570
1
        } else {
571
1
            fmt::format_to(line, "{}", data[i].to_string(row, format_options));
572
1
        }
573
2
    }
574
1
    return fmt::to_string(line);
575
1
}
576
577
47
std::string Block::dump_structure() const {
578
47
    std::string out;
579
382
    for (auto it = data.begin(); it != data.end(); ++it) {
580
335
        if (it != data.begin()) {
581
288
            out += ", \n";
582
288
        }
583
335
        out += it->dump_structure();
584
335
    }
585
47
    return out;
586
47
}
587
588
48.9k
Block Block::clone_empty() const {
589
48.9k
    Block res;
590
95.8k
    for (const auto& elem : data) {
591
95.8k
        res.insert(elem.clone_empty());
592
95.8k
    }
593
48.9k
    return res;
594
48.9k
}
595
596
31
MutableColumns Block::clone_empty_columns() const {
597
31
    size_t num_columns = data.size();
598
31
    MutableColumns columns(num_columns);
599
140
    for (size_t i = 0; i < num_columns; ++i) {
600
109
        columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column();
601
109
    }
602
31
    return columns;
603
31
}
604
605
26.0k
Columns Block::get_columns() const {
606
26.0k
    size_t num_columns = data.size();
607
26.0k
    Columns columns(num_columns);
608
113k
    for (size_t i = 0; i < num_columns; ++i) {
609
87.4k
        columns[i] = data[i].column->convert_to_full_column_if_const();
610
87.4k
    }
611
26.0k
    return columns;
612
26.0k
}
613
614
553
Columns Block::get_columns_and_convert() {
615
553
    size_t num_columns = data.size();
616
553
    Columns columns(num_columns);
617
1.16k
    for (size_t i = 0; i < num_columns; ++i) {
618
616
        data[i].column = data[i].column->convert_to_full_column_if_const();
619
616
        columns[i] = data[i].column;
620
616
    }
621
553
    return columns;
622
553
}
623
624
2.47k
Block::ScopedMutableColumns::ScopedMutableColumns(Block& block) : _block(&block) {
625
2.47k
    const size_t num_columns = block.data.size();
626
2.47k
    _columns.resize(num_columns);
627
2.47k
    size_t acquired_columns = 0;
628
2.47k
    try {
629
7.17k
        for (; acquired_columns < num_columns; ++acquired_columns) {
630
4.69k
            auto& column_with_type_and_name = block.data[acquired_columns];
631
4.69k
            _columns[acquired_columns] = scoped_mutate_column(column_with_type_and_name.column,
632
4.69k
                                                              column_with_type_and_name.type);
633
4.69k
        }
634
2.47k
    } catch (...) {
635
4
        for (size_t i = 0; i < acquired_columns; ++i) {
636
2
            block.data[i].column = std::move(_columns[i]);
637
2
        }
638
2
        _block = nullptr;
639
2
        throw;
640
2
    }
641
2.47k
}
642
643
2.47k
Block::ScopedMutableColumns::~ScopedMutableColumns() {
644
2.47k
    restore();
645
2.47k
}
646
647
Block::ScopedMutableColumns::ScopedMutableColumns(ScopedMutableColumns&& other) noexcept
648
0
        : _block(std::exchange(other._block, nullptr)), _columns(std::move(other._columns)) {}
649
650
Block::ScopedMutableColumns& Block::ScopedMutableColumns::operator=(
651
0
        ScopedMutableColumns&& other) noexcept {
652
0
    if (this != &other) {
653
0
        restore();
654
0
        _block = std::exchange(other._block, nullptr);
655
0
        _columns = std::move(other._columns);
656
0
    }
657
0
    return *this;
658
0
}
659
660
2
const DataTypePtr& Block::ScopedMutableColumns::get_datatype_by_position(size_t position) const {
661
2
    DCHECK(_block != nullptr);
662
2
    return _block->get_by_position(position).type;
663
2
}
664
665
2
const std::string& Block::ScopedMutableColumns::get_name_by_position(size_t position) const {
666
2
    DCHECK(_block != nullptr);
667
2
    return _block->get_by_position(position).name;
668
2
}
669
670
811
MutableColumns Block::ScopedMutableColumns::release() {
671
811
    DCHECK(_block != nullptr);
672
811
    _block = nullptr;
673
811
    return std::move(_columns);
674
811
}
675
676
2.95k
void Block::ScopedMutableColumns::restore() {
677
2.95k
    if (_block != nullptr) {
678
1.66k
        _block->set_columns(std::move(_columns));
679
1.66k
        _block = nullptr;
680
1.66k
    }
681
2.95k
}
682
683
Block::ScopedMutableColumn::ScopedMutableColumn(Block& block, size_t position)
684
98
        : _block(&block), _position(position) {
685
98
    DCHECK_LT(_position, _block->data.size());
686
98
    auto& column_with_type_and_name = _block->data[_position];
687
98
    DCHECK(column_with_type_and_name.type);
688
98
    _column =
689
98
            scoped_mutate_column(column_with_type_and_name.column, column_with_type_and_name.type);
690
98
}
691
692
97
Block::ScopedMutableColumn::~ScopedMutableColumn() {
693
97
    restore();
694
97
}
695
696
Block::ScopedMutableColumn::ScopedMutableColumn(ScopedMutableColumn&& other) noexcept
697
0
        : _block(std::exchange(other._block, nullptr)),
698
0
          _position(other._position),
699
0
          _column(std::move(other._column)) {}
700
701
Block::ScopedMutableColumn& Block::ScopedMutableColumn::operator=(
702
0
        ScopedMutableColumn&& other) noexcept {
703
0
    if (this != &other) {
704
0
        restore();
705
0
        _block = std::exchange(other._block, nullptr);
706
0
        _position = other._position;
707
0
        _column = std::move(other._column);
708
0
    }
709
0
    return *this;
710
0
}
711
712
97
void Block::ScopedMutableColumn::restore() {
713
97
    if (_block != nullptr) {
714
97
        DCHECK_LT(_position, _block->data.size());
715
97
        _block->data[_position].column = std::move(_column);
716
97
        _block = nullptr;
717
97
    }
718
97
}
719
720
2.47k
Block::ScopedMutableColumns Block::mutate_columns_scoped() & {
721
2.47k
    return ScopedMutableColumns(*this);
722
2.47k
}
723
724
98
Block::ScopedMutableColumn Block::mutate_column_scoped(size_t position) & {
725
98
    return ScopedMutableColumn(*this, position);
726
98
}
727
728
812
ScopedMutableBlock::ScopedMutableBlock(Block* block) {
729
812
    DCHECK(block != nullptr);
730
812
    DataTypes data_types = block->get_data_types();
731
812
    std::vector<std::string> names = block->get_names();
732
812
    auto columns_guard = block->mutate_columns_scoped();
733
812
    _mutable_block.data_types() = std::move(data_types);
734
812
    _mutable_block.get_names() = std::move(names);
735
812
    _mutable_block.set_mutable_columns(columns_guard.release());
736
812
    _block = block;
737
812
}
738
739
145k
MutableColumns Block::mutate_columns() && {
740
145k
    size_t num_columns = data.size();
741
145k
    MutableColumns columns(num_columns);
742
433k
    for (size_t i = 0; i < num_columns; ++i) {
743
287k
        DCHECK(data[i].type);
744
287k
        columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column))
745
287k
                                    : data[i].type->create_column();
746
287k
    }
747
145k
    return columns;
748
145k
}
749
750
3.37k
void Block::set_columns(MutableColumns&& columns) {
751
3.37k
    DCHECK_GE(columns.size(), data.size())
752
0
            << fmt::format("Invalid size of columns, columns size: {}, data size: {}",
753
0
                           columns.size(), data.size());
754
3.37k
    size_t num_columns = data.size();
755
11.6k
    for (size_t i = 0; i < num_columns; ++i) {
756
8.22k
        data[i].column = std::move(columns[i]);
757
8.22k
    }
758
3.37k
}
759
760
49
Block Block::clone_without_columns(const std::vector<int>* column_offset) const {
761
49
    Block res;
762
763
49
    if (column_offset != nullptr) {
764
30
        size_t num_columns = column_offset->size();
765
170
        for (size_t i = 0; i < num_columns; ++i) {
766
140
            res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name});
767
140
        }
768
30
    } else {
769
19
        size_t num_columns = data.size();
770
53
        for (size_t i = 0; i < num_columns; ++i) {
771
34
            res.insert({nullptr, data[i].type, data[i].name});
772
34
        }
773
19
    }
774
49
    return res;
775
49
}
776
777
55.7k
const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const {
778
55.7k
    return data;
779
55.7k
}
780
781
145k
std::vector<std::string> Block::get_names() const {
782
145k
    std::vector<std::string> res;
783
145k
    res.reserve(columns());
784
785
285k
    for (const auto& elem : data) {
786
285k
        res.push_back(elem.name);
787
285k
    }
788
789
145k
    return res;
790
145k
}
791
792
145k
DataTypes Block::get_data_types() const {
793
145k
    DataTypes res;
794
145k
    res.reserve(columns());
795
796
285k
    for (const auto& elem : data) {
797
285k
        res.push_back(elem.type);
798
285k
    }
799
800
145k
    return res;
801
145k
}
802
803
52.7k
void Block::clear() {
804
52.7k
    data.clear();
805
52.7k
}
806
807
1.62M
void Block::clear_column_data(int64_t column_size) {
808
1.62M
    SCOPED_SKIP_MEMORY_CHECK();
809
    // data.size() greater than column_size, means here have some
810
    // function exec result in block, need erase it here
811
1.62M
    if (column_size != -1 and data.size() > column_size) {
812
2.20k
        for (int64_t i = data.size() - 1; i >= column_size; --i) {
813
1.10k
            erase(i);
814
1.10k
        }
815
1.10k
    }
816
1.62M
    for (auto& d : data) {
817
51.3k
        if (d.column) {
818
51.3k
            if (d.column->is_exclusive()) {
819
51.1k
                d.column->assert_mutable()->clear();
820
51.1k
            } else {
821
212
                d.column = d.column->clone_empty();
822
212
            }
823
51.3k
        }
824
51.3k
    }
825
1.62M
}
826
827
46
void Block::clear_column_data(const std::vector<uint32_t>& columns_to_clear) {
828
46
    SCOPED_SKIP_MEMORY_CHECK();
829
79
    for (auto col : columns_to_clear) {
830
79
        DCHECK_LT(col, data.size());
831
79
        auto& column = data[col].column;
832
79
        if (column) {
833
79
            if (column->is_exclusive()) {
834
77
                column->assert_mutable()->clear();
835
77
            } else {
836
2
                column = column->clone_empty();
837
2
            }
838
79
        }
839
79
    }
840
46
}
841
842
void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
843
48.0k
                                      bool need_keep_first) {
844
48.0k
    if (data.size() >= column_keep_flags.size()) {
845
48.0k
        auto origin_rows = rows();
846
142k
        for (size_t i = 0; i < column_keep_flags.size(); ++i) {
847
94.1k
            if (!column_keep_flags[i]) {
848
36.8k
                data[i].column = data[i].column->clone_empty();
849
36.8k
            }
850
94.1k
        }
851
852
48.0k
        if (need_keep_first && !column_keep_flags[0]) {
853
1
            auto first_column = data[0].column->clone_empty();
854
1
            first_column->resize(origin_rows);
855
1
            data[0].column = std::move(first_column);
856
1
        }
857
48.0k
    }
858
48.0k
}
859
860
1.32k
void Block::swap(Block& other) noexcept {
861
1.32k
    SCOPED_SKIP_MEMORY_CHECK();
862
1.32k
    data.swap(other.data);
863
1.32k
}
864
865
1.70k
void Block::swap(Block&& other) noexcept {
866
1.70k
    SCOPED_SKIP_MEMORY_CHECK();
867
1.70k
    data = std::move(other.data);
868
1.70k
}
869
870
3
void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
871
3
    Container tmp_data;
872
3
    tmp_data.reserve(result_column_ids.size());
873
5
    for (const int result_column_id : result_column_ids) {
874
5
        tmp_data.push_back(data[result_column_id]);
875
5
    }
876
3
    data = std::move(tmp_data);
877
3
}
878
879
2
void Block::update_hash(SipHash& hash) const {
880
8
    for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
881
12
        for (const auto& col : data) {
882
12
            col.column->update_hash_with_value(row_no, hash);
883
12
        }
884
6
    }
885
2
}
886
887
void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
888
519
                                  const IColumn::Filter& filter) {
889
519
    size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
890
1.34k
    for (const auto& col : columns_to_filter) {
891
1.34k
        auto& column = block->get_by_position(col).column;
892
1.34k
        if (column->size() == count) {
893
1.29k
            continue;
894
1.29k
        }
895
43
        if (count == 0) {
896
2
            if (column->is_exclusive()) {
897
0
                column->assert_mutable()->clear();
898
2
            } else {
899
2
                column = column->clone_empty();
900
2
            }
901
2
            continue;
902
2
        }
903
41
        if (column->is_exclusive()) {
904
            // COW: safe to mutate in-place since we have exclusive ownership
905
41
            const auto result_size = column->assert_mutable()->filter(filter);
906
41
            if (result_size != count) [[unlikely]] {
907
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
908
0
                                "result_size not equal with filter_size, result_size={}, "
909
0
                                "filter_size={}",
910
0
                                result_size, count);
911
0
            }
912
41
        } else {
913
            // COW: must create a copy since column is shared
914
0
            column = column->filter(filter, count);
915
0
        }
916
41
    }
917
519
}
918
919
void Block::filter_block_internal(Block* block, const IColumn::Filter& filter,
920
1
                                  uint32_t column_to_keep) {
921
1
    std::vector<uint32_t> columns_to_filter;
922
1
    columns_to_filter.resize(column_to_keep);
923
3
    for (uint32_t i = 0; i < column_to_keep; ++i) {
924
2
        columns_to_filter[i] = i;
925
2
    }
926
1
    filter_block_internal(block, columns_to_filter, filter);
927
1
}
928
929
8
void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) {
930
8
    const size_t count =
931
8
            filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
932
24
    for (int i = 0; i < block->columns(); ++i) {
933
16
        auto& column = block->get_by_position(i).column;
934
16
        if (column->is_exclusive()) {
935
16
            column->assert_mutable()->filter(filter);
936
16
        } else {
937
0
            column = column->filter(filter, count);
938
0
        }
939
16
    }
940
8
}
941
942
Status Block::append_to_block_by_selector(MutableBlock* dst,
943
1
                                          const IColumn::Selector& selector) const {
944
1
    RETURN_IF_CATCH_EXCEPTION({
945
1
        DCHECK_EQ(data.size(), dst->mutable_columns().size());
946
1
        for (size_t i = 0; i < data.size(); i++) {
947
            // FIXME: this is a quickfix. we assume that only partition functions make there some
948
1
            if (!is_column_const(*data[i].column)) {
949
1
                data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector);
950
1
            }
951
1
        }
952
1
    });
953
1
    return Status::OK();
954
1
}
955
956
Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter,
957
496
                           size_t filter_column_id, size_t column_to_keep) {
958
496
    const auto& filter_column = block->get_by_position(filter_column_id).column;
959
496
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
960
1
        const auto& nested_column = nullable_column->get_nested_column_ptr();
961
962
1
        MutableColumnPtr mutable_holder =
963
1
                nested_column->use_count() == 1
964
1
                        ? nested_column->assert_mutable()
965
1
                        : nested_column->clone_resized(nested_column->size());
966
967
1
        auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get());
968
1
        const auto* __restrict null_map = nullable_column->get_null_map_data().data();
969
1
        IColumn::Filter& filter = concrete_column->get_data();
970
1
        auto* __restrict filter_data = filter.data();
971
972
1
        const size_t size = filter.size();
973
4
        for (size_t i = 0; i < size; ++i) {
974
3
            filter_data[i] &= !null_map[i];
975
3
        }
976
1
        RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
977
495
    } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
978
2
        bool ret = const_column->get_bool(0);
979
2
        if (!ret) {
980
2
            for (const auto& col : columns_to_filter) {
981
2
                auto& column = block->get_by_position(col).column;
982
2
                if (column->is_exclusive()) {
983
2
                    column->assert_mutable()->clear();
984
2
                } else {
985
0
                    column = column->clone_empty();
986
0
                }
987
2
            }
988
1
        }
989
493
    } else {
990
493
        const IColumn::Filter& filter =
991
493
                assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data();
992
493
        RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
993
493
    }
994
995
496
    erase_useless_column(block, column_to_keep);
996
496
    return Status::OK();
997
496
}
998
999
490
Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) {
1000
490
    std::vector<uint32_t> columns_to_filter;
1001
490
    columns_to_filter.resize(column_to_keep);
1002
1.77k
    for (uint32_t i = 0; i < column_to_keep; ++i) {
1003
1.28k
        columns_to_filter[i] = i;
1004
1.28k
    }
1005
490
    return filter_block(block, columns_to_filter, filter_column_id, column_to_keep);
1006
490
}
1007
1008
Status Block::serialize(int be_exec_version, PBlock* pblock,
1009
                        /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes,
1010
                        size_t* compressed_bytes, int64_t* compress_time,
1011
                        segment_v2::CompressionTypePB compression_type,
1012
2.73k
                        bool allow_transfer_large_data) const {
1013
2.73k
    RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
1014
2.73k
    pblock->set_be_exec_version(be_exec_version);
1015
1016
    // calc uncompressed size for allocation
1017
2.73k
    size_t content_uncompressed_size = 0;
1018
3.42k
    for (const auto& c : *this) {
1019
3.42k
        PColumnMeta* pcm = pblock->add_column_metas();
1020
3.42k
        c.to_pb_column_meta(pcm);
1021
3.42k
        DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type";
1022
        // get serialized size
1023
3.42k
        content_uncompressed_size +=
1024
3.42k
                c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version());
1025
3.42k
    }
1026
1027
    // serialize data values
1028
    // when data type is HLL, content_uncompressed_size maybe larger than real size.
1029
2.73k
    std::string column_values;
1030
2.73k
    try {
1031
        // TODO: After support c++23, we should use resize_and_overwrite to replace resize
1032
2.73k
        column_values.resize(content_uncompressed_size);
1033
2.73k
    } catch (...) {
1034
0
        std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.",
1035
0
                                      content_uncompressed_size);
1036
0
        LOG(WARNING) << msg;
1037
0
        return Status::BufferAllocFailed(msg);
1038
0
    }
1039
2.73k
    char* buf = column_values.data();
1040
1041
3.42k
    for (const auto& c : *this) {
1042
3.42k
        buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
1043
3.42k
    }
1044
2.73k
    *uncompressed_bytes = content_uncompressed_size;
1045
2.73k
    const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING;
1046
2.73k
    *compressed_bytes = serialize_bytes;
1047
2.73k
    column_values.resize(serialize_bytes);
1048
1049
    // compress
1050
2.73k
    if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) {
1051
624
        SCOPED_RAW_TIMER(compress_time);
1052
624
        pblock->set_compression_type(compression_type);
1053
624
        pblock->set_uncompressed_size(serialize_bytes);
1054
1055
624
        BlockCompressionCodec* codec;
1056
624
        RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
1057
1058
624
        faststring buf_compressed;
1059
624
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
1060
624
                codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed));
1061
624
        size_t compressed_size = buf_compressed.size();
1062
624
        if (LIKELY(compressed_size < serialize_bytes)) {
1063
            // TODO: rethink the logic here may copy again ?
1064
624
            pblock->set_column_values(buf_compressed.data(), buf_compressed.size());
1065
624
            pblock->set_compressed(true);
1066
624
            *compressed_bytes = compressed_size;
1067
624
        } else {
1068
0
            pblock->set_column_values(std::move(column_values));
1069
0
        }
1070
1071
624
        VLOG_ROW << "uncompressed size: " << content_uncompressed_size
1072
0
                 << ", compressed size: " << compressed_size;
1073
2.10k
    } else {
1074
2.10k
        pblock->set_column_values(std::move(column_values));
1075
2.10k
    }
1076
2.73k
    if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) {
1077
0
        return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.",
1078
0
                                     *compressed_bytes);
1079
0
    }
1080
2.73k
    return Status::OK();
1081
2.73k
}
1082
1083
240k
size_t MutableBlock::rows() const {
1084
240k
    for (const auto& column : _columns) {
1085
144k
        if (column) {
1086
144k
            return column->size();
1087
144k
        }
1088
144k
    }
1089
1090
96.0k
    return 0;
1091
240k
}
1092
1093
0
void MutableBlock::swap(MutableBlock& another) noexcept {
1094
0
    SCOPED_SKIP_MEMORY_CHECK();
1095
0
    _columns.swap(another._columns);
1096
0
    _data_types.swap(another._data_types);
1097
0
    _names.swap(another._names);
1098
0
}
1099
1100
0
void MutableBlock::add_row(const Block* block, int row) {
1101
0
    const auto& block_data = block->get_columns_with_type_and_name();
1102
0
    for (size_t i = 0; i < _columns.size(); ++i) {
1103
0
        _columns[i]->insert_from(*block_data[i].column.get(), row);
1104
0
    }
1105
0
}
1106
1107
Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
1108
162
                              const uint32_t* row_end, const std::vector<int>* column_offset) {
1109
162
    RETURN_IF_CATCH_EXCEPTION({
1110
162
        DCHECK_LE(columns(), block->columns());
1111
162
        if (column_offset != nullptr) {
1112
162
            DCHECK_EQ(columns(), column_offset->size());
1113
162
        }
1114
162
        const auto& block_data = block->get_columns_with_type_and_name();
1115
162
        for (size_t i = 0; i < _columns.size(); ++i) {
1116
162
            const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i];
1117
162
            DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name());
1118
162
            auto& dst = _columns[i];
1119
162
            const auto& src = *src_col.column.get();
1120
162
            DCHECK_GE(src.size(), row_end - row_begin);
1121
162
            dst->insert_indices_from(src, row_begin, row_end);
1122
162
        }
1123
162
    });
1124
161
    return Status::OK();
1125
162
}
1126
1127
126
Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) {
1128
126
    RETURN_IF_CATCH_EXCEPTION({
1129
126
        DCHECK_LE(columns(), block->columns());
1130
126
        const auto& block_data = block->get_columns_with_type_and_name();
1131
126
        for (size_t i = 0; i < _columns.size(); ++i) {
1132
126
            DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
1133
126
            auto& dst = _columns[i];
1134
126
            const auto& src = *block_data[i].column.get();
1135
126
            dst->insert_range_from(src, row_begin, length);
1136
126
        }
1137
126
    });
1138
126
    return Status::OK();
1139
126
}
1140
1141
144k
Block MutableBlock::to_block(int start_column) {
1142
144k
    return to_block(start_column, (int)_columns.size());
1143
144k
}
1144
1145
144k
Block MutableBlock::to_block(int start_column, int end_column) {
1146
144k
    ColumnsWithTypeAndName columns_with_schema;
1147
144k
    columns_with_schema.reserve(end_column - start_column);
1148
428k
    for (size_t i = start_column; i < end_column; ++i) {
1149
283k
        columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]);
1150
283k
    }
1151
144k
    return {columns_with_schema};
1152
144k
}
1153
1154
1
std::string MutableBlock::dump_data_json(size_t row_limit) const {
1155
1
    std::stringstream ss;
1156
1
    std::vector<std::string> headers;
1157
1158
1
    headers.reserve(columns());
1159
2
    for (size_t i = 0; i < columns(); ++i) {
1160
1
        headers.push_back(_data_types[i]->get_name());
1161
1
    }
1162
1
    size_t num_rows_to_dump = std::min(rows(), row_limit);
1163
1
    ss << "[";
1164
1165
1
    auto format_options = DataTypeSerDe::get_default_format_options();
1166
1
    auto time_zone = cctz::utc_time_zone();
1167
1
    format_options.timezone = &time_zone;
1168
1169
4
    for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) {
1170
3
        if (row_num > 0) {
1171
2
            ss << ",";
1172
2
        }
1173
3
        ss << "{";
1174
6
        for (size_t i = 0; i < columns(); ++i) {
1175
3
            if (i > 0) {
1176
0
                ss << ",";
1177
0
            }
1178
3
            ss << "\"" << headers[i] << "\":";
1179
3
            std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options);
1180
3
            ss << "\"" << s << "\"";
1181
3
        }
1182
3
        ss << "}";
1183
3
    }
1184
1
    ss << "]";
1185
1
    return ss.str();
1186
1
}
1187
1188
1
std::string MutableBlock::dump_data(size_t row_limit) const {
1189
1
    std::vector<std::string> headers;
1190
1
    std::vector<int> headers_size;
1191
2
    for (size_t i = 0; i < columns(); ++i) {
1192
1
        std::string s = _data_types[i]->get_name();
1193
1
        headers_size.push_back(s.size() > 15 ? (int)s.size() : 15);
1194
1
        headers.emplace_back(s);
1195
1
    }
1196
1197
1
    std::stringstream out;
1198
    // header upper line
1199
3
    auto line = [&]() {
1200
6
        for (size_t i = 0; i < columns(); ++i) {
1201
3
            out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
1202
3
        }
1203
3
        out << std::setw(1) << "+" << std::endl;
1204
3
    };
1205
1
    line();
1206
    // header text
1207
2
    for (size_t i = 0; i < columns(); ++i) {
1208
1
        out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
1209
1
            << headers[i];
1210
1
    }
1211
1
    out << std::setw(1) << "|" << std::endl;
1212
    // header bottom line
1213
1
    line();
1214
1
    if (rows() == 0) {
1215
0
        return out.str();
1216
0
    }
1217
1218
1
    auto format_options = DataTypeSerDe::get_default_format_options();
1219
1
    auto time_zone = cctz::utc_time_zone();
1220
1
    format_options.timezone = &time_zone;
1221
1222
    // content
1223
4
    for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) {
1224
6
        for (size_t i = 0; i < columns(); ++i) {
1225
3
            if (_columns[i].get()->empty()) {
1226
0
                out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
1227
0
                    << std::right;
1228
0
                continue;
1229
0
            }
1230
3
            std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options);
1231
3
            if (s.length() > headers_size[i]) {
1232
0
                s = s.substr(0, headers_size[i] - 3) + "...";
1233
0
            }
1234
3
            out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
1235
3
                << std::right << s;
1236
3
        }
1237
3
        out << std::setw(1) << "|" << std::endl;
1238
3
    }
1239
    // bottom line
1240
1
    line();
1241
1
    if (row_limit < rows()) {
1242
0
        out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
1243
0
    }
1244
1
    return out.str();
1245
1
}
1246
1247
48.0k
std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const {
1248
48.0k
    auto temp_block = Block::create_unique();
1249
94.1k
    for (const auto& d : data) {
1250
94.1k
        auto column = d.type->create_column();
1251
94.1k
        if (is_reserve) {
1252
0
            column->reserve(size);
1253
94.1k
        } else {
1254
94.1k
            column->insert_many_defaults(size);
1255
94.1k
        }
1256
94.1k
        temp_block->insert({std::move(column), d.type, d.name});
1257
94.1k
    }
1258
48.0k
    return temp_block;
1259
48.0k
}
1260
1261
10.1k
void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) {
1262
10.1k
    for (auto idx : char_type_idx) {
1263
2
        if (idx < data.size()) {
1264
1
            auto& col_and_name = this->get_by_position(idx);
1265
1
            if (col_and_name.column->is_exclusive()) {
1266
1
                col_and_name.column->assert_mutable()->shrink_padding_chars();
1267
1
            } else {
1268
0
                auto mutable_col = std::move(*col_and_name.column).mutate();
1269
0
                mutable_col->shrink_padding_chars();
1270
0
                col_and_name.column = std::move(mutable_col);
1271
0
            }
1272
1
        }
1273
2
    }
1274
10.1k
}
1275
1276
96.1k
size_t MutableBlock::allocated_bytes() const {
1277
96.1k
    size_t res = 0;
1278
188k
    for (const auto& col : _columns) {
1279
188k
        if (col) {
1280
188k
            res += col->allocated_bytes();
1281
188k
        }
1282
188k
    }
1283
1284
96.1k
    return res;
1285
96.1k
}
1286
1287
1
void MutableBlock::clear_column_data() noexcept {
1288
1
    SCOPED_SKIP_MEMORY_CHECK();
1289
1
    for (auto& col : _columns) {
1290
1
        if (col) {
1291
1
            col->clear();
1292
1
        }
1293
1
    }
1294
1
}
1295
1296
5
std::string MutableBlock::dump_names() const {
1297
5
    std::string out;
1298
16
    for (auto it = _names.begin(); it != _names.end(); ++it) {
1299
11
        if (it != _names.begin()) {
1300
6
            out += ", ";
1301
6
        }
1302
11
        out += *it;
1303
11
    }
1304
5
    return out;
1305
5
}
1306
} // namespace doris