Coverage Report

Created: 2026-03-26 03:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/beta_rowset_reader.h"
19
20
#include <stddef.h>
21
22
#include <algorithm>
23
#include <memory>
24
#include <ostream>
25
#include <roaring/roaring.hh>
26
#include <set>
27
#include <string>
28
#include <unordered_map>
29
#include <utility>
30
31
#include "common/logging.h"
32
#include "common/status.h"
33
#include "core/block/block.h"
34
#include "io/io_common.h"
35
#include "runtime/descriptors.h"
36
#include "runtime/runtime_profile.h"
37
#include "storage/cache/schema_cache.h"
38
#include "storage/delete/delete_handler.h"
39
#include "storage/iterator/vgeneric_iterators.h"
40
#include "storage/olap_define.h"
41
#include "storage/predicate/block_column_predicate.h"
42
#include "storage/predicate/column_predicate.h"
43
#include "storage/row_cursor.h"
44
#include "storage/rowset/rowset_meta.h"
45
#include "storage/rowset/rowset_reader_context.h"
46
#include "storage/schema.h"
47
#include "storage/segment/lazy_init_segment_iterator.h"
48
#include "storage/segment/segment.h"
49
#include "storage/tablet/tablet_meta.h"
50
#include "storage/tablet/tablet_schema.h"
51
52
namespace doris {
53
#include "common/compile_check_begin.h"
54
using namespace ErrorCode;
55
56
BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset)
57
910
        : _read_context(nullptr), _rowset(std::move(rowset)), _stats(&_owned_stats) {
58
910
    _rowset->acquire();
59
910
}
60
61
941
void BetaRowsetReader::reset_read_options() {
62
941
    _read_options.delete_condition_predicates = AndBlockColumnPredicate::create_shared();
63
941
    _read_options.column_predicates.clear();
64
941
    _read_options.col_id_to_predicates.clear();
65
941
    _read_options.del_predicates_for_zone_map.clear();
66
941
    _read_options.key_ranges.clear();
67
941
}
68
69
0
RowsetReaderSharedPtr BetaRowsetReader::clone() {
70
0
    return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset));
71
0
}
72
73
0
void BetaRowsetReader::update_profile(RuntimeProfile* profile) {
74
0
    if (_iterator != nullptr) {
75
0
        _iterator->update_profile(profile);
76
0
    }
77
0
}
78
79
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context,
80
                                               std::vector<RowwiseIteratorUPtr>* out_iters,
81
1.18k
                                               bool use_cache) {
82
1.18k
    _read_context = read_context;
83
    // The segment iterator is created with its own statistics,
84
    // and the member variable '_stats'  is initialized by '_stats(&owned_stats)'.
85
    // The choice of statistics used depends on the workload of the rowset reader.
86
    // For instance, if it's for query, the get_segment_iterators function
87
    // will receive one valid read_context with corresponding valid statistics,
88
    // and we will use those statistics.
89
    // However, for compaction or schema change workloads,
90
    // the read_context passed to the function will have null statistics,
91
    // and in such cases we will try to use the beta rowset reader's own statistics.
92
1.18k
    if (_read_context->stats != nullptr) {
93
1.09k
        _stats = _read_context->stats;
94
1.09k
    }
95
1.18k
    SCOPED_RAW_TIMER(&_stats->rowset_reader_get_segment_iterators_timer_ns);
96
97
1.18k
    RETURN_IF_ERROR(_rowset->load());
98
99
    // convert RowsetReaderContext to StorageReadOptions
100
1.18k
    _read_options.block_row_max = read_context->batch_size;
101
1.18k
    _read_options.preferred_block_size_bytes = read_context->preferred_block_size_bytes;
102
1.18k
    _read_options.preferred_max_col_bytes = read_context->preferred_max_col_bytes;
103
1.18k
    if (read_context->origin_return_columns != nullptr) {
104
1.08k
        _read_options.adaptive_batch_output_columns = *read_context->origin_return_columns;
105
1.08k
    }
106
1.18k
    _read_options.stats = _stats;
107
1.18k
    _read_options.push_down_agg_type_opt = _read_context->push_down_agg_type_opt;
108
1.18k
    _read_options.remaining_conjunct_roots = _read_context->remaining_conjunct_roots;
109
1.18k
    _read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
110
1.18k
    _read_options.virtual_column_exprs = _read_context->virtual_column_exprs;
111
112
1.18k
    _read_options.all_access_paths = _read_context->all_access_paths;
113
1.18k
    _read_options.predicate_access_paths = _read_context->predicate_access_paths;
114
115
1.18k
    _read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
116
1.18k
    _read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block;
117
1.18k
    _read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
118
1.18k
    _read_options.score_runtime = _read_context->score_runtime;
119
1.18k
    _read_options.collection_statistics = _read_context->collection_statistics;
120
1.18k
    _read_options.rowset_id = _rowset->rowset_id();
121
1.18k
    _read_options.version = _rowset->version();
122
1.18k
    _read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
123
1.18k
    _read_options.topn_limit = _topn_limit;
124
1.18k
    if (_read_context->lower_bound_keys != nullptr) {
125
1.08k
        for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) {
126
0
            _read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i),
127
0
                                                  _read_context->is_lower_keys_included->at(i),
128
0
                                                  &_read_context->upper_bound_keys->at(i),
129
0
                                                  _read_context->is_upper_keys_included->at(i));
130
0
        }
131
1.08k
    }
132
133
    // delete_hanlder is always set, but it maybe not init, so that it will return empty conditions
134
    // or predicates when it is not inited.
135
1.18k
    if (_read_context->delete_handler != nullptr) {
136
1.08k
        _read_context->delete_handler->get_delete_conditions_after_version(
137
1.08k
                _rowset->end_version(), _read_options.delete_condition_predicates.get(),
138
1.08k
                &_read_options.del_predicates_for_zone_map);
139
1.08k
    }
140
141
1.18k
    std::vector<uint32_t> read_columns;
142
1.18k
    std::set<uint32_t> read_columns_set;
143
1.18k
    std::set<uint32_t> delete_columns_set;
144
4.90k
    for (int i = 0; i < _read_context->return_columns->size(); ++i) {
145
3.71k
        read_columns.push_back(_read_context->return_columns->at(i));
146
3.71k
        read_columns_set.insert(_read_context->return_columns->at(i));
147
3.71k
    }
148
1.18k
    _read_options.delete_condition_predicates->get_all_column_ids(delete_columns_set);
149
1.18k
    for (auto cid : delete_columns_set) {
150
406
        if (read_columns_set.find(cid) == read_columns_set.end()) {
151
254
            read_columns.push_back(cid);
152
254
        }
153
406
    }
154
    // disable condition cache if you have delete condition
155
1.18k
    _read_context->condition_cache_digest =
156
1.18k
            delete_columns_set.empty() ? _read_context->condition_cache_digest : 0;
157
    // create segment iterators
158
1.18k
    VLOG_NOTICE << "read columns size: " << read_columns.size();
159
1.18k
    _input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
160
    // output_schema only contains return_columns (excludes extra columns like delete-predicate columns).
161
    // It is used by merge/union iterators to determine how many columns to copy to the output block.
162
1.18k
    _output_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(),
163
1.18k
                                              *(_read_context->return_columns));
164
1.18k
    if (_read_context->predicates != nullptr) {
165
1.08k
        _read_options.column_predicates.insert(_read_options.column_predicates.end(),
166
1.08k
                                               _read_context->predicates->begin(),
167
1.08k
                                               _read_context->predicates->end());
168
1.08k
        for (auto pred : *(_read_context->predicates)) {
169
0
            if (_read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
170
0
                _read_options.col_id_to_predicates.insert(
171
0
                        {pred->column_id(), AndBlockColumnPredicate::create_shared()});
172
0
            }
173
0
            _read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
174
0
                    SingleColumnBlockPredicate::create_unique(pred));
175
0
        }
176
1.08k
    }
177
178
    // Take a delete-bitmap for each segment, the bitmap contains all deletes
179
    // until the max read version, which is read_context->version.second
180
1.18k
    if (_read_context->delete_bitmap != nullptr) {
181
5
        {
182
5
            SCOPED_RAW_TIMER(&_stats->delete_bitmap_get_agg_ns);
183
5
            RowsetId rowset_id = rowset()->rowset_id();
184
39
            for (uint32_t seg_id = 0; seg_id < rowset()->num_segments(); ++seg_id) {
185
34
                auto d = _read_context->delete_bitmap->get_agg(
186
34
                        {rowset_id, seg_id, _read_context->version.second});
187
34
                if (d->isEmpty()) {
188
11
                    continue; // Empty delete bitmap for the segment
189
11
                }
190
23
                VLOG_TRACE << "Get the delete bitmap for rowset: " << rowset_id.to_string()
191
0
                           << ", segment id:" << seg_id << ", size:" << d->cardinality();
192
23
                _read_options.delete_bitmap.emplace(seg_id, std::move(d));
193
23
            }
194
5
        }
195
5
    }
196
197
1.18k
    if (_should_push_down_value_predicates()) {
198
        // sequence mapping currently only support merge on read, so can not push down value predicates
199
603
        if (_read_context->value_predicates != nullptr &&
200
603
            !read_context->tablet_schema->has_seq_map()) {
201
538
            _read_options.column_predicates.insert(_read_options.column_predicates.end(),
202
538
                                                   _read_context->value_predicates->begin(),
203
538
                                                   _read_context->value_predicates->end());
204
538
            for (auto pred : *(_read_context->value_predicates)) {
205
0
                if (_read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
206
0
                    _read_options.col_id_to_predicates.insert(
207
0
                            {pred->column_id(), AndBlockColumnPredicate::create_shared()});
208
0
                }
209
0
                _read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
210
0
                        SingleColumnBlockPredicate::create_unique(pred));
211
0
            }
212
538
        }
213
603
    }
214
1.18k
    _read_options.use_page_cache = _read_context->use_page_cache;
215
1.18k
    _read_options.tablet_schema = _read_context->tablet_schema;
216
1.18k
    _read_options.enable_unique_key_merge_on_write =
217
1.18k
            _read_context->enable_unique_key_merge_on_write;
218
1.18k
    _read_options.record_rowids = _read_context->record_rowids;
219
1.18k
    _read_options.topn_filter_source_node_ids = _read_context->topn_filter_source_node_ids;
220
1.18k
    _read_options.topn_filter_target_node_id = _read_context->topn_filter_target_node_id;
221
1.18k
    _read_options.read_orderby_key_reverse = _read_context->read_orderby_key_reverse;
222
1.18k
    _read_options.read_orderby_key_columns = _read_context->read_orderby_key_columns;
223
1.18k
    _read_options.io_ctx.reader_type = _read_context->reader_type;
224
1.18k
    _read_options.io_ctx.file_cache_stats = &_stats->file_cache_stats;
225
1.18k
    _read_options.runtime_state = _read_context->runtime_state;
226
1.18k
    _read_options.output_columns = _read_context->output_columns;
227
1.18k
    _read_options.io_ctx.reader_type = _read_context->reader_type;
228
1.18k
    _read_options.io_ctx.is_disposable = _read_context->reader_type != ReaderType::READER_QUERY;
229
1.18k
    _read_options.target_cast_type_for_variants = _read_context->target_cast_type_for_variants;
230
1.18k
    if (_read_context->runtime_state != nullptr) {
231
0
        _read_options.io_ctx.query_id = &_read_context->runtime_state->query_id();
232
0
        _read_options.io_ctx.read_file_cache =
233
0
                _read_context->runtime_state->query_options().enable_file_cache;
234
0
        _read_options.io_ctx.is_disposable =
235
0
                _read_context->runtime_state->query_options().disable_file_cache;
236
0
    }
237
238
1.18k
    if (_read_context->condition_cache_digest) {
239
0
        for (const auto& key_range : _read_options.key_ranges) {
240
0
            _read_context->condition_cache_digest =
241
0
                    key_range.get_digest(_read_context->condition_cache_digest);
242
0
        }
243
0
        _read_options.condition_cache_digest = _read_context->condition_cache_digest;
244
0
    }
245
246
1.18k
    _read_options.io_ctx.expiration_time = read_context->ttl_seconds;
247
248
1.18k
    bool enable_segment_cache = true;
249
1.18k
    auto* state = read_context->runtime_state;
250
1.18k
    if (state != nullptr) {
251
0
        enable_segment_cache = state->query_options().__isset.enable_segment_cache
252
0
                                       ? state->query_options().enable_segment_cache
253
0
                                       : true;
254
0
    }
255
    // When reader type is for query, session variable `enable_segment_cache` should be respected.
256
1.18k
    bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
257
1.18k
                                          enable_segment_cache);
258
259
1.18k
    auto segment_count = _rowset->num_segments();
260
1.18k
    auto [seg_start, seg_end] = _segment_offsets;
261
    // If seg_start == seg_end, it means that the segments of a rowset is not
262
    // split scanned by multiple scanners, and the rowset reader is used to read the whole rowset.
263
1.18k
    if (seg_start == seg_end) {
264
1.18k
        seg_start = 0;
265
1.18k
        seg_end = segment_count;
266
1.18k
    }
267
1.18k
    if (_read_context->record_rowids && _read_context->rowid_conversion) {
268
        // init segment rowid map for rowid conversion
269
396
        std::vector<uint32_t> segment_rows;
270
396
        RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats));
271
396
        RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
272
396
                                                                          segment_rows));
273
396
    }
274
275
6.79k
    for (int64_t i = seg_start; i < seg_end; i++) {
276
5.60k
        SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
277
5.60k
        std::unique_ptr<RowwiseIterator> iter;
278
279
        /// For iterators, we don't need to initialize them all at once when creating them.
280
        /// Instead, we should initialize each iterator separately when really using them.
281
        /// This optimization minimizes the lifecycle of resources like column readers
282
        /// and prevents excessive memory consumption, especially for wide tables.
283
5.60k
        if (_segment_row_ranges.empty()) {
284
5.60k
            _read_options.row_ranges.clear();
285
5.60k
            iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
286
5.60k
                                                             _input_schema, _read_options);
287
5.60k
        } else {
288
0
            DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
289
0
            auto local_options = _read_options;
290
0
            local_options.row_ranges = _segment_row_ranges[i - seg_start];
291
0
            if (local_options.condition_cache_digest) {
292
0
                local_options.condition_cache_digest =
293
0
                        local_options.row_ranges.get_digest(local_options.condition_cache_digest);
294
0
            }
295
0
            iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
296
0
                                                             _input_schema, local_options);
297
0
        }
298
299
5.60k
        if (iter->empty()) {
300
0
            continue;
301
0
        }
302
5.60k
        out_iters->push_back(std::move(iter));
303
5.60k
    }
304
305
1.18k
    return Status::OK();
306
1.18k
}
307
308
247
Status BetaRowsetReader::init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) {
309
247
    _read_context = read_context;
310
247
    _read_context->rowset_id = _rowset->rowset_id();
311
247
    _segment_offsets = rs_splits.segment_offsets;
312
247
    _segment_row_ranges = rs_splits.segment_row_ranges;
313
247
    return Status::OK();
314
247
}
315
316
6.74k
Status BetaRowsetReader::_init_iterator_once() {
317
6.74k
    return _init_iter_once.call([this] { return _init_iterator(); });
318
6.74k
}
319
320
247
Status BetaRowsetReader::_init_iterator() {
321
247
    std::vector<RowwiseIteratorUPtr> iterators;
322
247
    RETURN_IF_ERROR(get_segment_iterators(_read_context, &iterators));
323
324
247
    SCOPED_RAW_TIMER(&_stats->rowset_reader_init_iterators_timer_ns);
325
326
247
    if (_read_context->merged_rows == nullptr) {
327
103
        _read_context->merged_rows = &_merged_rows;
328
103
    }
329
    // merge or union segment iterator
330
247
    if (is_merge_iterator()) {
331
8
        auto sequence_loc = -1;
332
8
        if (_read_context->sequence_id_idx != -1) {
333
0
            for (int loc = 0; loc < _read_context->return_columns->size(); loc++) {
334
0
                if (_read_context->return_columns->at(loc) == _read_context->sequence_id_idx) {
335
0
                    sequence_loc = loc;
336
0
                    break;
337
0
                }
338
0
            }
339
0
        }
340
8
        _iterator = new_merge_iterator(std::move(iterators), sequence_loc, _read_context->is_unique,
341
8
                                       _read_context->read_orderby_key_reverse,
342
8
                                       _read_context->merged_rows, _output_schema);
343
239
    } else {
344
239
        if (_read_context->read_orderby_key_reverse) {
345
            // reverse iterators to read backward for ORDER BY key DESC
346
0
            std::reverse(iterators.begin(), iterators.end());
347
0
        }
348
239
        _iterator = new_union_iterator(std::move(iterators), _output_schema);
349
239
    }
350
351
247
    auto s = _iterator->init(_read_options);
352
247
    if (!s.ok()) {
353
0
        LOG(WARNING) << "failed to init iterator: " << s.to_string();
354
0
        _iterator.reset();
355
0
        return Status::Error<ROWSET_READER_INIT>(s.to_string());
356
0
    }
357
247
    return Status::OK();
358
247
}
359
360
1.18k
bool BetaRowsetReader::_should_push_down_value_predicates() const {
361
    // if unique table with rowset [0-x] or [0-1] [2-y] [...],
362
    // value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y]
363
    // must be compaction, not overlapping and don't have sequence column
364
1.18k
    return _rowset->keys_type() == UNIQUE_KEYS &&
365
1.18k
           (((_rowset->start_version() == 0 || _rowset->start_version() == 2) &&
366
673
             !_rowset->_rowset_meta->is_segments_overlapping() &&
367
673
             _read_context->sequence_id_idx == -1) ||
368
673
            _read_context->enable_unique_key_merge_on_write ||
369
673
            _read_context->enable_mor_value_predicate_pushdown);
370
1.18k
}
371
#include "common/compile_check_end.h"
372
} // namespace doris