Coverage Report

Created: 2026-06-25 13:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/core/block/block.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.cpp
19
// and modified by Doris
20
21
#include "core/block/block.h"
22
23
#include <fmt/format.h>
24
#include <gen_cpp/data.pb.h>
25
#include <glog/logging.h>
26
#include <snappy.h>
27
#include <streamvbyte.h>
28
29
#include <algorithm>
30
#include <cassert>
31
#include <iomanip>
32
#include <limits>
33
#include <ranges>
34
35
#include "agent/be_exec_version_manager.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/logging.h"
38
#include "common/status.h"
39
#include "core/assert_cast.h"
40
#include "core/column/column.h"
41
#include "core/column/column_const.h"
42
#include "core/column/column_nothing.h"
43
#include "core/column/column_nullable.h"
44
#include "core/column/column_vector.h"
45
#include "core/data_type/data_type_factory.hpp"
46
#include "core/data_type/data_type_nullable.h"
47
#include "core/data_type_serde/data_type_serde.h"
48
#include "runtime/descriptors.h"
49
#include "runtime/runtime_profile.h"
50
#include "runtime/thread_context.h"
51
#include "util/block_compression.h"
52
#include "util/faststring.h"
53
#include "util/simd/bits.h"
54
#include "util/slice.h"
55
56
class SipHash;
57
58
namespace doris::segment_v2 {
59
enum CompressionTypePB : int;
60
} // namespace doris::segment_v2
61
namespace doris {
62
template <typename T>
63
void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks,
64
2
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
2
    T block;
66
6
    while (blocks.try_dequeue(block)) {
67
4
        if (memory_used_counter) {
68
4
            if constexpr (std::is_same_v<T, Block>) {
69
2
                memory_used_counter->update(-block.allocated_bytes());
70
2
            } else {
71
2
                memory_used_counter->update(-block->allocated_bytes());
72
2
            }
73
4
        }
74
4
    }
75
2
}
_ZN5doris12clear_blocksINS_5BlockEEEvRN10moodycamel15ConcurrentQueueIT_NS2_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE
Line
Count
Source
64
1
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
1
    T block;
66
3
    while (blocks.try_dequeue(block)) {
67
2
        if (memory_used_counter) {
68
2
            if constexpr (std::is_same_v<T, Block>) {
69
2
                memory_used_counter->update(-block.allocated_bytes());
70
            } else {
71
                memory_used_counter->update(-block->allocated_bytes());
72
            }
73
2
        }
74
2
    }
75
1
}
_ZN5doris12clear_blocksISt10unique_ptrINS_5BlockESt14default_deleteIS2_EEEEvRN10moodycamel15ConcurrentQueueIT_NS6_28ConcurrentQueueDefaultTraitsEEEPNS_14RuntimeProfile7CounterE
Line
Count
Source
64
1
                  RuntimeProfile::Counter* memory_used_counter = nullptr) {
65
1
    T block;
66
3
    while (blocks.try_dequeue(block)) {
67
2
        if (memory_used_counter) {
68
            if constexpr (std::is_same_v<T, Block>) {
69
                memory_used_counter->update(-block.allocated_bytes());
70
2
            } else {
71
2
                memory_used_counter->update(-block->allocated_bytes());
72
2
            }
73
2
        }
74
2
    }
75
1
}
76
77
template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
78
                                  RuntimeProfile::Counter* memory_used_counter);
79
template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
80
                                      RuntimeProfile::Counter* memory_used_counter);
81
82
namespace {
83
84
// The no-clone fast path is only safe when the whole column tree is uniquely
85
// owned. A composite column with shared children still needs COW detachment.
86
32.6k
bool is_recursively_exclusive(const IColumn& column) {
87
32.6k
    if (!column.is_exclusive()) {
88
13
        return false;
89
13
    }
90
91
32.6k
    bool exclusive = true;
92
32.6k
    IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) {
93
27.8k
        if (!exclusive) {
94
0
            return;
95
0
        }
96
27.8k
        const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn);
97
27.8k
        DCHECK(subcolumn_ptr);
98
27.8k
        exclusive = is_recursively_exclusive(*subcolumn_ptr);
99
27.8k
    };
100
    // `for_each_subcolumn` only exposes a mutable callback type. This callback
101
    // only reads the wrapped pointers and never calls the non-const accessors.
102
32.6k
    const_cast<IColumn&>(column).for_each_subcolumn(callback);
103
32.6k
    return exclusive;
104
32.6k
}
105
106
// Acquire one live Block slot transactionally. Shared columns are detached while
107
// the original slot is still intact, so a clone failure cannot leave Block with
108
// a moved-from/null column. Exclusive column trees keep the stealing fast path.
109
4.84k
MutableColumnPtr scoped_mutate_column(ColumnPtr& column, const DataTypePtr& type) {
110
4.84k
    DCHECK(type);
111
4.84k
    if (!column) {
112
1
        return type->create_column();
113
1
    }
114
115
4.84k
    MutableColumnPtr mutable_column;
116
4.84k
    if (is_recursively_exclusive(*column)) {
117
4.82k
        mutable_column = std::move(*column).mutate();
118
4.82k
    } else {
119
13
        mutable_column = IColumn::mutate(column);
120
13
    }
121
4.84k
    column = nullptr;
122
4.84k
    return mutable_column;
123
4.84k
}
124
125
} // namespace
126
127
4.37k
Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {}
128
129
258k
Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {}
130
131
56.0k
Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) {
132
271k
    for (auto* const slot_desc : slots) {
133
271k
        auto column_ptr = slot_desc->get_empty_mutable_column();
134
271k
        column_ptr->reserve(block_size);
135
271k
        insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(),
136
271k
                                     slot_desc->col_name()));
137
271k
    }
138
56.0k
}
139
140
1
Block::Block(const std::vector<SlotDescriptor>& slots, size_t block_size) {
141
1
    std::vector<SlotDescriptor*> slot_ptrs(slots.size());
142
3
    for (size_t i = 0; i < slots.size(); ++i) {
143
        // Slots remain unmodified and are used to read column information; const_cast can be employed.
144
        // used in src/exec/rowid_fetcher.cpp
145
2
        slot_ptrs[i] = const_cast<SlotDescriptor*>(&slots[i]);
146
2
    }
147
1
    *this = Block(slot_ptrs, block_size);
148
1
}
149
150
Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes,
151
994
                          int64_t* decompress_time) {
152
994
    swap(Block());
153
994
    int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0;
154
994
    RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
155
156
994
    const char* buf = nullptr;
157
994
    std::string compression_scratch;
158
994
    if (pblock.compressed()) {
159
        // Decompress
160
483
        SCOPED_RAW_TIMER(decompress_time);
161
483
        const char* compressed_data = pblock.column_values().c_str();
162
483
        size_t compressed_size = pblock.column_values().size();
163
483
        size_t uncompressed_size = 0;
164
483
        if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
165
483
            BlockCompressionCodec* codec;
166
483
            RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
167
483
            uncompressed_size = pblock.uncompressed_size();
168
            // Should also use allocator to allocate memory here.
169
483
            compression_scratch.resize(uncompressed_size);
170
483
            Slice decompressed_slice(compression_scratch);
171
483
            RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size),
172
483
                                              &decompressed_slice));
173
483
            DCHECK(uncompressed_size == decompressed_slice.size);
174
483
        } else {
175
0
            bool success = snappy::GetUncompressedLength(compressed_data, compressed_size,
176
0
                                                         &uncompressed_size);
177
0
            DCHECK(success) << "snappy::GetUncompressedLength failed";
178
0
            compression_scratch.resize(uncompressed_size);
179
0
            success = snappy::RawUncompress(compressed_data, compressed_size,
180
0
                                            compression_scratch.data());
181
0
            DCHECK(success) << "snappy::RawUncompress failed";
182
0
        }
183
483
        *uncompressed_bytes = uncompressed_size;
184
483
        buf = compression_scratch.data();
185
511
    } else {
186
511
        buf = pblock.column_values().data();
187
511
    }
188
189
1.57k
    for (const auto& pcol_meta : pblock.column_metas()) {
190
1.57k
        DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta);
191
1.57k
        MutableColumnPtr data_column = type->create_column();
192
        // Here will try to allocate large memory, should return error if failed.
193
1.57k
        RETURN_IF_CATCH_EXCEPTION(
194
1.57k
                buf = type->deserialize(buf, &data_column, pblock.be_exec_version()));
195
1.57k
        data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
196
1.57k
    }
197
198
994
    return Status::OK();
199
994
}
200
201
13.5k
void Block::reserve(size_t count) {
202
13.5k
    data.reserve(count);
203
13.5k
}
204
205
4
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
206
4
    if (position > data.size()) {
207
1
        throw Exception(ErrorCode::INTERNAL_ERROR,
208
1
                        "invalid input position, position={}, data.size={}, names={}", position,
209
1
                        data.size(), dump_names());
210
1
    }
211
212
3
    data.emplace(data.begin() + position, elem);
213
3
}
214
215
3
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
216
3
    if (position > data.size()) {
217
1
        throw Exception(ErrorCode::INTERNAL_ERROR,
218
1
                        "invalid input position, position={}, data.size={}, names={}", position,
219
1
                        data.size(), dump_names());
220
1
    }
221
222
2
    data.emplace(data.begin() + position, std::move(elem));
223
2
}
224
225
8.09k
void Block::clear_names() {
226
177k
    for (auto& entry : data) {
227
177k
        entry.name.clear();
228
177k
    }
229
8.09k
}
230
231
14.6k
void Block::insert(const ColumnWithTypeAndName& elem) {
232
14.6k
    data.emplace_back(elem);
233
14.6k
}
234
235
1.00M
void Block::insert(ColumnWithTypeAndName&& elem) {
236
1.00M
    data.emplace_back(std::move(elem));
237
1.00M
}
238
239
3
void Block::erase(const std::set<size_t>& positions) {
240
3
    for (unsigned long position : std::ranges::reverse_view(positions)) {
241
2
        erase(position);
242
2
    }
243
3
}
244
245
725
void Block::erase_tail(size_t start) {
246
725
    DCHECK(start <= data.size()) << fmt::format(
247
0
            "Position out of bound in Block::erase(), max position = {}", data.size());
248
725
    data.erase(data.begin() + start, data.end());
249
725
}
250
251
124k
void Block::erase(size_t position) {
252
124k
    DCHECK(!data.empty()) << "Block is empty";
253
124k
    DCHECK_LT(position, data.size()) << fmt::format(
254
0
            "Position out of bound in Block::erase(), max position = {}", data.size() - 1);
255
256
124k
    erase_impl(position);
257
124k
}
258
259
124k
void Block::erase_impl(size_t position) {
260
124k
    data.erase(data.begin() + position);
261
124k
}
262
263
76.0k
ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) {
264
76.0k
    if (position >= data.size()) {
265
0
        throw Exception(ErrorCode::INTERNAL_ERROR,
266
0
                        "invalid input position, position={}, data.size={}, names={}", position,
267
0
                        data.size(), dump_names());
268
0
    }
269
76.0k
    return data[position];
270
76.0k
}
271
272
109
const ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) const {
273
109
    if (position >= data.size()) {
274
0
        throw Exception(ErrorCode::INTERNAL_ERROR,
275
0
                        "invalid input position, position={}, data.size={}, names={}", position,
276
0
                        data.size(), dump_names());
277
0
    }
278
109
    return data[position];
279
109
}
280
281
89
int Block::get_position_by_name(const std::string& name) const {
282
1.04k
    for (int i = 0; i < data.size(); i++) {
283
1.04k
        if (data[i].name == name) {
284
86
            return i;
285
86
        }
286
1.04k
    }
287
3
    return -1;
288
89
}
289
290
5
void Block::check_number_of_rows(bool allow_null_columns) const {
291
5
    ssize_t rows = -1;
292
9
    for (const auto& elem : data) {
293
9
        if (!elem.column && allow_null_columns) {
294
2
            continue;
295
2
        }
296
297
7
        if (!elem.column) {
298
1
            throw Exception(ErrorCode::INTERNAL_ERROR,
299
1
                            "Column {} in block is nullptr, in method check_number_of_rows.",
300
1
                            elem.name);
301
1
        }
302
303
6
        ssize_t size = elem.column->size();
304
305
6
        if (rows == -1) {
306
5
            rows = size;
307
5
        } else if (rows != size) {
308
1
            throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns doesn't match, block={}",
309
1
                            dump_structure());
310
1
        }
311
6
    }
312
5
}
313
314
2.63M
Status Block::check_type_and_column() const {
315
2.63M
#ifndef NDEBUG
316
2.63M
    for (const auto& elem : data) {
317
211k
        if (!elem.column) {
318
0
            continue;
319
0
        }
320
211k
        if (!elem.type) {
321
0
            continue;
322
0
        }
323
324
        // ColumnNothing is a special column type, it is used to represent a column that
325
        // is not materialized, so we don't need to check it.
326
211k
        if (check_and_get_column<ColumnNothing>(elem.column.get())) {
327
0
            continue;
328
0
        }
329
330
211k
        const auto& type = elem.type;
331
211k
        const auto& column = elem.column;
332
333
211k
        RETURN_IF_ERROR(column->column_self_check());
334
211k
        auto st = type->check_column(*column);
335
211k
        if (!st.ok()) {
336
1
            return Status::InternalError(
337
1
                    "Column {} in block is not compatible with its column type :{}, data type :{}, "
338
1
                    "error: {}",
339
1
                    elem.name, column->get_name(), type->get_name(), st.msg());
340
1
        }
341
211k
    }
342
2.63M
#endif
343
2.63M
    return Status::OK();
344
2.63M
}
345
346
1.36M
Status Block::check_column_and_type_not_null() const {
347
1.55M
    for (size_t i = 0; i != data.size(); ++i) {
348
190k
        const auto& elem = data[i];
349
190k
        if (!elem.column) {
350
1
            return Status::InternalError("Column in block is nullptr, column index: {}, name: {}",
351
1
                                         i, elem.name);
352
1
        }
353
190k
        if (!elem.type) {
354
1
            return Status::InternalError("Type in block is nullptr, column index: {}, name: {}", i,
355
1
                                         elem.name);
356
1
        }
357
190k
    }
358
1.36M
    return Status::OK();
359
1.36M
}
360
361
50.2M
size_t Block::rows() const {
362
50.2M
    for (const auto& elem : data) {
363
46.4M
        if (elem.column) {
364
46.4M
            return elem.column->size();
365
46.4M
        }
366
46.4M
    }
367
368
3.79M
    return 0;
369
50.2M
}
370
371
6
void Block::set_num_rows(size_t length) {
372
6
    if (rows() > length) {
373
4
        for (auto& elem : data) {
374
4
            if (elem.column) {
375
4
                elem.column = elem.column->shrink(length);
376
4
            }
377
4
        }
378
4
    }
379
6
}
380
381
1
void Block::skip_num_rows(int64_t& length) {
382
1
    auto origin_rows = rows();
383
1
    if (origin_rows <= length) {
384
0
        clear();
385
0
        length -= origin_rows;
386
1
    } else {
387
1
        for (auto& elem : data) {
388
1
            if (elem.column) {
389
1
                elem.column = elem.column->cut(length, origin_rows - length);
390
1
            }
391
1
        }
392
1
    }
393
1
}
394
395
13.1k
size_t Block::bytes() const {
396
13.1k
    size_t res = 0;
397
27.8k
    for (const auto& elem : data) {
398
27.8k
        if (!elem.column) {
399
0
            std::stringstream ss;
400
0
            for (const auto& e : data) {
401
0
                ss << e.name + " ";
402
0
            }
403
0
            throw Exception(ErrorCode::INTERNAL_ERROR,
404
0
                            "Column {} in block is nullptr, in method bytes. All Columns are {}",
405
0
                            elem.name, ss.str());
406
0
        }
407
27.8k
        res += elem.column->byte_size();
408
27.8k
    }
409
410
13.1k
    return res;
411
13.1k
}
412
413
194k
size_t Block::allocated_bytes() const {
414
194k
    size_t res = 0;
415
381k
    for (const auto& elem : data) {
416
381k
        if (!elem.column) {
417
            // Sometimes if expr failed, then there will be a nullptr
418
            // column left in the block.
419
1
            continue;
420
1
        }
421
381k
        res += elem.column->allocated_bytes();
422
381k
    }
423
424
194k
    return res;
425
194k
}
426
427
8
std::string Block::dump_names() const {
428
8
    std::string out;
429
23
    for (auto it = data.begin(); it != data.end(); ++it) {
430
15
        if (it != data.begin()) {
431
7
            out += ", ";
432
7
        }
433
15
        out += it->name;
434
15
    }
435
8
    return out;
436
8
}
437
438
7
std::string Block::dump_types() const {
439
7
    std::string out;
440
21
    for (auto it = data.begin(); it != data.end(); ++it) {
441
14
        if (it != data.begin()) {
442
7
            out += ", ";
443
7
        }
444
14
        out += it->type->get_name();
445
14
    }
446
7
    return out;
447
7
}
448
449
31
std::string Block::dump_data_json(size_t begin, size_t row_limit, bool allow_null_mismatch) const {
450
31
    std::stringstream ss;
451
452
31
    std::vector<std::string> headers;
453
31
    headers.reserve(columns());
454
46
    for (const auto& it : data) {
455
        // fmt::format is from the {fmt} library, you might be using std::format in C++20
456
        // If not, you can build the string with a stringstream as a fallback.
457
46
        headers.push_back(fmt::format("{}({})", it.name, it.type->get_name()));
458
46
    }
459
460
31
    size_t start_row = std::min(begin, rows());
461
31
    size_t end_row = std::min(rows(), begin + row_limit);
462
463
31
    auto format_options = DataTypeSerDe::get_default_format_options();
464
31
    auto time_zone = cctz::utc_time_zone();
465
31
    format_options.timezone = &time_zone;
466
467
31
    ss << "[";
468
3.59k
    for (size_t row_num = start_row; row_num < end_row; ++row_num) {
469
3.56k
        if (row_num > start_row) {
470
3.53k
            ss << ",";
471
3.53k
        }
472
3.56k
        ss << "{";
473
8.33k
        for (size_t i = 0; i < columns(); ++i) {
474
4.77k
            if (i > 0) {
475
1.21k
                ss << ",";
476
1.21k
            }
477
4.77k
            ss << "\"" << headers[i] << "\":";
478
4.77k
            std::string s;
479
480
            // This value-extraction logic is preserved from your original function
481
            // to maintain consistency, especially for handling nullability mismatches.
482
4.77k
            if (data[i].column && data[i].type->is_nullable() && !data[i].column->is_nullable()) {
483
                // This branch handles a specific internal representation of nullable columns.
484
                // The original code would assert here if allow_null_mismatch is false.
485
0
                assert(allow_null_mismatch);
486
0
                s = assert_cast<const DataTypeNullable*>(data[i].type.get())
487
0
                            ->get_nested_type()
488
0
                            ->to_string(*data[i].column, row_num, format_options);
489
4.77k
            } else {
490
                // This is the standard path. The to_string method is expected to correctly
491
                // handle all cases, including when the column is null (e.g., by returning "NULL").
492
4.77k
                s = data[i].to_string(row_num, format_options);
493
4.77k
            }
494
4.77k
            ss << "\"" << s << "\"";
495
4.77k
        }
496
3.56k
        ss << "}";
497
3.56k
    }
498
31
    ss << "]";
499
31
    return ss.str();
500
31
}
501
502
858
std::string Block::dump_data(size_t begin, size_t row_limit, bool allow_null_mismatch) const {
503
858
    std::vector<std::string> headers;
504
858
    std::vector<int> headers_size;
505
2.10k
    for (const auto& it : data) {
506
2.10k
        std::string s = fmt::format("{}({})", it.name, it.type->get_name());
507
2.10k
        headers_size.push_back(s.size() > 15 ? (int)s.size() : 15);
508
2.10k
        headers.emplace_back(s);
509
2.10k
    }
510
511
858
    std::stringstream out;
512
    // header upper line
513
2.16k
    auto line = [&]() {
514
8.07k
        for (size_t i = 0; i < columns(); ++i) {
515
5.91k
            out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
516
5.91k
        }
517
2.16k
        out << std::setw(1) << "+" << std::endl;
518
2.16k
    };
519
858
    line();
520
    // header text
521
2.96k
    for (size_t i = 0; i < columns(); ++i) {
522
2.10k
        out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
523
2.10k
            << headers[i];
524
2.10k
    }
525
858
    out << std::setw(1) << "|" << std::endl;
526
    // header bottom line
527
858
    line();
528
858
    if (rows() == 0) {
529
414
        return out.str();
530
414
    }
531
532
444
    auto format_options = DataTypeSerDe::get_default_format_options();
533
444
    auto time_zone = cctz::utc_time_zone();
534
444
    format_options.timezone = &time_zone;
535
536
    // content
537
12.2k
    for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) {
538
32.4k
        for (size_t i = 0; i < columns(); ++i) {
539
20.6k
            if (!data[i].column || data[i].column->empty()) {
540
0
                out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
541
0
                    << std::right;
542
0
                continue;
543
0
            }
544
20.6k
            std::string s;
545
20.6k
            if (data[i].column) { // column may be const
546
                // for code inside `default_implementation_for_nulls`, there's could have: type = null, col != null
547
20.6k
                if (data[i].type->is_nullable() && !data[i].column->is_nullable()) {
548
0
                    assert(allow_null_mismatch);
549
0
                    s = assert_cast<const DataTypeNullable*>(data[i].type.get())
550
0
                                ->get_nested_type()
551
0
                                ->to_string(*data[i].column, row_num, format_options);
552
20.6k
                } else {
553
20.6k
                    s = data[i].to_string(row_num, format_options);
554
20.6k
                }
555
20.6k
            }
556
20.6k
            if (s.length() > headers_size[i]) {
557
2.12k
                s = s.substr(0, headers_size[i] - 3) + "...";
558
2.12k
            }
559
20.6k
            out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
560
20.6k
                << std::right << s;
561
20.6k
        }
562
11.7k
        out << std::setw(1) << "|" << std::endl;
563
11.7k
    }
564
    // bottom line
565
444
    line();
566
444
    if (row_limit < rows()) {
567
112
        out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
568
112
    }
569
444
    return out.str();
570
444
}
571
572
1
std::string Block::dump_one_line(size_t row, int column_end) const {
573
1
    assert(column_end <= columns());
574
1
    fmt::memory_buffer line;
575
576
1
    auto format_options = DataTypeSerDe::get_default_format_options();
577
1
    auto time_zone = cctz::utc_time_zone();
578
1
    format_options.timezone = &time_zone;
579
580
3
    for (int i = 0; i < column_end; ++i) {
581
2
        if (LIKELY(i != 0)) {
582
            // TODO: need more effective function of to string. now the impl is slow
583
1
            fmt::format_to(line, " {}", data[i].to_string(row, format_options));
584
1
        } else {
585
1
            fmt::format_to(line, "{}", data[i].to_string(row, format_options));
586
1
        }
587
2
    }
588
1
    return fmt::to_string(line);
589
1
}
590
591
47
std::string Block::dump_structure() const {
592
47
    std::string out;
593
382
    for (auto it = data.begin(); it != data.end(); ++it) {
594
335
        if (it != data.begin()) {
595
288
            out += ", \n";
596
288
        }
597
335
        out += it->dump_structure();
598
335
    }
599
47
    return out;
600
47
}
601
602
48.9k
Block Block::clone_empty() const {
603
48.9k
    Block res;
604
95.8k
    for (const auto& elem : data) {
605
95.8k
        res.insert(elem.clone_empty());
606
95.8k
    }
607
48.9k
    return res;
608
48.9k
}
609
610
32
MutableColumns Block::clone_empty_columns() const {
611
32
    size_t num_columns = data.size();
612
32
    MutableColumns columns(num_columns);
613
142
    for (size_t i = 0; i < num_columns; ++i) {
614
110
        columns[i] = data[i].column ? data[i].column->clone_empty() : data[i].type->create_column();
615
110
    }
616
32
    return columns;
617
32
}
618
619
26.1k
Columns Block::get_columns() const {
620
26.1k
    size_t num_columns = data.size();
621
26.1k
    Columns columns(num_columns);
622
113k
    for (size_t i = 0; i < num_columns; ++i) {
623
87.8k
        columns[i] = data[i].column->convert_to_full_column_if_const();
624
87.8k
    }
625
26.1k
    return columns;
626
26.1k
}
627
628
559
Columns Block::get_columns_and_convert() {
629
559
    size_t num_columns = data.size();
630
559
    Columns columns(num_columns);
631
1.18k
    for (size_t i = 0; i < num_columns; ++i) {
632
622
        data[i].column = data[i].column->convert_to_full_column_if_const();
633
622
        columns[i] = data[i].column;
634
622
    }
635
559
    return columns;
636
559
}
637
638
2.51k
Block::ScopedMutableColumns::ScopedMutableColumns(Block& block) : _block(&block) {
639
2.51k
    const size_t num_columns = block.data.size();
640
2.51k
    _columns.resize(num_columns);
641
2.51k
    size_t acquired_columns = 0;
642
2.51k
    try {
643
7.25k
        for (; acquired_columns < num_columns; ++acquired_columns) {
644
4.74k
            auto& column_with_type_and_name = block.data[acquired_columns];
645
4.74k
            _columns[acquired_columns] = scoped_mutate_column(column_with_type_and_name.column,
646
4.74k
                                                              column_with_type_and_name.type);
647
4.74k
        }
648
2.51k
    } catch (...) {
649
4
        for (size_t i = 0; i < acquired_columns; ++i) {
650
2
            block.data[i].column = std::move(_columns[i]);
651
2
        }
652
2
        _block = nullptr;
653
2
        throw;
654
2
    }
655
2.51k
}
656
657
2.51k
Block::ScopedMutableColumns::~ScopedMutableColumns() {
658
2.51k
    restore();
659
2.51k
}
660
661
Block::ScopedMutableColumns::ScopedMutableColumns(ScopedMutableColumns&& other) noexcept
662
0
        : _block(std::exchange(other._block, nullptr)), _columns(std::move(other._columns)) {}
663
664
Block::ScopedMutableColumns& Block::ScopedMutableColumns::operator=(
665
0
        ScopedMutableColumns&& other) noexcept {
666
0
    if (this != &other) {
667
0
        restore();
668
0
        _block = std::exchange(other._block, nullptr);
669
0
        _columns = std::move(other._columns);
670
0
    }
671
0
    return *this;
672
0
}
673
674
2
const DataTypePtr& Block::ScopedMutableColumns::get_datatype_by_position(size_t position) const {
675
2
    DCHECK(_block != nullptr);
676
2
    return _block->get_by_position(position).type;
677
2
}
678
679
2
const std::string& Block::ScopedMutableColumns::get_name_by_position(size_t position) const {
680
2
    DCHECK(_block != nullptr);
681
2
    return _block->get_by_position(position).name;
682
2
}
683
684
816
MutableColumns Block::ScopedMutableColumns::release() {
685
816
    DCHECK(_block != nullptr);
686
816
    _block = nullptr;
687
816
    return std::move(_columns);
688
816
}
689
690
2.99k
void Block::ScopedMutableColumns::restore() {
691
2.99k
    if (_block != nullptr) {
692
1.69k
        _block->set_columns(std::move(_columns));
693
1.69k
        _block = nullptr;
694
1.69k
    }
695
2.99k
}
696
697
Block::ScopedMutableColumn::ScopedMutableColumn(Block& block, size_t position)
698
98
        : _block(&block), _position(position) {
699
98
    DCHECK_LT(_position, _block->data.size());
700
98
    auto& column_with_type_and_name = _block->data[_position];
701
98
    DCHECK(column_with_type_and_name.type);
702
98
    _column =
703
98
            scoped_mutate_column(column_with_type_and_name.column, column_with_type_and_name.type);
704
98
}
705
706
97
Block::ScopedMutableColumn::~ScopedMutableColumn() {
707
97
    restore();
708
97
}
709
710
Block::ScopedMutableColumn::ScopedMutableColumn(ScopedMutableColumn&& other) noexcept
711
0
        : _block(std::exchange(other._block, nullptr)),
712
0
          _position(other._position),
713
0
          _column(std::move(other._column)) {}
714
715
Block::ScopedMutableColumn& Block::ScopedMutableColumn::operator=(
716
0
        ScopedMutableColumn&& other) noexcept {
717
0
    if (this != &other) {
718
0
        restore();
719
0
        _block = std::exchange(other._block, nullptr);
720
0
        _position = other._position;
721
0
        _column = std::move(other._column);
722
0
    }
723
0
    return *this;
724
0
}
725
726
97
void Block::ScopedMutableColumn::restore() {
727
97
    if (_block != nullptr) {
728
97
        DCHECK_LT(_position, _block->data.size());
729
97
        _block->data[_position].column = std::move(_column);
730
97
        _block = nullptr;
731
97
    }
732
97
}
733
734
2.51k
Block::ScopedMutableColumns Block::mutate_columns_scoped() & {
735
2.51k
    return ScopedMutableColumns(*this);
736
2.51k
}
737
738
98
Block::ScopedMutableColumn Block::mutate_column_scoped(size_t position) & {
739
98
    return ScopedMutableColumn(*this, position);
740
98
}
741
742
817
ScopedMutableBlock::ScopedMutableBlock(Block* block) {
743
817
    DCHECK(block != nullptr);
744
817
    DataTypes data_types = block->get_data_types();
745
817
    std::vector<std::string> names = block->get_names();
746
817
    auto columns_guard = block->mutate_columns_scoped();
747
817
    _mutable_block.data_types() = std::move(data_types);
748
817
    _mutable_block.get_names() = std::move(names);
749
817
    _mutable_block.set_mutable_columns(columns_guard.release());
750
817
    _block = block;
751
817
}
752
753
146k
MutableColumns Block::mutate_columns() && {
754
146k
    size_t num_columns = data.size();
755
146k
    MutableColumns columns(num_columns);
756
433k
    for (size_t i = 0; i < num_columns; ++i) {
757
287k
        DCHECK(data[i].type);
758
287k
        columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column))
759
287k
                                    : data[i].type->create_column();
760
287k
    }
761
146k
    return columns;
762
146k
}
763
764
3.50k
void Block::set_columns(MutableColumns&& columns) {
765
3.50k
    DCHECK_GE(columns.size(), data.size())
766
0
            << fmt::format("Invalid size of columns, columns size: {}, data size: {}",
767
0
                           columns.size(), data.size());
768
3.50k
    size_t num_columns = data.size();
769
11.9k
    for (size_t i = 0; i < num_columns; ++i) {
770
8.45k
        data[i].column = std::move(columns[i]);
771
8.45k
    }
772
3.50k
}
773
774
51
Block Block::clone_without_columns(const std::vector<int>* column_offset) const {
775
51
    Block res;
776
777
51
    if (column_offset != nullptr) {
778
32
        size_t num_columns = column_offset->size();
779
174
        for (size_t i = 0; i < num_columns; ++i) {
780
142
            res.insert({nullptr, data[(*column_offset)[i]].type, data[(*column_offset)[i]].name});
781
142
        }
782
32
    } else {
783
19
        size_t num_columns = data.size();
784
53
        for (size_t i = 0; i < num_columns; ++i) {
785
34
            res.insert({nullptr, data[i].type, data[i].name});
786
34
        }
787
19
    }
788
51
    return res;
789
51
}
790
791
55.8k
const ColumnsWithTypeAndName& Block::get_columns_with_type_and_name() const {
792
55.8k
    return data;
793
55.8k
}
794
795
145k
std::vector<std::string> Block::get_names() const {
796
145k
    std::vector<std::string> res;
797
145k
    res.reserve(columns());
798
799
285k
    for (const auto& elem : data) {
800
285k
        res.push_back(elem.name);
801
285k
    }
802
803
145k
    return res;
804
145k
}
805
806
145k
DataTypes Block::get_data_types() const {
807
145k
    DataTypes res;
808
145k
    res.reserve(columns());
809
810
285k
    for (const auto& elem : data) {
811
285k
        res.push_back(elem.type);
812
285k
    }
813
814
145k
    return res;
815
145k
}
816
817
52.8k
void Block::clear() {
818
52.8k
    data.clear();
819
52.8k
}
820
821
1.29M
void Block::clear_column_data(int64_t column_size) {
822
1.29M
    SCOPED_SKIP_MEMORY_CHECK();
823
    // data.size() greater than column_size, means here have some
824
    // function exec result in block, need erase it here
825
1.29M
    if (column_size != -1 and data.size() > column_size) {
826
2.20k
        for (int64_t i = data.size() - 1; i >= column_size; --i) {
827
1.10k
            erase(i);
828
1.10k
        }
829
1.10k
    }
830
1.29M
    for (auto& d : data) {
831
53.3k
        if (d.column) {
832
53.3k
            if (d.column->is_exclusive()) {
833
53.1k
                d.column->assert_mutable()->clear();
834
53.1k
            } else {
835
245
                d.column = d.column->clone_empty();
836
245
            }
837
53.3k
        }
838
53.3k
    }
839
1.29M
}
840
841
46
void Block::clear_column_data(const std::vector<uint32_t>& columns_to_clear) {
842
46
    SCOPED_SKIP_MEMORY_CHECK();
843
79
    for (auto col : columns_to_clear) {
844
79
        DCHECK_LT(col, data.size());
845
79
        auto& column = data[col].column;
846
79
        if (column) {
847
79
            if (column->is_exclusive()) {
848
77
                column->assert_mutable()->clear();
849
77
            } else {
850
2
                column = column->clone_empty();
851
2
            }
852
79
        }
853
79
    }
854
46
}
855
856
void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
857
48.0k
                                      bool need_keep_first) {
858
48.0k
    if (data.size() >= column_keep_flags.size()) {
859
48.0k
        auto origin_rows = rows();
860
142k
        for (size_t i = 0; i < column_keep_flags.size(); ++i) {
861
94.1k
            if (!column_keep_flags[i]) {
862
36.8k
                data[i].column = data[i].column->clone_empty();
863
36.8k
            }
864
94.1k
        }
865
866
48.0k
        if (need_keep_first && !column_keep_flags[0]) {
867
1
            auto first_column = data[0].column->clone_empty();
868
1
            first_column->resize(origin_rows);
869
1
            data[0].column = std::move(first_column);
870
1
        }
871
48.0k
    }
872
48.0k
}
873
874
1.32k
void Block::swap(Block& other) noexcept {
875
1.32k
    SCOPED_SKIP_MEMORY_CHECK();
876
1.32k
    data.swap(other.data);
877
1.32k
}
878
879
1.70k
void Block::swap(Block&& other) noexcept {
880
1.70k
    SCOPED_SKIP_MEMORY_CHECK();
881
1.70k
    data = std::move(other.data);
882
1.70k
}
883
884
3
void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
885
3
    Container tmp_data;
886
3
    tmp_data.reserve(result_column_ids.size());
887
5
    for (const int result_column_id : result_column_ids) {
888
5
        tmp_data.push_back(data[result_column_id]);
889
5
    }
890
3
    data = std::move(tmp_data);
891
3
}
892
893
2
void Block::update_hash(SipHash& hash) const {
894
8
    for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
895
12
        for (const auto& col : data) {
896
12
            col.column->update_hash_with_value(row_no, hash);
897
12
        }
898
6
    }
899
2
}
900
901
void Block::filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
902
531
                                  const IColumn::Filter& filter) {
903
531
    size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
904
1.35k
    for (const auto& col : columns_to_filter) {
905
1.35k
        auto& column = block->get_by_position(col).column;
906
1.35k
        if (column->size() == count) {
907
1.29k
            continue;
908
1.29k
        }
909
61
        if (count == 0) {
910
2
            if (column->is_exclusive()) {
911
0
                column->assert_mutable()->clear();
912
2
            } else {
913
2
                column = column->clone_empty();
914
2
            }
915
2
            continue;
916
2
        }
917
59
        if (column->is_exclusive()) {
918
            // COW: safe to mutate in-place since we have exclusive ownership
919
59
            const auto result_size = column->assert_mutable()->filter(filter);
920
59
            if (result_size != count) [[unlikely]] {
921
0
                throw Exception(ErrorCode::INTERNAL_ERROR,
922
0
                                "result_size not equal with filter_size, result_size={}, "
923
0
                                "filter_size={}",
924
0
                                result_size, count);
925
0
            }
926
59
        } else {
927
            // COW: must create a copy since column is shared
928
0
            column = column->filter(filter, count);
929
0
        }
930
59
    }
931
531
}
932
933
void Block::filter_block_internal(Block* block, const IColumn::Filter& filter,
934
1
                                  uint32_t column_to_keep) {
935
1
    std::vector<uint32_t> columns_to_filter;
936
1
    columns_to_filter.resize(column_to_keep);
937
3
    for (uint32_t i = 0; i < column_to_keep; ++i) {
938
2
        columns_to_filter[i] = i;
939
2
    }
940
1
    filter_block_internal(block, columns_to_filter, filter);
941
1
}
942
943
8
void Block::filter_block_internal(Block* block, const IColumn::Filter& filter) {
944
8
    const size_t count =
945
8
            filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
946
24
    for (int i = 0; i < block->columns(); ++i) {
947
16
        auto& column = block->get_by_position(i).column;
948
16
        if (column->is_exclusive()) {
949
16
            column->assert_mutable()->filter(filter);
950
16
        } else {
951
0
            column = column->filter(filter, count);
952
0
        }
953
16
    }
954
8
}
955
956
Status Block::append_to_block_by_selector(MutableBlock* dst,
957
1
                                          const IColumn::Selector& selector) const {
958
1
    RETURN_IF_CATCH_EXCEPTION({
959
1
        DCHECK_EQ(data.size(), dst->mutable_columns().size());
960
1
        for (size_t i = 0; i < data.size(); i++) {
961
            // FIXME: this is a quickfix. we assume that only partition functions make there some
962
1
            if (!is_column_const(*data[i].column)) {
963
1
                data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector);
964
1
            }
965
1
        }
966
1
    });
967
1
    return Status::OK();
968
1
}
969
970
Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter,
971
496
                           size_t filter_column_id, size_t column_to_keep) {
972
496
    const auto& filter_column = block->get_by_position(filter_column_id).column;
973
496
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
974
1
        const auto& nested_column = nullable_column->get_nested_column_ptr();
975
976
1
        MutableColumnPtr mutable_holder =
977
1
                nested_column->use_count() == 1
978
1
                        ? nested_column->assert_mutable()
979
1
                        : nested_column->clone_resized(nested_column->size());
980
981
1
        auto* concrete_column = assert_cast<ColumnUInt8*>(mutable_holder.get());
982
1
        const auto* __restrict null_map = nullable_column->get_null_map_data().data();
983
1
        IColumn::Filter& filter = concrete_column->get_data();
984
1
        auto* __restrict filter_data = filter.data();
985
986
1
        const size_t size = filter.size();
987
4
        for (size_t i = 0; i < size; ++i) {
988
3
            filter_data[i] &= !null_map[i];
989
3
        }
990
1
        RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
991
495
    } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
992
2
        bool ret = const_column->get_bool(0);
993
2
        if (!ret) {
994
2
            for (const auto& col : columns_to_filter) {
995
2
                auto& column = block->get_by_position(col).column;
996
2
                if (column->is_exclusive()) {
997
2
                    column->assert_mutable()->clear();
998
2
                } else {
999
0
                    column = column->clone_empty();
1000
0
                }
1001
2
            }
1002
1
        }
1003
493
    } else {
1004
493
        const IColumn::Filter& filter =
1005
493
                assert_cast<const doris::ColumnUInt8&>(*filter_column).get_data();
1006
493
        RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
1007
493
    }
1008
1009
496
    erase_useless_column(block, column_to_keep);
1010
496
    return Status::OK();
1011
496
}
1012
1013
490
Status Block::filter_block(Block* block, size_t filter_column_id, size_t column_to_keep) {
1014
490
    std::vector<uint32_t> columns_to_filter;
1015
490
    columns_to_filter.resize(column_to_keep);
1016
1.77k
    for (uint32_t i = 0; i < column_to_keep; ++i) {
1017
1.28k
        columns_to_filter[i] = i;
1018
1.28k
    }
1019
490
    return filter_block(block, columns_to_filter, filter_column_id, column_to_keep);
1020
490
}
1021
1022
Status Block::serialize(int be_exec_version, PBlock* pblock,
1023
                        /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes,
1024
                        size_t* compressed_bytes, int64_t* compress_time,
1025
                        segment_v2::CompressionTypePB compression_type,
1026
2.72k
                        bool allow_transfer_large_data) const {
1027
2.72k
    RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
1028
2.72k
    pblock->set_be_exec_version(be_exec_version);
1029
1030
    // calc uncompressed size for allocation
1031
2.72k
    size_t content_uncompressed_size = 0;
1032
3.39k
    for (const auto& c : *this) {
1033
3.39k
        PColumnMeta* pcm = pblock->add_column_metas();
1034
3.39k
        c.to_pb_column_meta(pcm);
1035
18.4E
        DCHECK(pcm->type() != PGenericType::UNKNOWN) << " forget to set pb type";
1036
        // get serialized size
1037
3.39k
        content_uncompressed_size +=
1038
3.39k
                c.type->get_uncompressed_serialized_bytes(*(c.column), pblock->be_exec_version());
1039
3.39k
    }
1040
1041
    // serialize data values
1042
    // when data type is HLL, content_uncompressed_size maybe larger than real size.
1043
2.72k
    std::string column_values;
1044
2.72k
    try {
1045
        // TODO: After support c++23, we should use resize_and_overwrite to replace resize
1046
2.72k
        column_values.resize(content_uncompressed_size);
1047
2.72k
    } catch (...) {
1048
0
        std::string msg = fmt::format("Try to alloc {} bytes for pblock column values failed.",
1049
0
                                      content_uncompressed_size);
1050
0
        LOG(WARNING) << msg;
1051
0
        return Status::BufferAllocFailed(msg);
1052
0
    }
1053
2.72k
    char* buf = column_values.data();
1054
1055
3.40k
    for (const auto& c : *this) {
1056
3.40k
        buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
1057
3.40k
    }
1058
2.72k
    *uncompressed_bytes = content_uncompressed_size;
1059
2.72k
    const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING;
1060
2.72k
    *compressed_bytes = serialize_bytes;
1061
2.72k
    column_values.resize(serialize_bytes);
1062
1063
    // compress
1064
2.72k
    if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) {
1065
626
        SCOPED_RAW_TIMER(compress_time);
1066
626
        pblock->set_compression_type(compression_type);
1067
626
        pblock->set_uncompressed_size(serialize_bytes);
1068
1069
626
        BlockCompressionCodec* codec;
1070
626
        RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
1071
1072
626
        faststring buf_compressed;
1073
626
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
1074
626
                codec->compress(Slice(column_values.data(), serialize_bytes), &buf_compressed));
1075
626
        size_t compressed_size = buf_compressed.size();
1076
626
        if (LIKELY(compressed_size < serialize_bytes)) {
1077
            // TODO: rethink the logic here may copy again ?
1078
626
            pblock->set_column_values(buf_compressed.data(), buf_compressed.size());
1079
626
            pblock->set_compressed(true);
1080
626
            *compressed_bytes = compressed_size;
1081
626
        } else {
1082
0
            pblock->set_column_values(std::move(column_values));
1083
0
        }
1084
1085
626
        VLOG_ROW << "uncompressed size: " << content_uncompressed_size
1086
0
                 << ", compressed size: " << compressed_size;
1087
2.10k
    } else {
1088
2.10k
        pblock->set_column_values(std::move(column_values));
1089
2.10k
    }
1090
2.72k
    if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) {
1091
0
        return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.",
1092
0
                                     *compressed_bytes);
1093
0
    }
1094
2.72k
    return Status::OK();
1095
2.72k
}
1096
1097
240k
size_t MutableBlock::rows() const {
1098
240k
    for (const auto& column : _columns) {
1099
144k
        if (column) {
1100
144k
            return column->size();
1101
144k
        }
1102
144k
    }
1103
1104
96.0k
    return 0;
1105
240k
}
1106
1107
0
void MutableBlock::swap(MutableBlock& another) noexcept {
1108
0
    SCOPED_SKIP_MEMORY_CHECK();
1109
0
    _columns.swap(another._columns);
1110
0
    _data_types.swap(another._data_types);
1111
0
    _names.swap(another._names);
1112
0
}
1113
1114
0
void MutableBlock::add_row(const Block* block, int row) {
1115
0
    const auto& block_data = block->get_columns_with_type_and_name();
1116
0
    for (size_t i = 0; i < _columns.size(); ++i) {
1117
0
        _columns[i]->insert_from(*block_data[i].column.get(), row);
1118
0
    }
1119
0
}
1120
1121
Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
1122
163
                              const uint32_t* row_end, const std::vector<int>* column_offset) {
1123
163
    RETURN_IF_CATCH_EXCEPTION({
1124
163
        DCHECK_LE(columns(), block->columns());
1125
163
        if (column_offset != nullptr) {
1126
163
            DCHECK_EQ(columns(), column_offset->size());
1127
163
        }
1128
163
        const auto& block_data = block->get_columns_with_type_and_name();
1129
163
        for (size_t i = 0; i < _columns.size(); ++i) {
1130
163
            const auto& src_col = column_offset ? block_data[(*column_offset)[i]] : block_data[i];
1131
163
            DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name());
1132
163
            auto& dst = _columns[i];
1133
163
            const auto& src = *src_col.column.get();
1134
163
            DCHECK_GE(src.size(), row_end - row_begin);
1135
163
            dst->insert_indices_from(src, row_begin, row_end);
1136
163
        }
1137
163
    });
1138
162
    return Status::OK();
1139
163
}
1140
1141
126
Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) {
1142
126
    RETURN_IF_CATCH_EXCEPTION({
1143
126
        DCHECK_LE(columns(), block->columns());
1144
126
        const auto& block_data = block->get_columns_with_type_and_name();
1145
126
        for (size_t i = 0; i < _columns.size(); ++i) {
1146
126
            DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
1147
126
            auto& dst = _columns[i];
1148
126
            const auto& src = *block_data[i].column.get();
1149
126
            dst->insert_range_from(src, row_begin, length);
1150
126
        }
1151
126
    });
1152
126
    return Status::OK();
1153
126
}
1154
1155
144k
Block MutableBlock::to_block(int start_column) {
1156
144k
    return to_block(start_column, (int)_columns.size());
1157
144k
}
1158
1159
144k
Block MutableBlock::to_block(int start_column, int end_column) {
1160
144k
    ColumnsWithTypeAndName columns_with_schema;
1161
144k
    columns_with_schema.reserve(end_column - start_column);
1162
428k
    for (size_t i = start_column; i < end_column; ++i) {
1163
283k
        columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], _names[i]);
1164
283k
    }
1165
144k
    return {columns_with_schema};
1166
144k
}
1167
1168
1
std::string MutableBlock::dump_data_json(size_t row_limit) const {
1169
1
    std::stringstream ss;
1170
1
    std::vector<std::string> headers;
1171
1172
1
    headers.reserve(columns());
1173
2
    for (size_t i = 0; i < columns(); ++i) {
1174
1
        headers.push_back(_data_types[i]->get_name());
1175
1
    }
1176
1
    size_t num_rows_to_dump = std::min(rows(), row_limit);
1177
1
    ss << "[";
1178
1179
1
    auto format_options = DataTypeSerDe::get_default_format_options();
1180
1
    auto time_zone = cctz::utc_time_zone();
1181
1
    format_options.timezone = &time_zone;
1182
1183
4
    for (size_t row_num = 0; row_num < num_rows_to_dump; ++row_num) {
1184
3
        if (row_num > 0) {
1185
2
            ss << ",";
1186
2
        }
1187
3
        ss << "{";
1188
6
        for (size_t i = 0; i < columns(); ++i) {
1189
3
            if (i > 0) {
1190
0
                ss << ",";
1191
0
            }
1192
3
            ss << "\"" << headers[i] << "\":";
1193
3
            std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options);
1194
3
            ss << "\"" << s << "\"";
1195
3
        }
1196
3
        ss << "}";
1197
3
    }
1198
1
    ss << "]";
1199
1
    return ss.str();
1200
1
}
1201
1202
1
std::string MutableBlock::dump_data(size_t row_limit) const {
1203
1
    std::vector<std::string> headers;
1204
1
    std::vector<int> headers_size;
1205
2
    for (size_t i = 0; i < columns(); ++i) {
1206
1
        std::string s = _data_types[i]->get_name();
1207
1
        headers_size.push_back(s.size() > 15 ? (int)s.size() : 15);
1208
1
        headers.emplace_back(s);
1209
1
    }
1210
1211
1
    std::stringstream out;
1212
    // header upper line
1213
3
    auto line = [&]() {
1214
6
        for (size_t i = 0; i < columns(); ++i) {
1215
3
            out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
1216
3
        }
1217
3
        out << std::setw(1) << "+" << std::endl;
1218
3
    };
1219
1
    line();
1220
    // header text
1221
2
    for (size_t i = 0; i < columns(); ++i) {
1222
1
        out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
1223
1
            << headers[i];
1224
1
    }
1225
1
    out << std::setw(1) << "|" << std::endl;
1226
    // header bottom line
1227
1
    line();
1228
1
    if (rows() == 0) {
1229
0
        return out.str();
1230
0
    }
1231
1232
1
    auto format_options = DataTypeSerDe::get_default_format_options();
1233
1
    auto time_zone = cctz::utc_time_zone();
1234
1
    format_options.timezone = &time_zone;
1235
1236
    // content
1237
4
    for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) {
1238
6
        for (size_t i = 0; i < columns(); ++i) {
1239
3
            if (_columns[i].get()->empty()) {
1240
0
                out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
1241
0
                    << std::right;
1242
0
                continue;
1243
0
            }
1244
3
            std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num, format_options);
1245
3
            if (s.length() > headers_size[i]) {
1246
0
                s = s.substr(0, headers_size[i] - 3) + "...";
1247
0
            }
1248
3
            out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
1249
3
                << std::right << s;
1250
3
        }
1251
3
        out << std::setw(1) << "|" << std::endl;
1252
3
    }
1253
    // bottom line
1254
1
    line();
1255
1
    if (row_limit < rows()) {
1256
0
        out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
1257
0
    }
1258
1
    return out.str();
1259
1
}
1260
1261
48.0k
std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_reserve) const {
1262
48.0k
    auto temp_block = Block::create_unique();
1263
94.1k
    for (const auto& d : data) {
1264
94.1k
        auto column = d.type->create_column();
1265
94.1k
        if (is_reserve) {
1266
0
            column->reserve(size);
1267
94.1k
        } else {
1268
94.1k
            column->insert_many_defaults(size);
1269
94.1k
        }
1270
94.1k
        temp_block->insert({std::move(column), d.type, d.name});
1271
94.1k
    }
1272
48.0k
    return temp_block;
1273
48.0k
}
1274
1275
96.1k
size_t MutableBlock::allocated_bytes() const {
1276
96.1k
    size_t res = 0;
1277
188k
    for (const auto& col : _columns) {
1278
188k
        if (col) {
1279
188k
            res += col->allocated_bytes();
1280
188k
        }
1281
188k
    }
1282
1283
96.1k
    return res;
1284
96.1k
}
1285
1286
1
void MutableBlock::clear_column_data() noexcept {
1287
1
    SCOPED_SKIP_MEMORY_CHECK();
1288
1
    for (auto& col : _columns) {
1289
1
        if (col) {
1290
1
            col->clear();
1291
1
        }
1292
1
    }
1293
1
}
1294
1295
5
std::string MutableBlock::dump_names() const {
1296
5
    std::string out;
1297
16
    for (auto it = _names.begin(); it != _names.end(); ++it) {
1298
11
        if (it != _names.begin()) {
1299
6
            out += ", ";
1300
6
        }
1301
11
        out += *it;
1302
11
    }
1303
5
    return out;
1304
5
}
1305
} // namespace doris