Coverage Report

Created: 2026-06-12 03:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/olap_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/olap_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <stdlib.h>
25
#include <thrift/protocol/TDebugProtocol.h>
26
27
#include <algorithm>
28
#include <atomic>
29
#include <iterator>
30
#include <ostream>
31
#include <set>
32
33
#include "cloud/cloud_storage_engine.h"
34
#include "cloud/cloud_tablet_hotspot.h"
35
#include "cloud/config.h"
36
#include "common/config.h"
37
#include "common/consts.h"
38
#include "common/logging.h"
39
#include "common/metrics/doris_metrics.h"
40
#include "core/block/block.h"
41
#include "exec/common/variant_util.h"
42
#include "exec/operator/olap_scan_operator.h"
43
#include "exec/scan/scan_node.h"
44
#include "exprs/function_filter.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "io/cache/block_file_cache_profile.h"
48
#include "io/io_common.h"
49
#include "runtime/descriptors.h"
50
#include "runtime/exec_env.h"
51
#include "runtime/runtime_profile.h"
52
#include "runtime/runtime_state.h"
53
#include "service/backend_options.h"
54
#include "storage/id_manager.h"
55
#include "storage/index/inverted/inverted_index_profile.h"
56
#include "storage/iterator/block_reader.h"
57
#include "storage/olap_common.h"
58
#include "storage/olap_tuple.h"
59
#include "storage/olap_utils.h"
60
#include "storage/storage_engine.h"
61
#include "storage/tablet/tablet_schema.h"
62
#ifndef NDEBUG
63
#include "util/debug_points.h"
64
#endif
65
#include "util/json/path_in_data.h"
66
67
namespace doris {
68
#include "common/compile_check_avoid_begin.h"
69
70
using ReadSource = TabletReadSource;
71
72
OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& params)
73
277k
        : Scanner(params.state, parent, params.limit, params.profile),
74
277k
          _key_ranges(std::move(params.key_ranges)),
75
277k
          _tablet_reader_params({.tablet = std::move(params.tablet),
76
277k
                                 .tablet_schema {},
77
277k
                                 .reader_type = params.read_row_binlog ? ReaderType::READER_BINLOG
78
277k
                                                                       : ReaderType::READER_QUERY,
79
277k
                                 .aggregation = params.aggregation,
80
277k
                                 .version = {0, params.version},
81
277k
                                 .start_key {},
82
277k
                                 .end_key {},
83
277k
                                 .predicates {},
84
277k
                                 .function_filters {},
85
277k
                                 .delete_predicates {},
86
277k
                                 .target_cast_type_for_variants {},
87
277k
                                 .all_access_paths {},
88
277k
                                 .predicate_access_paths {},
89
277k
                                 .rs_splits {},
90
277k
                                 .return_columns {},
91
277k
                                 .output_columns {},
92
277k
                                 .common_expr_ctxs_push_down {},
93
277k
                                 .topn_filter_source_node_ids {},
94
277k
                                 .key_group_cluster_key_idxes {},
95
277k
                                 .virtual_column_exprs {},
96
277k
                                 .vir_cid_to_idx_in_block {},
97
277k
                                 .vir_col_idx_to_type {},
98
277k
                                 .score_runtime {},
99
277k
                                 .collection_statistics {},
100
277k
                                 .ann_topn_runtime {},
101
277k
                                 .condition_cache_digest = parent->get_condition_cache_digest()}) {
102
277k
    _tablet_reader_params.set_read_source(std::move(params.read_source),
103
277k
                                          _state->skip_delete_bitmap());
104
277k
    _has_prepared = false;
105
277k
    _vector_search_params = params.state->get_vector_search_params();
106
277k
}
107
108
static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
109
54
                                          const std::vector<uint32_t>& read_columns) {
110
    // avoid too long for one line,
111
    // it is hard to display in `show profile` stmt if one line is too long.
112
54
    const int col_per_line = 10;
113
54
    int i = 0;
114
54
    std::string read_columns_string;
115
54
    read_columns_string += "[";
116
274
    for (auto it = read_columns.cbegin(); it != read_columns.cend(); it++) {
117
220
        if (it != read_columns.cbegin()) {
118
166
            read_columns_string += ", ";
119
166
        }
120
220
        read_columns_string += tablet_schema->columns().at(*it)->name();
121
220
        if (i >= col_per_line) {
122
10
            read_columns_string += "\n";
123
10
            i = 0;
124
210
        } else {
125
210
            ++i;
126
210
        }
127
220
    }
128
54
    read_columns_string += "]";
129
54
    return read_columns_string;
130
54
}
131
132
566k
static bool has_file_cache_statistics(const io::FileCacheStatistics& stats) {
133
566k
    return stats.num_local_io_total != 0 || stats.num_remote_io_total != 0 ||
134
566k
           stats.num_peer_io_total != 0 || stats.local_io_timer != 0 ||
135
566k
           stats.bytes_read_from_local != 0 || stats.bytes_read_from_remote != 0 ||
136
566k
           stats.bytes_read_from_peer != 0 || stats.remote_io_timer != 0 ||
137
566k
           stats.peer_io_timer != 0 || stats.remote_wait_timer != 0 ||
138
566k
           stats.write_cache_io_timer != 0 || stats.bytes_write_into_cache != 0 ||
139
566k
           stats.num_skip_cache_io_total != 0 || stats.read_cache_file_directly_timer != 0 ||
140
566k
           stats.cache_get_or_set_timer != 0 || stats.lock_wait_timer != 0 ||
141
566k
           stats.get_timer != 0 || stats.set_timer != 0 ||
142
566k
           stats.inverted_index_num_local_io_total != 0 ||
143
566k
           stats.inverted_index_num_remote_io_total != 0 ||
144
566k
           stats.inverted_index_num_peer_io_total != 0 ||
145
566k
           stats.inverted_index_bytes_read_from_local != 0 ||
146
566k
           stats.inverted_index_bytes_read_from_remote != 0 ||
147
566k
           stats.inverted_index_bytes_read_from_peer != 0 ||
148
566k
           stats.inverted_index_local_io_timer != 0 || stats.inverted_index_remote_io_timer != 0 ||
149
566k
           stats.inverted_index_peer_io_timer != 0 || stats.inverted_index_io_timer != 0;
150
566k
}
151
152
276k
Status OlapScanner::_prepare_impl() {
153
276k
    auto* local_state = static_cast<OlapScanLocalState*>(_local_state);
154
276k
    auto& tablet = _tablet_reader_params.tablet;
155
276k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
156
276k
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
157
276k
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230 init")
158
276k
                .tag("tablet_id", tablet->tablet_id());
159
276k
        return Status::Error<false>(-230, "injected error");
160
276k
    });
161
162
276k
    for (auto& ctx : local_state->_common_expr_ctxs_push_down) {
163
9.88k
        VExprContextSPtr context;
164
9.88k
        RETURN_IF_ERROR(ctx->clone(_state, context));
165
9.88k
        _common_expr_ctxs_push_down.emplace_back(context);
166
9.88k
        context->prepare_ann_range_search(_vector_search_params);
167
9.88k
    }
168
169
276k
    for (auto pair : local_state->_slot_id_to_virtual_column_expr) {
170
        // Scanner will be executed in a different thread, so we need to clone the context.
171
274
        VExprContextSPtr context;
172
274
        RETURN_IF_ERROR(pair.second->clone(_state, context));
173
274
        _slot_id_to_virtual_column_expr[pair.first] = context;
174
274
    }
175
176
276k
    _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
177
276k
    _slot_id_to_col_type = local_state->_slot_id_to_col_type;
178
276k
    _score_runtime = local_state->_score_runtime;
179
    // All scanners share the same ann_topn_runtime.
180
276k
    _ann_topn_runtime = local_state->_ann_topn_runtime;
181
182
    // set limit to reduce end of rowset and segment mem use
183
276k
    _tablet_reader = std::make_unique<BlockReader>();
184
    // batch size is passed down to segment iterator, use _state->batch_size()
185
    // instead of _parent->limit(), because if _parent->limit() is a very small
186
    // value (e.g. select a from t where a .. and b ... limit 1),
187
    // it will be very slow when reading data in segment iterator
188
276k
    _tablet_reader->set_batch_size(_state->batch_size());
189
    // Adaptive batch size: pass byte-budget settings to the storage reader.
190
    // The reader still uses batch_size() as the row ceiling.
191
276k
    _tablet_reader->set_preferred_block_size_bytes(_state->preferred_block_size_bytes());
192
276k
    {
193
276k
        TOlapScanNode& olap_scan_node = local_state->olap_scan_node();
194
276k
        TabletSchemaSPtr source_tablet_schema =
195
276k
                _tablet_reader_params.reader_type == ReaderType::READER_BINLOG
196
276k
                        ? tablet->row_binlog_tablet_schema()
197
276k
                        : tablet->tablet_schema();
198
199
276k
        tablet_schema = std::make_shared<TabletSchema>();
200
276k
        tablet_schema->copy_from(*source_tablet_schema);
201
276k
        if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() &&
202
276k
            olap_scan_node.columns_desc[0].col_unique_id >= 0) {
203
276k
            tablet_schema->clear_columns();
204
4.11M
            for (const auto& column_desc : olap_scan_node.columns_desc) {
205
4.11M
                tablet_schema->append_column(TabletColumn(column_desc));
206
4.11M
            }
207
276k
            if (olap_scan_node.__isset.schema_version) {
208
276k
                tablet_schema->set_schema_version(olap_scan_node.schema_version);
209
276k
            }
210
276k
        }
211
276k
        if (olap_scan_node.__isset.indexes_desc) {
212
276k
            tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc);
213
276k
        }
214
215
276k
        if (_tablet_reader_params.rs_splits.empty()) {
216
            // Non-pipeline mode, Tablet : Scanner = 1 : 1
217
            // acquire tablet rowset readers at the beginning of the scan node
218
            // to prevent this case: when there are lots of olap scanners to run for example 10000
219
            // the rowsets maybe compacted when the last olap scanner starts
220
0
            ReadSource read_source;
221
222
0
            if (config::is_cloud_mode()) {
223
                // FIXME(plat1ko): Avoid pointer cast
224
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
225
0
            }
226
227
0
            auto maybe_read_source = tablet->capture_read_source(
228
0
                    _tablet_reader_params.version,
229
0
                    {
230
0
                            .skip_missing_versions = _state->skip_missing_version(),
231
0
                            .enable_fetch_rowsets_from_peers =
232
0
                                    config::enable_fetch_rowsets_from_peer_replicas,
233
0
                            .capture_row_binlog =
234
0
                                    _tablet_reader_params.reader_type == ReaderType::READER_BINLOG,
235
0
                            .enable_prefer_cached_rowset =
236
0
                                    config::is_cloud_mode() ? _state->enable_prefer_cached_rowset()
237
0
                                                            : false,
238
0
                            .query_freshness_tolerance_ms =
239
0
                                    config::is_cloud_mode() ? _state->query_freshness_tolerance_ms()
240
0
                                                            : -1,
241
0
                    });
242
0
            if (!maybe_read_source) {
243
0
                LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error();
244
0
                return maybe_read_source.error();
245
0
            }
246
0
            read_source = std::move(maybe_read_source.value());
247
248
0
            if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) {
249
0
                LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}",
250
0
                         tablet->tablet_id(), print_id(_state->query_id()));
251
0
            }
252
253
0
            if (!_state->skip_delete_predicate()) {
254
0
                read_source.fill_delete_predicates();
255
0
            }
256
0
            _tablet_reader_params.set_read_source(std::move(read_source));
257
0
        }
258
259
        // Initialize tablet_reader_params
260
276k
        RETURN_IF_ERROR(_init_tablet_reader_params(
261
276k
                local_state->_parent->cast<OlapScanOperatorX>()._slot_id_to_slot_desc, _key_ranges,
262
276k
                local_state->_slot_id_to_predicates, local_state->_push_down_functions));
263
276k
    }
264
265
    // add read columns in profile
266
276k
    if (_state->enable_profile()) {
267
54
        _profile->add_info_string("ReadColumns",
268
54
                                  read_columns_to_string(tablet_schema, _return_columns));
269
54
    }
270
271
276k
    if (_tablet_reader_params.score_runtime) {
272
0
        SCOPED_TIMER(local_state->_statistics_collect_timer);
273
0
        _tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>();
274
275
0
        io::IOContext io_ctx {
276
0
                .reader_type = _tablet_reader_params.reader_type,
277
0
                .expiration_time = tablet->ttl_seconds(),
278
0
                .query_id = &_state->query_id(),
279
0
                .file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
280
0
                .is_inverted_index = true,
281
0
        };
282
283
0
        RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
284
0
                _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema,
285
0
                _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
286
0
    }
287
288
276k
    _has_prepared = true;
289
276k
    return Status::OK();
290
276k
}
291
292
276k
Status OlapScanner::_open_impl(RuntimeState* state) {
293
276k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
294
276k
    SCOPED_TIMER(_local_state->cast<OlapScanLocalState>()._reader_init_timer);
295
296
276k
    auto res = _tablet_reader->init(_tablet_reader_params);
297
276k
    if (!res.ok()) {
298
9
        res.append("failed to initialize storage reader. tablet=" +
299
9
                   std::to_string(_tablet_reader_params.tablet->tablet_id()) +
300
9
                   ", backend=" + BackendOptions::get_localhost());
301
9
        return res;
302
9
    }
303
304
    // Do not hold rs_splits any more to release memory.
305
276k
    _tablet_reader_params.rs_splits.clear();
306
307
276k
    return Status::OK();
308
276k
}
309
310
// it will be called under tablet read lock because capture rs readers need
311
Status OlapScanner::_init_tablet_reader_params(
312
        const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc,
313
        const std::vector<OlapScanRange*>& key_ranges,
314
        const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
315
                slot_to_predicates,
316
276k
        const std::vector<FunctionFilter>& function_filters) {
317
    // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
318
276k
    const bool single_version = _tablet_reader_params.has_single_version();
319
320
276k
    auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state);
321
276k
    bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
322
276k
                           olap_local_state->olap_scan_node().read_mor_as_dup;
323
276k
    if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
324
34
        _tablet_reader_params.direct_mode = true;
325
34
        _tablet_reader_params.aggregation = true;
326
276k
    } else {
327
276k
        auto push_down_agg_type = _local_state->get_push_down_agg_type();
328
276k
        _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version ||
329
276k
                                            (push_down_agg_type != TPushAggOp::NONE &&
330
3.67k
                                             push_down_agg_type != TPushAggOp::COUNT_ON_INDEX);
331
276k
    }
332
333
276k
    RETURN_IF_ERROR(_init_variant_columns());
334
276k
    RETURN_IF_ERROR(_init_return_columns());
335
336
276k
    _tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type();
337
338
276k
    _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down;
339
276k
    _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
340
276k
    _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
341
276k
    _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
342
276k
    _tablet_reader_params.score_runtime = _score_runtime;
343
276k
    _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids;
344
276k
    _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
345
276k
    for (const auto& ele : ((OlapScanLocalState*)_local_state)->_cast_types_for_variants) {
346
1.03k
        _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second;
347
1.03k
    };
348
276k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
349
2.47M
    for (auto& predicates : slot_to_predicates) {
350
2.47M
        const int sid = predicates.first;
351
2.47M
        DCHECK(slot_id_to_slot_desc.contains(sid));
352
2.47M
        int32_t index =
353
2.47M
                tablet_schema->field_index(slot_id_to_slot_desc.find(sid)->second->col_name());
354
2.47M
        if (index < 0) {
355
0
            throw Exception(
356
0
                    Status::InternalError("Column {} not found in tablet schema",
357
0
                                          slot_id_to_slot_desc.find(sid)->second->col_name()));
358
0
        }
359
2.47M
        for (auto& predicate : predicates.second) {
360
275k
            _tablet_reader_params.predicates.push_back(predicate->clone(index));
361
275k
        }
362
2.47M
    }
363
364
276k
    std::copy(function_filters.cbegin(), function_filters.cend(),
365
276k
              std::inserter(_tablet_reader_params.function_filters,
366
276k
                            _tablet_reader_params.function_filters.begin()));
367
368
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
369
276k
    for (auto& del_pred : _tablet_reader_params.delete_predicates) {
370
646
        tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
371
646
    }
372
373
    // Push key ranges to the tablet reader.
374
    // Skip the "full scan" placeholder (has_lower_bound == false) — when no key
375
    // predicates exist, start_key/end_key remain empty and the reader does a full scan.
376
380k
    for (auto* key_range : key_ranges) {
377
380k
        if (!key_range->has_lower_bound) {
378
31.7k
            continue;
379
31.7k
        }
380
381
349k
        _tablet_reader_params.start_key_include = key_range->begin_include;
382
349k
        _tablet_reader_params.end_key_include = key_range->end_include;
383
384
349k
        _tablet_reader_params.start_key.push_back(key_range->begin_scan_range);
385
349k
        _tablet_reader_params.end_key.push_back(key_range->end_scan_range);
386
349k
    }
387
388
276k
    _tablet_reader_params.profile = _local_state->custom_profile();
389
276k
    _tablet_reader_params.runtime_state = _state;
390
391
276k
    _tablet_reader_params.origin_return_columns = &_return_columns;
392
276k
    _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set;
393
394
276k
    if (_tablet_reader_params.direct_mode) {
395
272k
        _tablet_reader_params.return_columns = _return_columns;
396
272k
    } else {
397
        // we need to fetch all key columns to do the right aggregation on storage engine side.
398
15.4k
        for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) {
399
11.3k
            _tablet_reader_params.return_columns.push_back(i);
400
11.3k
        }
401
22.0k
        for (auto index : _return_columns) {
402
22.0k
            if (tablet_schema->column(index).is_key()) {
403
7.65k
                continue;
404
7.65k
            }
405
14.3k
            _tablet_reader_params.return_columns.push_back(index);
406
14.3k
        }
407
        // expand the sequence column
408
4.02k
        if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
409
2
            bool has_replace_col = false;
410
6
            for (auto col : _return_columns) {
411
6
                if (tablet_schema->column(col).aggregation() ==
412
6
                    FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
413
2
                    has_replace_col = true;
414
2
                    break;
415
2
                }
416
6
            }
417
2
            if (auto sequence_col_idx = tablet_schema->sequence_col_idx();
418
2
                has_replace_col && tablet_schema->has_sequence_col() &&
419
2
                std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) ==
420
0
                        _return_columns.end()) {
421
0
                _tablet_reader_params.return_columns.push_back(sequence_col_idx);
422
0
            }
423
2
            if (has_replace_col) {
424
2
                const auto& val_to_seq = tablet_schema->value_col_idx_to_seq_col_idx();
425
2
                std::set<uint32_t> return_seq_columns;
426
427
12
                for (auto col : _tablet_reader_params.return_columns) {
428
                    // we need to add the necessary sequence column in _return_columns, and
429
                    // Avoid adding the same seq column twice
430
12
                    const auto val_iter = val_to_seq.find(col);
431
12
                    if (val_iter != val_to_seq.end()) {
432
8
                        auto seq = val_iter->second;
433
8
                        if (std::find(_tablet_reader_params.return_columns.begin(),
434
8
                                      _tablet_reader_params.return_columns.end(),
435
8
                                      seq) == _tablet_reader_params.return_columns.end()) {
436
0
                            return_seq_columns.insert(seq);
437
0
                        }
438
8
                    }
439
12
                }
440
2
                _tablet_reader_params.return_columns.insert(
441
2
                        std::end(_tablet_reader_params.return_columns),
442
2
                        std::begin(return_seq_columns), std::end(return_seq_columns));
443
2
            }
444
2
        }
445
4.02k
    }
446
447
276k
    _tablet_reader_params.use_page_cache = _state->enable_page_cache();
448
449
276k
    DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK);
450
451
276k
    if (!_state->skip_storage_engine_merge()) {
452
276k
        auto* olap_scan_local_state = (OlapScanLocalState*)_local_state;
453
276k
        TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node();
454
455
        // Set MOR value predicate pushdown flag
456
276k
        if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown &&
457
276k
            olap_scan_node.enable_mor_value_predicate_pushdown) {
458
0
            _tablet_reader_params.enable_mor_value_predicate_pushdown = true;
459
0
        }
460
461
276k
        const bool has_key_topn =
462
276k
                olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty();
463
276k
        if (has_key_topn) {
464
845
            _limit = _local_state->limit_per_scanner();
465
845
        }
466
467
276k
        const bool no_runtime_filters = _total_rf_num == 0;
468
276k
        const bool segment_limit_enabled = _state->enable_segment_limit_pushdown();
469
276k
        const bool storage_no_merge = olap_scan_local_state->_storage_no_merge();
470
471
276k
        if (_limit > 0 && no_runtime_filters && segment_limit_enabled && storage_no_merge) {
472
1.17k
            for (const auto& conjunct : _conjuncts) {
473
0
                DORIS_CHECK(!olap_scan_local_state->_check_expr_storage_filter(
474
0
                        conjunct->root(), OlapScanLocalState::ExprStorageFilterCheckMode::
475
0
                                                  HAS_SEGMENT_EVALUABLE_EXPR));
476
0
            }
477
1.17k
        }
478
479
        // Segment LIMIT has only two legal states: completely disabled, or enabled after every
480
        // row-filtering conjunct has become a storage predicate or SegmentIterator common expr.
481
276k
        const bool can_push_down_segment_limit = _limit > 0 && no_runtime_filters &&
482
276k
                                                 _conjuncts.empty() && segment_limit_enabled &&
483
276k
                                                 storage_no_merge;
484
276k
        if (can_push_down_segment_limit) {
485
1.17k
            if (has_key_topn) {
486
763
                _tablet_reader_params.read_orderby_key = true;
487
763
                if (!olap_scan_node.sort_info.is_asc_order[0]) {
488
291
                    _tablet_reader_params.read_orderby_key_reverse = true;
489
291
                }
490
763
                _tablet_reader_params.read_orderby_key_num_prefix_columns =
491
763
                        olap_scan_node.sort_info.is_asc_order.size();
492
763
                _tablet_reader_params.read_orderby_key_limit = _limit;
493
763
            } else {
494
415
                _tablet_reader_params.general_read_limit = _limit;
495
415
            }
496
1.17k
        }
497
498
276k
        if (_tablet_reader_params.read_orderby_key_limit > 0 ||
499
276k
            _tablet_reader_params.general_read_limit > 0) {
500
1.17k
            DORIS_CHECK(can_push_down_segment_limit);
501
1.17k
            DORIS_CHECK(_conjuncts.empty());
502
1.17k
        }
503
504
        // A key TopN scan cannot share the plain LIMIT early-stop counter. If
505
        // storage TopN is pushed down, each scanner must produce its full local
506
        // candidates. If it is not pushed down for any reason, the upper TopN
507
        // still needs all rows from the scan.
508
276k
        if (has_key_topn) {
509
845
            _shared_scan_limit = nullptr;
510
845
            if (_tablet_reader_params.read_orderby_key_limit == 0) {
511
83
                _limit = -1;
512
83
            }
513
845
        }
514
        // Note: _shared_scan_limit is intentionally not pushed into the
515
        // storage layer. SegmentIterator's _process_eof() is irreversible,
516
        // so a concurrently-decremented atomic could reach 0 while a segment
517
        // still has data needed by other scanners.
518
519
        // set push down topn filter
520
276k
        _tablet_reader_params.topn_filter_source_node_ids =
521
276k
                olap_scan_local_state->get_topn_filter_source_node_ids(_state, true);
522
276k
        if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) {
523
4.39k
            _tablet_reader_params.topn_filter_target_node_id =
524
4.39k
                    olap_scan_local_state->parent()->node_id();
525
4.39k
        }
526
276k
    }
527
528
    // If this is a Two-Phase read query, and we need to delay the release of Rowset
529
    // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset
530
276k
    if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
531
0
        constexpr static int delayed_s = 60;
532
0
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
533
0
            uint64_t delayed_expired_timestamp =
534
0
                    UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() +
535
0
                    delayed_s;
536
0
            rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
537
0
                    delayed_expired_timestamp);
538
0
            ExecEnv::GetInstance()->storage_engine().add_quering_rowset(
539
0
                    rs_reader.rs_reader->rowset());
540
0
        }
541
0
    }
542
543
276k
    if (tablet_schema->has_global_row_id()) {
544
6.23k
        auto& id_file_map = _state->get_id_file_map();
545
11.5k
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
546
11.5k
            id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset());
547
11.5k
        }
548
6.23k
    }
549
550
276k
    return Status::OK();
551
276k
}
552
553
275k
Status OlapScanner::_init_variant_columns() {
554
275k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
555
275k
    if (tablet_schema->num_variant_columns() == 0) {
556
273k
        return Status::OK();
557
273k
    }
558
    // Parent column has path info to distinction from each other
559
7.10k
    for (auto* slot : _output_tuple_desc->slots()) {
560
7.10k
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
561
            // Such columns are not exist in frontend schema info, so we need to
562
            // add them into tablet_schema for later column indexing.
563
4.07k
            const auto& dt_variant =
564
4.07k
                    assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()));
565
4.07k
            TabletColumn subcol = TabletColumn::create_materialized_variant_column(
566
4.07k
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
567
4.07k
                    slot->column_paths(), slot->col_unique_id(),
568
4.07k
                    dt_variant.variant_max_subcolumns_count(), dt_variant.enable_doc_mode());
569
4.07k
            if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) {
570
3.60k
                tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT);
571
3.60k
            }
572
4.07k
        }
573
7.10k
    }
574
2.42k
    variant_util::inherit_column_attributes(tablet_schema);
575
2.42k
    return Status::OK();
576
275k
}
577
578
276k
Status OlapScanner::_init_return_columns() {
579
3.23M
    for (auto* slot : _output_tuple_desc->slots()) {
580
        // variant column using path to index a column
581
3.23M
        int32_t index = 0;
582
3.23M
        auto& tablet_schema = _tablet_reader_params.tablet_schema;
583
3.23M
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
584
4.08k
            index = tablet_schema->field_index(PathInData(
585
4.08k
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
586
4.08k
                    slot->column_paths()));
587
3.23M
        } else {
588
3.23M
            index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id())
589
18.4E
                                               : tablet_schema->field_index(slot->col_name());
590
3.23M
        }
591
592
3.23M
        if (index < 0) {
593
0
            return Status::InternalError(
594
0
                    "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}",
595
0
                    slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id());
596
0
        }
597
598
3.23M
        if (slot->get_virtual_column_expr()) {
599
275
            ColumnId virtual_column_cid = index;
600
275
            _virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()];
601
275
            size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
602
275
            _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
603
275
            _vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()];
604
605
275
            VLOG_DEBUG << fmt::format(
606
6
                    "Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(),
607
6
                    virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid],
608
6
                    _vir_col_idx_to_type[idx_in_block]->get_name());
609
275
        }
610
611
3.23M
        const auto& column = tablet_schema->column(index);
612
3.23M
        int32_t unique_id =
613
3.23M
                column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
614
3.23M
        if (!slot->all_access_paths().empty()) {
615
14.7k
            _tablet_reader_params.all_access_paths.insert({unique_id, slot->all_access_paths()});
616
14.7k
        }
617
618
3.23M
        if (!slot->predicate_access_paths().empty()) {
619
3.78k
            _tablet_reader_params.predicate_access_paths.insert(
620
3.78k
                    {unique_id, slot->predicate_access_paths()});
621
3.78k
        }
622
623
3.23M
        if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
624
3.23M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP ||
625
3.23M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) &&
626
3.23M
            !slot->all_access_paths().empty()) {
627
7.80k
            tablet_schema->add_pruned_columns_data_type(column.unique_id(), slot->type());
628
7.80k
        }
629
630
3.23M
        _return_columns.push_back(index);
631
3.23M
        if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) {
632
0
            _tablet_columns_convert_to_null_set.emplace(index);
633
3.23M
        } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) {
634
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
635
0
                    "slot(id: {}, name: {})'s nullable does not match "
636
0
                    "column(tablet id: {}, index: {}, name: {}) ",
637
0
                    slot->id(), slot->col_name(), tablet_schema->table_id(), index,
638
0
                    tablet_schema->column(index).name());
639
0
        }
640
3.23M
    }
641
642
276k
    if (_return_columns.empty()) {
643
0
        return Status::InternalError("failed to build storage scanner, no materialized slot!");
644
0
    }
645
646
276k
    return Status::OK();
647
276k
}
648
649
580k
bool OlapScanner::check_partition_pruned() const {
650
580k
    if (!_local_state) {
651
0
        return false;
652
0
    }
653
580k
    return _local_state->is_partition_pruned(_tablet_reader_params.tablet->partition_id());
654
580k
}
655
656
290k
doris::TabletStorageType OlapScanner::get_storage_type() {
657
290k
    if (config::is_cloud_mode()) {
658
        // we don't have cold storage in cloud mode, all storage is treated as local
659
285k
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
660
285k
    }
661
5.22k
    int local_reader = 0;
662
18.1k
    for (const auto& reader : _tablet_reader_params.rs_splits) {
663
18.1k
        local_reader += reader.rs_reader->rowset()->is_local();
664
18.1k
    }
665
5.22k
    int total_reader = _tablet_reader_params.rs_splits.size();
666
667
5.22k
    if (local_reader == total_reader) {
668
5.22k
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
669
5.22k
    } else if (local_reader == 0) {
670
0
        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
671
0
    }
672
0
    return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
673
5.22k
}
674
675
344k
Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
676
    // Read one block from block reader
677
    // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface,
678
    // that is, eof can be set to true only when the returned block is empty.
679
344k
    RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, eof));
680
344k
    if (block->rows() > 0) {
681
68.5k
        _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed);
682
68.5k
        *eof = false;
683
68.5k
    }
684
344k
#ifndef NDEBUG
685
344k
    RETURN_IF_ERROR(_check_ann_cache_hit_debug_points(_tablet_reader->stats()));
686
344k
#endif
687
344k
    return Status::OK();
688
344k
}
689
690
278k
Status OlapScanner::close(RuntimeState* state) {
691
278k
    if (!_try_close()) {
692
150
        return Status::OK();
693
150
    }
694
278k
    RETURN_IF_ERROR(Scanner::close(state));
695
278k
    return Status::OK();
696
278k
}
697
698
289k
void OlapScanner::update_realtime_counters() {
699
289k
    if (!_has_prepared) {
700
        // Counter update need prepare successfully, or it maybe core. For example, olap scanner
701
        // will open tablet reader during prepare, if not prepare successfully, tablet reader == nullptr.
702
0
        return;
703
0
    }
704
289k
    OlapScanLocalState* local_state = static_cast<OlapScanLocalState*>(_local_state);
705
289k
    const OlapReaderStatistics& stats = _tablet_reader->stats();
706
289k
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
707
289k
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
708
289k
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
709
289k
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
710
711
    // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in
712
    // doris metrics.
713
    // ScanBytes is the uncompressed bytes read from local + remote
714
    // bytes_read_from_local is the compressed bytes read from local
715
    // bytes_read_from_remote is the compressed bytes read from remote
716
    // scan bytes > bytes_read_from_local + bytes_read_from_remote
717
289k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read);
718
289k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
719
289k
            stats.uncompressed_bytes_read);
720
721
    // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote
722
289k
    if (stats.file_cache_stats.bytes_read_from_local == 0 &&
723
289k
        stats.file_cache_stats.bytes_read_from_remote == 0) {
724
268k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
725
268k
                stats.compressed_bytes_read);
726
268k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
727
268k
                stats.compressed_bytes_read);
728
268k
    } else {
729
21.4k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
730
21.4k
                stats.file_cache_stats.bytes_read_from_local);
731
21.4k
        _state->get_query_ctx()
732
21.4k
                ->resource_ctx()
733
21.4k
                ->io_context()
734
21.4k
                ->update_scan_bytes_from_remote_storage(
735
21.4k
                        stats.file_cache_stats.bytes_read_from_remote);
736
737
21.4k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
738
21.4k
                stats.file_cache_stats.bytes_read_from_local);
739
21.4k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
740
21.4k
                stats.file_cache_stats.bytes_read_from_remote);
741
21.4k
    }
742
743
289k
    if (has_file_cache_statistics(stats.file_cache_stats)) {
744
21.9k
        io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get());
745
21.9k
        cache_profile.update(&stats.file_cache_stats);
746
21.9k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
747
21.9k
                stats.file_cache_stats.bytes_write_into_cache);
748
21.9k
    }
749
750
289k
    _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
751
289k
    _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
752
289k
    _tablet_reader->mutable_stats()->raw_rows_read = 0;
753
289k
    _tablet_reader->mutable_stats()->file_cache_stats = {};
754
289k
}
755
756
275k
void OlapScanner::_collect_profile_before_close() {
757
    //  Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside.
758
275k
    if (_has_updated_counter) {
759
0
        return;
760
0
    }
761
275k
    _has_updated_counter = true;
762
275k
    _tablet_reader->update_profile(_profile);
763
764
275k
    Scanner::_collect_profile_before_close();
765
766
    // Update counters for OlapScanner
767
    // Update counters from tablet reader's stats
768
275k
    auto& stats = _tablet_reader->stats();
769
275k
    auto* local_state = (OlapScanLocalState*)_local_state;
770
275k
    COUNTER_UPDATE(local_state->_io_timer, stats.io_ns);
771
275k
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
772
275k
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
773
275k
    COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns);
774
275k
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
775
275k
    COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns);
776
275k
    COUNTER_UPDATE(local_state->_block_load_counter, stats.blocks_load);
777
275k
    COUNTER_UPDATE(local_state->_block_fetch_timer, stats.block_fetch_ns);
778
275k
    COUNTER_UPDATE(local_state->_delete_bitmap_get_agg_timer, stats.delete_bitmap_get_agg_ns);
779
275k
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
780
275k
    COUNTER_UPDATE(local_state->_vec_cond_timer, stats.vec_cond_ns);
781
275k
    COUNTER_UPDATE(local_state->_short_cond_timer, stats.short_cond_ns);
782
275k
    COUNTER_UPDATE(local_state->_expr_filter_timer, stats.expr_filter_ns);
783
275k
    COUNTER_UPDATE(local_state->_block_init_timer, stats.block_init_ns);
784
275k
    COUNTER_UPDATE(local_state->_block_init_seek_timer, stats.block_init_seek_ns);
785
275k
    COUNTER_UPDATE(local_state->_block_init_seek_counter, stats.block_init_seek_num);
786
275k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_keys_timer,
787
275k
                   stats.generate_row_ranges_by_keys_ns);
788
275k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_column_conditions_timer,
789
275k
                   stats.generate_row_ranges_by_column_conditions_ns);
790
275k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_bf_timer,
791
275k
                   stats.generate_row_ranges_by_bf_ns);
792
275k
    COUNTER_UPDATE(local_state->_collect_iterator_merge_next_timer,
793
275k
                   stats.collect_iterator_merge_next_timer);
794
275k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_zonemap_timer,
795
275k
                   stats.generate_row_ranges_by_zonemap_ns);
796
275k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_dict_timer,
797
275k
                   stats.generate_row_ranges_by_dict_ns);
798
275k
    COUNTER_UPDATE(local_state->_predicate_column_read_timer, stats.predicate_column_read_ns);
799
275k
    COUNTER_UPDATE(local_state->_non_predicate_column_read_timer, stats.non_predicate_read_ns);
800
275k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_timer,
801
275k
                   stats.predicate_column_read_seek_ns);
802
275k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter,
803
275k
                   stats.predicate_column_read_seek_num);
804
275k
    COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns);
805
275k
    COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns);
806
275k
    COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num);
807
275k
    COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns);
808
275k
    COUNTER_UPDATE(local_state->_rows_vec_cond_filtered_counter, stats.rows_vec_cond_filtered);
809
275k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_filtered_counter,
810
275k
                   stats.rows_short_circuit_cond_filtered);
811
275k
    COUNTER_UPDATE(local_state->_rows_expr_cond_filtered_counter, stats.rows_expr_cond_filtered);
812
275k
    COUNTER_UPDATE(local_state->_rows_vec_cond_input_counter, stats.vec_cond_input_rows);
813
275k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
814
275k
                   stats.short_circuit_cond_input_rows);
815
275k
    COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows);
816
275k
    COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered);
817
275k
    COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered);
818
275k
    COUNTER_UPDATE(local_state->_expr_zonemap_filtered_segment_counter,
819
275k
                   stats.expr_zonemap_filtered_segments);
820
275k
    COUNTER_UPDATE(local_state->_expr_zonemap_filtered_page_counter,
821
275k
                   stats.expr_zonemap_filtered_pages);
822
275k
    COUNTER_UPDATE(local_state->_expr_zonemap_unusable_counter, stats.expr_zonemap_unusable_evals);
823
275k
    COUNTER_UPDATE(local_state->_in_zonemap_point_check_counter,
824
275k
                   stats.in_zonemap_point_check_count);
825
275k
    COUNTER_UPDATE(local_state->_in_zonemap_range_only_counter, stats.in_zonemap_range_only_count);
826
275k
    COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered);
827
275k
    COUNTER_UPDATE(local_state->_bf_filtered_counter, stats.rows_bf_filtered);
828
275k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_filtered);
829
275k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_by_bitmap);
830
275k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_vec_del_cond_filtered);
831
275k
    COUNTER_UPDATE(local_state->_conditions_filtered_counter, stats.rows_conditions_filtered);
832
275k
    COUNTER_UPDATE(local_state->_key_range_filtered_counter, stats.rows_key_range_filtered);
833
275k
    COUNTER_UPDATE(local_state->_total_pages_num_counter, stats.total_pages_num);
834
275k
    COUNTER_UPDATE(local_state->_cached_pages_num_counter, stats.cached_pages_num);
835
275k
    COUNTER_UPDATE(local_state->_inverted_index_filter_counter, stats.rows_inverted_index_filtered);
836
275k
    COUNTER_UPDATE(local_state->_inverted_index_filter_timer, stats.inverted_index_filter_timer);
837
275k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_hit_counter,
838
275k
                   stats.inverted_index_query_cache_hit);
839
275k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_miss_counter,
840
275k
                   stats.inverted_index_query_cache_miss);
841
275k
    COUNTER_UPDATE(local_state->_inverted_index_query_timer, stats.inverted_index_query_timer);
842
275k
    COUNTER_UPDATE(local_state->_inverted_index_query_null_bitmap_timer,
843
275k
                   stats.inverted_index_query_null_bitmap_timer);
844
275k
    COUNTER_UPDATE(local_state->_inverted_index_query_bitmap_copy_timer,
845
275k
                   stats.inverted_index_query_bitmap_copy_timer);
846
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_open_timer,
847
275k
                   stats.inverted_index_searcher_open_timer);
848
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer,
849
275k
                   stats.inverted_index_searcher_search_timer);
850
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer,
851
275k
                   stats.inverted_index_searcher_search_init_timer);
852
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer,
853
275k
                   stats.inverted_index_searcher_search_exec_timer);
854
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter,
855
275k
                   stats.inverted_index_searcher_cache_hit);
856
275k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter,
857
275k
                   stats.inverted_index_searcher_cache_miss);
858
275k
    COUNTER_UPDATE(local_state->_inverted_index_downgrade_count_counter,
859
275k
                   stats.inverted_index_downgrade_count);
860
275k
    COUNTER_UPDATE(local_state->_inverted_index_analyzer_timer,
861
275k
                   stats.inverted_index_analyzer_timer);
862
275k
    COUNTER_UPDATE(local_state->_inverted_index_lookup_timer, stats.inverted_index_lookup_timer);
863
275k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_timer,
864
275k
                   stats.variant_scan_sparse_column_timer_ns);
865
275k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_bytes,
866
275k
                   stats.variant_scan_sparse_column_bytes);
867
275k
    COUNTER_UPDATE(local_state->_variant_fill_path_from_sparse_column_timer,
868
275k
                   stats.variant_fill_path_from_sparse_column_timer_ns);
869
275k
    COUNTER_UPDATE(local_state->_variant_subtree_default_iter_count,
870
275k
                   stats.variant_subtree_default_iter_count);
871
275k
    COUNTER_UPDATE(local_state->_variant_subtree_leaf_iter_count,
872
275k
                   stats.variant_subtree_leaf_iter_count);
873
275k
    COUNTER_UPDATE(local_state->_variant_subtree_hierarchical_iter_count,
874
275k
                   stats.variant_subtree_hierarchical_iter_count);
875
275k
    COUNTER_UPDATE(local_state->_variant_subtree_sparse_iter_count,
876
275k
                   stats.variant_subtree_sparse_iter_count);
877
275k
    COUNTER_UPDATE(local_state->_variant_doc_value_column_iter_count,
878
275k
                   stats.variant_doc_value_column_iter_count);
879
880
275k
    if (stats.adaptive_batch_size_predict_max_rows > 0) {
881
235k
        local_state->_adaptive_batch_predict_min_rows_counter->set(
882
235k
                stats.adaptive_batch_size_predict_min_rows);
883
235k
        local_state->_adaptive_batch_predict_max_rows_counter->set(
884
235k
                stats.adaptive_batch_size_predict_max_rows);
885
235k
    }
886
887
275k
    InvertedIndexProfileReporter inverted_index_profile;
888
275k
    inverted_index_profile.update(local_state->_index_filter_profile.get(),
889
275k
                                  &stats.inverted_index_stats);
890
891
275k
    if (has_file_cache_statistics(stats.file_cache_stats)) {
892
0
        io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get());
893
0
        cache_profile.update(&stats.file_cache_stats);
894
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
895
0
                stats.file_cache_stats.bytes_write_into_cache);
896
0
    }
897
275k
    COUNTER_UPDATE(local_state->_output_index_result_column_timer,
898
275k
                   stats.output_index_result_column_timer);
899
275k
    COUNTER_UPDATE(local_state->_filtered_segment_counter, stats.filtered_segment_number);
900
275k
    COUNTER_UPDATE(local_state->_total_segment_counter, stats.total_segment_number);
901
275k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, stats.condition_cache_hit_seg_nums);
902
275k
    COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
903
275k
                   stats.condition_cache_filtered_rows);
904
905
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_timer, stats.tablet_reader_init_timer_ns);
906
275k
    COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer,
907
275k
                   stats.tablet_reader_capture_rs_readers_timer_ns);
908
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_return_columns_timer,
909
275k
                   stats.tablet_reader_init_return_columns_timer_ns);
910
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_keys_param_timer,
911
275k
                   stats.tablet_reader_init_keys_param_timer_ns);
912
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_orderby_keys_param_timer,
913
275k
                   stats.tablet_reader_init_orderby_keys_param_timer_ns);
914
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_conditions_param_timer,
915
275k
                   stats.tablet_reader_init_conditions_param_timer_ns);
916
275k
    COUNTER_UPDATE(local_state->_tablet_reader_init_delete_condition_param_timer,
917
275k
                   stats.tablet_reader_init_delete_condition_param_timer_ns);
918
275k
    COUNTER_UPDATE(local_state->_block_reader_vcollect_iter_init_timer,
919
275k
                   stats.block_reader_vcollect_iter_init_timer_ns);
920
275k
    COUNTER_UPDATE(local_state->_block_reader_rs_readers_init_timer,
921
275k
                   stats.block_reader_rs_readers_init_timer_ns);
922
275k
    COUNTER_UPDATE(local_state->_block_reader_build_heap_init_timer,
923
275k
                   stats.block_reader_build_heap_init_timer_ns);
924
925
275k
    COUNTER_UPDATE(local_state->_rowset_reader_get_segment_iterators_timer,
926
275k
                   stats.rowset_reader_get_segment_iterators_timer_ns);
927
275k
    COUNTER_UPDATE(local_state->_rowset_reader_create_iterators_timer,
928
275k
                   stats.rowset_reader_create_iterators_timer_ns);
929
275k
    COUNTER_UPDATE(local_state->_rowset_reader_init_iterators_timer,
930
275k
                   stats.rowset_reader_init_iterators_timer_ns);
931
275k
    COUNTER_UPDATE(local_state->_rowset_reader_load_segments_timer,
932
275k
                   stats.rowset_reader_load_segments_timer_ns);
933
934
275k
    COUNTER_UPDATE(local_state->_segment_iterator_init_timer, stats.segment_iterator_init_timer_ns);
935
275k
    COUNTER_UPDATE(local_state->_segment_iterator_init_return_column_iterators_timer,
936
275k
                   stats.segment_iterator_init_return_column_iterators_timer_ns);
937
275k
    COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer,
938
275k
                   stats.segment_iterator_init_index_iterators_timer_ns);
939
275k
    COUNTER_UPDATE(local_state->_segment_iterator_init_segment_prefetchers_timer,
940
275k
                   stats.segment_iterator_init_segment_prefetchers_timer_ns);
941
942
275k
    COUNTER_UPDATE(local_state->_segment_create_column_readers_timer,
943
275k
                   stats.segment_create_column_readers_timer_ns);
944
275k
    COUNTER_UPDATE(local_state->_segment_load_index_timer, stats.segment_load_index_timer_ns);
945
946
    // Update metrics
947
275k
    DorisMetrics::instance()->query_scan_bytes->increment(
948
275k
            local_state->_read_uncompressed_counter->value());
949
275k
    DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
950
275k
    auto& tablet = _tablet_reader_params.tablet;
951
275k
    tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
952
275k
    tablet->query_scan_rows->increment(local_state->_scan_rows->value());
953
275k
    tablet->query_scan_count->increment(1);
954
955
275k
    COUNTER_UPDATE(local_state->_ann_range_search_filter_counter,
956
275k
                   stats.rows_ann_index_range_filtered);
957
275k
    COUNTER_UPDATE(local_state->_ann_topn_filter_counter, stats.rows_ann_index_topn_filtered);
958
275k
    COUNTER_UPDATE(local_state->_ann_index_load_costs, stats.ann_index_load_ns);
959
275k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_load_costs, stats.ann_ivf_on_disk_load_ns);
960
275k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_hit_cnt,
961
275k
                   stats.ann_ivf_on_disk_cache_hit_cnt);
962
275k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_miss_cnt,
963
275k
                   stats.ann_ivf_on_disk_cache_miss_cnt);
964
275k
    COUNTER_UPDATE(local_state->_ann_range_search_costs, stats.ann_index_range_search_ns);
965
275k
    COUNTER_UPDATE(local_state->_ann_range_search_cnt, stats.ann_index_range_search_cnt);
966
275k
    COUNTER_UPDATE(local_state->_ann_range_engine_search_costs, stats.ann_range_engine_search_ns);
967
    // Engine prepare before search
968
275k
    COUNTER_UPDATE(local_state->_ann_range_pre_process_costs, stats.ann_range_pre_process_ns);
969
    // Post process parent: Doris result process + engine convert
970
275k
    COUNTER_UPDATE(local_state->_ann_range_post_process_costs,
971
275k
                   stats.ann_range_result_convert_ns + stats.ann_range_engine_convert_ns);
972
    // Engine convert (child under post-process)
973
275k
    COUNTER_UPDATE(local_state->_ann_range_engine_convert_costs, stats.ann_range_engine_convert_ns);
974
    // Doris-side result convert (child under post-process)
975
275k
    COUNTER_UPDATE(local_state->_ann_range_result_convert_costs, stats.ann_range_result_convert_ns);
976
977
275k
    COUNTER_UPDATE(local_state->_ann_topn_search_costs, stats.ann_topn_search_ns);
978
275k
    COUNTER_UPDATE(local_state->_ann_topn_search_cnt, stats.ann_index_topn_search_cnt);
979
275k
    COUNTER_UPDATE(local_state->_ann_cache_hit_cnt, stats.ann_index_cache_hits);
980
275k
    COUNTER_UPDATE(local_state->_ann_range_cache_hit_cnt, stats.ann_index_range_cache_hits);
981
982
    // Detailed ANN timers
983
    // ANN TopN timers with hierarchy
984
    // Engine search time (FAISS)
985
275k
    COUNTER_UPDATE(local_state->_ann_topn_engine_search_costs,
986
275k
                   stats.ann_index_topn_engine_search_ns);
987
    // Engine prepare time (allocations/buffer setup before search)
988
275k
    COUNTER_UPDATE(local_state->_ann_topn_pre_process_costs,
989
275k
                   stats.ann_index_topn_engine_prepare_ns);
990
    // Post process parent includes Doris result processing + engine convert
991
275k
    COUNTER_UPDATE(local_state->_ann_topn_post_process_costs,
992
275k
                   stats.ann_index_topn_result_process_ns + stats.ann_index_topn_engine_convert_ns);
993
    // Engine-side conversion time inside FAISS wrappers (child under post-process)
994
275k
    COUNTER_UPDATE(local_state->_ann_topn_engine_convert_costs,
995
275k
                   stats.ann_index_topn_engine_convert_ns);
996
997
    // Doris-side result convert costs (show separately as another child counter); use pure process time
998
275k
    COUNTER_UPDATE(local_state->_ann_topn_result_convert_costs,
999
275k
                   stats.ann_index_topn_result_process_ns);
1000
1001
275k
    COUNTER_UPDATE(local_state->_ann_fallback_brute_force_cnt, stats.ann_fall_back_brute_force_cnt);
1002
1003
    // Overhead counter removed; precise instrumentation is reported via engine_prepare above.
1004
275k
}
1005
1006
#ifndef NDEBUG
1007
343k
Status OlapScanner::_check_ann_cache_hit_debug_points(const OlapReaderStatistics& stats) {
1008
343k
    DBUG_EXECUTE_IF("olap_scanner.ann_topn_cache_hits", {
1009
343k
        auto expected_hits = dp->param<int32_t>("expected_hits", -1);
1010
343k
        auto min_hits = dp->param<int32_t>("min_hits", -1);
1011
343k
        if (expected_hits >= 0 && stats.ann_index_cache_hits != expected_hits) {
1012
343k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
1013
343k
                    "ann_index_cache_hits: {} not equal to expected: {}",
1014
343k
                    stats.ann_index_cache_hits, expected_hits);
1015
343k
        }
1016
343k
        if (min_hits >= 0 && stats.ann_index_cache_hits < min_hits) {
1017
343k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
1018
343k
                    "ann_index_cache_hits: {} less than expected min: {}",
1019
343k
                    stats.ann_index_cache_hits, min_hits);
1020
343k
        }
1021
343k
    })
1022
343k
    DBUG_EXECUTE_IF("olap_scanner.ann_range_cache_hits", {
1023
343k
        auto expected_hits = dp->param<int32_t>("expected_hits", -1);
1024
343k
        auto min_hits = dp->param<int32_t>("min_hits", -1);
1025
343k
        if (expected_hits >= 0 && stats.ann_index_range_cache_hits != expected_hits) {
1026
343k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
1027
343k
                    "ann_index_range_cache_hits: {} not equal to expected: {}",
1028
343k
                    stats.ann_index_range_cache_hits, expected_hits);
1029
343k
        }
1030
343k
        if (min_hits >= 0 && stats.ann_index_range_cache_hits < min_hits) {
1031
343k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
1032
343k
                    "ann_index_range_cache_hits: {} less than expected min: {}",
1033
343k
                    stats.ann_index_range_cache_hits, min_hits);
1034
343k
        }
1035
343k
    })
1036
343k
    return Status::OK();
1037
343k
}
1038
#endif
1039
1040
#include "common/compile_check_avoid_end.h"
1041
} // namespace doris