Coverage Report

Created: 2025-05-20 19:11

/root/doris/be/src/olap/tablet_reader.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <memory>
26
#include <set>
27
#include <string>
28
#include <unordered_set>
29
#include <utility>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "exprs/function_filter.h"
34
#include "gutil/strings/substitute.h"
35
#include "io/io_common.h"
36
#include "olap/delete_handler.h"
37
#include "olap/iterators.h"
38
#include "olap/olap_common.h"
39
#include "olap/olap_tuple.h"
40
#include "olap/row_cursor.h"
41
#include "olap/rowset/rowset.h"
42
#include "olap/rowset/rowset_meta.h"
43
#include "olap/rowset/rowset_reader.h"
44
#include "olap/rowset/rowset_reader_context.h"
45
#include "olap/tablet_fwd.h"
46
47
namespace doris {
48
49
class RuntimeState;
50
class BitmapFilterFuncBase;
51
class BloomFilterFuncBase;
52
class ColumnPredicate;
53
class DeleteBitmap;
54
class HybridSetBase;
55
class RuntimeProfile;
56
57
namespace vectorized {
58
class VCollectIterator;
59
class Block;
60
class VExpr;
61
class Arena;
62
class VExprContext;
63
} // namespace vectorized
64
65
// Used to compare row with input scan key. Scan key only contains key columns,
66
// row contains all key columns, which is superset of key columns.
67
// So we should compare the common prefix columns of lhs and rhs.
68
//
69
// NOTE: if you are not sure if you can use it, please don't use this function.
70
0
inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) {
71
0
    auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids());
72
0
    for (uint32_t cid = 0; cid < cmp_cids; ++cid) {
73
0
        auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid));
74
0
        if (res != 0) {
75
0
            return res;
76
0
        }
77
0
    }
78
0
    return 0;
79
0
}
80
81
class TabletReader {
82
    struct KeysParam {
83
        std::string to_string() const;
84
85
        std::vector<RowCursor> start_keys;
86
        std::vector<RowCursor> end_keys;
87
        bool start_key_include = false;
88
        bool end_key_include = false;
89
    };
90
91
public:
92
    struct ReadSource {
93
        std::vector<RowSetSplits> rs_splits;
94
        std::vector<RowsetMetaSharedPtr> delete_predicates;
95
        // Fill delete predicates with `rs_splits`
96
        void fill_delete_predicates();
97
    };
98
    // Params for Reader,
99
    // mainly include tablet, data version and fetch range.
100
    struct ReaderParams {
101
0
        bool has_single_version() const {
102
0
            return (rs_splits.size() == 1 &&
103
0
                    rs_splits[0].rs_reader->rowset()->start_version() == 0 &&
104
0
                    !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) ||
105
0
                   (rs_splits.size() == 2 &&
106
0
                    rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 &&
107
0
                    rs_splits[1].rs_reader->rowset()->start_version() == 2 &&
108
0
                    !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
109
0
        }
110
111
140
        void set_read_source(ReadSource read_source) {
112
140
            rs_splits = std::move(read_source.rs_splits);
113
140
            delete_predicates = std::move(read_source.delete_predicates);
114
140
        }
115
116
        BaseTabletSPtr tablet;
117
        TabletSchemaSPtr tablet_schema;
118
        ReaderType reader_type = ReaderType::READER_QUERY;
119
        bool direct_mode = false;
120
        bool aggregation = false;
121
        // for compaction, schema_change, check_sum: we don't use page cache
122
        // for query and config::disable_storage_page_cache is false, we use page cache
123
        bool use_page_cache = false;
124
        Version version = Version(-1, 0);
125
126
        std::vector<OlapTuple> start_key;
127
        std::vector<OlapTuple> end_key;
128
        bool start_key_include = false;
129
        bool end_key_include = false;
130
131
        std::vector<TCondition> conditions;
132
        std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters;
133
        std::vector<std::pair<string, std::shared_ptr<BitmapFilterFuncBase>>> bitmap_filters;
134
        std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>> in_filters;
135
        std::vector<FunctionFilter> function_filters;
136
        std::vector<RowsetMetaSharedPtr> delete_predicates;
137
        // slots that cast may be eliminated in storage layer
138
        std::map<std::string, TypeDescriptor> target_cast_type_for_variants;
139
140
        std::vector<RowSetSplits> rs_splits;
141
        // For unique key table with merge-on-write
142
        DeleteBitmap* delete_bitmap = nullptr;
143
144
        // return_columns is init from query schema
145
        std::vector<uint32_t> return_columns;
146
        // output_columns only contain columns in OrderByExprs and outputExprs
147
        std::set<int32_t> output_columns;
148
        RuntimeProfile* profile = nullptr;
149
        RuntimeState* runtime_state = nullptr;
150
151
        // use only in vec exec engine
152
        std::vector<uint32_t>* origin_return_columns = nullptr;
153
        std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
154
        TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
155
        vectorized::VExpr* remaining_vconjunct_root = nullptr;
156
        std::vector<vectorized::VExprSPtr> remaining_conjunct_roots;
157
        vectorized::VExprContextSPtrs common_expr_ctxs_push_down;
158
159
        // used for compaction to record row ids
160
        bool record_rowids = false;
161
        RowIdConversion* rowid_conversion;
162
        // flag for enable topn opt
163
        bool use_topn_opt = false;
164
        std::vector<int> topn_filter_source_node_ids;
165
        // used for special optimization for query : ORDER BY key LIMIT n
166
        bool read_orderby_key = false;
167
        // used for special optimization for query : ORDER BY key DESC LIMIT n
168
        bool read_orderby_key_reverse = false;
169
        // num of columns for orderby key
170
        size_t read_orderby_key_num_prefix_columns = 0;
171
        // limit of rows for read_orderby_key
172
        size_t read_orderby_key_limit = 0;
173
        // filter_block arguments
174
        vectorized::VExprContextSPtrs filter_block_conjuncts;
175
176
        // for vertical compaction
177
        bool is_key_column_group = false;
178
        std::vector<uint32_t> key_group_cluster_key_idxes;
179
180
        bool is_segcompaction = false;
181
182
        std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
183
184
        void check_validation() const;
185
186
        std::string to_string() const;
187
188
        int64_t batch_size = -1;
189
    };
190
191
162
    TabletReader() = default;
192
193
    virtual ~TabletReader();
194
195
    TabletReader(const TabletReader&) = delete;
196
    void operator=(const TabletReader&) = delete;
197
198
    // Initialize TabletReader with tablet, data version and fetch range.
199
    virtual Status init(const ReaderParams& read_params);
200
201
    // Read next block with aggregation.
202
    // Return OK and set `*eof` to false when next block is read
203
    // Return OK and set `*eof` to true when no more rows can be read.
204
    // Return others when unexpected error happens.
205
0
    virtual Status next_block_with_aggregation(vectorized::Block* block, bool* eof) {
206
0
        return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>(
207
0
                "TabletReader not support next_block_with_aggregation");
208
0
    }
209
210
48
    virtual uint64_t merged_rows() const { return _merged_rows; }
211
212
107
    uint64_t filtered_rows() const {
213
107
        return _stats.rows_del_filtered + _stats.rows_del_by_bitmap +
214
107
               _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered +
215
107
               _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered;
216
107
    }
217
218
0
    void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; }
219
220
294
    int batch_size() const { return _reader_context.batch_size; }
221
222
0
    const OlapReaderStatistics& stats() const { return _stats; }
223
0
    OlapReaderStatistics* mutable_stats() { return &_stats; }
224
225
0
    virtual bool update_profile(RuntimeProfile* profile) { return false; }
226
    static Status init_reader_params_and_create_block(
227
            TabletSharedPtr tablet, ReaderType reader_type,
228
            const std::vector<RowsetSharedPtr>& input_rowsets,
229
            TabletReader::ReaderParams* reader_params, vectorized::Block* block);
230
231
protected:
232
    friend class vectorized::VCollectIterator;
233
    friend class DeleteHandler;
234
235
    Status _init_params(const ReaderParams& read_params);
236
237
    Status _capture_rs_readers(const ReaderParams& read_params);
238
239
    bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers);
240
241
    Status _init_keys_param(const ReaderParams& read_params);
242
243
    Status _init_orderby_keys_param(const ReaderParams& read_params);
244
245
    Status _init_conditions_param(const ReaderParams& read_params);
246
247
    ColumnPredicate* _parse_to_predicate(
248
            const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
249
250
    ColumnPredicate* _parse_to_predicate(
251
            const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter);
252
253
    ColumnPredicate* _parse_to_predicate(
254
            const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter);
255
256
    virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter);
257
258
    Status _init_delete_condition(const ReaderParams& read_params);
259
260
    Status _init_return_columns(const ReaderParams& read_params);
261
262
542
    const BaseTabletSPtr& tablet() { return _tablet; }
263
    // If original column is a variant type column, and it's predicate is normalized
264
    // so in order to get the real type of column predicate, we need to reset type
265
    // according to the related type in `target_cast_type_for_variants`.Since variant is not
266
    // an predicate applicable type.Otherwise return the original tablet column.
267
    // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column
268
    // to type bigint
269
    TabletColumn materialize_column(const TabletColumn& orig);
270
271
240
    const TabletSchema& tablet_schema() { return *_tablet_schema; }
272
273
    std::unique_ptr<vectorized::Arena> _predicate_arena;
274
    std::vector<uint32_t> _return_columns;
275
    // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n
276
    // columns for orderby keys
277
    std::vector<uint32_t> _orderby_key_columns;
278
    // only use in outer join which change the column nullable which must keep same in
279
    // vec query engine
280
    std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr;
281
282
    BaseTabletSPtr _tablet;
283
    RowsetReaderContext _reader_context;
284
    TabletSchemaSPtr _tablet_schema;
285
    KeysParam _keys_param;
286
    std::vector<bool> _is_lower_keys_included;
287
    std::vector<bool> _is_upper_keys_included;
288
    std::vector<ColumnPredicate*> _col_predicates;
289
    std::vector<ColumnPredicate*> _value_col_predicates;
290
    DeleteHandler _delete_handler;
291
292
    bool _aggregation = false;
293
    // for agg query, we don't need to finalize when scan agg object data
294
    ReaderType _reader_type = ReaderType::READER_QUERY;
295
    bool _next_delete_flag = false;
296
    bool _filter_delete = false;
297
    int32_t _sequence_col_idx = -1;
298
    bool _direct_mode = false;
299
300
    std::vector<uint32_t> _key_cids;
301
    std::vector<uint32_t> _value_cids;
302
303
    uint64_t _merged_rows = 0;
304
    OlapReaderStatistics _stats;
305
};
306
307
} // namespace doris