Coverage Report

Created: 2025-04-24 12:23

/root/doris/be/src/olap/tablet_reader.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/tablet_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <thrift/protocol/TDebugProtocol.h>
23
24
#include <algorithm>
25
#include <functional>
26
#include <iterator>
27
#include <numeric>
28
#include <ostream>
29
#include <shared_mutex>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/exception.h"
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "exprs/bitmapfilter_predicate.h"
37
#include "exprs/bloom_filter_func.h"
38
#include "exprs/create_predicate_function.h"
39
#include "exprs/hybrid_set.h"
40
#include "olap/column_predicate.h"
41
#include "olap/itoken_extractor.h"
42
#include "olap/like_column_predicate.h"
43
#include "olap/olap_common.h"
44
#include "olap/olap_define.h"
45
#include "olap/predicate_creator.h"
46
#include "olap/row_cursor.h"
47
#include "olap/rowset/segment_v2/bloom_filter.h"
48
#include "olap/schema.h"
49
#include "olap/tablet.h"
50
#include "olap/tablet_meta.h"
51
#include "olap/tablet_schema.h"
52
#include "runtime/query_context.h"
53
#include "runtime/runtime_predicate.h"
54
#include "runtime/runtime_state.h"
55
#include "vec/common/arena.h"
56
#include "vec/core/block.h"
57
58
namespace doris {
59
using namespace ErrorCode;
60
61
162
void TabletReader::ReaderParams::check_validation() const {
62
162
    if (UNLIKELY(version.first == -1 && is_segcompaction == false)) {
63
0
        LOG(FATAL) << "version is not set. tablet=" << tablet->tablet_id();
64
0
    }
65
162
}
66
67
0
std::string TabletReader::ReaderParams::to_string() const {
68
0
    std::stringstream ss;
69
0
    ss << "tablet=" << tablet->tablet_id() << " reader_type=" << int(reader_type)
70
0
       << " aggregation=" << aggregation << " version=" << version
71
0
       << " start_key_include=" << start_key_include << " end_key_include=" << end_key_include;
72
73
0
    for (const auto& key : start_key) {
74
0
        ss << " keys=" << key;
75
0
    }
76
77
0
    for (const auto& key : end_key) {
78
0
        ss << " end_keys=" << key;
79
0
    }
80
81
0
    for (auto& condition : conditions) {
82
0
        ss << " conditions=" << apache::thrift::ThriftDebugString(condition);
83
0
    }
84
85
0
    return ss.str();
86
0
}
87
88
0
std::string TabletReader::KeysParam::to_string() const {
89
0
    std::stringstream ss;
90
0
    ss << "start_key_include=" << start_key_include << " end_key_include=" << end_key_include;
91
92
0
    for (auto& start_key : start_keys) {
93
0
        ss << " keys=" << start_key.to_string();
94
0
    }
95
0
    for (auto& end_key : end_keys) {
96
0
        ss << " end_keys=" << end_key.to_string();
97
0
    }
98
99
0
    return ss.str();
100
0
}
101
102
140
void TabletReader::ReadSource::fill_delete_predicates() {
103
140
    DCHECK_EQ(delete_predicates.size(), 0);
104
421
    for (auto&& split : rs_splits) {
105
421
        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
106
421
        if (rs_meta->has_delete_predicate()) {
107
77
            delete_predicates.push_back(rs_meta);
108
77
        }
109
421
    }
110
140
}
111
112
162
TabletReader::~TabletReader() {
113
162
    VLOG_NOTICE << "merged rows:" << _merged_rows;
114
162
    for (auto pred : _col_predicates) {
115
0
        delete pred;
116
0
    }
117
162
    for (auto pred : _value_col_predicates) {
118
0
        delete pred;
119
0
    }
120
162
}
121
122
162
Status TabletReader::init(const ReaderParams& read_params) {
123
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_timer_ns);
124
162
    _predicate_arena.reset(new vectorized::Arena());
125
126
162
    Status res = _init_params(read_params);
127
162
    if (!res.ok()) {
128
0
        LOG(WARNING) << "fail to init reader when init params. res:" << res
129
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
130
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
131
0
                     << ", reader type:" << int(read_params.reader_type)
132
0
                     << ", version:" << read_params.version;
133
0
    }
134
162
    return res;
135
162
}
136
137
// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
138
bool TabletReader::_optimize_for_single_rowset(
139
0
        const std::vector<RowsetReaderSharedPtr>& rs_readers) {
140
0
    bool has_delete_rowset = false;
141
0
    bool has_overlapping = false;
142
0
    int nonoverlapping_count = 0;
143
0
    for (const auto& rs_reader : rs_readers) {
144
0
        if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
145
0
            has_delete_rowset = true;
146
0
            break;
147
0
        }
148
0
        if (rs_reader->rowset()->rowset_meta()->num_rows() > 0) {
149
0
            if (rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
150
                // when there are overlapping segments, can not do directly read
151
0
                has_overlapping = true;
152
0
                break;
153
0
            } else if (++nonoverlapping_count > 1) {
154
0
                break;
155
0
            }
156
0
        }
157
0
    }
158
159
0
    return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset;
160
0
}
161
162
140
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
163
140
    SCOPED_RAW_TIMER(&_stats.tablet_reader_capture_rs_readers_timer_ns);
164
140
    if (read_params.rs_splits.empty()) {
165
0
        return Status::InternalError("fail to acquire data sources. tablet={}",
166
0
                                     _tablet->tablet_id());
167
0
    }
168
169
140
    bool eof = false;
170
140
    bool is_lower_key_included = _keys_param.start_key_include;
171
140
    bool is_upper_key_included = _keys_param.end_key_include;
172
173
140
    for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
174
        // lower bound
175
0
        RowCursor& start_key = _keys_param.start_keys[i];
176
0
        RowCursor& end_key = _keys_param.end_keys[i];
177
178
0
        if (!is_lower_key_included) {
179
0
            if (compare_row_key(start_key, end_key) >= 0) {
180
0
                VLOG_NOTICE << "return EOF when lower key not include"
181
0
                            << ", start_key=" << start_key.to_string()
182
0
                            << ", end_key=" << end_key.to_string();
183
0
                eof = true;
184
0
                break;
185
0
            }
186
0
        } else {
187
0
            if (compare_row_key(start_key, end_key) > 0) {
188
0
                VLOG_NOTICE << "return EOF when lower key include="
189
0
                            << ", start_key=" << start_key.to_string()
190
0
                            << ", end_key=" << end_key.to_string();
191
0
                eof = true;
192
0
                break;
193
0
            }
194
0
        }
195
196
0
        _is_lower_keys_included.push_back(is_lower_key_included);
197
0
        _is_upper_keys_included.push_back(is_upper_key_included);
198
0
    }
199
200
140
    if (eof) {
201
0
        return Status::OK();
202
0
    }
203
204
140
    bool need_ordered_result = true;
205
140
    if (read_params.reader_type == ReaderType::READER_QUERY) {
206
0
        if (_tablet_schema->keys_type() == DUP_KEYS) {
207
            // duplicated keys are allowed, no need to merge sort keys in rowset
208
0
            need_ordered_result = false;
209
0
        }
210
0
        if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
211
0
            _tablet->enable_unique_key_merge_on_write()) {
212
            // unique keys with merge on write, no need to merge sort keys in rowset
213
0
            need_ordered_result = false;
214
0
        }
215
0
        if (_aggregation) {
216
            // compute engine will aggregate rows with the same key,
217
            // it's ok for rowset to return unordered result
218
0
            need_ordered_result = false;
219
0
        }
220
221
0
        if (_direct_mode) {
222
            // direct mode indicates that the storage layer does not need to merge,
223
            // it's ok for rowset to return unordered result
224
0
            need_ordered_result = false;
225
0
        }
226
227
0
        if (read_params.read_orderby_key) {
228
0
            need_ordered_result = true;
229
0
        }
230
0
    }
231
232
140
    _reader_context.reader_type = read_params.reader_type;
233
140
    _reader_context.version = read_params.version;
234
140
    _reader_context.tablet_schema = _tablet_schema;
235
140
    _reader_context.need_ordered_result = need_ordered_result;
236
140
    _reader_context.use_topn_opt = read_params.use_topn_opt;
237
140
    _reader_context.topn_filter_source_node_ids = read_params.topn_filter_source_node_ids;
238
140
    _reader_context.read_orderby_key_reverse = read_params.read_orderby_key_reverse;
239
140
    _reader_context.read_orderby_key_limit = read_params.read_orderby_key_limit;
240
140
    _reader_context.filter_block_conjuncts = read_params.filter_block_conjuncts;
241
140
    _reader_context.return_columns = &_return_columns;
242
140
    _reader_context.read_orderby_key_columns =
243
140
            _orderby_key_columns.size() > 0 ? &_orderby_key_columns : nullptr;
244
140
    _reader_context.predicates = &_col_predicates;
245
140
    _reader_context.value_predicates = &_value_col_predicates;
246
140
    _reader_context.lower_bound_keys = &_keys_param.start_keys;
247
140
    _reader_context.is_lower_keys_included = &_is_lower_keys_included;
248
140
    _reader_context.upper_bound_keys = &_keys_param.end_keys;
249
140
    _reader_context.is_upper_keys_included = &_is_upper_keys_included;
250
140
    _reader_context.delete_handler = &_delete_handler;
251
140
    _reader_context.stats = &_stats;
252
140
    _reader_context.use_page_cache = read_params.use_page_cache;
253
140
    _reader_context.sequence_id_idx = _sequence_col_idx;
254
140
    _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
255
140
    _reader_context.merged_rows = &_merged_rows;
256
140
    _reader_context.delete_bitmap = read_params.delete_bitmap;
257
140
    _reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write();
258
140
    _reader_context.record_rowids = read_params.record_rowids;
259
140
    _reader_context.rowid_conversion = read_params.rowid_conversion;
260
140
    _reader_context.is_key_column_group = read_params.is_key_column_group;
261
140
    _reader_context.remaining_conjunct_roots = read_params.remaining_conjunct_roots;
262
140
    _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
263
140
    _reader_context.output_columns = &read_params.output_columns;
264
140
    _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
265
266
140
    return Status::OK();
267
140
}
268
269
0
TabletColumn TabletReader::materialize_column(const TabletColumn& orig) {
270
0
    if (!orig.is_variant_type()) {
271
0
        return orig;
272
0
    }
273
0
    TabletColumn column_with_cast_type = orig;
274
0
    auto cast_type = _reader_context.target_cast_type_for_variants.at(orig.name());
275
0
    FieldType filed_type = TabletColumn::get_field_type_by_type(cast_type.type);
276
0
    if (filed_type == FieldType::OLAP_FIELD_TYPE_UNKNOWN) {
277
0
        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid type for variant column: {}",
278
0
                               cast_type.type);
279
0
    }
280
0
    column_with_cast_type.set_type(filed_type);
281
0
    return column_with_cast_type;
282
0
}
283
284
162
Status TabletReader::_init_params(const ReaderParams& read_params) {
285
162
    read_params.check_validation();
286
287
162
    _direct_mode = read_params.direct_mode;
288
162
    _aggregation = read_params.aggregation;
289
162
    _reader_type = read_params.reader_type;
290
162
    _tablet = read_params.tablet;
291
162
    _tablet_schema = read_params.tablet_schema;
292
162
    _reader_context.runtime_state = read_params.runtime_state;
293
162
    _reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants;
294
295
162
    RETURN_IF_ERROR(_init_conditions_param(read_params));
296
297
162
    Status res = _init_delete_condition(read_params);
298
162
    if (!res.ok()) {
299
0
        LOG(WARNING) << "fail to init delete param. res = " << res;
300
0
        return res;
301
0
    }
302
303
162
    res = _init_return_columns(read_params);
304
162
    if (!res.ok()) {
305
0
        LOG(WARNING) << "fail to init return columns. res = " << res;
306
0
        return res;
307
0
    }
308
309
162
    res = _init_keys_param(read_params);
310
162
    if (!res.ok()) {
311
0
        LOG(WARNING) << "fail to init keys param. res=" << res;
312
0
        return res;
313
0
    }
314
162
    res = _init_orderby_keys_param(read_params);
315
162
    if (!res.ok()) {
316
0
        LOG(WARNING) << "fail to init orderby keys param. res=" << res;
317
0
        return res;
318
0
    }
319
162
    if (_tablet_schema->has_sequence_col()) {
320
8
        auto sequence_col_idx = _tablet_schema->sequence_col_idx();
321
8
        DCHECK_NE(sequence_col_idx, -1);
322
16
        for (auto col : _return_columns) {
323
            // query has sequence col
324
16
            if (col == sequence_col_idx) {
325
4
                _sequence_col_idx = sequence_col_idx;
326
4
                break;
327
4
            }
328
16
        }
329
8
    }
330
331
162
    return res;
332
162
}
333
334
162
Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
335
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_return_columns_timer_ns);
336
162
    if (read_params.reader_type == ReaderType::READER_QUERY) {
337
22
        _return_columns = read_params.return_columns;
338
22
        _tablet_columns_convert_to_null_set = read_params.tablet_columns_convert_to_null_set;
339
37
        for (auto id : read_params.return_columns) {
340
37
            if (_tablet_schema->column(id).is_key()) {
341
22
                _key_cids.push_back(id);
342
22
            } else {
343
15
                _value_cids.push_back(id);
344
15
            }
345
37
        }
346
140
    } else if (read_params.return_columns.empty()) {
347
0
        for (size_t i = 0; i < _tablet_schema->num_columns(); ++i) {
348
0
            _return_columns.push_back(i);
349
0
            if (_tablet_schema->column(i).is_key()) {
350
0
                _key_cids.push_back(i);
351
0
            } else {
352
0
                _value_cids.push_back(i);
353
0
            }
354
0
        }
355
0
        VLOG_NOTICE << "return column is empty, using full column as default.";
356
140
    } else if ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
357
140
                read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
358
140
                read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
359
140
                read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
360
140
                read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
361
140
                read_params.reader_type == ReaderType::READER_ALTER_TABLE) &&
362
140
               !read_params.return_columns.empty()) {
363
140
        _return_columns = read_params.return_columns;
364
261
        for (auto id : read_params.return_columns) {
365
261
            if (_tablet_schema->column(id).is_key()) {
366
92
                _key_cids.push_back(id);
367
169
            } else {
368
169
                _value_cids.push_back(id);
369
169
            }
370
261
        }
371
140
    } else if (read_params.reader_type == ReaderType::READER_CHECKSUM) {
372
0
        _return_columns = read_params.return_columns;
373
0
        for (auto id : read_params.return_columns) {
374
0
            if (_tablet_schema->column(id).is_key()) {
375
0
                _key_cids.push_back(id);
376
0
            } else {
377
0
                _value_cids.push_back(id);
378
0
            }
379
0
        }
380
0
    } else {
381
0
        return Status::Error<INVALID_ARGUMENT>(
382
0
                "fail to init return columns. reader_type={}, return_columns_size={}",
383
0
                int(read_params.reader_type), read_params.return_columns.size());
384
0
    }
385
386
162
    std::sort(_key_cids.begin(), _key_cids.end(), std::greater<uint32_t>());
387
388
162
    return Status::OK();
389
162
}
390
391
162
Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
392
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_keys_param_timer_ns);
393
162
    if (read_params.start_key.empty()) {
394
162
        return Status::OK();
395
162
    }
396
397
0
    _keys_param.start_key_include = read_params.start_key_include;
398
0
    _keys_param.end_key_include = read_params.end_key_include;
399
400
0
    size_t start_key_size = read_params.start_key.size();
401
    //_keys_param.start_keys.resize(start_key_size);
402
0
    std::vector<RowCursor>(start_key_size).swap(_keys_param.start_keys);
403
404
0
    size_t scan_key_size = read_params.start_key.front().size();
405
0
    if (scan_key_size > _tablet_schema->num_columns()) {
406
0
        return Status::Error<INVALID_ARGUMENT>(
407
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
408
0
                "column_count={}, schema.num_columns={}",
409
0
                scan_key_size, _tablet_schema->num_columns());
410
0
    }
411
412
0
    std::vector<uint32_t> columns(scan_key_size);
413
0
    std::iota(columns.begin(), columns.end(), 0);
414
415
0
    std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet_schema->columns(), columns);
416
417
0
    for (size_t i = 0; i < start_key_size; ++i) {
418
0
        if (read_params.start_key[i].size() != scan_key_size) {
419
0
            return Status::Error<INVALID_ARGUMENT>(
420
0
                    "The start_key.at({}).size={}, not equals the scan_key_size={}", i,
421
0
                    read_params.start_key[i].size(), scan_key_size);
422
0
        }
423
424
0
        Status res = _keys_param.start_keys[i].init_scan_key(
425
0
                _tablet_schema, read_params.start_key[i].values(), schema);
426
0
        if (!res.ok()) {
427
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
428
0
            return res;
429
0
        }
430
0
        res = _keys_param.start_keys[i].from_tuple(read_params.start_key[i]);
431
0
        if (!res.ok()) {
432
0
            LOG(WARNING) << "fail to init row cursor from Keys. res=" << res << "key_index=" << i;
433
0
            return res;
434
0
        }
435
0
    }
436
437
0
    size_t end_key_size = read_params.end_key.size();
438
    //_keys_param.end_keys.resize(end_key_size);
439
0
    std::vector<RowCursor>(end_key_size).swap(_keys_param.end_keys);
440
0
    for (size_t i = 0; i < end_key_size; ++i) {
441
0
        if (read_params.end_key[i].size() != scan_key_size) {
442
0
            return Status::Error<INVALID_ARGUMENT>(
443
0
                    "The end_key.at({}).size={}, not equals the scan_key_size={}", i,
444
0
                    read_params.end_key[i].size(), scan_key_size);
445
0
        }
446
447
0
        Status res = _keys_param.end_keys[i].init_scan_key(_tablet_schema,
448
0
                                                           read_params.end_key[i].values(), schema);
449
0
        if (!res.ok()) {
450
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
451
0
            return res;
452
0
        }
453
454
0
        res = _keys_param.end_keys[i].from_tuple(read_params.end_key[i]);
455
0
        if (!res.ok()) {
456
0
            LOG(WARNING) << "fail to init row cursor from Keys. res=" << res << " key_index=" << i;
457
0
            return res;
458
0
        }
459
0
    }
460
461
    //TODO:check the valid of start_key and end_key.(eg. start_key <= end_key)
462
463
0
    return Status::OK();
464
0
}
465
466
162
Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) {
467
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_orderby_keys_param_timer_ns);
468
    // UNIQUE_KEYS will compare all keys as before
469
162
    if (_tablet_schema->keys_type() == DUP_KEYS || (_tablet_schema->keys_type() == UNIQUE_KEYS &&
470
116
                                                    _tablet->enable_unique_key_merge_on_write())) {
471
        // find index in vector _return_columns
472
        //   for the read_orderby_key_num_prefix_columns orderby keys
473
102
        for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
474
0
            for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
475
0
                if (_return_columns[idx] == i) {
476
0
                    _orderby_key_columns.push_back(idx);
477
0
                    break;
478
0
                }
479
0
            }
480
0
        }
481
102
        if (read_params.read_orderby_key_num_prefix_columns != _orderby_key_columns.size()) {
482
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
483
0
                    "read_orderby_key_num_prefix_columns != _orderby_key_columns.size, "
484
0
                    "read_params.read_orderby_key_num_prefix_columns={}, "
485
0
                    "_orderby_key_columns.size()={}",
486
0
                    read_params.read_orderby_key_num_prefix_columns, _orderby_key_columns.size());
487
0
        }
488
102
    }
489
490
162
    return Status::OK();
491
162
}
492
493
162
Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
494
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns);
495
162
    for (auto& condition : read_params.conditions) {
496
0
        TCondition tmp_cond = condition;
497
0
        RETURN_IF_ERROR(_tablet_schema->have_column(tmp_cond.column_name));
498
        // The "column" parameter might represent a column resulting from the decomposition of a variant column.
499
        // Instead of using a "unique_id" for identification, we are utilizing a "path" to denote this column.
500
0
        const auto& column = materialize_column(_tablet_schema->column(tmp_cond.column_name));
501
0
        uint32_t index = _tablet_schema->field_index(tmp_cond.column_name);
502
0
        ColumnPredicate* predicate =
503
0
                parse_to_predicate(column, index, tmp_cond, _predicate_arena.get());
504
0
        if (predicate != nullptr) {
505
            // record condition value into predicate_params in order to pushdown segment_iterator,
506
            // _gen_predicate_result_sign will build predicate result unique sign with condition value
507
0
            auto predicate_params = predicate->predicate_params();
508
0
            predicate_params->values = condition.condition_values;
509
0
            predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
510
0
            if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
511
0
                _value_col_predicates.push_back(predicate);
512
0
            } else {
513
0
                _col_predicates.push_back(predicate);
514
0
            }
515
0
        }
516
0
    }
517
518
    // Only key column bloom filter will push down to storage engine
519
162
    for (const auto& filter : read_params.bloom_filters) {
520
0
        _col_predicates.emplace_back(_parse_to_predicate(filter));
521
0
    }
522
523
162
    for (const auto& filter : read_params.bitmap_filters) {
524
0
        _col_predicates.emplace_back(_parse_to_predicate(filter));
525
0
    }
526
527
162
    for (const auto& filter : read_params.in_filters) {
528
0
        ColumnPredicate* predicate = _parse_to_predicate(filter);
529
0
        if (predicate != nullptr) {
530
            // in_filters from runtime filter predicates which pushed down to data source.
531
0
            auto predicate_params = predicate->predicate_params();
532
0
            predicate_params->marked_by_runtime_filter = true;
533
0
        }
534
0
        _col_predicates.emplace_back(predicate);
535
0
    }
536
537
    // Function filter push down to storage engine
538
162
    auto is_like_predicate = [](ColumnPredicate* _pred) {
539
0
        return dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(_pred) != nullptr ||
540
0
               dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(_pred) != nullptr;
541
0
    };
542
543
162
    for (const auto& filter : read_params.function_filters) {
544
0
        _col_predicates.emplace_back(_parse_to_predicate(filter));
545
0
        auto* pred = _col_predicates.back();
546
0
        const auto& col = _tablet_schema->column(pred->column_id());
547
0
        auto is_like = is_like_predicate(pred);
548
0
        auto* tablet_index = _tablet_schema->get_ngram_bf_index(col.unique_id());
549
550
0
        if (is_like && tablet_index && config::enable_query_like_bloom_filter) {
551
0
            std::unique_ptr<segment_v2::BloomFilter> ng_bf;
552
0
            std::string pattern = pred->get_search_str();
553
0
            auto gram_bf_size = tablet_index->get_gram_bf_size();
554
0
            auto gram_size = tablet_index->get_gram_size();
555
556
0
            RETURN_IF_ERROR(segment_v2::BloomFilter::create(segment_v2::NGRAM_BLOOM_FILTER, &ng_bf,
557
0
                                                            gram_bf_size));
558
0
            NgramTokenExtractor _token_extractor(gram_size);
559
560
0
            if (_token_extractor.string_like_to_bloom_filter(pattern.data(), pattern.length(),
561
0
                                                             *ng_bf)) {
562
0
                pred->set_page_ng_bf(std::move(ng_bf));
563
0
            }
564
0
        }
565
0
    }
566
162
    if (read_params.use_topn_opt) {
567
0
        for (int id : read_params.topn_filter_source_node_ids) {
568
0
            auto& runtime_predicate =
569
0
                    read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
570
0
            RETURN_IF_ERROR(runtime_predicate.set_tablet_schema(_tablet_schema));
571
0
        }
572
0
    }
573
162
    return Status::OK();
574
162
}
575
576
ColumnPredicate* TabletReader::_parse_to_predicate(
577
0
        const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
578
0
    int32_t index = _tablet_schema->field_index(bloom_filter.first);
579
0
    if (index < 0) {
580
0
        return nullptr;
581
0
    }
582
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
583
0
    return create_column_predicate(index, bloom_filter.second, column.type(),
584
0
                                   _reader_context.runtime_state->be_exec_version(), &column);
585
0
}
586
587
ColumnPredicate* TabletReader::_parse_to_predicate(
588
0
        const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter) {
589
0
    int32_t index = _tablet_schema->field_index(in_filter.first);
590
0
    if (index < 0) {
591
0
        return nullptr;
592
0
    }
593
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
594
0
    return create_column_predicate(index, in_filter.second, column.type(),
595
0
                                   _reader_context.runtime_state->be_exec_version(), &column);
596
0
}
597
598
ColumnPredicate* TabletReader::_parse_to_predicate(
599
0
        const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter) {
600
0
    int32_t index = _tablet_schema->field_index(bitmap_filter.first);
601
0
    if (index < 0) {
602
0
        return nullptr;
603
0
    }
604
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
605
0
    return create_column_predicate(index, bitmap_filter.second, column.type(),
606
0
                                   _reader_context.runtime_state->be_exec_version(), &column);
607
0
}
608
609
0
ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& function_filter) {
610
0
    int32_t index = _tablet_schema->field_index(function_filter._col_name);
611
0
    if (index < 0) {
612
0
        return nullptr;
613
0
    }
614
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
615
0
    return create_column_predicate(index, std::make_shared<FunctionFilter>(function_filter),
616
0
                                   column.type(), _reader_context.runtime_state->be_exec_version(),
617
0
                                   &column);
618
0
}
619
620
162
Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
621
162
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_delete_condition_param_timer_ns);
622
    // If it's cumu and not allow do delete when cumu
623
162
    if (read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
624
162
        (read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
625
162
         !config::enable_delete_when_cumu_compaction)) {
626
0
        return Status::OK();
627
0
    }
628
    // Only BASE_COMPACTION and COLD_DATA_COMPACTION and CUMULATIVE_COMPACTION need set filter_delete = true
629
    // other reader type:
630
    // QUERY will filter the row in query layer to keep right result use where clause.
631
162
    _filter_delete = ((read_params.reader_type == ReaderType::READER_BASE_COMPACTION &&
632
162
                       config::enable_prune_delete_sign_when_base_compaction) ||
633
162
                      read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
634
162
                      ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
635
22
                        config::enable_delete_when_cumu_compaction)) ||
636
162
                      read_params.reader_type == ReaderType::READER_CHECKSUM);
637
162
    return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
638
162
                                read_params.version.second);
639
162
}
640
641
Status TabletReader::init_reader_params_and_create_block(
642
        TabletSharedPtr tablet, ReaderType reader_type,
643
        const std::vector<RowsetSharedPtr>& input_rowsets,
644
0
        TabletReader::ReaderParams* reader_params, vectorized::Block* block) {
645
0
    reader_params->tablet = tablet;
646
0
    reader_params->reader_type = reader_type;
647
0
    reader_params->version =
648
0
            Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version());
649
650
0
    ReadSource read_source;
651
0
    for (auto& rowset : input_rowsets) {
652
0
        RowsetReaderSharedPtr rs_reader;
653
0
        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
654
0
        read_source.rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
655
0
    }
656
0
    read_source.fill_delete_predicates();
657
0
    reader_params->set_read_source(std::move(read_source));
658
659
0
    std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
660
0
    std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(),
661
0
                   [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
662
0
    TabletSchemaSPtr read_tablet_schema =
663
0
            tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
664
0
    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
665
0
    merge_tablet_schema->copy_from(*read_tablet_schema);
666
667
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
668
0
    for (auto& del_pred : reader_params->delete_predicates) {
669
0
        merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
670
0
    }
671
0
    reader_params->tablet_schema = merge_tablet_schema;
672
0
    if (tablet->enable_unique_key_merge_on_write()) {
673
0
        reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
674
0
    }
675
676
0
    reader_params->return_columns.resize(read_tablet_schema->num_columns());
677
0
    std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0);
678
0
    reader_params->origin_return_columns = &reader_params->return_columns;
679
680
0
    *block = read_tablet_schema->create_block();
681
682
0
    return Status::OK();
683
0
}
684
685
} // namespace doris