Coverage Report

Created: 2026-04-23 13:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/olap_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/olap_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <stdlib.h>
25
#include <thrift/protocol/TDebugProtocol.h>
26
27
#include <algorithm>
28
#include <atomic>
29
#include <iterator>
30
#include <ostream>
31
#include <set>
32
33
#include "cloud/cloud_storage_engine.h"
34
#include "cloud/cloud_tablet_hotspot.h"
35
#include "cloud/config.h"
36
#include "common/config.h"
37
#include "common/consts.h"
38
#include "common/logging.h"
39
#include "common/metrics/doris_metrics.h"
40
#include "core/block/block.h"
41
#include "exec/common/variant_util.h"
42
#include "exec/operator/olap_scan_operator.h"
43
#include "exec/scan/scan_node.h"
44
#include "exprs/function_filter.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "io/cache/block_file_cache_profile.h"
48
#include "io/io_common.h"
49
#include "runtime/descriptors.h"
50
#include "runtime/exec_env.h"
51
#include "runtime/runtime_profile.h"
52
#include "runtime/runtime_state.h"
53
#include "service/backend_options.h"
54
#include "storage/id_manager.h"
55
#include "storage/index/inverted/inverted_index_profile.h"
56
#include "storage/iterator/block_reader.h"
57
#include "storage/olap_common.h"
58
#include "storage/olap_tuple.h"
59
#include "storage/olap_utils.h"
60
#include "storage/storage_engine.h"
61
#include "storage/tablet/tablet_schema.h"
62
#include "util/json/path_in_data.h"
63
64
namespace doris {
65
#include "common/compile_check_avoid_begin.h"
66
67
using ReadSource = TabletReadSource;
68
69
OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& params)
70
841k
        : Scanner(params.state, parent, params.limit, params.profile),
71
841k
          _key_ranges(std::move(params.key_ranges)),
72
841k
          _tablet_reader_params({.tablet = std::move(params.tablet),
73
841k
                                 .tablet_schema {},
74
841k
                                 .aggregation = params.aggregation,
75
841k
                                 .version = {0, params.version},
76
841k
                                 .start_key {},
77
841k
                                 .end_key {},
78
841k
                                 .predicates {},
79
841k
                                 .function_filters {},
80
841k
                                 .delete_predicates {},
81
841k
                                 .target_cast_type_for_variants {},
82
841k
                                 .all_access_paths {},
83
841k
                                 .predicate_access_paths {},
84
841k
                                 .rs_splits {},
85
841k
                                 .return_columns {},
86
841k
                                 .output_columns {},
87
841k
                                 .remaining_conjunct_roots {},
88
841k
                                 .common_expr_ctxs_push_down {},
89
841k
                                 .topn_filter_source_node_ids {},
90
841k
                                 .filter_block_conjuncts {},
91
841k
                                 .key_group_cluster_key_idxes {},
92
841k
                                 .virtual_column_exprs {},
93
841k
                                 .vir_cid_to_idx_in_block {},
94
841k
                                 .vir_col_idx_to_type {},
95
841k
                                 .score_runtime {},
96
841k
                                 .collection_statistics {},
97
841k
                                 .ann_topn_runtime {},
98
841k
                                 .condition_cache_digest = parent->get_condition_cache_digest()}) {
99
841k
    _tablet_reader_params.set_read_source(std::move(params.read_source),
100
841k
                                          _state->skip_delete_bitmap());
101
841k
    _has_prepared = false;
102
841k
    _vector_search_params = params.state->get_vector_search_params();
103
841k
}
104
105
static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
106
3.17k
                                          const std::vector<uint32_t>& read_columns) {
107
    // avoid too long for one line,
108
    // it is hard to display in `show profile` stmt if one line is too long.
109
3.17k
    const int col_per_line = 10;
110
3.17k
    int i = 0;
111
3.17k
    std::string read_columns_string;
112
3.17k
    read_columns_string += "[";
113
16.8k
    for (auto it = read_columns.cbegin(); it != read_columns.cend(); it++) {
114
13.7k
        if (it != read_columns.cbegin()) {
115
10.5k
            read_columns_string += ", ";
116
10.5k
        }
117
13.7k
        read_columns_string += tablet_schema->columns().at(*it)->name();
118
13.7k
        if (i >= col_per_line) {
119
13
            read_columns_string += "\n";
120
13
            i = 0;
121
13.7k
        } else {
122
13.7k
            ++i;
123
13.7k
        }
124
13.7k
    }
125
3.17k
    read_columns_string += "]";
126
3.17k
    return read_columns_string;
127
3.17k
}
128
129
840k
Status OlapScanner::prepare() {
130
840k
    auto* local_state = static_cast<OlapScanLocalState*>(_local_state);
131
840k
    auto& tablet = _tablet_reader_params.tablet;
132
840k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
133
840k
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
134
840k
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230 init")
135
840k
                .tag("tablet_id", tablet->tablet_id());
136
840k
        return Status::Error<false>(-230, "injected error");
137
840k
    });
138
139
840k
    for (auto& ctx : local_state->_common_expr_ctxs_push_down) {
140
15.6k
        VExprContextSPtr context;
141
15.6k
        RETURN_IF_ERROR(ctx->clone(_state, context));
142
15.6k
        _common_expr_ctxs_push_down.emplace_back(context);
143
15.6k
        context->prepare_ann_range_search(_vector_search_params);
144
15.6k
    }
145
146
840k
    for (auto pair : local_state->_slot_id_to_virtual_column_expr) {
147
        // Scanner will be executed in a different thread, so we need to clone the context.
148
271
        VExprContextSPtr context;
149
271
        RETURN_IF_ERROR(pair.second->clone(_state, context));
150
271
        _slot_id_to_virtual_column_expr[pair.first] = context;
151
271
    }
152
153
840k
    _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
154
840k
    _slot_id_to_col_type = local_state->_slot_id_to_col_type;
155
840k
    _score_runtime = local_state->_score_runtime;
156
    // All scanners share the same ann_topn_runtime.
157
840k
    _ann_topn_runtime = local_state->_ann_topn_runtime;
158
159
    // set limit to reduce end of rowset and segment mem use
160
840k
    _tablet_reader = std::make_unique<BlockReader>();
161
    // batch size is passed down to segment iterator, use _state->batch_size()
162
    // instead of _parent->limit(), because if _parent->limit() is a very small
163
    // value (e.g. select a from t where a .. and b ... limit 1),
164
    // it will be very slow when reading data in segment iterator
165
840k
    _tablet_reader->set_batch_size(_state->batch_size());
166
840k
    {
167
840k
        TOlapScanNode& olap_scan_node = local_state->olap_scan_node();
168
169
        // Each scanner builds its own TabletSchema to avoid concurrent modification.
170
840k
        tablet_schema = std::make_shared<TabletSchema>();
171
840k
        tablet_schema->copy_from(*tablet->tablet_schema());
172
840k
        if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() &&
173
840k
            olap_scan_node.columns_desc[0].col_unique_id >= 0) {
174
840k
            tablet_schema->clear_columns();
175
13.2M
            for (const auto& column_desc : olap_scan_node.columns_desc) {
176
13.2M
                tablet_schema->append_column(TabletColumn(column_desc));
177
13.2M
            }
178
840k
            if (olap_scan_node.__isset.schema_version) {
179
840k
                tablet_schema->set_schema_version(olap_scan_node.schema_version);
180
840k
            }
181
840k
        }
182
840k
        if (olap_scan_node.__isset.indexes_desc) {
183
840k
            tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc);
184
840k
        }
185
186
840k
        if (_tablet_reader_params.rs_splits.empty()) {
187
            // Non-pipeline mode, Tablet : Scanner = 1 : 1
188
            // acquire tablet rowset readers at the beginning of the scan node
189
            // to prevent this case: when there are lots of olap scanners to run for example 10000
190
            // the rowsets maybe compacted when the last olap scanner starts
191
0
            ReadSource read_source;
192
193
0
            if (config::is_cloud_mode()) {
194
                // FIXME(plat1ko): Avoid pointer cast
195
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
196
0
            }
197
198
0
            auto maybe_read_source = tablet->capture_read_source(
199
0
                    _tablet_reader_params.version,
200
0
                    {
201
0
                            .skip_missing_versions = _state->skip_missing_version(),
202
0
                            .enable_fetch_rowsets_from_peers =
203
0
                                    config::enable_fetch_rowsets_from_peer_replicas,
204
0
                            .enable_prefer_cached_rowset =
205
0
                                    config::is_cloud_mode() ? _state->enable_prefer_cached_rowset()
206
0
                                                            : false,
207
0
                            .query_freshness_tolerance_ms =
208
0
                                    config::is_cloud_mode() ? _state->query_freshness_tolerance_ms()
209
0
                                                            : -1,
210
0
                    });
211
0
            if (!maybe_read_source) {
212
0
                LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error();
213
0
                return maybe_read_source.error();
214
0
            }
215
216
0
            read_source = std::move(maybe_read_source.value());
217
218
0
            if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) {
219
0
                LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}",
220
0
                         tablet->tablet_id(), print_id(_state->query_id()));
221
0
            }
222
223
0
            if (!_state->skip_delete_predicate()) {
224
0
                read_source.fill_delete_predicates();
225
0
            }
226
0
            _tablet_reader_params.set_read_source(std::move(read_source));
227
0
        }
228
229
        // Initialize tablet_reader_params
230
840k
        RETURN_IF_ERROR(_init_tablet_reader_params(
231
840k
                local_state->_parent->cast<OlapScanOperatorX>()._slot_id_to_slot_desc, _key_ranges,
232
840k
                local_state->_slot_id_to_predicates, local_state->_push_down_functions));
233
840k
    }
234
235
    // add read columns in profile
236
840k
    if (_state->enable_profile()) {
237
3.16k
        _profile->add_info_string("ReadColumns",
238
3.16k
                                  read_columns_to_string(tablet_schema, _return_columns));
239
3.16k
    }
240
241
840k
    if (_tablet_reader_params.score_runtime) {
242
14
        SCOPED_TIMER(local_state->_statistics_collect_timer);
243
14
        _tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>();
244
245
14
        io::IOContext io_ctx {
246
14
                .reader_type = ReaderType::READER_QUERY,
247
14
                .expiration_time = tablet->ttl_seconds(),
248
14
                .query_id = &_state->query_id(),
249
14
                .file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
250
14
                .is_inverted_index = true,
251
14
        };
252
253
14
        RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
254
14
                _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema,
255
14
                _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
256
14
    }
257
258
840k
    _has_prepared = true;
259
840k
    return Status::OK();
260
840k
}
261
262
840k
Status OlapScanner::_open_impl(RuntimeState* state) {
263
840k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
264
840k
    SCOPED_TIMER(_local_state->cast<OlapScanLocalState>()._reader_init_timer);
265
266
840k
    auto res = _tablet_reader->init(_tablet_reader_params);
267
840k
    if (!res.ok()) {
268
59
        res.append("failed to initialize storage reader. tablet=" +
269
59
                   std::to_string(_tablet_reader_params.tablet->tablet_id()) +
270
59
                   ", backend=" + BackendOptions::get_localhost());
271
59
        return res;
272
59
    }
273
274
    // Do not hold rs_splits any more to release memory.
275
840k
    _tablet_reader_params.rs_splits.clear();
276
277
840k
    return Status::OK();
278
840k
}
279
280
// it will be called under tablet read lock because capture rs readers need
281
Status OlapScanner::_init_tablet_reader_params(
282
        const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc,
283
        const std::vector<OlapScanRange*>& key_ranges,
284
        const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
285
                slot_to_predicates,
286
840k
        const std::vector<FunctionFilter>& function_filters) {
287
    // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
288
840k
    const bool single_version = _tablet_reader_params.has_single_version();
289
290
840k
    auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state);
291
840k
    bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup &&
292
840k
                           olap_local_state->olap_scan_node().read_mor_as_dup;
293
840k
    if (_state->skip_storage_engine_merge() || read_mor_as_dup) {
294
43
        _tablet_reader_params.direct_mode = true;
295
43
        _tablet_reader_params.aggregation = true;
296
840k
    } else {
297
840k
        auto push_down_agg_type = _local_state->get_push_down_agg_type();
298
840k
        _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version ||
299
840k
                                            (push_down_agg_type != TPushAggOp::NONE &&
300
10.2k
                                             push_down_agg_type != TPushAggOp::COUNT_ON_INDEX);
301
840k
    }
302
303
840k
    RETURN_IF_ERROR(_init_variant_columns());
304
840k
    RETURN_IF_ERROR(_init_return_columns());
305
306
840k
    _tablet_reader_params.reader_type = ReaderType::READER_QUERY;
307
840k
    _tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type();
308
309
    // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
310
840k
    if (_common_expr_ctxs_push_down.empty()) {
311
825k
        for (auto& conjunct : _conjuncts) {
312
11.4k
            _tablet_reader_params.remaining_conjunct_roots.emplace_back(conjunct->root());
313
11.4k
        }
314
825k
    } else {
315
15.5k
        for (auto& ctx : _common_expr_ctxs_push_down) {
316
15.5k
            _tablet_reader_params.remaining_conjunct_roots.emplace_back(ctx->root());
317
15.5k
        }
318
14.8k
    }
319
320
840k
    _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down;
321
840k
    _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
322
840k
    _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
323
840k
    _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
324
840k
    _tablet_reader_params.score_runtime = _score_runtime;
325
840k
    _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids;
326
840k
    _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
327
840k
    for (const auto& ele : ((OlapScanLocalState*)_local_state)->_cast_types_for_variants) {
328
1.36k
        _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second;
329
1.36k
    };
330
840k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
331
6.12M
    for (auto& predicates : slot_to_predicates) {
332
6.12M
        const int sid = predicates.first;
333
6.12M
        DCHECK(slot_id_to_slot_desc.contains(sid));
334
6.12M
        int32_t index =
335
6.12M
                tablet_schema->field_index(slot_id_to_slot_desc.find(sid)->second->col_name());
336
6.12M
        if (index < 0) {
337
0
            throw Exception(
338
0
                    Status::InternalError("Column {} not found in tablet schema",
339
0
                                          slot_id_to_slot_desc.find(sid)->second->col_name()));
340
0
        }
341
6.12M
        for (auto& predicate : predicates.second) {
342
671k
            _tablet_reader_params.predicates.push_back(predicate->clone(index));
343
671k
        }
344
6.12M
    }
345
346
840k
    std::copy(function_filters.cbegin(), function_filters.cend(),
347
840k
              std::inserter(_tablet_reader_params.function_filters,
348
840k
                            _tablet_reader_params.function_filters.begin()));
349
350
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
351
840k
    for (auto& del_pred : _tablet_reader_params.delete_predicates) {
352
7.01k
        tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
353
7.01k
    }
354
355
    // Push key ranges to the tablet reader.
356
    // Skip the "full scan" placeholder (has_lower_bound == false) — when no key
357
    // predicates exist, start_key/end_key remain empty and the reader does a full scan.
358
1.48M
    for (auto* key_range : key_ranges) {
359
1.48M
        if (!key_range->has_lower_bound) {
360
131k
            continue;
361
131k
        }
362
363
1.35M
        _tablet_reader_params.start_key_include = key_range->begin_include;
364
1.35M
        _tablet_reader_params.end_key_include = key_range->end_include;
365
366
1.35M
        _tablet_reader_params.start_key.push_back(key_range->begin_scan_range);
367
1.35M
        _tablet_reader_params.end_key.push_back(key_range->end_scan_range);
368
1.35M
    }
369
370
840k
    _tablet_reader_params.profile = _local_state->custom_profile();
371
840k
    _tablet_reader_params.runtime_state = _state;
372
373
840k
    _tablet_reader_params.origin_return_columns = &_return_columns;
374
840k
    _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set;
375
376
840k
    if (_tablet_reader_params.direct_mode) {
377
829k
        _tablet_reader_params.return_columns = _return_columns;
378
829k
    } else {
379
        // we need to fetch all key columns to do the right aggregation on storage engine side.
380
37.4k
        for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) {
381
26.9k
            _tablet_reader_params.return_columns.push_back(i);
382
26.9k
        }
383
49.2k
        for (auto index : _return_columns) {
384
49.2k
            if (tablet_schema->column(index).is_key()) {
385
17.9k
                continue;
386
17.9k
            }
387
31.2k
            _tablet_reader_params.return_columns.push_back(index);
388
31.2k
        }
389
        // expand the sequence column
390
10.4k
        if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
391
40
            bool has_replace_col = false;
392
90
            for (auto col : _return_columns) {
393
90
                if (tablet_schema->column(col).aggregation() ==
394
90
                    FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
395
39
                    has_replace_col = true;
396
39
                    break;
397
39
                }
398
90
            }
399
40
            if (auto sequence_col_idx = tablet_schema->sequence_col_idx();
400
40
                has_replace_col && tablet_schema->has_sequence_col() &&
401
40
                std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) ==
402
27
                        _return_columns.end()) {
403
15
                _tablet_reader_params.return_columns.push_back(sequence_col_idx);
404
15
            }
405
40
            if (has_replace_col) {
406
40
                const auto& val_to_seq = tablet_schema->value_col_idx_to_seq_col_idx();
407
40
                std::set<uint32_t> return_seq_columns;
408
409
245
                for (auto col : _tablet_reader_params.return_columns) {
410
                    // we need to add the necessary sequence column in _return_columns, and
411
                    // Avoid adding the same seq column twice
412
245
                    const auto val_iter = val_to_seq.find(col);
413
245
                    if (val_iter != val_to_seq.end()) {
414
42
                        auto seq = val_iter->second;
415
42
                        if (std::find(_tablet_reader_params.return_columns.begin(),
416
42
                                      _tablet_reader_params.return_columns.end(),
417
42
                                      seq) == _tablet_reader_params.return_columns.end()) {
418
4
                            return_seq_columns.insert(seq);
419
4
                        }
420
42
                    }
421
245
                }
422
40
                _tablet_reader_params.return_columns.insert(
423
40
                        std::end(_tablet_reader_params.return_columns),
424
40
                        std::begin(return_seq_columns), std::end(return_seq_columns));
425
40
            }
426
40
        }
427
10.4k
    }
428
429
840k
    _tablet_reader_params.use_page_cache = _state->enable_page_cache();
430
431
840k
    DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK);
432
433
840k
    if (!_state->skip_storage_engine_merge()) {
434
840k
        auto* olap_scan_local_state = (OlapScanLocalState*)_local_state;
435
840k
        TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node();
436
437
        // Set MOR value predicate pushdown flag
438
840k
        if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown &&
439
840k
            olap_scan_node.enable_mor_value_predicate_pushdown) {
440
25
            _tablet_reader_params.enable_mor_value_predicate_pushdown = true;
441
25
        }
442
443
        // Skip topn / general-limit storage-layer optimizations when runtime
444
        // filters exist.  Late-arriving filters would re-populate _conjuncts
445
        // at the scanner level while the storage layer has already committed
446
        // to a row budget counted before those filters, causing the scan to
447
        // return fewer rows than the limit requires.
448
840k
        if (_total_rf_num == 0) {
449
            // order by table keys optimization for topn
450
            // will only read head/tail of data file since it's already sorted by keys
451
808k
            if (olap_scan_node.__isset.sort_info &&
452
808k
                !olap_scan_node.sort_info.is_asc_order.empty()) {
453
3.30k
                _limit = _local_state->limit_per_scanner();
454
3.30k
                _tablet_reader_params.read_orderby_key = true;
455
3.30k
                if (!olap_scan_node.sort_info.is_asc_order[0]) {
456
411
                    _tablet_reader_params.read_orderby_key_reverse = true;
457
411
                }
458
3.30k
                _tablet_reader_params.read_orderby_key_num_prefix_columns =
459
3.30k
                        olap_scan_node.sort_info.is_asc_order.size();
460
3.30k
                _tablet_reader_params.read_orderby_key_limit = _limit;
461
462
3.30k
                if (_tablet_reader_params.read_orderby_key_limit > 0 &&
463
3.30k
                    olap_scan_local_state->_storage_no_merge()) {
464
3.20k
                    _tablet_reader_params.filter_block_conjuncts = _conjuncts;
465
3.20k
                    _conjuncts.clear();
466
3.20k
                }
467
805k
            } else if (_limit > 0 && olap_scan_local_state->_storage_no_merge()) {
468
                // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW
469
                // (non-merge path). Only when topn optimization is NOT active.
470
                // NOTE: _limit is the global query limit (TPlanNode.limit), not a
471
                // per-scanner budget. With N scanners each scanner may read up to
472
                // _limit rows, so up to N * _limit rows are read in total before
473
                // the _shared_scan_limit coordinator stops them. This is
474
                // acceptable because _shared_scan_limit guarantees correctness,
475
                // and the over-read is bounded by (N-1) * _limit which is small
476
                // for typical LIMIT values.
477
1.81k
                _tablet_reader_params.general_read_limit = _limit;
478
1.81k
                _tablet_reader_params.filter_block_conjuncts = _conjuncts;
479
1.81k
                _conjuncts.clear();
480
1.81k
            }
481
808k
        }
482
483
        // set push down topn filter
484
840k
        _tablet_reader_params.topn_filter_source_node_ids =
485
840k
                olap_scan_local_state->get_topn_filter_source_node_ids(_state, true);
486
840k
        if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) {
487
4.88k
            _tablet_reader_params.topn_filter_target_node_id =
488
4.88k
                    olap_scan_local_state->parent()->node_id();
489
4.88k
        }
490
840k
    }
491
492
    // If this is a Two-Phase read query, and we need to delay the release of Rowset
493
    // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset
494
840k
    if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
495
12
        constexpr static int delayed_s = 60;
496
18
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
497
18
            uint64_t delayed_expired_timestamp =
498
18
                    UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() +
499
18
                    delayed_s;
500
18
            rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
501
18
                    delayed_expired_timestamp);
502
18
            ExecEnv::GetInstance()->storage_engine().add_quering_rowset(
503
18
                    rs_reader.rs_reader->rowset());
504
18
        }
505
12
    }
506
507
840k
    if (tablet_schema->has_global_row_id()) {
508
6.66k
        auto& id_file_map = _state->get_id_file_map();
509
12.5k
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
510
12.5k
            id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset());
511
12.5k
        }
512
6.66k
    }
513
514
840k
    return Status::OK();
515
840k
}
516
517
840k
Status OlapScanner::_init_variant_columns() {
518
840k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
519
840k
    if (tablet_schema->num_variant_columns() == 0) {
520
834k
        return Status::OK();
521
834k
    }
522
    // Parent column has path info to distinction from each other
523
13.4k
    for (auto* slot : _output_tuple_desc->slots()) {
524
13.4k
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
525
            // Such columns are not exist in frontend schema info, so we need to
526
            // add them into tablet_schema for later column indexing.
527
6.34k
            const auto& dt_variant =
528
6.34k
                    assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()));
529
6.34k
            TabletColumn subcol = TabletColumn::create_materialized_variant_column(
530
6.34k
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
531
6.34k
                    slot->column_paths(), slot->col_unique_id(),
532
6.34k
                    dt_variant.variant_max_subcolumns_count(), dt_variant.enable_doc_mode());
533
6.34k
            if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) {
534
4.61k
                tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT);
535
4.61k
            }
536
6.34k
        }
537
13.4k
    }
538
5.57k
    variant_util::inherit_column_attributes(tablet_schema);
539
5.57k
    return Status::OK();
540
840k
}
541
542
840k
Status OlapScanner::_init_return_columns() {
543
7.99M
    for (auto* slot : _output_tuple_desc->slots()) {
544
        // variant column using path to index a column
545
7.99M
        int32_t index = 0;
546
7.99M
        auto& tablet_schema = _tablet_reader_params.tablet_schema;
547
7.99M
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
548
6.33k
            index = tablet_schema->field_index(PathInData(
549
6.33k
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
550
6.33k
                    slot->column_paths()));
551
7.98M
        } else {
552
7.99M
            index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id())
553
18.4E
                                               : tablet_schema->field_index(slot->col_name());
554
7.98M
        }
555
556
7.99M
        if (index < 0) {
557
0
            return Status::InternalError(
558
0
                    "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}",
559
0
                    slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id());
560
0
        }
561
562
7.99M
        if (slot->get_virtual_column_expr()) {
563
271
            ColumnId virtual_column_cid = index;
564
271
            _virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()];
565
271
            size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
566
271
            _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
567
271
            _vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()];
568
569
271
            VLOG_DEBUG << fmt::format(
570
0
                    "Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(),
571
0
                    virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid],
572
0
                    _vir_col_idx_to_type[idx_in_block]->get_name());
573
271
        }
574
575
7.99M
        const auto& column = tablet_schema->column(index);
576
7.99M
        int32_t unique_id =
577
7.99M
                column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
578
7.99M
        if (!slot->all_access_paths().empty()) {
579
74.7k
            _tablet_reader_params.all_access_paths.insert({unique_id, slot->all_access_paths()});
580
74.7k
        }
581
582
7.99M
        if (!slot->predicate_access_paths().empty()) {
583
5.73k
            _tablet_reader_params.predicate_access_paths.insert(
584
5.73k
                    {unique_id, slot->predicate_access_paths()});
585
5.73k
        }
586
587
7.99M
        if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
588
7.99M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP ||
589
7.99M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) &&
590
7.99M
            !slot->all_access_paths().empty()) {
591
70.5k
            tablet_schema->add_pruned_columns_data_type(column.unique_id(), slot->type());
592
70.5k
        }
593
594
7.99M
        _return_columns.push_back(index);
595
7.99M
        if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) {
596
0
            _tablet_columns_convert_to_null_set.emplace(index);
597
7.99M
        } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) {
598
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
599
0
                    "slot(id: {}, name: {})'s nullable does not match "
600
0
                    "column(tablet id: {}, index: {}, name: {}) ",
601
0
                    slot->id(), slot->col_name(), tablet_schema->table_id(), index,
602
0
                    tablet_schema->column(index).name());
603
0
        }
604
7.99M
    }
605
606
840k
    if (_return_columns.empty()) {
607
0
        return Status::InternalError("failed to build storage scanner, no materialized slot!");
608
0
    }
609
610
840k
    return Status::OK();
611
840k
}
612
613
884k
doris::TabletStorageType OlapScanner::get_storage_type() {
614
884k
    if (config::is_cloud_mode()) {
615
        // we don't have cold storage in cloud mode, all storage is treated as local
616
880k
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
617
880k
    }
618
4.42k
    int local_reader = 0;
619
12.7k
    for (const auto& reader : _tablet_reader_params.rs_splits) {
620
12.7k
        local_reader += reader.rs_reader->rowset()->is_local();
621
12.7k
    }
622
4.42k
    int total_reader = _tablet_reader_params.rs_splits.size();
623
624
4.42k
    if (local_reader == total_reader) {
625
4.42k
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
626
4.42k
    } else if (local_reader == 0) {
627
0
        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
628
0
    }
629
0
    return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
630
4.42k
}
631
632
1.09M
Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
633
    // Read one block from block reader
634
    // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface,
635
    // that is, eof can be set to true only when the returned block is empty.
636
1.09M
    RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, eof));
637
1.09M
    if (block->rows() > 0) {
638
252k
        _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed);
639
252k
        *eof = false;
640
252k
    }
641
1.09M
    return Status::OK();
642
1.09M
}
643
644
843k
Status OlapScanner::close(RuntimeState* state) {
645
843k
    if (!_try_close()) {
646
150
        return Status::OK();
647
150
    }
648
843k
    RETURN_IF_ERROR(Scanner::close(state));
649
843k
    return Status::OK();
650
843k
}
651
652
883k
void OlapScanner::update_realtime_counters() {
653
883k
    if (!_has_prepared) {
654
        // Counter update need prepare successfully, or it maybe core. For example, olap scanner
655
        // will open tablet reader during prepare, if not prepare successfully, tablet reader == nullptr.
656
0
        return;
657
0
    }
658
883k
    OlapScanLocalState* local_state = static_cast<OlapScanLocalState*>(_local_state);
659
883k
    const OlapReaderStatistics& stats = _tablet_reader->stats();
660
883k
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
661
883k
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
662
883k
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
663
883k
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
664
665
    // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in
666
    // doris metrics.
667
    // ScanBytes is the uncompressed bytes read from local + remote
668
    // bytes_read_from_local is the compressed bytes read from local
669
    // bytes_read_from_remote is the compressed bytes read from remote
670
    // scan bytes > bytes_read_from_local + bytes_read_from_remote
671
883k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read);
672
883k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
673
883k
            stats.uncompressed_bytes_read);
674
675
    // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote
676
883k
    if (stats.file_cache_stats.bytes_read_from_local == 0 &&
677
883k
        stats.file_cache_stats.bytes_read_from_remote == 0) {
678
629k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
679
629k
                stats.compressed_bytes_read);
680
629k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
681
629k
                stats.compressed_bytes_read);
682
629k
    } else {
683
254k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
684
254k
                stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local);
685
254k
        _state->get_query_ctx()
686
254k
                ->resource_ctx()
687
254k
                ->io_context()
688
254k
                ->update_scan_bytes_from_remote_storage(
689
254k
                        stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote);
690
691
254k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
692
254k
                stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local);
693
254k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
694
254k
                stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote);
695
254k
    }
696
697
883k
    _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
698
883k
    _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
699
883k
    _tablet_reader->mutable_stats()->raw_rows_read = 0;
700
701
883k
    _bytes_read_from_local = _tablet_reader->stats().file_cache_stats.bytes_read_from_local;
702
883k
    _bytes_read_from_remote = _tablet_reader->stats().file_cache_stats.bytes_read_from_remote;
703
883k
}
704
705
840k
void OlapScanner::_collect_profile_before_close() {
706
    //  Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside.
707
840k
    if (_has_updated_counter) {
708
0
        return;
709
0
    }
710
840k
    _has_updated_counter = true;
711
840k
    _tablet_reader->update_profile(_profile);
712
713
840k
    Scanner::_collect_profile_before_close();
714
715
    // Update counters for OlapScanner
716
    // Update counters from tablet reader's stats
717
840k
    auto& stats = _tablet_reader->stats();
718
840k
    auto* local_state = (OlapScanLocalState*)_local_state;
719
840k
    COUNTER_UPDATE(local_state->_io_timer, stats.io_ns);
720
840k
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
721
840k
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
722
840k
    COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns);
723
840k
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
724
840k
    COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns);
725
840k
    COUNTER_UPDATE(local_state->_block_load_counter, stats.blocks_load);
726
840k
    COUNTER_UPDATE(local_state->_block_fetch_timer, stats.block_fetch_ns);
727
840k
    COUNTER_UPDATE(local_state->_delete_bitmap_get_agg_timer, stats.delete_bitmap_get_agg_ns);
728
840k
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
729
840k
    COUNTER_UPDATE(local_state->_vec_cond_timer, stats.vec_cond_ns);
730
840k
    COUNTER_UPDATE(local_state->_short_cond_timer, stats.short_cond_ns);
731
840k
    COUNTER_UPDATE(local_state->_expr_filter_timer, stats.expr_filter_ns);
732
840k
    COUNTER_UPDATE(local_state->_block_init_timer, stats.block_init_ns);
733
840k
    COUNTER_UPDATE(local_state->_block_init_seek_timer, stats.block_init_seek_ns);
734
840k
    COUNTER_UPDATE(local_state->_block_init_seek_counter, stats.block_init_seek_num);
735
840k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_keys_timer,
736
840k
                   stats.generate_row_ranges_by_keys_ns);
737
840k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_column_conditions_timer,
738
840k
                   stats.generate_row_ranges_by_column_conditions_ns);
739
840k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_bf_timer,
740
840k
                   stats.generate_row_ranges_by_bf_ns);
741
840k
    COUNTER_UPDATE(local_state->_collect_iterator_merge_next_timer,
742
840k
                   stats.collect_iterator_merge_next_timer);
743
840k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_zonemap_timer,
744
840k
                   stats.generate_row_ranges_by_zonemap_ns);
745
840k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_dict_timer,
746
840k
                   stats.generate_row_ranges_by_dict_ns);
747
840k
    COUNTER_UPDATE(local_state->_predicate_column_read_timer, stats.predicate_column_read_ns);
748
840k
    COUNTER_UPDATE(local_state->_non_predicate_column_read_timer, stats.non_predicate_read_ns);
749
840k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_timer,
750
840k
                   stats.predicate_column_read_seek_ns);
751
840k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter,
752
840k
                   stats.predicate_column_read_seek_num);
753
840k
    COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns);
754
840k
    COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns);
755
840k
    COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num);
756
840k
    COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns);
757
840k
    COUNTER_UPDATE(local_state->_rows_vec_cond_filtered_counter, stats.rows_vec_cond_filtered);
758
840k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_filtered_counter,
759
840k
                   stats.rows_short_circuit_cond_filtered);
760
840k
    COUNTER_UPDATE(local_state->_rows_expr_cond_filtered_counter, stats.rows_expr_cond_filtered);
761
840k
    COUNTER_UPDATE(local_state->_rows_vec_cond_input_counter, stats.vec_cond_input_rows);
762
840k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
763
840k
                   stats.short_circuit_cond_input_rows);
764
840k
    COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows);
765
840k
    COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered);
766
840k
    COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered);
767
840k
    COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered);
768
840k
    COUNTER_UPDATE(local_state->_bf_filtered_counter, stats.rows_bf_filtered);
769
840k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_filtered);
770
840k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_by_bitmap);
771
840k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_vec_del_cond_filtered);
772
840k
    COUNTER_UPDATE(local_state->_conditions_filtered_counter, stats.rows_conditions_filtered);
773
840k
    COUNTER_UPDATE(local_state->_key_range_filtered_counter, stats.rows_key_range_filtered);
774
840k
    COUNTER_UPDATE(local_state->_total_pages_num_counter, stats.total_pages_num);
775
840k
    COUNTER_UPDATE(local_state->_cached_pages_num_counter, stats.cached_pages_num);
776
840k
    COUNTER_UPDATE(local_state->_inverted_index_filter_counter, stats.rows_inverted_index_filtered);
777
840k
    COUNTER_UPDATE(local_state->_inverted_index_filter_timer, stats.inverted_index_filter_timer);
778
840k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_hit_counter,
779
840k
                   stats.inverted_index_query_cache_hit);
780
840k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_miss_counter,
781
840k
                   stats.inverted_index_query_cache_miss);
782
840k
    COUNTER_UPDATE(local_state->_inverted_index_query_timer, stats.inverted_index_query_timer);
783
840k
    COUNTER_UPDATE(local_state->_inverted_index_query_null_bitmap_timer,
784
840k
                   stats.inverted_index_query_null_bitmap_timer);
785
840k
    COUNTER_UPDATE(local_state->_inverted_index_query_bitmap_copy_timer,
786
840k
                   stats.inverted_index_query_bitmap_copy_timer);
787
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_open_timer,
788
840k
                   stats.inverted_index_searcher_open_timer);
789
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer,
790
840k
                   stats.inverted_index_searcher_search_timer);
791
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer,
792
840k
                   stats.inverted_index_searcher_search_init_timer);
793
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer,
794
840k
                   stats.inverted_index_searcher_search_exec_timer);
795
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter,
796
840k
                   stats.inverted_index_searcher_cache_hit);
797
840k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter,
798
840k
                   stats.inverted_index_searcher_cache_miss);
799
840k
    COUNTER_UPDATE(local_state->_inverted_index_downgrade_count_counter,
800
840k
                   stats.inverted_index_downgrade_count);
801
840k
    COUNTER_UPDATE(local_state->_inverted_index_analyzer_timer,
802
840k
                   stats.inverted_index_analyzer_timer);
803
840k
    COUNTER_UPDATE(local_state->_inverted_index_lookup_timer, stats.inverted_index_lookup_timer);
804
840k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_timer,
805
840k
                   stats.variant_scan_sparse_column_timer_ns);
806
840k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_bytes,
807
840k
                   stats.variant_scan_sparse_column_bytes);
808
840k
    COUNTER_UPDATE(local_state->_variant_fill_path_from_sparse_column_timer,
809
840k
                   stats.variant_fill_path_from_sparse_column_timer_ns);
810
840k
    COUNTER_UPDATE(local_state->_variant_subtree_default_iter_count,
811
840k
                   stats.variant_subtree_default_iter_count);
812
840k
    COUNTER_UPDATE(local_state->_variant_subtree_leaf_iter_count,
813
840k
                   stats.variant_subtree_leaf_iter_count);
814
840k
    COUNTER_UPDATE(local_state->_variant_subtree_hierarchical_iter_count,
815
840k
                   stats.variant_subtree_hierarchical_iter_count);
816
840k
    COUNTER_UPDATE(local_state->_variant_subtree_sparse_iter_count,
817
840k
                   stats.variant_subtree_sparse_iter_count);
818
840k
    COUNTER_UPDATE(local_state->_variant_doc_value_column_iter_count,
819
840k
                   stats.variant_doc_value_column_iter_count);
820
821
840k
    InvertedIndexProfileReporter inverted_index_profile;
822
840k
    inverted_index_profile.update(local_state->_index_filter_profile.get(),
823
840k
                                  &stats.inverted_index_stats);
824
825
    // only cloud deploy mode will use file cache.
826
840k
    if (config::is_cloud_mode() && config::enable_file_cache) {
827
835k
        io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get());
828
835k
        cache_profile.update(&stats.file_cache_stats);
829
835k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
830
835k
                stats.file_cache_stats.bytes_write_into_cache);
831
835k
    }
832
840k
    COUNTER_UPDATE(local_state->_output_index_result_column_timer,
833
840k
                   stats.output_index_result_column_timer);
834
840k
    COUNTER_UPDATE(local_state->_filtered_segment_counter, stats.filtered_segment_number);
835
840k
    COUNTER_UPDATE(local_state->_total_segment_counter, stats.total_segment_number);
836
840k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, stats.condition_cache_hit_seg_nums);
837
840k
    COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
838
840k
                   stats.condition_cache_filtered_rows);
839
840
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_timer, stats.tablet_reader_init_timer_ns);
841
840k
    COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer,
842
840k
                   stats.tablet_reader_capture_rs_readers_timer_ns);
843
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_return_columns_timer,
844
840k
                   stats.tablet_reader_init_return_columns_timer_ns);
845
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_keys_param_timer,
846
840k
                   stats.tablet_reader_init_keys_param_timer_ns);
847
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_orderby_keys_param_timer,
848
840k
                   stats.tablet_reader_init_orderby_keys_param_timer_ns);
849
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_conditions_param_timer,
850
840k
                   stats.tablet_reader_init_conditions_param_timer_ns);
851
840k
    COUNTER_UPDATE(local_state->_tablet_reader_init_delete_condition_param_timer,
852
840k
                   stats.tablet_reader_init_delete_condition_param_timer_ns);
853
840k
    COUNTER_UPDATE(local_state->_block_reader_vcollect_iter_init_timer,
854
840k
                   stats.block_reader_vcollect_iter_init_timer_ns);
855
840k
    COUNTER_UPDATE(local_state->_block_reader_rs_readers_init_timer,
856
840k
                   stats.block_reader_rs_readers_init_timer_ns);
857
840k
    COUNTER_UPDATE(local_state->_block_reader_build_heap_init_timer,
858
840k
                   stats.block_reader_build_heap_init_timer_ns);
859
860
840k
    COUNTER_UPDATE(local_state->_rowset_reader_get_segment_iterators_timer,
861
840k
                   stats.rowset_reader_get_segment_iterators_timer_ns);
862
840k
    COUNTER_UPDATE(local_state->_rowset_reader_create_iterators_timer,
863
840k
                   stats.rowset_reader_create_iterators_timer_ns);
864
840k
    COUNTER_UPDATE(local_state->_rowset_reader_init_iterators_timer,
865
840k
                   stats.rowset_reader_init_iterators_timer_ns);
866
840k
    COUNTER_UPDATE(local_state->_rowset_reader_load_segments_timer,
867
840k
                   stats.rowset_reader_load_segments_timer_ns);
868
869
840k
    COUNTER_UPDATE(local_state->_segment_iterator_init_timer, stats.segment_iterator_init_timer_ns);
870
840k
    COUNTER_UPDATE(local_state->_segment_iterator_init_return_column_iterators_timer,
871
840k
                   stats.segment_iterator_init_return_column_iterators_timer_ns);
872
840k
    COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer,
873
840k
                   stats.segment_iterator_init_index_iterators_timer_ns);
874
875
840k
    COUNTER_UPDATE(local_state->_segment_create_column_readers_timer,
876
840k
                   stats.segment_create_column_readers_timer_ns);
877
840k
    COUNTER_UPDATE(local_state->_segment_load_index_timer, stats.segment_load_index_timer_ns);
878
879
    // Update metrics
880
840k
    DorisMetrics::instance()->query_scan_bytes->increment(
881
840k
            local_state->_read_uncompressed_counter->value());
882
840k
    DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
883
840k
    auto& tablet = _tablet_reader_params.tablet;
884
840k
    tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
885
840k
    tablet->query_scan_rows->increment(local_state->_scan_rows->value());
886
840k
    tablet->query_scan_count->increment(1);
887
888
840k
    COUNTER_UPDATE(local_state->_ann_range_search_filter_counter,
889
840k
                   stats.rows_ann_index_range_filtered);
890
840k
    COUNTER_UPDATE(local_state->_ann_topn_filter_counter, stats.rows_ann_index_topn_filtered);
891
840k
    COUNTER_UPDATE(local_state->_ann_index_load_costs, stats.ann_index_load_ns);
892
840k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_load_costs, stats.ann_ivf_on_disk_load_ns);
893
840k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_hit_cnt,
894
840k
                   stats.ann_ivf_on_disk_cache_hit_cnt);
895
840k
    COUNTER_UPDATE(local_state->_ann_ivf_on_disk_cache_miss_cnt,
896
840k
                   stats.ann_ivf_on_disk_cache_miss_cnt);
897
840k
    COUNTER_UPDATE(local_state->_ann_range_search_costs, stats.ann_index_range_search_ns);
898
840k
    COUNTER_UPDATE(local_state->_ann_range_search_cnt, stats.ann_index_range_search_cnt);
899
840k
    COUNTER_UPDATE(local_state->_ann_range_engine_search_costs, stats.ann_range_engine_search_ns);
900
    // Engine prepare before search
901
840k
    COUNTER_UPDATE(local_state->_ann_range_pre_process_costs, stats.ann_range_pre_process_ns);
902
    // Post process parent: Doris result process + engine convert
903
840k
    COUNTER_UPDATE(local_state->_ann_range_post_process_costs,
904
840k
                   stats.ann_range_result_convert_ns + stats.ann_range_engine_convert_ns);
905
    // Engine convert (child under post-process)
906
840k
    COUNTER_UPDATE(local_state->_ann_range_engine_convert_costs, stats.ann_range_engine_convert_ns);
907
    // Doris-side result convert (child under post-process)
908
840k
    COUNTER_UPDATE(local_state->_ann_range_result_convert_costs, stats.ann_range_result_convert_ns);
909
910
840k
    COUNTER_UPDATE(local_state->_ann_topn_search_costs, stats.ann_topn_search_ns);
911
840k
    COUNTER_UPDATE(local_state->_ann_topn_search_cnt, stats.ann_index_topn_search_cnt);
912
913
    // Detailed ANN timers
914
    // ANN TopN timers with hierarchy
915
    // Engine search time (FAISS)
916
840k
    COUNTER_UPDATE(local_state->_ann_topn_engine_search_costs,
917
840k
                   stats.ann_index_topn_engine_search_ns);
918
    // Engine prepare time (allocations/buffer setup before search)
919
840k
    COUNTER_UPDATE(local_state->_ann_topn_pre_process_costs,
920
840k
                   stats.ann_index_topn_engine_prepare_ns);
921
    // Post process parent includes Doris result processing + engine convert
922
840k
    COUNTER_UPDATE(local_state->_ann_topn_post_process_costs,
923
840k
                   stats.ann_index_topn_result_process_ns + stats.ann_index_topn_engine_convert_ns);
924
    // Engine-side conversion time inside FAISS wrappers (child under post-process)
925
840k
    COUNTER_UPDATE(local_state->_ann_topn_engine_convert_costs,
926
840k
                   stats.ann_index_topn_engine_convert_ns);
927
928
    // Doris-side result convert costs (show separately as another child counter); use pure process time
929
840k
    COUNTER_UPDATE(local_state->_ann_topn_result_convert_costs,
930
840k
                   stats.ann_index_topn_result_process_ns);
931
932
840k
    COUNTER_UPDATE(local_state->_ann_fallback_brute_force_cnt, stats.ann_fall_back_brute_force_cnt);
933
934
    // Overhead counter removed; precise instrumentation is reported via engine_prepare above.
935
840k
}
936
937
#include "common/compile_check_avoid_end.h"
938
} // namespace doris