Coverage Report

Created: 2025-09-05 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/tablet_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/tablet_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <thrift/protocol/TDebugProtocol.h>
23
24
#include <algorithm>
25
#include <functional>
26
#include <iterator>
27
#include <memory>
28
#include <numeric>
29
#include <ostream>
30
#include <shared_mutex>
31
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/exception.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "exprs/bitmapfilter_predicate.h"
38
#include "exprs/bloom_filter_func.h"
39
#include "exprs/create_predicate_function.h"
40
#include "exprs/hybrid_set.h"
41
#include "olap/column_predicate.h"
42
#include "olap/itoken_extractor.h"
43
#include "olap/like_column_predicate.h"
44
#include "olap/olap_common.h"
45
#include "olap/olap_define.h"
46
#include "olap/predicate_creator.h"
47
#include "olap/row_cursor.h"
48
#include "olap/rowset/segment_v2/bloom_filter.h"
49
#include "olap/schema.h"
50
#include "olap/tablet.h"
51
#include "olap/tablet_meta.h"
52
#include "olap/tablet_schema.h"
53
#include "runtime/query_context.h"
54
#include "runtime/runtime_predicate.h"
55
#include "runtime/runtime_state.h"
56
#include "vec/common/arena.h"
57
#include "vec/common/schema_util.h"
58
#include "vec/core/block.h"
59
60
namespace doris {
61
#include "common/compile_check_begin.h"
62
using namespace ErrorCode;
63
64
359
void TabletReader::ReaderParams::check_validation() const {
65
359
    if (UNLIKELY(version.first == -1 && is_segcompaction == false)) {
66
0
        throw Exception(Status::FatalError("version is not set. tablet={}", tablet->tablet_id()));
67
0
    }
68
359
}
69
70
0
std::string TabletReader::ReaderParams::to_string() const {
71
0
    std::stringstream ss;
72
0
    ss << "tablet=" << tablet->tablet_id() << " reader_type=" << int(reader_type)
73
0
       << " aggregation=" << aggregation << " version=" << version
74
0
       << " start_key_include=" << start_key_include << " end_key_include=" << end_key_include;
75
76
0
    for (const auto& key : start_key) {
77
0
        ss << " keys=" << key;
78
0
    }
79
80
0
    for (const auto& key : end_key) {
81
0
        ss << " end_keys=" << key;
82
0
    }
83
84
0
    for (auto& condition : conditions) {
85
0
        ss << " conditions=" << apache::thrift::ThriftDebugString(condition.filter);
86
0
    }
87
88
0
    return ss.str();
89
0
}
90
91
0
std::string TabletReader::KeysParam::to_string() const {
92
0
    std::stringstream ss;
93
0
    ss << "start_key_include=" << start_key_include << " end_key_include=" << end_key_include;
94
95
0
    for (const auto& start_key : start_keys) {
96
0
        ss << " keys=" << start_key.to_string();
97
0
    }
98
0
    for (const auto& end_key : end_keys) {
99
0
        ss << " end_keys=" << end_key.to_string();
100
0
    }
101
102
0
    return ss.str();
103
0
}
104
105
337
void TabletReader::ReadSource::fill_delete_predicates() {
106
337
    DCHECK_EQ(delete_predicates.size(), 0);
107
1.07k
    for (auto&& split : rs_splits) {
108
1.07k
        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
109
1.07k
        if (rs_meta->has_delete_predicate()) {
110
149
            delete_predicates.push_back(rs_meta);
111
149
        }
112
1.07k
    }
113
337
}
114
115
378
TabletReader::~TabletReader() {
116
378
    for (auto* pred : _col_predicates) {
117
0
        delete pred;
118
0
    }
119
378
    for (auto* pred : _value_col_predicates) {
120
0
        delete pred;
121
0
    }
122
378
}
123
124
359
Status TabletReader::init(const ReaderParams& read_params) {
125
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_timer_ns);
126
127
359
    Status res = _init_params(read_params);
128
359
    if (!res.ok()) {
129
0
        LOG(WARNING) << "fail to init reader when init params. res:" << res
130
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
131
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
132
0
                     << ", reader type:" << int(read_params.reader_type)
133
0
                     << ", version:" << read_params.version;
134
0
    }
135
359
    return res;
136
359
}
137
138
// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
139
bool TabletReader::_optimize_for_single_rowset(
140
0
        const std::vector<RowsetReaderSharedPtr>& rs_readers) {
141
0
    bool has_delete_rowset = false;
142
0
    bool has_overlapping = false;
143
0
    int nonoverlapping_count = 0;
144
0
    for (const auto& rs_reader : rs_readers) {
145
0
        if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
146
0
            has_delete_rowset = true;
147
0
            break;
148
0
        }
149
0
        if (rs_reader->rowset()->rowset_meta()->num_rows() > 0) {
150
0
            if (rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
151
                // when there are overlapping segments, can not do directly read
152
0
                has_overlapping = true;
153
0
                break;
154
0
            } else if (++nonoverlapping_count > 1) {
155
0
                break;
156
0
            }
157
0
        }
158
0
    }
159
160
0
    return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset;
161
0
}
162
163
337
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
164
337
    SCOPED_RAW_TIMER(&_stats.tablet_reader_capture_rs_readers_timer_ns);
165
337
    if (read_params.rs_splits.empty()) {
166
0
        return Status::InternalError("fail to acquire data sources. tablet={}",
167
0
                                     _tablet->tablet_id());
168
0
    }
169
170
337
    bool eof = false;
171
337
    bool is_lower_key_included = _keys_param.start_key_include;
172
337
    bool is_upper_key_included = _keys_param.end_key_include;
173
174
337
    for (int i = 0; i < _keys_param.start_keys.size(); ++i) {
175
        // lower bound
176
0
        RowCursor& start_key = _keys_param.start_keys[i];
177
0
        RowCursor& end_key = _keys_param.end_keys[i];
178
179
0
        if (!is_lower_key_included) {
180
0
            if (compare_row_key(start_key, end_key) >= 0) {
181
0
                VLOG_NOTICE << "return EOF when lower key not include"
182
0
                            << ", start_key=" << start_key.to_string()
183
0
                            << ", end_key=" << end_key.to_string();
184
0
                eof = true;
185
0
                break;
186
0
            }
187
0
        } else {
188
0
            if (compare_row_key(start_key, end_key) > 0) {
189
0
                VLOG_NOTICE << "return EOF when lower key include="
190
0
                            << ", start_key=" << start_key.to_string()
191
0
                            << ", end_key=" << end_key.to_string();
192
0
                eof = true;
193
0
                break;
194
0
            }
195
0
        }
196
197
0
        _is_lower_keys_included.push_back(is_lower_key_included);
198
0
        _is_upper_keys_included.push_back(is_upper_key_included);
199
0
    }
200
201
337
    if (eof) {
202
0
        return Status::OK();
203
0
    }
204
205
337
    bool need_ordered_result = true;
206
337
    if (read_params.reader_type == ReaderType::READER_QUERY) {
207
0
        if (_tablet_schema->keys_type() == DUP_KEYS) {
208
            // duplicated keys are allowed, no need to merge sort keys in rowset
209
0
            need_ordered_result = false;
210
0
        }
211
0
        if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
212
0
            _tablet->enable_unique_key_merge_on_write()) {
213
            // unique keys with merge on write, no need to merge sort keys in rowset
214
0
            need_ordered_result = false;
215
0
        }
216
0
        if (_aggregation) {
217
            // compute engine will aggregate rows with the same key,
218
            // it's ok for rowset to return unordered result
219
0
            need_ordered_result = false;
220
0
        }
221
222
0
        if (_direct_mode) {
223
            // direct mode indicates that the storage layer does not need to merge,
224
            // it's ok for rowset to return unordered result
225
0
            need_ordered_result = false;
226
0
        }
227
228
0
        if (read_params.read_orderby_key) {
229
0
            need_ordered_result = true;
230
0
        }
231
0
    }
232
233
337
    _reader_context.reader_type = read_params.reader_type;
234
337
    _reader_context.version = read_params.version;
235
337
    _reader_context.tablet_schema = _tablet_schema;
236
337
    _reader_context.need_ordered_result = need_ordered_result;
237
337
    _reader_context.topn_filter_source_node_ids = read_params.topn_filter_source_node_ids;
238
337
    _reader_context.topn_filter_target_node_id = read_params.topn_filter_target_node_id;
239
337
    _reader_context.read_orderby_key_reverse = read_params.read_orderby_key_reverse;
240
337
    _reader_context.read_orderby_key_limit = read_params.read_orderby_key_limit;
241
337
    _reader_context.filter_block_conjuncts = read_params.filter_block_conjuncts;
242
337
    _reader_context.return_columns = &_return_columns;
243
337
    _reader_context.read_orderby_key_columns =
244
337
            !_orderby_key_columns.empty() ? &_orderby_key_columns : nullptr;
245
337
    _reader_context.predicates = &_col_predicates;
246
337
    _reader_context.value_predicates = &_value_col_predicates;
247
337
    _reader_context.lower_bound_keys = &_keys_param.start_keys;
248
337
    _reader_context.is_lower_keys_included = &_is_lower_keys_included;
249
337
    _reader_context.upper_bound_keys = &_keys_param.end_keys;
250
337
    _reader_context.is_upper_keys_included = &_is_upper_keys_included;
251
337
    _reader_context.delete_handler = &_delete_handler;
252
337
    _reader_context.stats = &_stats;
253
337
    _reader_context.use_page_cache = read_params.use_page_cache;
254
337
    _reader_context.sequence_id_idx = _sequence_col_idx;
255
337
    _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
256
337
    _reader_context.merged_rows = &_merged_rows;
257
337
    _reader_context.delete_bitmap = read_params.delete_bitmap;
258
337
    _reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write();
259
337
    _reader_context.record_rowids = read_params.record_rowids;
260
337
    _reader_context.rowid_conversion = read_params.rowid_conversion;
261
337
    _reader_context.is_key_column_group = read_params.is_key_column_group;
262
337
    _reader_context.remaining_conjunct_roots = read_params.remaining_conjunct_roots;
263
337
    _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
264
337
    _reader_context.output_columns = &read_params.output_columns;
265
337
    _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
266
337
    _reader_context.ttl_seconds = _tablet->ttl_seconds();
267
337
    _reader_context.score_runtime = read_params.score_runtime;
268
337
    _reader_context.collection_statistics = read_params.collection_statistics;
269
270
337
    _reader_context.virtual_column_exprs = read_params.virtual_column_exprs;
271
337
    _reader_context.vir_cid_to_idx_in_block = read_params.vir_cid_to_idx_in_block;
272
337
    _reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type;
273
337
    _reader_context.ann_topn_runtime = read_params.ann_topn_runtime;
274
275
337
    return Status::OK();
276
337
}
277
278
0
TabletColumn TabletReader::materialize_column(const TabletColumn& orig) {
279
0
    if (!orig.is_variant_type()) {
280
0
        return orig;
281
0
    }
282
0
    TabletColumn column_with_cast_type = orig;
283
0
    auto cast_type = _reader_context.target_cast_type_for_variants.at(orig.name());
284
0
    return vectorized::schema_util::get_column_by_type(
285
0
            cast_type, orig.name(),
286
0
            {
287
0
                    .unique_id = orig.unique_id(),
288
0
                    .parent_unique_id = orig.parent_unique_id(),
289
0
                    .path_info = *orig.path_info_ptr(),
290
0
            });
291
0
}
292
293
359
Status TabletReader::_init_params(const ReaderParams& read_params) {
294
359
    read_params.check_validation();
295
296
359
    _direct_mode = read_params.direct_mode;
297
359
    _aggregation = read_params.aggregation;
298
359
    _reader_type = read_params.reader_type;
299
359
    _tablet = read_params.tablet;
300
359
    _tablet_schema = read_params.tablet_schema;
301
359
    _reader_context.runtime_state = read_params.runtime_state;
302
359
    _reader_context.target_cast_type_for_variants = read_params.target_cast_type_for_variants;
303
304
359
    RETURN_IF_ERROR(_init_conditions_param(read_params));
305
306
359
    Status res = _init_delete_condition(read_params);
307
359
    if (!res.ok()) {
308
0
        LOG(WARNING) << "fail to init delete param. res = " << res;
309
0
        return res;
310
0
    }
311
312
359
    res = _init_return_columns(read_params);
313
359
    if (!res.ok()) {
314
0
        LOG(WARNING) << "fail to init return columns. res = " << res;
315
0
        return res;
316
0
    }
317
318
359
    res = _init_keys_param(read_params);
319
359
    if (!res.ok()) {
320
0
        LOG(WARNING) << "fail to init keys param. res=" << res;
321
0
        return res;
322
0
    }
323
359
    res = _init_orderby_keys_param(read_params);
324
359
    if (!res.ok()) {
325
0
        LOG(WARNING) << "fail to init orderby keys param. res=" << res;
326
0
        return res;
327
0
    }
328
359
    if (_tablet_schema->has_sequence_col()) {
329
8
        auto sequence_col_idx = _tablet_schema->sequence_col_idx();
330
8
        DCHECK_NE(sequence_col_idx, -1);
331
16
        for (auto col : _return_columns) {
332
            // query has sequence col
333
16
            if (col == sequence_col_idx) {
334
4
                _sequence_col_idx = sequence_col_idx;
335
4
                break;
336
4
            }
337
16
        }
338
8
    }
339
340
359
    return res;
341
359
}
342
343
359
Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
344
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_return_columns_timer_ns);
345
359
    if (read_params.reader_type == ReaderType::READER_QUERY) {
346
22
        _return_columns = read_params.return_columns;
347
22
        _tablet_columns_convert_to_null_set = read_params.tablet_columns_convert_to_null_set;
348
37
        for (auto id : read_params.return_columns) {
349
37
            if (_tablet_schema->column(id).is_key()) {
350
22
                _key_cids.push_back(id);
351
22
            } else {
352
15
                _value_cids.push_back(id);
353
15
            }
354
37
        }
355
337
    } else if (read_params.return_columns.empty()) {
356
0
        for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
357
0
            _return_columns.push_back(i);
358
0
            if (_tablet_schema->column(i).is_key()) {
359
0
                _key_cids.push_back(i);
360
0
            } else {
361
0
                _value_cids.push_back(i);
362
0
            }
363
0
        }
364
0
        VLOG_NOTICE << "return column is empty, using full column as default.";
365
337
    } else if ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
366
337
                read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
367
337
                read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
368
337
                read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
369
337
                read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
370
337
                read_params.reader_type == ReaderType::READER_ALTER_TABLE) &&
371
337
               !read_params.return_columns.empty()) {
372
337
        _return_columns = read_params.return_columns;
373
1.07k
        for (auto id : read_params.return_columns) {
374
1.07k
            if (_tablet_schema->column(id).is_key()) {
375
114
                _key_cids.push_back(id);
376
959
            } else {
377
959
                _value_cids.push_back(id);
378
959
            }
379
1.07k
        }
380
337
    } else if (read_params.reader_type == ReaderType::READER_CHECKSUM) {
381
0
        _return_columns = read_params.return_columns;
382
0
        for (auto id : read_params.return_columns) {
383
0
            if (_tablet_schema->column(id).is_key()) {
384
0
                _key_cids.push_back(id);
385
0
            } else {
386
0
                _value_cids.push_back(id);
387
0
            }
388
0
        }
389
0
    } else {
390
0
        return Status::Error<INVALID_ARGUMENT>(
391
0
                "fail to init return columns. reader_type={}, return_columns_size={}",
392
0
                int(read_params.reader_type), read_params.return_columns.size());
393
0
    }
394
395
359
    std::sort(_key_cids.begin(), _key_cids.end(), std::greater<>());
396
397
359
    return Status::OK();
398
359
}
399
400
359
Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
401
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_keys_param_timer_ns);
402
359
    if (read_params.start_key.empty()) {
403
359
        return Status::OK();
404
359
    }
405
406
0
    _keys_param.start_key_include = read_params.start_key_include;
407
0
    _keys_param.end_key_include = read_params.end_key_include;
408
409
0
    size_t start_key_size = read_params.start_key.size();
410
    //_keys_param.start_keys.resize(start_key_size);
411
0
    std::vector<RowCursor>(start_key_size).swap(_keys_param.start_keys);
412
413
0
    size_t scan_key_size = read_params.start_key.front().size();
414
0
    if (scan_key_size > _tablet_schema->num_columns()) {
415
0
        return Status::Error<INVALID_ARGUMENT>(
416
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
417
0
                "column_count={}, schema.num_columns={}",
418
0
                scan_key_size, _tablet_schema->num_columns());
419
0
    }
420
421
0
    std::vector<uint32_t> columns(scan_key_size);
422
0
    std::iota(columns.begin(), columns.end(), 0);
423
424
0
    std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet_schema->columns(), columns);
425
426
0
    for (size_t i = 0; i < start_key_size; ++i) {
427
0
        if (read_params.start_key[i].size() != scan_key_size) {
428
0
            return Status::Error<INVALID_ARGUMENT>(
429
0
                    "The start_key.at({}).size={}, not equals the scan_key_size={}", i,
430
0
                    read_params.start_key[i].size(), scan_key_size);
431
0
        }
432
433
0
        Status res = _keys_param.start_keys[i].init_scan_key(
434
0
                _tablet_schema, read_params.start_key[i].values(), schema);
435
0
        if (!res.ok()) {
436
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
437
0
            return res;
438
0
        }
439
0
        res = _keys_param.start_keys[i].from_tuple(read_params.start_key[i]);
440
0
        if (!res.ok()) {
441
0
            LOG(WARNING) << "fail to init row cursor from Keys. res=" << res << "key_index=" << i;
442
0
            return res;
443
0
        }
444
0
    }
445
446
0
    size_t end_key_size = read_params.end_key.size();
447
    //_keys_param.end_keys.resize(end_key_size);
448
0
    std::vector<RowCursor>(end_key_size).swap(_keys_param.end_keys);
449
0
    for (size_t i = 0; i < end_key_size; ++i) {
450
0
        if (read_params.end_key[i].size() != scan_key_size) {
451
0
            return Status::Error<INVALID_ARGUMENT>(
452
0
                    "The end_key.at({}).size={}, not equals the scan_key_size={}", i,
453
0
                    read_params.end_key[i].size(), scan_key_size);
454
0
        }
455
456
0
        Status res = _keys_param.end_keys[i].init_scan_key(_tablet_schema,
457
0
                                                           read_params.end_key[i].values(), schema);
458
0
        if (!res.ok()) {
459
0
            LOG(WARNING) << "fail to init row cursor. res = " << res;
460
0
            return res;
461
0
        }
462
463
0
        res = _keys_param.end_keys[i].from_tuple(read_params.end_key[i]);
464
0
        if (!res.ok()) {
465
0
            LOG(WARNING) << "fail to init row cursor from Keys. res=" << res << " key_index=" << i;
466
0
            return res;
467
0
        }
468
0
    }
469
470
    //TODO:check the valid of start_key and end_key.(eg. start_key <= end_key)
471
472
0
    return Status::OK();
473
0
}
474
475
359
Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) {
476
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_orderby_keys_param_timer_ns);
477
    // UNIQUE_KEYS will compare all keys as before
478
359
    if (_tablet_schema->keys_type() == DUP_KEYS || (_tablet_schema->keys_type() == UNIQUE_KEYS &&
479
299
                                                    _tablet->enable_unique_key_merge_on_write())) {
480
299
        if (!_tablet_schema->cluster_key_uids().empty()) {
481
0
            if (read_params.read_orderby_key_num_prefix_columns >
482
0
                _tablet_schema->cluster_key_uids().size()) {
483
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
484
0
                        "read_orderby_key_num_prefix_columns={} > cluster_keys.size()={}",
485
0
                        read_params.read_orderby_key_num_prefix_columns,
486
0
                        _tablet_schema->cluster_key_uids().size());
487
0
            }
488
0
            for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
489
0
                auto cid = _tablet_schema->cluster_key_uids()[i];
490
0
                auto index = _tablet_schema->field_index(cid);
491
0
                if (index < 0) {
492
0
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
493
0
                            "could not find cluster key column with unique_id=" +
494
0
                            std::to_string(cid) +
495
0
                            " in tablet schema, tablet_id=" + std::to_string(_tablet->tablet_id()));
496
0
                }
497
0
                for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
498
0
                    if (_return_columns[idx] == index) {
499
0
                        _orderby_key_columns.push_back(idx);
500
0
                        break;
501
0
                    }
502
0
                }
503
0
            }
504
299
        } else {
505
            // find index in vector _return_columns
506
            //   for the read_orderby_key_num_prefix_columns orderby keys
507
299
            for (uint32_t i = 0; i < read_params.read_orderby_key_num_prefix_columns; i++) {
508
0
                for (uint32_t idx = 0; idx < _return_columns.size(); idx++) {
509
0
                    if (_return_columns[idx] == i) {
510
0
                        _orderby_key_columns.push_back(idx);
511
0
                        break;
512
0
                    }
513
0
                }
514
0
            }
515
299
        }
516
299
        if (read_params.read_orderby_key_num_prefix_columns != _orderby_key_columns.size()) {
517
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
518
0
                    "read_orderby_key_num_prefix_columns != _orderby_key_columns.size, "
519
0
                    "read_params.read_orderby_key_num_prefix_columns={}, "
520
0
                    "_orderby_key_columns.size()={}",
521
0
                    read_params.read_orderby_key_num_prefix_columns, _orderby_key_columns.size());
522
0
        }
523
299
    }
524
525
359
    return Status::OK();
526
359
}
527
528
359
Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
529
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns);
530
359
    std::vector<ColumnPredicate*> predicates;
531
532
1.07k
    auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
533
1.07k
        for (const auto& param : params) {
534
0
            ColumnPredicate* predicate = _parse_to_predicate({param.column_name, param.filter});
535
0
            predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter,
536
0
                                              param.input_rows_counter);
537
0
            predicates.emplace_back(predicate);
538
0
        }
539
1.07k
    };
tablet_reader.cpp:_ZZN5doris12TabletReader22_init_conditions_paramERKNS0_12ReaderParamsEENK3$_0clIKSt6vectorINS_15FilterOlapParamISt10shared_ptrINS_19BloomFilterFuncBaseEEEESaISB_EEEEDaRT_
Line
Count
Source
532
359
    auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
533
359
        for (const auto& param : params) {
534
0
            ColumnPredicate* predicate = _parse_to_predicate({param.column_name, param.filter});
535
0
            predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter,
536
0
                                              param.input_rows_counter);
537
0
            predicates.emplace_back(predicate);
538
0
        }
539
359
    };
tablet_reader.cpp:_ZZN5doris12TabletReader22_init_conditions_paramERKNS0_12ReaderParamsEENK3$_0clIKSt6vectorINS_15FilterOlapParamISt10shared_ptrINS_20BitmapFilterFuncBaseEEEESaISB_EEEEDaRT_
Line
Count
Source
532
359
    auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
533
359
        for (const auto& param : params) {
534
0
            ColumnPredicate* predicate = _parse_to_predicate({param.column_name, param.filter});
535
0
            predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter,
536
0
                                              param.input_rows_counter);
537
0
            predicates.emplace_back(predicate);
538
0
        }
539
359
    };
tablet_reader.cpp:_ZZN5doris12TabletReader22_init_conditions_paramERKNS0_12ReaderParamsEENK3$_0clIKSt6vectorINS_15FilterOlapParamISt10shared_ptrINS_13HybridSetBaseEEEESaISB_EEEEDaRT_
Line
Count
Source
532
359
    auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
533
359
        for (const auto& param : params) {
534
0
            ColumnPredicate* predicate = _parse_to_predicate({param.column_name, param.filter});
535
0
            predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter,
536
0
                                              param.input_rows_counter);
537
0
            predicates.emplace_back(predicate);
538
0
        }
539
359
    };
540
541
359
    for (const auto& param : read_params.conditions) {
542
0
        TCondition tmp_cond = param.filter;
543
0
        RETURN_IF_ERROR(_tablet_schema->have_column(tmp_cond.column_name));
544
        // The "column" parameter might represent a column resulting from the decomposition of a variant column.
545
        // Instead of using a "unique_id" for identification, we are utilizing a "path" to denote this column.
546
0
        const auto& column = *DORIS_TRY(_tablet_schema->column(tmp_cond.column_name));
547
0
        const auto& mcolumn = materialize_column(column);
548
0
        uint32_t index = _tablet_schema->field_index(tmp_cond.column_name);
549
0
        ColumnPredicate* predicate = parse_to_predicate(mcolumn, index, tmp_cond, _predicate_arena);
550
        // record condition value into predicate_params in order to pushdown segment_iterator,
551
        // _gen_predicate_result_sign will build predicate result unique sign with condition value
552
0
        predicate->attach_profile_counter(param.runtime_filter_id, param.filtered_rows_counter,
553
0
                                          param.input_rows_counter);
554
0
        predicates.emplace_back(predicate);
555
0
    }
556
359
    parse_and_emplace_predicates(read_params.bloom_filters);
557
359
    parse_and_emplace_predicates(read_params.bitmap_filters);
558
359
    parse_and_emplace_predicates(read_params.in_filters);
559
560
    // Function filter push down to storage engine
561
359
    auto is_like_predicate = [](ColumnPredicate* _pred) {
562
0
        return dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(_pred) != nullptr ||
563
0
               dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(_pred) != nullptr;
564
0
    };
565
566
359
    for (const auto& filter : read_params.function_filters) {
567
0
        predicates.emplace_back(_parse_to_predicate(filter));
568
0
        auto* pred = predicates.back();
569
570
0
        const auto& col = _tablet_schema->column(pred->column_id());
571
0
        const auto* tablet_index = _tablet_schema->get_ngram_bf_index(col.unique_id());
572
0
        if (is_like_predicate(pred) && tablet_index && config::enable_query_like_bloom_filter) {
573
0
            std::unique_ptr<segment_v2::BloomFilter> ng_bf;
574
0
            std::string pattern = pred->get_search_str();
575
0
            auto gram_bf_size = tablet_index->get_gram_bf_size();
576
0
            auto gram_size = tablet_index->get_gram_size();
577
578
0
            RETURN_IF_ERROR(segment_v2::BloomFilter::create(segment_v2::NGRAM_BLOOM_FILTER, &ng_bf,
579
0
                                                            gram_bf_size));
580
0
            NgramTokenExtractor _token_extractor(gram_size);
581
582
0
            if (_token_extractor.string_like_to_bloom_filter(pattern.data(), pattern.length(),
583
0
                                                             *ng_bf)) {
584
0
                pred->set_page_ng_bf(std::move(ng_bf));
585
0
            }
586
0
        }
587
0
    }
588
589
359
    for (auto* predicate : predicates) {
590
0
        auto column = _tablet_schema->column(predicate->column_id());
591
0
        if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
592
0
            _value_col_predicates.push_back(predicate);
593
0
        } else {
594
0
            _col_predicates.push_back(predicate);
595
0
        }
596
0
    }
597
598
359
    for (int id : read_params.topn_filter_source_node_ids) {
599
0
        auto& runtime_predicate =
600
0
                read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
601
0
        RETURN_IF_ERROR(runtime_predicate.set_tablet_schema(read_params.topn_filter_target_node_id,
602
0
                                                            _tablet_schema));
603
0
    }
604
359
    return Status::OK();
605
359
}
606
607
ColumnPredicate* TabletReader::_parse_to_predicate(
608
0
        const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
609
0
    int32_t index = _tablet_schema->field_index(bloom_filter.first);
610
0
    if (index < 0) {
611
0
        return nullptr;
612
0
    }
613
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
614
0
    return create_column_predicate(index, bloom_filter.second, column.type(), &column);
615
0
}
616
617
ColumnPredicate* TabletReader::_parse_to_predicate(
618
0
        const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter) {
619
0
    int32_t index = _tablet_schema->field_index(in_filter.first);
620
0
    if (index < 0) {
621
0
        return nullptr;
622
0
    }
623
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
624
0
    return create_column_predicate(index, in_filter.second, column.type(), &column);
625
0
}
626
627
ColumnPredicate* TabletReader::_parse_to_predicate(
628
0
        const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter) {
629
0
    int32_t index = _tablet_schema->field_index(bitmap_filter.first);
630
0
    if (index < 0) {
631
0
        return nullptr;
632
0
    }
633
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
634
0
    return create_column_predicate(index, bitmap_filter.second, column.type(), &column);
635
0
}
636
637
0
ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& function_filter) {
638
0
    int32_t index = _tablet_schema->field_index(function_filter._col_name);
639
0
    if (index < 0) {
640
0
        return nullptr;
641
0
    }
642
0
    const TabletColumn& column = materialize_column(_tablet_schema->column(index));
643
0
    return create_column_predicate(index, std::make_shared<FunctionFilter>(function_filter),
644
0
                                   column.type(), &column);
645
0
}
646
647
359
Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
648
359
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_delete_condition_param_timer_ns);
649
    // If it's cumu and not allow do delete when cumu
650
359
    if (read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
651
359
        (read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
652
359
         !config::enable_delete_when_cumu_compaction)) {
653
0
        return Status::OK();
654
0
    }
655
359
    bool cumu_delete = read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
656
359
                       config::enable_delete_when_cumu_compaction;
657
    // Delete sign could not be applied when delete on cumu compaction is enabled, bucause it is meant for delete with predicates.
658
    // If delete design is applied on cumu compaction, it will lose effect when doing base compaction.
659
    // `_delete_sign_available` indicates the condition where we could apply delete signs to data.
660
359
    _delete_sign_available = (((read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
661
359
                                read_params.reader_type == ReaderType::READER_FULL_COMPACTION) &&
662
359
                               config::enable_prune_delete_sign_when_base_compaction) ||
663
359
                              read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
664
359
                              read_params.reader_type == ReaderType::READER_CHECKSUM);
665
666
    // `_filter_delete` indicates the condition where we should execlude deleted tuples when reading data.
667
    // However, queries will not use this condition but generate special where predicates to filter data.
668
    // (Though a lille bit confused, it is how the current logic working...)
669
359
    _filter_delete = _delete_sign_available || cumu_delete;
670
359
    return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
671
359
                                read_params.version.second);
672
359
}
673
674
Status TabletReader::init_reader_params_and_create_block(
675
        TabletSharedPtr tablet, ReaderType reader_type,
676
        const std::vector<RowsetSharedPtr>& input_rowsets,
677
0
        TabletReader::ReaderParams* reader_params, vectorized::Block* block) {
678
0
    reader_params->tablet = tablet;
679
0
    reader_params->reader_type = reader_type;
680
0
    reader_params->version =
681
0
            Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version());
682
683
0
    ReadSource read_source;
684
0
    for (const auto& rowset : input_rowsets) {
685
0
        RowsetReaderSharedPtr rs_reader;
686
0
        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
687
0
        read_source.rs_splits.emplace_back(std::move(rs_reader));
688
0
    }
689
0
    read_source.fill_delete_predicates();
690
0
    reader_params->set_read_source(std::move(read_source));
691
692
0
    std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
693
0
    std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(),
694
0
                   [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
695
0
    TabletSchemaSPtr read_tablet_schema =
696
0
            tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
697
0
    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
698
0
    merge_tablet_schema->copy_from(*read_tablet_schema);
699
700
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
701
0
    for (auto& del_pred : reader_params->delete_predicates) {
702
0
        merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
703
0
    }
704
0
    reader_params->tablet_schema = merge_tablet_schema;
705
0
    if (tablet->enable_unique_key_merge_on_write()) {
706
0
        reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
707
0
    }
708
709
0
    reader_params->return_columns.resize(read_tablet_schema->num_columns());
710
0
    std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0);
711
0
    reader_params->origin_return_columns = &reader_params->return_columns;
712
713
0
    *block = read_tablet_schema->create_block();
714
715
0
    return Status::OK();
716
0
}
717
718
#include "common/compile_check_end.h"
719
} // namespace doris