Coverage Report

Created: 2026-06-11 11:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/tablet/tablet_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/tablet/tablet_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <thrift/protocol/TDebugProtocol.h>
23
24
#include <algorithm>
25
#include <functional>
26
#include <iterator>
27
#include <memory>
28
#include <numeric>
29
#include <ostream>
30
#include <shared_mutex>
31
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/exception.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/arena.h"
38
#include "core/block/block.h"
39
#include "exec/common/variant_util.h"
40
#include "exprs/bloom_filter_func.h"
41
#include "exprs/create_predicate_function.h"
42
#include "exprs/hybrid_set.h"
43
#include "runtime/query_context.h"
44
#include "runtime/runtime_predicate.h"
45
#include "runtime/runtime_state.h"
46
#include "storage/index/bloom_filter/bloom_filter.h"
47
#include "storage/itoken_extractor.h"
48
#include "storage/olap_common.h"
49
#include "storage/olap_define.h"
50
#include "storage/predicate/column_predicate.h"
51
#include "storage/predicate/like_column_predicate.h"
52
#include "storage/predicate/predicate_creator.h"
53
#include "storage/row_cursor.h"
54
#include "storage/schema.h"
55
#include "storage/tablet/tablet.h"
56
#include "storage/tablet/tablet_meta.h"
57
#include "storage/tablet/tablet_schema.h"
58
59
namespace doris {
60
using namespace ErrorCode;
61
62
1.07M
void TabletReader::ReaderParams::check_validation() const {
63
1.07M
    if (UNLIKELY(version.first == -1 && is_segcompaction == false)) {
64
0
        throw Exception(Status::FatalError("version is not set. tablet={}", tablet->tablet_id()));
65
0
    }
66
1.07M
}
67
68
1.07M
Status TabletReader::init(const ReaderParams& read_params) {
69
1.07M
    Status res = _init_params(read_params);
70
1.07M
    if (!res.ok()) {
71
0
        LOG(WARNING) << "fail to init reader when init params. res:" << res
72
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
73
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
74
0
                     << ", reader type:" << int(read_params.reader_type)
75
0
                     << ", version:" << read_params.version;
76
0
    }
77
1.07M
    return res;
78
1.07M
}
79
80
1.07M
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
81
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_capture_rs_readers_timer_ns);
82
1.07M
    if (read_params.rs_splits.empty()) {
83
0
        return Status::InternalError("fail to acquire data sources. tablet={}",
84
0
                                     _tablet->tablet_id());
85
0
    }
86
87
1.07M
    bool eof = false;
88
1.07M
    bool is_lower_key_included = _keys_param.start_key_include;
89
1.07M
    bool is_upper_key_included = _keys_param.end_key_include;
90
91
2.73M
    for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
92
        // lower bound
93
1.65M
        RowCursor& start_key = _keys_param.start_keys[i];
94
1.65M
        RowCursor& end_key = _keys_param.end_keys[i];
95
96
1.65M
        if (!is_lower_key_included) {
97
1.10k
            if (compare_row_key(start_key, end_key) >= 0) {
98
0
                VLOG_NOTICE << "return EOF when lower key not include"
99
0
                            << ", start_key=" << start_key.to_string()
100
0
                            << ", end_key=" << end_key.to_string();
101
0
                eof = true;
102
0
                break;
103
0
            }
104
1.65M
        } else {
105
1.65M
            if (compare_row_key(start_key, end_key) > 0) {
106
0
                VLOG_NOTICE << "return EOF when lower key include="
107
0
                            << ", start_key=" << start_key.to_string()
108
0
                            << ", end_key=" << end_key.to_string();
109
0
                eof = true;
110
0
                break;
111
0
            }
112
1.65M
        }
113
114
1.65M
        _is_lower_keys_included.push_back(is_lower_key_included);
115
1.65M
        _is_upper_keys_included.push_back(is_upper_key_included);
116
1.65M
    }
117
118
1.07M
    if (eof) {
119
0
        return Status::EndOfFile("reach end of scan range. tablet={}", _tablet->tablet_id());
120
0
    }
121
122
1.07M
    bool need_ordered_result = true;
123
1.07M
    if (read_params.reader_type == ReaderType::READER_QUERY ||
124
1.07M
        read_params.reader_type == ReaderType::READER_BINLOG) {
125
1.04M
        if (_tablet_schema->keys_type() == DUP_KEYS) {
126
            // duplicated keys are allowed, no need to merge sort keys in rowset
127
366k
            need_ordered_result = false;
128
366k
        }
129
1.04M
        if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
130
1.04M
            _tablet->enable_unique_key_merge_on_write()) {
131
            // unique keys with merge on write, no need to merge sort keys in rowset
132
632k
            need_ordered_result = false;
133
632k
        }
134
1.04M
        if (_aggregation) {
135
            // compute engine will aggregate rows with the same key,
136
            // it's ok for rowset to return unordered result
137
1.00M
            need_ordered_result = false;
138
1.00M
        }
139
140
1.04M
        if (_direct_mode) {
141
            // direct mode indicates that the storage layer does not need to merge,
142
            // it's ok for rowset to return unordered result
143
1.03M
            need_ordered_result = false;
144
1.03M
        }
145
146
1.04M
        if (read_params.read_orderby_key) {
147
2.51k
            need_ordered_result = true;
148
2.51k
        }
149
1.04M
    }
150
151
1.07M
    _reader_context.reader_type = read_params.reader_type;
152
1.07M
    _reader_context.version = read_params.version;
153
1.07M
    _reader_context.tablet_schema = _tablet_schema;
154
1.07M
    _reader_context.need_ordered_result = need_ordered_result;
155
1.07M
    _reader_context.topn_filter_source_node_ids = read_params.topn_filter_source_node_ids;
156
1.07M
    _reader_context.topn_filter_target_node_id = read_params.topn_filter_target_node_id;
157
1.07M
    _reader_context.read_orderby_key_reverse = read_params.read_orderby_key_reverse;
158
1.07M
    _reader_context.use_insert_order_when_same =
159
1.07M
            read_params.use_insert_order_when_same ||
160
1.07M
            read_params.reader_type == ReaderType::READER_BINLOG ||
161
1.07M
            read_params.reader_type == ReaderType::READER_BINLOG_COMPACTION;
162
1.07M
    _reader_context.read_orderby_key_limit = read_params.read_orderby_key_limit;
163
1.07M
    _reader_context.return_columns = &_return_columns;
164
1.07M
    _reader_context.read_orderby_key_columns =
165
1.07M
            !_orderby_key_columns.empty() ? &_orderby_key_columns : nullptr;
166
1.07M
    _reader_context.predicates = &_col_predicates;
167
1.07M
    _reader_context.value_predicates = &_value_col_predicates;
168
1.07M
    _reader_context.lower_bound_keys = &_keys_param.start_keys;
169
1.07M
    _reader_context.is_lower_keys_included = &_is_lower_keys_included;
170
1.07M
    _reader_context.upper_bound_keys = &_keys_param.end_keys;
171
1.07M
    _reader_context.is_upper_keys_included = &_is_upper_keys_included;
172
1.07M
    _reader_context.delete_handler = &_delete_handler;
173
1.07M
    _reader_context.stats = &_stats;
174
1.07M
    _reader_context.use_page_cache = read_params.use_page_cache;
175
1.07M
    _reader_context.sequence_id_idx = _sequence_col_idx;
176
1.07M
    _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
177
1.07M
    _reader_context.merged_rows = &_merged_rows;
178
1.07M
    _reader_context.delete_bitmap = read_params.delete_bitmap;
179
1.07M
    _reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write();
180
1.07M
    _reader_context.enable_mor_value_predicate_pushdown =
181
1.07M
            read_params.enable_mor_value_predicate_pushdown;
182
1.07M
    _reader_context.record_rowids = read_params.record_rowids;
183
1.07M
    _reader_context.rowid_conversion = read_params.rowid_conversion;
184
1.07M
    _reader_context.is_key_column_group = read_params.is_key_column_group;
185
1.07M
    _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
186
1.07M
    _reader_context.output_columns = &read_params.output_columns;
187
1.07M
    _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
188
1.07M
    _reader_context.ttl_seconds = _tablet->ttl_seconds();
189
1.07M
    _reader_context.score_runtime = read_params.score_runtime;
190
1.07M
    _reader_context.collection_statistics = read_params.collection_statistics;
191
192
1.07M
    _reader_context.virtual_column_exprs = read_params.virtual_column_exprs;
193
1.07M
    _reader_context.vir_cid_to_idx_in_block = read_params.vir_cid_to_idx_in_block;
194
1.07M
    _reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type;
195
1.07M
    _reader_context.ann_topn_runtime = read_params.ann_topn_runtime;
196
197
1.07M
    _reader_context.condition_cache_digest = read_params.condition_cache_digest;
198
1.07M
    _reader_context.all_access_paths = read_params.all_access_paths;
199
1.07M
    _reader_context.predicate_access_paths = read_params.predicate_access_paths;
200
201
    // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW
202
1.07M
    _reader_context.general_read_limit = read_params.general_read_limit;
203
204
    // Preserve the original requested output layout so BlockReader can map expanded storage
205
    // columns (for non-direct AGG/UNIQUE paths) back to the final output block.
206
1.07M
    _reader_context.origin_return_columns = read_params.origin_return_columns;
207
208
1.07M
    return Status::OK();
209
1.07M
}
210
211
434
TabletColumn TabletReader::materialize_column(const TabletColumn& orig) {
212
434
    if (!orig.is_variant_type()) {
213
431
        return orig;
214
431
    }
215
3
    TabletColumn column_with_cast_type = orig;
216
3
    auto cast_type = _reader_context.target_cast_type_for_variants.at(orig.name());
217
3
    return variant_util::get_column_by_type(cast_type, orig.name(),
218
3
                                            {
219
3
                                                    .unique_id = orig.unique_id(),
220
3
                                                    .parent_unique_id = orig.parent_unique_id(),
221
3
                                                    .path_info = *orig.path_info_ptr(),
222
3
                                            });
223
434
}
224
225
1.07M
Status TabletReader::_init_params(const ReaderParams& read_params) {
226
1.07M
    read_params.check_validation();
227
228
1.07M
    _direct_mode = read_params.direct_mode;
229
1.07M
    _aggregation = read_params.aggregation;
230
1.07M
    _reader_type = read_params.reader_type;
231
1.07M
    _tablet = read_params.tablet;
232
1.07M
    _tablet_schema = read_params.tablet_schema;
233
1.07M
    _reader_context.runtime_state = read_params.runtime_state;
234
1.07M
    _reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants;
235
236
1.07M
    RETURN_IF_ERROR(_init_conditions_param(read_params));
237
238
1.07M
    Status res = _init_delete_condition(read_params);
239
1.07M
    if (!res.ok()) {
240
0
        LOG(WARNING) << "fail to init delete param. res = " << res;
241
0
        return res;
242
0
    }
243
244
1.07M
    res = _init_return_columns(read_params);
245
1.07M
    if (!res.ok()) {
246
0
        LOG(WARNING) << "fail to init return columns. res = " << res;
247
0
        return res;
248
0
    }
249
250
1.07M
    res = _init_keys_param(read_params);
251
1.07M
    if (!res.ok()) {
252
0
        LOG(WARNING) << "fail to init keys param. res=" << res;
253
0
        return res;
254
0
    }
255
1.07M
    res = _init_orderby_keys_param(read_params);
256
1.07M
    if (!res.ok()) {
257
0
        LOG(WARNING) << "fail to init orderby keys param. res=" << res;
258
0
        return res;
259
0
    }
260
1.07M
    if (_tablet_schema->has_sequence_col()) {
261
5.12k
        auto sequence_col_idx = _tablet_schema->sequence_col_idx();
262
5.12k
        DCHECK_NE(sequence_col_idx, -1);
263
18.8k
        for (auto col : _return_columns) {
264
            // query has sequence col
265
18.8k
            if (col == sequence_col_idx) {
266
357
                _sequence_col_idx = sequence_col_idx;
267
357
                break;
268
357
            }
269
18.8k
        }
270
5.12k
    }
271
272
1.07M
    return res;
273
1.07M
}
274
275
1.07M
Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
276
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_return_columns_timer_ns);
277
1.07M
    if (read_params.reader_type == ReaderType::READER_QUERY ||
278
1.07M
        read_params.reader_type == ReaderType::READER_BINLOG) {
279
1.04M
        _return_columns = read_params.return_columns;
280
1.04M
        _tablet_columns_convert_to_null_set = read_params.tablet_columns_convert_to_null_set;
281
10.4M
        for (auto id : read_params.return_columns) {
282
10.4M
            if (_tablet_schema->column(id).is_key()) {
283
4.52M
                _key_cids.push_back(id);
284
5.88M
            } else {
285
5.88M
                _value_cids.push_back(id);
286
5.88M
            }
287
10.4M
        }
288
1.04M
    } else if (read_params.return_columns.empty()) {
289
0
        for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
290
0
            _return_columns.push_back(i);
291
0
            if (_tablet_schema->column(i).is_key()) {
292
0
                _key_cids.push_back(i);
293
0
            } else {
294
0
                _value_cids.push_back(i);
295
0
            }
296
0
        }
297
0
        VLOG_NOTICE << "return column is empty, using full column as default.";
298
22.3k
    } else if ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
299
22.3k
                read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
300
22.3k
                read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
301
22.3k
                read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
302
22.3k
                read_params.reader_type == ReaderType::READER_BINLOG_COMPACTION ||
303
22.3k
                read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
304
22.3k
                read_params.reader_type == ReaderType::READER_ALTER_TABLE) &&
305
25.6k
               !read_params.return_columns.empty()) {
306
25.6k
        _return_columns = read_params.return_columns;
307
102k
        for (auto id : read_params.return_columns) {
308
102k
            if (_tablet_schema->column(id).is_key()) {
309
37.3k
                _key_cids.push_back(id);
310
65.5k
            } else {
311
65.5k
                _value_cids.push_back(id);
312
65.5k
            }
313
102k
        }
314
18.4E
    } else if (read_params.reader_type == ReaderType::READER_CHECKSUM) {
315
0
        _return_columns = read_params.return_columns;
316
0
        for (auto id : read_params.return_columns) {
317
0
            if (_tablet_schema->column(id).is_key()) {
318
0
                _key_cids.push_back(id);
319
0
            } else {
320
0
                _value_cids.push_back(id);
321
0
            }
322
0
        }
323
18.4E
    } else {
324
18.4E
        return Status::Error<INVALID_ARGUMENT>(
325
18.4E
                "fail to init return columns. reader_type={}, return_columns_size={}",
326
18.4E
                int(read_params.reader_type), read_params.return_columns.size());
327
18.4E
    }
328
329
1.07M
    std::sort(_key_cids.begin(), _key_cids.end(), std::greater<>());
330
331
1.07M
    return Status::OK();
332
1.07M
}
333
334
1.07M
Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
335
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_keys_param_timer_ns);
336
1.07M
    if (read_params.start_key.empty()) {
337
228k
        return Status::OK();
338
228k
    }
339
340
844k
    _keys_param.start_key_include = read_params.start_key_include;
341
844k
    _keys_param.end_key_include = read_params.end_key_include;
342
343
844k
    size_t start_key_size = read_params.start_key.size();
344
    //_keys_param.start_keys.resize(start_key_size);
345
844k
    std::vector<RowCursor>(start_key_size).swap(_keys_param.start_keys);
346
347
844k
    size_t scan_key_size = read_params.start_key.front().size();
348
844k
    if (scan_key_size > _tablet_schema->num_columns()) {
349
0
        return Status::Error<INVALID_ARGUMENT>(
350
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
351
0
                "column_count={}, schema.num_columns={}",
352
0
                scan_key_size, _tablet_schema->num_columns());
353
0
    }
354
355
2.50M
    for (size_t i = 0; i < start_key_size; ++i) {
356
1.65M
        if (read_params.start_key[i].size() != scan_key_size) {
357
0
            return Status::Error<INVALID_ARGUMENT>(
358
0
                    "The start_key.at({}).size={}, not equals the scan_key_size={}", i,
359
0
                    read_params.start_key[i].size(), scan_key_size);
360
0
        }
361
362
1.65M
        Status res = _keys_param.start_keys[i].init(_tablet_schema, read_params.start_key[i]);
363
1.65M
        if (!res.ok()) {
364
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
365
0
            return res;
366
0
        }
367
1.65M
    }
368
369
844k
    size_t end_key_size = read_params.end_key.size();
370
    //_keys_param.end_keys.resize(end_key_size);
371
844k
    std::vector<RowCursor>(end_key_size).swap(_keys_param.end_keys);
372
2.50M
    for (size_t i = 0; i < end_key_size; ++i) {
373
1.65M
        if (read_params.end_key[i].size() != scan_key_size) {
374
0
            return Status::Error<INVALID_ARGUMENT>(
375
0
                    "The end_key.at({}).size={}, not equals the scan_key_size={}", i,
376
0
                    read_params.end_key[i].size(), scan_key_size);
377
0
        }
378
379
1.65M
        Status res = _keys_param.end_keys[i].init(_tablet_schema, read_params.end_key[i]);
380
1.65M
        if (!res.ok()) {
381
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
382
0
            return res;
383
0
        }
384
1.65M
    }
385
386
    //TODO:check the valid of start_key and end_key.(eg. start_key <= end_key)
387
388
844k
    return Status::OK();
389
844k
}
390
391
1.07M
Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) {
392
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_orderby_keys_param_timer_ns);
393
    // UNIQUE_KEYS will compare all keys as before
394
1.07M
    if (_tablet_schema->keys_type() == DUP_KEYS || (_tablet_schema->keys_type() == UNIQUE_KEYS &&
395
1.02M
                                                    _tablet->enable_unique_key_merge_on_write())) {
396
1.02M
        if (!_tablet_schema->cluster_key_uids().empty()) {
397
5.31k
            if (read_params.read_orderby_key_num_prefix_columns >
398
5.31k
                _tablet_schema->cluster_key_uids().size()) {
399
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
400
0
                        "read_orderby_key_num_prefix_columns={} > cluster_keys.size()={}",
401
0
                        read_params.read_orderby_key_num_prefix_columns,
402
0
                        _tablet_schema->cluster_key_uids().size());
403
0
            }
404
5.32k
            for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
405
4
                auto cid = _tablet_schema->cluster_key_uids()[i];
406
4
                auto index = _tablet_schema->field_index(cid);
407
4
                if (index < 0) {
408
0
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
409
0
                            "could not find cluster key column with unique_id=" +
410
0
                            std::to_string(cid) +
411
0
                            " in tablet schema, tablet_id=" + std::to_string(_tablet->tablet_id()));
412
0
                }
413
8
                for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
414
8
                    if (_return_columns[idx] == index) {
415
4
                        _orderby_key_columns.push_back(idx);
416
4
                        break;
417
4
                    }
418
8
                }
419
4
            }
420
1.02M
        } else {
421
            // find index in vector _return_columns
422
            //   for the read_orderby_key_num_prefix_columns orderby keys
423
1.02M
            for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
424
4.55k
                for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
425
4.55k
                    if (_return_columns[idx] == i) {
426
3.41k
                        _orderby_key_columns.push_back(idx);
427
3.41k
                        break;
428
3.41k
                    }
429
4.55k
                }
430
3.40k
            }
431
1.02M
        }
432
1.02M
        if (read_params.read_orderby_key_num_prefix_columns != _orderby_key_columns.size()) {
433
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
434
0
                    "read_orderby_key_num_prefix_columns != _orderby_key_columns.size, "
435
0
                    "read_params.read_orderby_key_num_prefix_columns={}, "
436
0
                    "_orderby_key_columns.size()={}",
437
0
                    read_params.read_orderby_key_num_prefix_columns, _orderby_key_columns.size());
438
0
        }
439
1.02M
    }
440
441
1.07M
    return Status::OK();
442
1.07M
}
443
444
1.07M
Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
445
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns);
446
1.07M
    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
447
1.07M
    std::copy(read_params.predicates.cbegin(), read_params.predicates.cend(),
448
1.07M
              std::inserter(predicates, predicates.begin()));
449
    // Function filter push down to storage engine
450
1.07M
    auto is_like_predicate = [](std::shared_ptr<ColumnPredicate> _pred) {
451
430
        return dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(_pred.get()) != nullptr ||
452
430
               dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(_pred.get()) != nullptr;
453
430
    };
454
455
1.07M
    for (const auto& filter : read_params.function_filters) {
456
434
        predicates.emplace_back(_parse_to_predicate(filter));
457
434
        auto pred = predicates.back();
458
459
434
        const auto& col = _tablet_schema->column(pred->column_id());
460
434
        const auto* tablet_index = _tablet_schema->get_ngram_bf_index(col.unique_id());
461
434
        if (is_like_predicate(pred) && tablet_index && config::enable_query_like_bloom_filter) {
462
15
            std::unique_ptr<segment_v2::BloomFilter> ng_bf;
463
15
            std::string pattern = pred->get_search_str();
464
15
            auto gram_bf_size = tablet_index->get_gram_bf_size();
465
15
            auto gram_size = tablet_index->get_gram_size();
466
467
15
            RETURN_IF_ERROR(segment_v2::BloomFilter::create(segment_v2::NGRAM_BLOOM_FILTER, &ng_bf,
468
15
                                                            gram_bf_size));
469
15
            NgramTokenExtractor _token_extractor(gram_size);
470
471
15
            if (_token_extractor.string_like_to_bloom_filter(pattern.data(), pattern.length(),
472
16
                                                             *ng_bf)) {
473
16
                pred->set_page_ng_bf(std::move(ng_bf));
474
16
            }
475
15
        }
476
434
    }
477
478
1.07M
    int32_t delete_sign_idx = _tablet_schema->delete_sign_idx();
479
1.07M
    for (auto predicate : predicates) {
480
820k
        auto column = _tablet_schema->column(predicate->column_id());
481
820k
        if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
482
            // When MOR value predicate pushdown is enabled, drop __DORIS_DELETE_SIGN__
483
            // from storage-layer predicates entirely. Delete sign must only be evaluated
484
            // post-merge via VExpr to prevent deleted rows from reappearing.
485
2.94k
            if (read_params.enable_mor_value_predicate_pushdown && delete_sign_idx >= 0 &&
486
2.94k
                predicate->column_id() == static_cast<uint32_t>(delete_sign_idx)) {
487
25
                continue;
488
25
            }
489
2.92k
            _value_col_predicates.push_back(predicate);
490
817k
        } else {
491
817k
            _col_predicates.push_back(predicate);
492
817k
        }
493
820k
    }
494
495
1.07M
    return Status::OK();
496
1.07M
}
497
498
std::shared_ptr<ColumnPredicate> TabletReader::_parse_to_predicate(
499
430
        const FunctionFilter& function_filter) {
500
430
    int32_t index = _tablet_schema->field_index(function_filter._col_name);
501
430
    if (index < 0) {
502
0
        throw Exception(Status::InternalError("Column {} not found in tablet schema",
503
0
                                              function_filter._col_name));
504
0
        return nullptr;
505
0
    }
506
430
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
507
430
    return create_column_predicate(index, std::make_shared<FunctionFilter>(function_filter),
508
430
                                   column.type(), &column);
509
430
}
510
511
1.07M
Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
512
1.07M
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_delete_condition_param_timer_ns);
513
    // If it's cumu and not allow do delete when cumu
514
1.07M
    if (read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
515
1.07M
        (read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
516
1.07M
         !config::enable_delete_when_cumu_compaction)) {
517
23.4k
        return Status::OK();
518
23.4k
    }
519
1.04M
    bool cumu_delete = read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
520
1.04M
                       config::enable_delete_when_cumu_compaction;
521
    // Delete sign could not be applied when delete on cumu compaction is enabled, bucause it is meant for delete with predicates.
522
    // If delete design is applied on cumu compaction, it will lose effect when doing base compaction.
523
    // `_delete_sign_available` indicates the condition where we could apply delete signs to data.
524
1.04M
    _delete_sign_available = (((read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
525
1.04M
                                read_params.reader_type == ReaderType::READER_FULL_COMPACTION) &&
526
1.04M
                               config::enable_prune_delete_sign_when_base_compaction) ||
527
1.04M
                              read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
528
1.04M
                              read_params.reader_type == ReaderType::READER_CHECKSUM);
529
530
    // `_filter_delete` indicates the condition where we should execlude deleted tuples when reading data.
531
    // However, queries will not use this condition but generate special where predicates to filter data.
532
    // (Though a lille bit confused, it is how the current logic working...)
533
1.04M
    _filter_delete = _delete_sign_available || cumu_delete;
534
1.04M
    return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
535
1.04M
                                read_params.version.second);
536
1.07M
}
537
538
Status TabletReader::init_reader_params_and_create_block(
539
        TabletSharedPtr tablet, ReaderType reader_type,
540
        const std::vector<RowsetSharedPtr>& input_rowsets,
541
0
        TabletReader::ReaderParams* reader_params, Block* block) {
542
0
    reader_params->tablet = tablet;
543
0
    reader_params->reader_type = reader_type;
544
0
    reader_params->version =
545
0
            Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version());
546
547
0
    TabletReadSource read_source;
548
0
    for (const auto& rowset : input_rowsets) {
549
0
        RowsetReaderSharedPtr rs_reader;
550
0
        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
551
0
        read_source.rs_splits.emplace_back(std::move(rs_reader));
552
0
    }
553
0
    read_source.fill_delete_predicates();
554
0
    reader_params->set_read_source(std::move(read_source));
555
556
0
    std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
557
0
    std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(),
558
0
                   [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
559
0
    TabletSchemaSPtr read_tablet_schema =
560
0
            tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
561
0
    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
562
0
    merge_tablet_schema->copy_from(*read_tablet_schema);
563
564
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
565
0
    for (auto& del_pred : reader_params->delete_predicates) {
566
0
        merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
567
0
    }
568
0
    reader_params->tablet_schema = merge_tablet_schema;
569
570
0
    reader_params->return_columns.resize(read_tablet_schema->num_columns());
571
0
    std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0);
572
0
    reader_params->origin_return_columns = &reader_params->return_columns;
573
574
0
    *block = read_tablet_schema->create_block();
575
576
0
    return Status::OK();
577
0
}
578
579
} // namespace doris