Coverage Report

Created: 2026-05-09 18:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/core/block/block.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Block.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <glog/logging.h>
24
#include <parallel_hashmap/phmap.h>
25
26
#include <cstddef>
27
#include <cstdint>
28
#include <initializer_list>
29
#include <list>
30
#include <memory>
31
#include <ostream>
32
#include <set>
33
#include <string>
34
#include <utility>
35
#include <vector>
36
37
#include "common/be_mock_util.h"
38
#include "common/exception.h"
39
#include "common/factory_creator.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_nullable.h"
47
#include "core/types.h"
48
49
class SipHash;
50
51
namespace doris {
52
53
class TupleDescriptor;
54
class PBlock;
55
class SlotDescriptor;
56
57
namespace segment_v2 {
58
enum CompressionTypePB : int;
59
} // namespace segment_v2
60
61
/** Container for set of columns for bunch of rows in memory.
62
  * This is unit of data processing.
63
  * Also contains metadata - data types of columns and their names
64
  *  (either original names from a table, or generated names during temporary calculations).
65
  * Allows to insert, remove columns in arbitrary position, to change order of columns.
66
  */
67
class MutableBlock;
68
69
class Block {
70
    ENABLE_FACTORY_CREATOR(Block);
71
72
private:
73
    using Container = ColumnsWithTypeAndName;
74
    Container data;
75
76
public:
77
429k
    Block() = default;
78
    Block(std::initializer_list<ColumnWithTypeAndName> il);
79
    Block(ColumnsWithTypeAndName data_);
80
    Block(const std::vector<SlotDescriptor*>& slots, size_t block_size);
81
    Block(const std::vector<SlotDescriptor>& slots, size_t block_size);
82
83
811k
    MOCK_FUNCTION ~Block() = default;
84
843
    Block(const Block& block) = default;
85
48.2k
    Block& operator=(const Block& p) = default;
86
63.4k
    Block(Block&& block) = default;
87
839
    Block& operator=(Block&& other) = default;
88
89
    void reserve(size_t count);
90
    // Make sure the nammes is useless when use block
91
    void clear_names();
92
93
    /// insert the column at the specified position
94
    void insert(size_t position, const ColumnWithTypeAndName& elem);
95
    void insert(size_t position, ColumnWithTypeAndName&& elem);
96
    /// insert the column to the end
97
    void insert(const ColumnWithTypeAndName& elem);
98
    void insert(ColumnWithTypeAndName&& elem);
99
    /// remove the column at the specified position
100
    void erase(size_t position);
101
    /// remove the column at the [start, end)
102
    void erase_tail(size_t start);
103
    /// remove the columns at the specified positions
104
    void erase(const std::set<size_t>& positions);
105
    // T was std::set<int>, std::vector<int>, std::list<int>
106
    template <class T>
107
0
    void erase_not_in(const T& container) {
108
0
        Container new_data;
109
0
        for (auto pos : container) {
110
0
            new_data.emplace_back(std::move(data[pos]));
111
0
        }
112
0
        std::swap(data, new_data);
113
0
    }
114
115
17
    std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const {
116
17
        std::unordered_map<std::string, uint32_t> name_to_index_map;
117
136
        for (uint32_t i = 0; i < data.size(); ++i) {
118
119
            name_to_index_map[data[i].name] = i;
119
119
        }
120
17
        return name_to_index_map;
121
17
    }
122
123
    /// References are invalidated after calling functions above.
124
17.6M
    ColumnWithTypeAndName& get_by_position(size_t position) {
125
18.4E
        DCHECK(data.size() > position)
126
18.4E
                << ", data.size()=" << data.size() << ", position=" << position;
127
17.6M
        return data[position];
128
17.6M
    }
129
29.1M
    const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
130
131
31.4k
    void replace_by_position(size_t position, ColumnPtr&& res) {
132
31.4k
        this->get_by_position(position).column = std::move(res);
133
31.4k
    }
134
135
0
    void replace_by_position(size_t position, const ColumnPtr& res) {
136
0
        this->get_by_position(position).column = res;
137
0
    }
138
139
703
    void replace_by_position_if_const(size_t position) {
140
703
        auto& element = this->get_by_position(position);
141
703
        element.column = element.column->convert_to_full_column_if_const();
142
703
    }
143
144
    ColumnWithTypeAndName& safe_get_by_position(size_t position);
145
    const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
146
147
71.6k
    Container::iterator begin() { return data.begin(); }
148
71.6k
    Container::iterator end() { return data.end(); }
149
6.24k
    Container::const_iterator begin() const { return data.begin(); }
150
6.21k
    Container::const_iterator end() const { return data.end(); }
151
0
    Container::const_iterator cbegin() const { return data.cbegin(); }
152
0
    Container::const_iterator cend() const { return data.cend(); }
153
154
    // Get position of column by name. Returns -1 if there is no column with that name.
155
    // ATTN: this method is O(N). better maintain name -> position map in caller if you need to call it frequently.
156
    int get_position_by_name(const std::string& name) const;
157
158
    const ColumnsWithTypeAndName& get_columns_with_type_and_name() const;
159
160
    std::vector<std::string> get_names() const;
161
    DataTypes get_data_types() const;
162
163
59
    DataTypePtr get_data_type(size_t index) const {
164
59
        CHECK(index < data.size());
165
59
        return data[index].type;
166
59
    }
167
168
    /// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0.
169
    size_t rows() const;
170
171
    // Cut the rows in block, use in LIMIT operation
172
    void set_num_rows(size_t length);
173
174
    // Skip the rows in block, use in OFFSET, LIMIT operation
175
    void skip_num_rows(int64_t& offset);
176
177
    /// As the assumption we used around, the number of columns won't exceed int16 range. so no need to worry when we
178
    ///  assign it to int32.
179
18.2M
    uint32_t columns() const { return static_cast<uint32_t>(data.size()); }
180
181
    /// Checks that every column in block is not nullptr and has same number of elements.
182
    void check_number_of_rows(bool allow_null_columns = false) const;
183
184
    Status check_type_and_column() const;
185
186
    /// Approximate number of bytes used by column data in memory.
187
    /// This reflects the actual data footprint (e.g. string contents, numeric arrays)
188
    /// and is the metric used by adaptive batch size byte budgets.
189
    size_t bytes() const;
190
191
    /// Approximate number of allocated (reserved) bytes in memory.
192
    /// This may be larger than bytes() due to pre-allocated capacity in vectors/arenas.
193
    /// Used for memory tracking and profiling.
194
    MOCK_FUNCTION size_t allocated_bytes() const;
195
196
    /** Get a list of column names separated by commas. */
197
    std::string dump_names() const;
198
199
    std::string dump_types() const;
200
201
    /** List of names, types and lengths of columns. Designed for debugging. */
202
    std::string dump_structure() const;
203
204
    /** Get the same block, but empty. */
205
    Block clone_empty() const;
206
207
    Columns get_columns() const;
208
    Columns get_columns_and_convert();
209
210
    Block clone_without_columns(const std::vector<int>* column_offset = nullptr) const;
211
212
    /** Get empty columns with the same types as in block. */
213
    MutableColumns clone_empty_columns() const;
214
215
    /** Get columns from block for mutation. Columns in block will be nullptr. */
216
    MutableColumns mutate_columns();
217
218
    /** Replace columns in a block */
219
    void set_columns(MutableColumns&& columns);
220
    void clear();
221
    void swap(Block& other) noexcept;
222
    void swap(Block&& other) noexcept;
223
224
    // Shuffle columns in place based on the result_column_ids
225
    void shuffle_columns(const std::vector<int>& result_column_ids);
226
227
    // Default column size = -1 means clear all column in block
228
    // Else clear column [0, column_size) delete column [column_size, data.size)
229
    void clear_column_data(int64_t column_size = -1) noexcept;
230
231
15.4k
    MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }
232
233
12
    bool is_empty_column() { return data.empty(); }
234
235
2.77M
    bool empty() const { return rows() == 0; }
236
237
    /** 
238
      * Updates SipHash of the Block, using update method of columns.
239
      * Returns hash for block, that could be used to differentiate blocks
240
      *  with same structure, but different data.
241
      */
242
    void update_hash(SipHash& hash) const;
243
244
    /** 
245
     *  Get block data in string. 
246
     *  If code is in default_implementation_for_nulls or something likely, type and column's nullity could
247
     *   temporarily be not same. set allow_null_mismatch to true to dump it correctly.
248
    */
249
    std::string dump_data(size_t begin = 0, size_t row_limit = 100,
250
                          bool allow_null_mismatch = false) const;
251
252
    std::string dump_data_json(size_t begin = 0, size_t row_limit = 100,
253
                               bool allow_null_mismatch = false) const;
254
255
    /** Get one line data from block, only use in load data */
256
    std::string dump_one_line(size_t row, int column_end) const;
257
258
    Status append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const;
259
260
    // need exception safety
261
    static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
262
                                      const IColumn::Filter& filter);
263
    // need exception safety
264
    static void filter_block_internal(Block* block, const IColumn::Filter& filter,
265
                                      uint32_t column_to_keep);
266
    // need exception safety
267
    static void filter_block_internal(Block* block, const IColumn::Filter& filter);
268
269
    static Status filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter,
270
                               size_t filter_column_id, size_t column_to_keep);
271
272
    static Status filter_block(Block* block, size_t filter_column_id, size_t column_to_keep);
273
274
707
    static void erase_useless_column(Block* block, size_t column_to_keep) {
275
707
        block->erase_tail(column_to_keep);
276
707
    }
277
278
    // serialize block to PBlock
279
    Status serialize(int be_exec_version, PBlock* pblock, size_t* uncompressed_bytes,
280
                     size_t* compressed_bytes, int64_t* compress_time,
281
                     segment_v2::CompressionTypePB compression_type,
282
                     bool allow_transfer_large_data = false) const;
283
284
    Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, int64_t* decompress_time);
285
286
    std::unique_ptr<Block> create_same_struct_block(size_t size, bool is_reserve = false) const;
287
288
    /** Compares (*this) n-th row and rhs m-th row.
289
      * Returns negative number, 0, or positive number  (*this) n-th row is less, equal, greater than rhs m-th row respectively.
290
      * Is used in sortings.
291
      *
292
      * If one of element's value is NaN or NULLs, then:
293
      * - if nan_direction_hint == -1, NaN and NULLs are considered as least than everything other;
294
      * - if nan_direction_hint ==  1, NaN and NULLs are considered as greatest than everything other.
295
      * For example, if nan_direction_hint == -1 is used by descending sorting, NaNs will be at the end.
296
      *
297
      * For non Nullable and non floating point types, nan_direction_hint is ignored.
298
      */
299
3
    int compare_at(size_t n, size_t m, const Block& rhs, int nan_direction_hint) const {
300
3
        DCHECK_EQ(columns(), rhs.columns());
301
3
        return compare_at(n, m, columns(), rhs, nan_direction_hint);
302
3
    }
303
304
    int compare_at(size_t n, size_t m, size_t num_columns, const Block& rhs,
305
7.15M
                   int nan_direction_hint) const {
306
7.15M
        DCHECK_GE(columns(), num_columns);
307
7.15M
        DCHECK_GE(rhs.columns(), num_columns);
308
309
7.15M
        DCHECK_LE(n, rows());
310
7.15M
        DCHECK_LE(m, rhs.rows());
311
9.94M
        for (size_t i = 0; i < num_columns; ++i) {
312
7.19M
            DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type));
313
7.19M
            auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column),
314
7.19M
                                                             nan_direction_hint);
315
7.19M
            if (res) {
316
4.40M
                return res;
317
4.40M
            }
318
7.19M
        }
319
2.74M
        return 0;
320
7.15M
    }
321
322
    int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns,
323
2
                   const Block& rhs, int nan_direction_hint) const {
324
2
        DCHECK_GE(columns(), compare_columns->size());
325
2
        DCHECK_GE(rhs.columns(), compare_columns->size());
326
327
2
        DCHECK_LE(n, rows());
328
2
        DCHECK_LE(m, rhs.rows());
329
3
        for (auto i : *compare_columns) {
330
3
            DCHECK(get_by_position(i).type->equals(*rhs.get_by_position(i).type));
331
3
            auto res = get_by_position(i).column->compare_at(n, m, *(rhs.get_by_position(i).column),
332
3
                                                             nan_direction_hint);
333
3
            if (res) {
334
2
                return res;
335
2
            }
336
3
        }
337
0
        return 0;
338
2
    }
339
340
    //note(wb) no DCHECK here, because this method is only used after compare_at now, so no need to repeat check here.
341
    // If this method is used in more places, you can add DCHECK case by case.
342
    int compare_column_at(size_t n, size_t m, size_t col_idx, const Block& rhs,
343
26.7k
                          int nan_direction_hint) const {
344
26.7k
        auto res = get_by_position(col_idx).column->compare_at(
345
26.7k
                n, m, *(rhs.get_by_position(col_idx).column), nan_direction_hint);
346
26.7k
        return res;
347
26.7k
    }
348
349
    // for String type or Array<String> type
350
    void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx);
351
352
    void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
353
                                   bool need_keep_first);
354
355
    // Helper: sum byte_size() of all mutable columns.
356
    // Unlike Block::bytes() which operates on immutable ColumnPtr,
357
    // this works on MutableColumns during block construction (e.g. in BlockReader).
358
300k
    static inline size_t columns_byte_size(const MutableColumns& cols) {
359
300k
        size_t total = 0;
360
605k
        for (const auto& col : cols) {
361
605k
            total += col->byte_size();
362
605k
        }
363
300k
        return total;
364
300k
    }
365
366
private:
367
    void erase_impl(size_t position);
368
};
369
370
using Blocks = std::vector<Block>;
371
using BlocksList = std::list<Block>;
372
using BlocksPtr = std::shared_ptr<Blocks>;
373
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
374
375
class MutableBlock {
376
    ENABLE_FACTORY_CREATOR(MutableBlock);
377
378
private:
379
    MutableColumns _columns;
380
    DataTypes _data_types;
381
    std::vector<std::string> _names;
382
383
public:
384
48.7k
    static MutableBlock build_mutable_block(Block* block) {
385
48.7k
        return block == nullptr ? MutableBlock() : MutableBlock(block);
386
48.7k
    }
387
72.2k
    MutableBlock() = default;
388
217k
    ~MutableBlock() = default;
389
390
    MutableBlock(Block* block)
391
48.9k
            : _columns(block->mutate_columns()),
392
48.9k
              _data_types(block->get_data_types()),
393
48.9k
              _names(block->get_names()) {}
394
    MutableBlock(Block&& block)
395
96.7k
            : _columns(block.mutate_columns()),
396
96.7k
              _data_types(block.get_data_types()),
397
96.7k
              _names(block.get_names()) {}
398
399
96.1k
    void operator=(MutableBlock&& m_block) {
400
96.1k
        _columns = std::move(m_block._columns);
401
96.1k
        _data_types = std::move(m_block._data_types);
402
96.1k
        _names = std::move(m_block._names);
403
96.1k
    }
404
405
    size_t rows() const;
406
422
    size_t columns() const { return _columns.size(); }
407
408
144k
    bool empty() const { return rows() == 0; }
409
410
48.6k
    MutableColumns& mutable_columns() { return _columns; }
411
412
1
    void set_mutable_columns(MutableColumns&& columns) { _columns = std::move(columns); }
413
414
0
    DataTypes& data_types() { return _data_types; }
415
416
128
    MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; }
417
60
    const MutableColumnPtr& get_column_by_position(size_t position) const {
418
60
        return _columns[position];
419
60
    }
420
421
5
    DataTypePtr& get_datatype_by_position(size_t position) { return _data_types[position]; }
422
22
    const DataTypePtr& get_datatype_by_position(size_t position) const {
423
22
        return _data_types[position];
424
22
    }
425
426
38
    int compare_one_column(size_t n, size_t m, size_t column_id, int nan_direction_hint) const {
427
38
        DCHECK_LE(column_id, columns());
428
38
        DCHECK_LE(n, rows());
429
38
        DCHECK_LE(m, rows());
430
38
        auto& column = get_column_by_position(column_id);
431
38
        return column->compare_at(n, m, *column, nan_direction_hint);
432
38
    }
433
434
    int compare_at(size_t n, size_t m, size_t num_columns, const MutableBlock& rhs,
435
6
                   int nan_direction_hint) const {
436
6
        DCHECK_GE(columns(), num_columns);
437
6
        DCHECK_GE(rhs.columns(), num_columns);
438
439
6
        DCHECK_LE(n, rows());
440
6
        DCHECK_LE(m, rhs.rows());
441
14
        for (size_t i = 0; i < num_columns; ++i) {
442
11
            DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
443
11
            auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)),
444
11
                                                             nan_direction_hint);
445
11
            if (res) {
446
3
                return res;
447
3
            }
448
11
        }
449
3
        return 0;
450
6
    }
451
452
    int compare_at(size_t n, size_t m, const std::vector<uint32_t>* compare_columns,
453
0
                   const MutableBlock& rhs, int nan_direction_hint) const {
454
0
        DCHECK_GE(columns(), compare_columns->size());
455
0
        DCHECK_GE(rhs.columns(), compare_columns->size());
456
457
0
        DCHECK_LE(n, rows());
458
0
        DCHECK_LE(m, rhs.rows());
459
0
        for (auto i : *compare_columns) {
460
0
            DCHECK(get_datatype_by_position(i)->equals(*rhs.get_datatype_by_position(i)));
461
0
            auto res = get_column_by_position(i)->compare_at(n, m, *(rhs.get_column_by_position(i)),
462
0
                                                             nan_direction_hint);
463
0
            if (res) {
464
0
                return res;
465
0
            }
466
0
        }
467
0
        return 0;
468
0
    }
469
470
3
    std::string dump_types() const {
471
3
        std::string res;
472
9
        for (auto type : _data_types) {
473
9
            if (!res.empty()) {
474
6
                res += ", ";
475
6
            }
476
9
            res += type->get_name();
477
9
        }
478
3
        return res;
479
3
    }
480
481
    template <typename T>
482
127
    [[nodiscard]] Status merge(T&& block) {
483
127
        RETURN_IF_CATCH_EXCEPTION(return merge_impl(block););
484
127
    }
_ZN5doris12MutableBlock5mergeIRNS_5BlockEEENS_6StatusEOT_
Line
Count
Source
482
49
    [[nodiscard]] Status merge(T&& block) {
483
49
        RETURN_IF_CATCH_EXCEPTION(return merge_impl(block););
484
49
    }
_ZN5doris12MutableBlock5mergeINS_5BlockEEENS_6StatusEOT_
Line
Count
Source
482
78
    [[nodiscard]] Status merge(T&& block) {
483
78
        RETURN_IF_CATCH_EXCEPTION(return merge_impl(block););
484
78
    }
485
486
    template <typename T>
487
48.0k
    [[nodiscard]] Status merge_ignore_overflow(T&& block) {
488
48.0k
        RETURN_IF_CATCH_EXCEPTION(return merge_impl_ignore_overflow(block););
489
48.0k
    }
490
491
    // only use for join. call ignore_overflow to prevent from throw exception in join
492
    template <typename T>
493
48.0k
    [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
494
48.0k
        if (_columns.size() != block.columns()) {
495
1
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
496
1
                    "Merge block not match, self column count: {}, [columns: {}, types: {}], "
497
1
                    "input column count: {}, [columns: {}, "
498
1
                    "types: {}], ",
499
1
                    _columns.size(), dump_names(), dump_types(), block.columns(),
500
1
                    block.dump_names(), block.dump_types());
501
1
        }
502
142k
        for (int i = 0; i < _columns.size(); ++i) {
503
94.2k
            if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
504
1
                throw doris::Exception(doris::ErrorCode::FATAL_ERROR,
505
1
                                       "Merge block not match, self:[columns: {}, types: {}], "
506
1
                                       "input:[columns: {}, types: {}], ",
507
1
                                       dump_names(), dump_types(), block.dump_names(),
508
1
                                       block.dump_types());
509
1
            }
510
94.2k
            _columns[i]->insert_range_from_ignore_overflow(
511
94.2k
                    *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0,
512
94.2k
                    block.rows());
513
94.2k
        }
514
48.0k
        return Status::OK();
515
48.0k
    }
_ZN5doris12MutableBlock26merge_impl_ignore_overflowIRNS_5BlockEEENS_6StatusEOT_
Line
Count
Source
493
48.0k
    [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
494
48.0k
        if (_columns.size() != block.columns()) {
495
1
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
496
1
                    "Merge block not match, self column count: {}, [columns: {}, types: {}], "
497
1
                    "input column count: {}, [columns: {}, "
498
1
                    "types: {}], ",
499
1
                    _columns.size(), dump_names(), dump_types(), block.columns(),
500
1
                    block.dump_names(), block.dump_types());
501
1
        }
502
142k
        for (int i = 0; i < _columns.size(); ++i) {
503
94.2k
            if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
504
0
                throw doris::Exception(doris::ErrorCode::FATAL_ERROR,
505
0
                                       "Merge block not match, self:[columns: {}, types: {}], "
506
0
                                       "input:[columns: {}, types: {}], ",
507
0
                                       dump_names(), dump_types(), block.dump_names(),
508
0
                                       block.dump_types());
509
0
            }
510
94.2k
            _columns[i]->insert_range_from_ignore_overflow(
511
94.2k
                    *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0,
512
94.2k
                    block.rows());
513
94.2k
        }
514
48.0k
        return Status::OK();
515
48.0k
    }
_ZN5doris12MutableBlock26merge_impl_ignore_overflowINS_5BlockEEENS_6StatusEOT_
Line
Count
Source
493
1
    [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
494
1
        if (_columns.size() != block.columns()) {
495
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
496
0
                    "Merge block not match, self column count: {}, [columns: {}, types: {}], "
497
0
                    "input column count: {}, [columns: {}, "
498
0
                    "types: {}], ",
499
0
                    _columns.size(), dump_names(), dump_types(), block.columns(),
500
0
                    block.dump_names(), block.dump_types());
501
0
        }
502
3
        for (int i = 0; i < _columns.size(); ++i) {
503
3
            if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
504
1
                throw doris::Exception(doris::ErrorCode::FATAL_ERROR,
505
1
                                       "Merge block not match, self:[columns: {}, types: {}], "
506
1
                                       "input:[columns: {}, types: {}], ",
507
1
                                       dump_names(), dump_types(), block.dump_names(),
508
1
                                       block.dump_types());
509
1
            }
510
2
            _columns[i]->insert_range_from_ignore_overflow(
511
2
                    *block.get_by_position(i).column->convert_to_full_column_if_const().get(), 0,
512
2
                    block.rows());
513
2
        }
514
0
        return Status::OK();
515
1
    }
516
517
    template <typename T>
518
129
    [[nodiscard]] Status merge_impl(T&& block) {
519
        // merge is not supported in dynamic block
520
129
        if (_columns.empty() && _data_types.empty()) {
521
40
            _data_types = block.get_data_types();
522
40
            _names = block.get_names();
523
40
            _columns.resize(block.columns());
524
139
            for (size_t i = 0; i < block.columns(); ++i) {
525
99
                if (block.get_by_position(i).column) {
526
98
                    _columns[i] = (*std::move(block.get_by_position(i)
527
98
                                                      .column->convert_to_full_column_if_const()))
528
98
                                          .mutate();
529
98
                } else {
530
1
                    _columns[i] = _data_types[i]->create_column();
531
1
                }
532
99
            }
533
89
        } else {
534
89
            if (_columns.size() != block.columns()) {
535
1
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
536
1
                        "Merge block not match, self column count: {}, [columns: {}, types: {}], "
537
1
                        "input column count: {}, [columns: {}, "
538
1
                        "types: {}], ",
539
1
                        _columns.size(), dump_names(), dump_types(), block.columns(),
540
1
                        block.dump_names(), block.dump_types());
541
1
            }
542
233
            for (int i = 0; i < _columns.size(); ++i) {
543
145
                if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
544
1
                    DCHECK(_data_types[i]->is_nullable())
545
0
                            << " target type: " << _data_types[i]->get_name()
546
0
                            << " src type: " << block.get_by_position(i).type->get_name();
547
1
                    DCHECK(((DataTypeNullable*)_data_types[i].get())
548
1
                                   ->get_nested_type()
549
1
                                   ->equals(*block.get_by_position(i).type));
550
1
                    DCHECK(!block.get_by_position(i).type->is_nullable());
551
1
                    _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column)
552
1
                                                            ->convert_to_full_column_if_const(),
553
1
                                                   0, block.rows());
554
144
                } else {
555
144
                    _columns[i]->insert_range_from(
556
144
                            *block.get_by_position(i)
557
144
                                     .column->convert_to_full_column_if_const()
558
144
                                     .get(),
559
144
                            0, block.rows());
560
144
                }
561
145
            }
562
88
        }
563
128
        return Status::OK();
564
129
    }
_ZN5doris12MutableBlock10merge_implIRNS_5BlockEEENS_6StatusEOT_
Line
Count
Source
518
127
    [[nodiscard]] Status merge_impl(T&& block) {
519
        // merge is not supported in dynamic block
520
127
        if (_columns.empty() && _data_types.empty()) {
521
40
            _data_types = block.get_data_types();
522
40
            _names = block.get_names();
523
40
            _columns.resize(block.columns());
524
139
            for (size_t i = 0; i < block.columns(); ++i) {
525
99
                if (block.get_by_position(i).column) {
526
98
                    _columns[i] = (*std::move(block.get_by_position(i)
527
98
                                                      .column->convert_to_full_column_if_const()))
528
98
                                          .mutate();
529
98
                } else {
530
1
                    _columns[i] = _data_types[i]->create_column();
531
1
                }
532
99
            }
533
87
        } else {
534
87
            if (_columns.size() != block.columns()) {
535
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
536
0
                        "Merge block not match, self column count: {}, [columns: {}, types: {}], "
537
0
                        "input column count: {}, [columns: {}, "
538
0
                        "types: {}], ",
539
0
                        _columns.size(), dump_names(), dump_types(), block.columns(),
540
0
                        block.dump_names(), block.dump_types());
541
0
            }
542
229
            for (int i = 0; i < _columns.size(); ++i) {
543
142
                if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
544
0
                    DCHECK(_data_types[i]->is_nullable())
545
0
                            << " target type: " << _data_types[i]->get_name()
546
0
                            << " src type: " << block.get_by_position(i).type->get_name();
547
0
                    DCHECK(((DataTypeNullable*)_data_types[i].get())
548
0
                                   ->get_nested_type()
549
0
                                   ->equals(*block.get_by_position(i).type));
550
0
                    DCHECK(!block.get_by_position(i).type->is_nullable());
551
0
                    _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column)
552
0
                                                            ->convert_to_full_column_if_const(),
553
0
                                                   0, block.rows());
554
142
                } else {
555
142
                    _columns[i]->insert_range_from(
556
142
                            *block.get_by_position(i)
557
142
                                     .column->convert_to_full_column_if_const()
558
142
                                     .get(),
559
142
                            0, block.rows());
560
142
                }
561
142
            }
562
87
        }
563
127
        return Status::OK();
564
127
    }
_ZN5doris12MutableBlock10merge_implINS_5BlockEEENS_6StatusEOT_
Line
Count
Source
518
2
    [[nodiscard]] Status merge_impl(T&& block) {
519
        // merge is not supported in dynamic block
520
2
        if (_columns.empty() && _data_types.empty()) {
521
0
            _data_types = block.get_data_types();
522
0
            _names = block.get_names();
523
0
            _columns.resize(block.columns());
524
0
            for (size_t i = 0; i < block.columns(); ++i) {
525
0
                if (block.get_by_position(i).column) {
526
0
                    _columns[i] = (*std::move(block.get_by_position(i)
527
0
                                                      .column->convert_to_full_column_if_const()))
528
0
                                          .mutate();
529
0
                } else {
530
0
                    _columns[i] = _data_types[i]->create_column();
531
0
                }
532
0
            }
533
2
        } else {
534
2
            if (_columns.size() != block.columns()) {
535
1
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
536
1
                        "Merge block not match, self column count: {}, [columns: {}, types: {}], "
537
1
                        "input column count: {}, [columns: {}, "
538
1
                        "types: {}], ",
539
1
                        _columns.size(), dump_names(), dump_types(), block.columns(),
540
1
                        block.dump_names(), block.dump_types());
541
1
            }
542
4
            for (int i = 0; i < _columns.size(); ++i) {
543
3
                if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
544
1
                    DCHECK(_data_types[i]->is_nullable())
545
0
                            << " target type: " << _data_types[i]->get_name()
546
0
                            << " src type: " << block.get_by_position(i).type->get_name();
547
1
                    DCHECK(((DataTypeNullable*)_data_types[i].get())
548
1
                                   ->get_nested_type()
549
1
                                   ->equals(*block.get_by_position(i).type));
550
1
                    DCHECK(!block.get_by_position(i).type->is_nullable());
551
1
                    _columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column)
552
1
                                                            ->convert_to_full_column_if_const(),
553
1
                                                   0, block.rows());
554
2
                } else {
555
2
                    _columns[i]->insert_range_from(
556
2
                            *block.get_by_position(i)
557
2
                                     .column->convert_to_full_column_if_const()
558
2
                                     .get(),
559
2
                            0, block.rows());
560
2
                }
561
3
            }
562
1
        }
563
1
        return Status::OK();
564
2
    }
565
566
    // move to columns' data to a Block. this will invalidate
567
    Block to_block(int start_column = 0);
568
    Block to_block(int start_column, int end_column);
569
570
    void swap(MutableBlock& other) noexcept;
571
572
    void add_row(const Block* block, int row);
573
    // Batch add row should return error status if allocate memory failed.
574
    Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end,
575
                    const std::vector<int>* column_offset = nullptr);
576
    Status add_rows(const Block* block, size_t row_begin, size_t length);
577
578
    std::string dump_data(size_t row_limit = 100) const;
579
    std::string dump_data_json(size_t row_limit = 100) const;
580
581
85
    void clear() {
582
85
        _columns.clear();
583
85
        _data_types.clear();
584
85
        _names.clear();
585
85
    }
586
587
    // columns resist. columns' inner data removed.
588
    void clear_column_data() noexcept;
589
590
    size_t allocated_bytes() const;
591
592
48.0k
    size_t bytes() const {
593
48.0k
        size_t res = 0;
594
94.0k
        for (const auto& elem : _columns) {
595
94.0k
            res += elem->byte_size();
596
94.0k
        }
597
598
48.0k
        return res;
599
48.0k
    }
600
601
0
    std::vector<std::string>& get_names() { return _names; }
602
603
    /** Get a list of column names separated by commas. */
604
    std::string dump_names() const;
605
};
606
607
struct IteratorRowRef {
608
    std::shared_ptr<Block> block;
609
    int row_pos;
610
    bool is_same;
611
612
    template <typename T>
613
1.06M
    int compare(const IteratorRowRef& rhs, const T& compare_arguments) const {
614
1.06M
        return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1);
615
1.06M
    }
Unexecuted instantiation: _ZNK5doris14IteratorRowRef7compareIPKSt6vectorIjSaIjEEEEiRKS0_RKT_
_ZNK5doris14IteratorRowRef7compareImEEiRKS0_RKT_
Line
Count
Source
613
1.06M
    int compare(const IteratorRowRef& rhs, const T& compare_arguments) const {
614
1.06M
        return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1);
615
1.06M
    }
616
617
512
    void reset() {
618
512
        block = nullptr;
619
512
        row_pos = -1;
620
512
        is_same = false;
621
512
    }
622
};
623
624
using BlockView = std::vector<IteratorRowRef>;
625
using BlockUPtr = std::unique_ptr<Block>;
626
627
} // namespace doris