Coverage Report

Created: 2026-06-27 12:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <algorithm>
23
#include <atomic>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "runtime/exec_env.h"
29
#include "runtime/runtime_state.h"
30
#include "storage/tablet/tablet.h"
31
#include "util/stopwatch.hpp"
32
33
namespace doris {
34
class RuntimeProfile;
35
class TupleDescriptor;
36
37
class VExprContext;
38
39
class ScanLocalStateBase;
40
} // namespace doris
41
42
namespace doris {
43
44
// Counter for load
45
struct ScannerCounter {
46
460k
    ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}
47
48
    int64_t num_rows_filtered;   // unqualified rows (unmatched the dest schema, or no partition)
49
    int64_t num_rows_unselected; // rows filtered by predicates
50
};
51
52
class Scanner {
53
public:
54
    Scanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t limit,
55
            RuntimeProfile* profile);
56
57
    //only used for FileScanner read one line.
58
    Scanner(RuntimeState* state, RuntimeProfile* profile)
59
3.23k
            : _state(state), _limit(1), _profile(profile), _total_rf_num(0), _has_prepared(false) {
60
3.23k
        DorisMetrics::instance()->scanner_cnt->increment(1);
61
3.23k
    };
62
63
460k
    virtual ~Scanner() {
64
460k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_state->query_mem_tracker());
65
460k
        _input_block.clear();
66
460k
        _conjuncts.clear();
67
460k
        _projections.clear();
68
460k
        _origin_block.clear();
69
460k
        _common_expr_ctxs_push_down.clear();
70
460k
        DorisMetrics::instance()->scanner_cnt->increment(-1);
71
460k
    }
72
73
    virtual Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts);
74
456k
    Status prepare() {
75
456k
        SCOPED_RAW_TIMER(&_per_scanner_timer);
76
456k
        SCOPED_RAW_TIMER(&_per_scanner_prepare_timer);
77
456k
        return _prepare_impl();
78
456k
    }
79
80
456k
    Status open(RuntimeState* state) {
81
456k
        SCOPED_RAW_TIMER(&_per_scanner_timer);
82
456k
        SCOPED_RAW_TIMER(&_per_scanner_open_timer);
83
456k
        return _open_impl(state);
84
456k
    }
85
86
    Status get_block(RuntimeState* state, Block* block, bool* eos);
87
    Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos);
88
89
    virtual Status close(RuntimeState* state);
90
91
    // Try to stop scanner, and all running readers.
92
457k
    virtual void try_stop() { _should_stop = true; };
93
94
0
    virtual std::string get_name() { return ""; }
95
96
    // return the readable name of current scan range.
97
    // eg, for file scanner, return the current file path.
98
0
    virtual std::string get_current_scan_range_name() { return "not implemented"; }
99
100
protected:
101
89.1k
    virtual Status _prepare_impl() {
102
89.1k
        _has_prepared = true;
103
89.1k
        return Status::OK();
104
89.1k
    }
105
106
456k
    virtual Status _open_impl(RuntimeState* state) {
107
456k
        _block_avg_bytes = state->batch_size() * 8;
108
456k
        return Status::OK();
109
456k
    }
110
111
    // Subclass should implement this to return data.
112
    virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
113
114
545k
    Status _merge_padding_block() {
115
545k
        if (_padding_block.empty()) {
116
412k
            _padding_block.swap(_origin_block);
117
412k
        } else if (_origin_block.rows()) {
118
74.5k
            ScopedMutableBlock scoped_mutable_block(&_padding_block);
119
74.5k
            auto& mutable_block = scoped_mutable_block.mutable_block();
120
74.5k
            RETURN_IF_ERROR(mutable_block.merge(_origin_block));
121
74.5k
        }
122
545k
        return Status::OK();
123
545k
    }
124
125
    // Update the counters before closing this scanner
126
    virtual void _collect_profile_before_close();
127
128
    // Whether rows filtered/unselected by this scanner should be reported to the load
129
    // counters in RuntimeState. Only the scanner reading the load source data should
130
    // report, otherwise rows filtered by query predicates (e.g. in INSERT INTO ... SELECT
131
    // or DELETE FROM ... WHERE) would be mixed into load counters and make
132
    // num_rows_load_success() negative.
133
369k
    virtual bool _should_update_load_counters() const { return _is_load; }
134
135
    // Check if scanner is already closed, if not, mark it as closed.
136
    // Returns true if the scanner was successfully marked as closed (first time).
137
    // Returns false if the scanner was already closed.
138
    bool _try_close();
139
140
    // Filter the output block finally.
141
    Status _filter_output_block(Block* block);
142
143
    Status _do_projections(Block* origin_block, Block* output_block);
144
145
private:
146
510k
    void _start_scan_cpu_timer() {
147
510k
        _cpu_watch.reset();
148
510k
        _cpu_watch.start();
149
510k
    }
150
151
510k
    void _update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }
152
    void _update_scan_cpu_timer();
153
154
public:
155
    // Call start_wait_worker_timer() when submit the scanner to the thread pool.
156
    // And call update_wait_worker_timer() when it is actually being executed.
157
1.02M
    void start_wait_worker_timer() {
158
1.02M
        _watch.reset();
159
1.02M
        _watch.start();
160
1.02M
    }
161
162
510k
    void resume() {
163
510k
        _update_wait_worker_timer();
164
510k
        _start_scan_cpu_timer();
165
510k
    }
166
510k
    void pause() {
167
510k
        _update_scan_cpu_timer();
168
510k
        start_wait_worker_timer();
169
510k
    }
170
440
    int64_t get_time_cost_ns() const { return _per_scanner_timer; }
171
440
    int64_t get_prepare_time_cost_ns() const { return _per_scanner_prepare_timer; }
172
440
    int64_t get_open_time_cost_ns() const { return _per_scanner_open_timer; }
173
174
440
    int64_t projection_time() const { return _projection_timer; }
175
440
    int64_t get_rows_read() const { return _num_rows_read; }
176
177
510k
    bool has_prepared() const { return _has_prepared; }
178
179
    Status try_append_late_arrival_runtime_filter();
180
181
440
    int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }
182
183
    // Some counters need to be updated realtime, for example, workload group policy need
184
    // scan bytes to cancel the query exceed limit.
185
1.90k
    virtual void update_realtime_counters() {}
186
187
914k
    RuntimeState* runtime_state() { return _state; }
188
189
510k
    bool is_open() const { return _is_open; }
190
457k
    void set_opened() { _is_open = true; }
191
192
140k
    virtual doris::TabletStorageType get_storage_type() {
193
140k
        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
194
140k
    }
195
196
    // Returns true if this scanner's partition has been pruned by a runtime filter.
197
    // Overridden by OlapScanner to check partition pruning state.
198
280k
    virtual bool check_partition_pruned() const { return false; }
199
200
0
    bool need_to_close() const { return _need_to_close; }
201
202
456k
    void mark_to_need_to_close() {
203
        // If the scanner is failed during init or open, then not need update counters
204
        // because the query is fail and the counter is useless. And it may core during
205
        // update counters. For example, update counters depend on scanner's tablet, but
206
        // the tablet == null when init failed.
207
456k
        if (_is_open) {
208
456k
            _collect_profile_before_close();
209
456k
        }
210
456k
        _need_to_close = true;
211
456k
    }
212
213
0
    void set_status_on_failure(const Status& st) { _status = st; }
214
215
510k
    int64_t limit() const { return _limit; }
216
217
0
    auto get_block_avg_bytes() const { return _block_avg_bytes; }
218
219
111k
    void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; }
220
221
protected:
222
    RuntimeState* _state = nullptr;
223
    ScanLocalStateBase* _local_state = nullptr;
224
225
    // Set if scan node has sort limit info
226
    int64_t _limit = -1;
227
228
    RuntimeProfile* _profile = nullptr;
229
230
    const TupleDescriptor* _output_tuple_desc = nullptr;
231
    const RowDescriptor* _output_row_descriptor = nullptr;
232
233
    // If _input_tuple_desc is set, the scanner will read data into
234
    // this _input_block first, then convert to the output block.
235
    Block _input_block;
236
237
    bool _is_open = false;
238
    std::atomic<bool> _is_closed {false};
239
    bool _need_to_close = false;
240
    Status _status;
241
242
    // If _applied_rf_num == _total_rf_num
243
    // means all runtime filters are arrived and applied.
244
    int _applied_rf_num = 0;
245
    int _total_rf_num = 0;
246
    // Cloned from _conjuncts of scan node.
247
    // It includes predicate in SQL and runtime filters.
248
    VExprContextSPtrs _conjuncts;
249
    VExprContextSPtrs _projections;
250
    // Used in common subexpression elimination to compute intermediate results.
251
    std::vector<VExprContextSPtrs> _intermediate_projections;
252
    Block _origin_block;
253
    Block _padding_block;
254
255
    VExprContextSPtrs _common_expr_ctxs_push_down;
256
257
    // num of rows read from scanner
258
    int64_t _num_rows_read = 0;
259
260
    int64_t _num_byte_read = 0;
261
262
    // num of rows return from scanner, after filter block
263
    int64_t _num_rows_return = 0;
264
265
    size_t _block_avg_bytes = 0;
266
267
    // Set true after counter is updated finally
268
    bool _has_updated_counter = false;
269
270
    // watch to count the time wait for scanner thread
271
    MonotonicStopWatch _watch;
272
    // Do not use ScopedTimer. There is no guarantee that, the counter
273
    ThreadCpuStopWatch _cpu_watch;
274
    int64_t _scanner_wait_worker_timer = 0;
275
    int64_t _scan_cpu_timer = 0;
276
277
    bool _is_load = false;
278
279
    bool _has_prepared = false;
280
281
    ScannerCounter _counter;
282
    int64_t _per_scanner_timer = 0;
283
    int64_t _per_scanner_prepare_timer = 0;
284
    int64_t _per_scanner_open_timer = 0;
285
    int64_t _projection_timer = 0;
286
287
    bool _should_stop = false;
288
289
    // Cached pointer to ScanOperator's remaining-limit counter. Null when
290
    // this scanner is on the topn path or the query has no LIMIT.
291
    std::atomic<int64_t>* _shared_scan_limit = nullptr;
292
};
293
294
using ScannerSPtr = std::shared_ptr<Scanner>;
295
296
} // namespace doris