Coverage Report

Created: 2026-04-09 13:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/generic_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PlanNodes_types.h>
21
22
#include <functional>
23
#include <memory>
24
#include <set>
25
#include <string>
26
#include <tuple>
27
#include <unordered_map>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "core/column/column.h"
32
#include "core/column/column_nullable.h"
33
#include "core/data_type/data_type.h"
34
#include "exprs/vexpr.h"
35
#include "exprs/vexpr_context.h"
36
#include "exprs/vexpr_fwd.h"
37
#include "format/column_descriptor.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "runtime/descriptors.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/predicate/block_column_predicate.h"
42
#include "storage/segment/common.h"
43
#include "util/profile_collector.h"
44
45
namespace doris {
46
class ColumnPredicate;
47
} // namespace doris
48
49
namespace doris {
50
#include "common/compile_check_begin.h"
51
52
class Block;
53
class VSlotRef;
54
55
// Context passed from FileScanner to readers for condition cache integration.
56
// On MISS: readers populate filter_result per-granule during predicate evaluation.
57
// On HIT: readers skip granules where filter_result[granule] == false.
58
struct ConditionCacheContext {
59
    bool is_hit = false;
60
    std::shared_ptr<std::vector<bool>> filter_result; // per-granule: true = has surviving rows
61
    int64_t base_granule = 0; // global granule index of the first granule in filter_result
62
    static constexpr int GRANULE_SIZE = 2048;
63
};
64
65
/// Base context for the unified init_reader(ReaderInitContext*) template method.
66
/// Contains fields shared by ALL reader types. Format-specific readers define
67
/// subclasses (ParquetInitContext, OrcInitContext, etc.) with extra fields.
68
/// FileScanner allocates the appropriate subclass and populates the shared fields
69
/// before calling init_reader().
70
struct ReaderInitContext {
71
79.3k
    virtual ~ReaderInitContext() = default;
72
73
    // ---- Owned by FileScanner, shared by all readers ----
74
    std::vector<ColumnDescriptor>* column_descs = nullptr;
75
    std::unordered_map<std::string, uint32_t>* col_name_to_block_idx = nullptr;
76
    RuntimeState* state = nullptr;
77
    const TupleDescriptor* tuple_descriptor = nullptr;
78
    const RowDescriptor* row_descriptor = nullptr;
79
    const TFileScanRangeParams* params = nullptr;
80
    const TFileRangeDesc* range = nullptr;
81
    TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
82
83
    // ---- Output slots (filled by on_before_init_reader) ----
84
    std::vector<std::string> column_names;
85
    std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node =
86
            TableSchemaChangeHelper::ConstNode::get_instance();
87
    std::set<uint64_t> column_ids;
88
    std::set<uint64_t> filter_column_ids;
89
};
90
91
/// Safe downcast for ReaderInitContext subclasses.
92
/// Uses dynamic_cast + DORIS_CHECK: crashes on type mismatch (per Doris coding standards).
93
template <typename To, typename From>
94
73.6k
To* checked_context_cast(From* ptr) {
95
73.6k
    auto* result = dynamic_cast<To*>(ptr);
96
73.6k
    DORIS_CHECK(result != nullptr);
97
73.6k
    return result;
98
73.6k
}
_ZN5doris20checked_context_castINS_14CsvInitContextENS_17ReaderInitContextEEEPT_PT0_
Line
Count
Source
94
7.45k
To* checked_context_cast(From* ptr) {
95
7.45k
    auto* result = dynamic_cast<To*>(ptr);
96
7.45k
    DORIS_CHECK(result != nullptr);
97
7.45k
    return result;
98
7.45k
}
_ZN5doris20checked_context_castINS_14OrcInitContextENS_17ReaderInitContextEEEPT_PT0_
Line
Count
Source
94
28.8k
To* checked_context_cast(From* ptr) {
95
28.8k
    auto* result = dynamic_cast<To*>(ptr);
96
28.8k
    DORIS_CHECK(result != nullptr);
97
28.8k
    return result;
98
28.8k
}
_ZN5doris20checked_context_castINS_15JsonInitContextENS_17ReaderInitContextEEEPT_PT0_
Line
Count
Source
94
759
To* checked_context_cast(From* ptr) {
95
759
    auto* result = dynamic_cast<To*>(ptr);
96
759
    DORIS_CHECK(result != nullptr);
97
759
    return result;
98
759
}
_ZN5doris20checked_context_castINS_18ParquetInitContextENS_17ReaderInitContextEEEPT_PT0_
Line
Count
Source
94
36.5k
To* checked_context_cast(From* ptr) {
95
36.5k
    auto* result = dynamic_cast<To*>(ptr);
96
36.5k
    DORIS_CHECK(result != nullptr);
97
36.5k
    return result;
98
36.5k
}
Unexecuted instantiation: _ZN5doris20checked_context_castINS_14WalInitContextENS_17ReaderInitContextEEEPT_PT0_
99
100
/// Base reader interface for all file readers.
101
/// A GenericReader is responsible for reading a file and returning
102
/// a set of blocks with specified schema.
103
///
104
/// Provides hook virtual methods that implement the Template Method pattern:
105
///   init_reader:      _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader
106
///   get_next_block:   on_before_read_block → _do_get_next_block → on_after_read_block
107
///
108
/// Column-filling logic (partition/missing/synthesized) lives in TableFormatReader.
109
class GenericReader : public ProfileCollector {
110
public:
111
87.4k
    GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
112
85.7k
    void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
113
18.4E
        DCHECK(!_reader_initialized)
114
18.4E
                << "set_push_down_agg_type must not be called after init_reader completes";
115
85.7k
        _push_down_agg_type = push_down_agg_type;
116
85.7k
    }
117
70.8k
    TPushAggOp::type get_push_down_agg_type() const { return _push_down_agg_type; }
118
119
    /// Template method for reading blocks.
120
    /// Calls: on_before_read_block → _do_get_next_block → on_after_read_block
121
177k
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) {
122
177k
        RETURN_IF_ERROR(on_before_read_block(block));
123
177k
        RETURN_IF_ERROR(_do_get_next_block(block, read_rows, eof));
124
177k
        RETURN_IF_ERROR(on_after_read_block(block, read_rows));
125
177k
        return Status::OK();
126
177k
    }
127
128
    // Type is always nullable to process illegal values.
129
    // Results are cached after the first successful call.
130
82.2k
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
131
82.2k
        if (_get_columns_cached) {
132
8.31k
            *name_to_type = _cached_name_to_type;
133
8.31k
            return Status::OK();
134
8.31k
        }
135
73.9k
        RETURN_IF_ERROR(_get_columns_impl(name_to_type));
136
73.9k
        _cached_name_to_type = *name_to_type;
137
73.9k
        _get_columns_cached = true;
138
139
73.9k
        return Status::OK();
140
73.9k
    }
141
142
0
    virtual Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
143
0
        return Status::NotSupported("get_columns is not implemented");
144
0
    }
145
146
    // This method is responsible for initializing the resource for parsing schema.
147
    // It will be called before `get_parsed_schema`.
148
0
    virtual Status init_schema_reader() {
149
0
        return Status::NotSupported("init_schema_reader is not implemented for this reader.");
150
0
    }
151
    // `col_types` is always nullable to process illegal values.
152
    virtual Status get_parsed_schema(std::vector<std::string>* col_names,
153
0
                                     std::vector<DataTypePtr>* col_types) {
154
0
        return Status::NotSupported("get_parsed_schema is not implemented for this reader.");
155
0
    }
156
87.5k
    ~GenericReader() override = default;
157
158
26.9k
    virtual Status close() { return Status::OK(); }
159
160
3.27k
    Status read_by_rows(const std::list<int64_t>& row_ids) {
161
3.27k
        _read_by_rows = true;
162
3.27k
        _row_ids = row_ids;
163
3.27k
        return _set_read_one_line_impl();
164
3.27k
    }
165
166
    /// The reader is responsible for counting the number of rows read,
167
    /// because some readers, such as parquet/orc,
168
    /// can skip some pages/rowgroups through indexes.
169
17.8k
    virtual bool count_read_rows() { return false; }
170
171
    /// Returns true if on_before_init_reader has already set _column_descs.
172
65.4k
    bool has_column_descs() const { return _column_descs != nullptr; }
173
174
    /// Unified initialization entry point (NVI pattern).
175
    /// Enforces the template method sequence for ALL readers:
176
    ///   _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader
177
    /// Subclasses implement _open_file_reader and _do_init_reader(ReaderInitContext*).
178
    /// FileScanner constructs the appropriate ReaderInitContext subclass and calls this.
179
    ///
180
    /// NOTE: During migration, readers not yet ported to this API still use their
181
    /// format-specific init_reader(...) methods. This method is non-virtual so it
182
    /// cannot be accidentally overridden.
183
79.3k
    Status init_reader(ReaderInitContext* ctx) {
184
        // Apply push_down_agg_type early so _open_file_reader and _do_init_reader
185
        // can use it (e.g., PaimonCppReader skips full init on COUNT pushdown).
186
        // on_after_init_reader may reset this (e.g., Iceberg with equality deletes).
187
79.3k
        set_push_down_agg_type(ctx->push_down_agg_type);
188
189
79.3k
        RETURN_IF_ERROR(_open_file_reader(ctx));
190
191
        // Standalone readers (delete file readers, push handler) set column_descs=nullptr
192
        // and pre-populate column_names directly. Skip hooks for them.
193
79.3k
        if (ctx->column_descs != nullptr) {
194
74.1k
            RETURN_IF_ERROR(on_before_init_reader(ctx));
195
74.1k
        }
196
197
79.2k
        RETURN_IF_ERROR(_do_init_reader(ctx));
198
199
79.2k
        if (ctx->column_descs != nullptr) {
200
74.1k
            RETURN_IF_ERROR(on_after_init_reader(ctx));
201
74.1k
        }
202
203
79.2k
        _reader_initialized = true;
204
79.2k
        return Status::OK();
205
79.2k
    }
206
207
    /// Hook called before core init. Default just sets _column_descs.
208
    /// TableFormatReader overrides with partition/missing column computation.
209
    /// ORC/Parquet/Hive/Iceberg further override with format-specific schema matching.
210
5.50k
    virtual Status on_before_init_reader(ReaderInitContext* ctx) {
211
5.50k
        _column_descs = ctx->column_descs;
212
5.50k
        return Status::OK();
213
5.50k
    }
214
215
protected:
216
    // ---- Init-time hooks (Template Method for init_reader) ----
217
218
    /// Opens the file and prepares I/O resources before hooks run. Override in
219
    /// subclasses to open files, read metadata, set up decompressors, etc.
220
    /// For Parquet/ORC, opens the file and reads footer metadata.
221
    /// For CSV/JSON, opens the file, creates decompressors, and sets up line readers.
222
    /// Default is no-op (for JNI, Native, Arrow readers).
223
5.61k
    virtual Status _open_file_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); }
224
225
    /// Core initialization (format-specific). Subclasses override to perform
226
    /// their actual parsing engine setup. The context should be downcast to
227
    /// the appropriate subclass using checked_context_cast<T>.
228
    /// Default returns NotSupported — readers not yet migrated to the unified
229
    /// init_reader(ReaderInitContext*) API still use their old init methods.
230
0
    virtual Status _do_init_reader(ReaderInitContext* /*ctx*/) {
231
0
        return Status::NotSupported(
232
0
                "_do_init_reader(ReaderInitContext*) not yet implemented for this reader");
233
0
    }
234
235
    // ---- Existing init-time hooks ----
236
237
    /// Called after core init completes. Subclasses override to process
238
    /// delete files, deletion vectors, etc.
239
69.1k
    virtual Status on_after_init_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); }
240
241
    // ---- Read-time hooks ----
242
243
    /// Called before reading a block. Subclasses override to modify block
244
    /// structure (e.g. add ACID columns, expand for equality delete).
245
145k
    virtual Status on_before_read_block(Block* block) { return Status::OK(); }
246
247
    /// Core block reading. Subclasses must override with actual read logic.
248
    virtual Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;
249
250
    /// Called after reading a block. Subclasses override to post-process
251
    /// (e.g. remove ACID columns, apply equality delete filter).
252
14.0k
    virtual Status on_after_read_block(Block* block, size_t* read_rows) { return Status::OK(); }
253
254
0
    virtual Status _set_read_one_line_impl() {
255
0
        return Status::NotSupported("read_by_rows is not implemented for this reader.");
256
0
    }
257
258
    const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
259
260
    TPushAggOp::type _push_down_agg_type {};
261
    bool _reader_initialized = false;
262
263
public:
264
    // Pass condition cache context to the reader for HIT/MISS tracking.
265
0
    virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
266
267
    // Returns true if this reader can produce an accurate total row count from metadata
268
    // without reading actual data. Used to determine if CountReader decorator can be applied.
269
    // Only ORC and Parquet readers support this (via file footer metadata).
270
1.02k
    virtual bool supports_count_pushdown() const { return false; }
271
272
    // Returns the total number of rows the reader will produce.
273
    // Used to pre-allocate condition cache with the correct number of granules.
274
1.64k
    virtual int64_t get_total_rows() const { return 0; }
275
276
    // Returns true if this reader has delete operations (e.g. Iceberg position/equality deletes,
277
    // Hive ACID deletes). Used to disable condition cache when deletes are present, since cached
278
    // granule results may become stale if delete files change between queries.
279
1.64k
    virtual bool has_delete_operations() const { return false; }
280
281
protected:
282
    bool _read_by_rows = false;
283
    std::list<int64_t> _row_ids;
284
285
    // Cache to save some common part such as file footer.
286
    // Maybe null if not used
287
    FileMetaCache* _meta_cache = nullptr;
288
289
    // ---- Column descriptors (set by init_reader, owned by FileScanner) ----
290
    const std::vector<ColumnDescriptor>* _column_descs = nullptr;
291
292
    // ---- get_columns cache ----
293
    bool _get_columns_cached = false;
294
    std::unordered_map<std::string, DataTypePtr> _cached_name_to_type;
295
};
296
297
/// Provides an accessor for the current batch's row positions within the file.
298
/// Implemented by RowGroupReader (Parquet) and OrcReader.
299
class RowPositionProvider {
300
public:
301
70.1k
    virtual ~RowPositionProvider() = default;
302
    virtual const std::vector<segment_v2::rowid_t>& current_batch_row_positions() const = 0;
303
};
304
305
#include "common/compile_check_end.h"
306
} // namespace doris