Coverage Report

Created: 2026-07-04 04:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_scan.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//   http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing,
10
// software distributed under the License is distributed on an
11
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
12
// KIND, either express or implied.  See the License for the
13
// specific language governing permissions and limitations
14
// under the License.
15
16
#include "format_v2/parquet/parquet_scan.h"
17
18
#include <algorithm>
19
#include <limits>
20
#include <memory>
21
#include <ranges>
22
#include <unordered_set>
23
#include <utility>
24
25
#include "common/exception.h"
26
#include "common/status.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/column/column_vector.h"
30
#include "exprs/vexpr_context.h"
31
#include "format_v2/parquet/parquet_column_schema.h"
32
#include "format_v2/parquet/parquet_file_context.h"
33
#include "format_v2/parquet/parquet_statistics.h"
34
35
namespace doris::format::parquet {
36
37
namespace {
38
39
202
int64_t column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) {
40
202
    return column_metadata.has_dictionary_page()
41
202
                   ? cast_set<int64_t>(column_metadata.dictionary_page_offset())
42
202
                   : cast_set<int64_t>(column_metadata.data_page_offset());
43
202
}
44
45
void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema,
46
150
                                 std::unordered_set<int>* leaf_column_ids) {
47
150
    DORIS_CHECK(leaf_column_ids != nullptr);
48
150
    if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) {
49
142
        if (column_schema.leaf_column_id >= 0) {
50
142
            leaf_column_ids->insert(column_schema.leaf_column_id);
51
142
        }
52
142
        return;
53
142
    }
54
12
    for (const auto& child : column_schema.children) {
55
12
        DORIS_CHECK(child != nullptr);
56
12
        collect_all_leaf_column_ids(*child, leaf_column_ids);
57
12
    }
58
8
}
59
60
void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema,
61
                                       const format::LocalColumnIndex& projection,
62
144
                                       std::unordered_set<int>* leaf_column_ids) {
63
144
    DORIS_CHECK(leaf_column_ids != nullptr);
64
144
    if (projection.project_all_children || projection.children.empty()) {
65
138
        collect_all_leaf_column_ids(column_schema, leaf_column_ids);
66
138
        return;
67
138
    }
68
7
    for (const auto& child_projection : projection.children) {
69
7
        const auto child_it =
70
11
                std::ranges::find_if(column_schema.children, [&](const auto& child_schema) {
71
11
                    return child_schema->local_id == child_projection.local_id();
72
11
                });
73
7
        DORIS_CHECK(child_it != column_schema.children.end());
74
7
        collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids);
75
7
    }
76
6
}
77
78
bool is_row_group_outside_range(const ::parquet::FileMetaData& metadata,
79
200
                                const ParquetScanRange& scan_range, int row_group_idx) {
80
200
    if (scan_range.size < 0) {
81
170
        return false;
82
170
    }
83
30
    const int64_t range_start_offset = scan_range.start_offset;
84
30
    const int64_t range_end_offset = range_start_offset + scan_range.size;
85
30
    DORIS_CHECK(range_start_offset >= 0);
86
30
    DORIS_CHECK(range_end_offset >= range_start_offset);
87
30
    if (range_start_offset == 0 &&
88
30
        (scan_range.file_size < 0 || range_end_offset >= scan_range.file_size)) {
89
0
        return false;
90
0
    }
91
92
30
    auto row_group_metadata = metadata.RowGroup(row_group_idx);
93
30
    DORIS_CHECK(row_group_metadata != nullptr);
94
30
    DORIS_CHECK(row_group_metadata->num_columns() > 0);
95
30
    const auto first_column = row_group_metadata->ColumnChunk(0);
96
30
    const auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
97
30
    DORIS_CHECK(first_column != nullptr);
98
30
    DORIS_CHECK(last_column != nullptr);
99
30
    const int64_t row_group_start_offset = column_start_offset(*first_column);
100
30
    const int64_t row_group_end_offset =
101
30
            column_start_offset(*last_column) + last_column->total_compressed_size();
102
30
    const int64_t row_group_mid_offset =
103
30
            row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2;
104
30
    return row_group_mid_offset < range_start_offset || row_group_mid_offset >= range_end_offset;
105
30
}
106
107
99
std::vector<format::LocalColumnIndex> request_scan_columns(const format::FileScanRequest& request) {
108
99
    std::vector<format::LocalColumnIndex> scan_columns;
109
99
    scan_columns.reserve(request.predicate_columns.size() + request.non_predicate_columns.size());
110
99
    scan_columns.insert(scan_columns.end(), request.predicate_columns.begin(),
111
99
                        request.predicate_columns.end());
112
99
    scan_columns.insert(scan_columns.end(), request.non_predicate_columns.begin(),
113
99
                        request.non_predicate_columns.end());
114
99
    return scan_columns;
115
99
}
116
117
std::vector<ParquetPageCacheRange> build_row_group_prefetch_ranges(
118
        const ::parquet::FileMetaData& metadata,
119
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
120
99
        const std::vector<format::LocalColumnIndex>& scan_columns, int row_group_idx) {
121
99
    std::unordered_set<int> leaf_column_ids;
122
164
    for (const auto& projection : scan_columns) {
123
164
        const auto local_id = projection.local_id();
124
164
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
125
164
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
126
27
            continue;
127
27
        }
128
137
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
129
137
        DORIS_CHECK(file_schema[local_id] != nullptr);
130
        // Prefetch and merge-reader ranges must be physical leaf chunks, not Doris logical slots.
131
        // Example: for a struct column s<a:int,b:string>, projecting only s.a should include only
132
        // the Parquet leaf chunk of a. Projecting the whole struct includes both a and b.
133
137
        collect_projected_leaf_column_ids(*file_schema[local_id], projection, &leaf_column_ids);
134
137
    }
135
136
99
    auto row_group_metadata = metadata.RowGroup(row_group_idx);
137
99
    DORIS_CHECK(row_group_metadata != nullptr);
138
99
    std::vector<int> ordered_leaf_column_ids(leaf_column_ids.begin(), leaf_column_ids.end());
139
99
    std::ranges::sort(ordered_leaf_column_ids);
140
141
99
    std::vector<ParquetPageCacheRange> ranges;
142
99
    ranges.reserve(ordered_leaf_column_ids.size());
143
142
    for (const auto leaf_column_id : ordered_leaf_column_ids) {
144
142
        DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns());
145
142
        auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id);
146
142
        DORIS_CHECK(column_metadata != nullptr);
147
142
        const int64_t offset = column_start_offset(*column_metadata);
148
142
        const int64_t size = column_metadata->total_compressed_size();
149
142
        DORIS_CHECK(offset >= 0);
150
142
        if (size > 0) {
151
142
            ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size});
152
142
        }
153
142
    }
154
99
    return ranges;
155
99
}
156
157
} // namespace
158
159
Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata,
160
                               ::parquet::ParquetFileReader* file_reader,
161
                               const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
162
                               const format::FileScanRequest& request,
163
                               const ParquetScanRange& scan_range, bool enable_bloom_filter,
164
116
                               RowGroupScanPlan* plan, const cctz::time_zone* timezone) {
165
116
    DORIS_CHECK(plan != nullptr);
166
116
    plan->row_groups.clear();
167
116
    plan->pruning_stats = ParquetPruningStats {};
168
169
116
    std::vector<int64_t> row_group_first_rows(metadata.num_row_groups());
170
116
    std::vector<int> scan_range_selected_row_groups;
171
116
    scan_range_selected_row_groups.reserve(metadata.num_row_groups());
172
116
    int64_t next_row_group_first_row = 0;
173
316
    for (int row_group_idx = 0; row_group_idx < metadata.num_row_groups(); ++row_group_idx) {
174
200
        row_group_first_rows[row_group_idx] = next_row_group_first_row;
175
200
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
176
200
        DORIS_CHECK(row_group_metadata != nullptr);
177
200
        const int64_t row_group_rows = row_group_metadata->num_rows();
178
200
        if (row_group_rows < 0) {
179
0
            return Status::Corruption("Invalid negative row count in parquet row group {}",
180
0
                                      row_group_idx);
181
0
        }
182
200
        next_row_group_first_row += row_group_rows;
183
200
        if (!is_row_group_outside_range(metadata, scan_range, row_group_idx)) {
184
179
            scan_range_selected_row_groups.push_back(row_group_idx);
185
179
        }
186
200
    }
187
188
116
    std::vector<int> statistics_selected_row_groups;
189
116
    RETURN_IF_ERROR(select_row_groups_by_statistics(
190
116
            metadata, file_reader, file_schema, request, &scan_range_selected_row_groups,
191
116
            &statistics_selected_row_groups, enable_bloom_filter, &plan->pruning_stats, timezone));
192
193
116
    plan->row_groups.reserve(statistics_selected_row_groups.size());
194
144
    for (const auto row_group_idx : statistics_selected_row_groups) {
195
144
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
196
144
        DORIS_CHECK(row_group_metadata != nullptr);
197
144
        const int64_t row_group_rows = row_group_metadata->num_rows();
198
144
        if (row_group_rows == 0) {
199
0
            continue;
200
0
        }
201
202
144
        RowGroupReadPlan row_group_plan;
203
144
        row_group_plan.row_group_id = row_group_idx;
204
144
        row_group_plan.first_file_row = row_group_first_rows[row_group_idx];
205
144
        row_group_plan.row_group_rows = row_group_rows;
206
144
        RETURN_IF_ERROR(select_row_group_ranges_by_page_index(
207
144
                file_reader, file_schema, request, row_group_idx, row_group_rows,
208
144
                &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans,
209
144
                &plan->pruning_stats, timezone));
210
144
        if (row_group_plan.selected_ranges.empty()) {
211
1
            continue;
212
1
        }
213
143
        plan->pruning_stats.selected_row_ranges += row_group_plan.selected_ranges.size();
214
143
        plan->row_groups.push_back(std::move(row_group_plan));
215
143
    }
216
116
    plan->pruning_stats.selected_row_groups = plan->row_groups.size();
217
116
    return Status::OK();
218
116
}
219
220
namespace {
221
222
uint16_t apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection,
223
42
                                   uint16_t selected_rows) {
224
42
    uint16_t new_selected_rows = 0;
225
6.36k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
226
6.32k
        const auto row_idx = selection->get_index(selection_idx);
227
6.32k
        if (filter[row_idx] != 0) {
228
4.23k
            selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx));
229
4.23k
        }
230
6.32k
    }
231
42
    return new_selected_rows;
232
42
}
233
234
Status execute_filter_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
235
                                Block* file_block, SelectionVector* selection,
236
43
                                uint16_t* selected_rows) {
237
43
    for (const auto& conjunct : request.conjuncts) {
238
31
        if (*selected_rows == 0) {
239
0
            break;
240
0
        }
241
31
        DORIS_CHECK(conjunct != nullptr);
242
31
        IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
243
31
        bool can_filter_all = false;
244
31
        RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
245
31
                                                 static_cast<size_t>(batch_rows), false,
246
31
                                                 &can_filter_all));
247
31
        *selected_rows =
248
31
                can_filter_all ? 0 : apply_filter_to_selection(filter, selection, *selected_rows);
249
31
    }
250
43
    return Status::OK();
251
43
}
252
253
Status execute_delete_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
254
                                Block* file_block, SelectionVector* selection,
255
41
                                uint16_t* selected_rows) {
256
41
    for (const auto& delete_conjunct : request.delete_conjuncts) {
257
13
        if (*selected_rows == 0) {
258
0
            break;
259
0
        }
260
13
        DORIS_CHECK(delete_conjunct != nullptr);
261
13
        int result_column_id = -1;
262
13
        RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block,
263
13
                                                         &result_column_id));
264
13
        DORIS_CHECK(result_column_id >= 0 &&
265
13
                    result_column_id < static_cast<int>(file_block->columns()));
266
13
        const auto& delete_filter = assert_cast<const ColumnUInt8&>(
267
13
                                            *file_block->get_by_position(result_column_id).column)
268
13
                                            .get_data();
269
13
        DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows));
270
13
        IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1);
271
13
        bool has_kept_row = false;
272
64
        for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) {
273
51
            keep_filter[row] = !delete_filter[row];
274
51
            has_kept_row |= keep_filter[row] != 0;
275
51
        }
276
13
        file_block->erase(result_column_id);
277
13
        *selected_rows =
278
13
                !has_kept_row ? 0
279
13
                              : apply_filter_to_selection(keep_filter, selection, *selected_rows);
280
13
    }
281
41
    return Status::OK();
282
41
}
283
284
} // namespace
285
286
IColumn::Filter selection_to_filter(const SelectionVector& selection, uint16_t selected_rows,
287
29
                                    int64_t batch_rows) {
288
29
    IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
289
2.13k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
290
2.10k
        filter[selection.get_index(selection_idx)] = 1;
291
2.10k
    }
292
29
    return filter;
293
29
}
294
295
Status execute_batch_filters(const format::FileScanRequest& request, int64_t batch_rows,
296
                             Block* file_block, SelectionVector* selection, uint16_t* selected_rows,
297
92
                             int64_t* conjunct_filtered_rows) {
298
92
    if (request.conjuncts.empty() && request.delete_conjuncts.empty()) {
299
49
        return Status::OK();
300
49
    }
301
43
    const auto selected_rows_before_conjunct = *selected_rows;
302
43
    RETURN_IF_ERROR(
303
43
            execute_filter_conjuncts(request, batch_rows, file_block, selection, selected_rows));
304
43
    if (conjunct_filtered_rows != nullptr) {
305
43
        *conjunct_filtered_rows += static_cast<int64_t>(selected_rows_before_conjunct) -
306
43
                                   static_cast<int64_t>(*selected_rows);
307
43
    }
308
43
    if (*selected_rows == 0) {
309
2
        return Status::OK();
310
2
    }
311
41
    return execute_delete_conjuncts(request, batch_rows, file_block, selection, selected_rows);
312
43
}
313
314
namespace {
315
2
int64_t count_range_rows(const std::vector<RowRange>& ranges) {
316
2
    int64_t rows = 0;
317
2
    for (const auto& range : ranges) {
318
2
        rows += range.length;
319
2
    }
320
2
    return rows;
321
2
}
322
323
void append_intersection(const RowRange& left, const RowRange& right,
324
1
                         std::vector<RowRange>* result) {
325
1
    const int64_t start = std::max(left.start, right.start);
326
1
    const int64_t end = std::min(left.start + left.length, right.start + right.length);
327
1
    if (start < end) {
328
1
        result->push_back(RowRange {.start = start, .length = end - start});
329
1
    }
330
1
}
331
332
std::vector<RowRange> filter_ranges_by_condition_cache(const std::vector<RowRange>& ranges,
333
                                                       const std::vector<bool>& cache,
334
                                                       int64_t row_group_first_row,
335
1
                                                       int64_t base_granule) {
336
1
    std::vector<RowRange> result;
337
1
    if (cache.empty()) {
338
0
        return ranges;
339
0
    }
340
341
    // Cache coordinates are file-global granules; RowRange coordinates are row-group-relative.
342
    // Walk every selected range in order and split it by granule. Granules covered by the bitmap
343
    // are kept only when the bit is true. Granules outside the bitmap are kept conservatively, so
344
    // an undersized or old-format cache entry cannot skip valid rows.
345
1
    for (const auto& range : ranges) {
346
1
        const int64_t global_start = row_group_first_row + range.start;
347
1
        const int64_t global_end = global_start + range.length;
348
1
        for (int64_t granule = global_start / ConditionCacheContext::GRANULE_SIZE;
349
3
             granule <= (global_end - 1) / ConditionCacheContext::GRANULE_SIZE; ++granule) {
350
2
            const int64_t cache_idx = granule - base_granule;
351
2
            const bool keep = cache_idx < 0 || static_cast<size_t>(cache_idx) >= cache.size() ||
352
2
                              cache[static_cast<size_t>(cache_idx)];
353
2
            if (!keep) {
354
1
                continue;
355
1
            }
356
1
            const int64_t granule_start = granule * ConditionCacheContext::GRANULE_SIZE;
357
1
            const int64_t granule_end = granule_start + ConditionCacheContext::GRANULE_SIZE;
358
1
            const RowRange file_granule_range {.start = granule_start - row_group_first_row,
359
1
                                               .length = granule_end - granule_start};
360
1
            append_intersection(range, file_granule_range, &result);
361
1
        }
362
1
    }
363
1
    return result;
364
1
}
365
366
} // namespace
367
368
104
void ParquetScanScheduler::set_plan(RowGroupScanPlan plan) {
369
104
    _row_group_plans = std::move(plan.row_groups);
370
104
    _condition_cache_filtered_rows = 0;
371
104
    _predicate_filtered_rows = 0;
372
104
    reset();
373
104
}
374
375
2
void ParquetScanScheduler::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {
376
2
    _condition_cache_ctx = std::move(ctx);
377
2
    if (!_condition_cache_ctx || !_condition_cache_ctx->filter_result || _row_group_plans.empty()) {
378
0
        return;
379
0
    }
380
381
2
    _condition_cache_ctx->base_granule =
382
2
            _row_group_plans.front().first_file_row / ConditionCacheContext::GRANULE_SIZE;
383
2
    if (!_condition_cache_ctx->is_hit) {
384
1
        return;
385
1
    }
386
387
1
    std::vector<RowGroupReadPlan> filtered_plans;
388
1
    filtered_plans.reserve(_row_group_plans.size());
389
1
    for (auto& plan : _row_group_plans) {
390
1
        const int64_t old_rows = count_range_rows(plan.selected_ranges);
391
1
        plan.selected_ranges = filter_ranges_by_condition_cache(
392
1
                plan.selected_ranges, *_condition_cache_ctx->filter_result, plan.first_file_row,
393
1
                _condition_cache_ctx->base_granule);
394
1
        const int64_t new_rows = count_range_rows(plan.selected_ranges);
395
1
        _condition_cache_filtered_rows += old_rows - new_rows;
396
1
        if (!plan.selected_ranges.empty()) {
397
1
            filtered_plans.push_back(std::move(plan));
398
1
        }
399
1
    }
400
1
    _row_group_plans = std::move(filtered_plans);
401
1
    reset();
402
1
}
403
404
105
void ParquetScanScheduler::reset() {
405
105
    _next_row_group_plan_idx = 0;
406
105
    reset_current_row_group();
407
105
}
408
409
156
void ParquetScanScheduler::reset_current_row_group() {
410
156
    _current_row_group.reset();
411
156
    _current_predicate_columns.clear();
412
156
    _current_non_predicate_columns.clear();
413
156
    _current_row_group_rows = 0;
414
156
    _current_row_group_id = -1;
415
156
    _current_row_group_rows_read = 0;
416
156
    _current_row_group_first_row = 0;
417
156
    _current_selected_ranges.clear();
418
156
    _current_range_idx = 0;
419
156
    _current_range_rows_read = 0;
420
156
    _current_predicate_prefetched = false;
421
156
    _current_non_predicate_prefetched = false;
422
156
    _current_merge_range_active = false;
423
156
}
424
425
Status ParquetScanScheduler::open_next_row_group(
426
        ParquetFileContext& file_context,
427
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
428
139
        const format::FileScanRequest& request, bool* has_row_group) {
429
139
    *has_row_group = false;
430
139
    if (_next_row_group_plan_idx >= _row_group_plans.size()) {
431
40
        return Status::OK();
432
40
    }
433
99
    const RowGroupReadPlan& row_group_plan = _row_group_plans[_next_row_group_plan_idx++];
434
99
    const int row_group_idx = row_group_plan.row_group_id;
435
99
    _current_merge_range_active =
436
99
            prepare_current_row_group_reader(file_context, file_schema, request, row_group_idx);
437
99
    try {
438
99
        _current_row_group = file_context.file_reader->RowGroup(row_group_idx);
439
99
    } catch (const ::parquet::ParquetException& e) {
440
0
        return Status::Corruption("Failed to open parquet row group {}: {}", row_group_idx,
441
0
                                  e.what());
442
0
    } catch (const std::exception& e) {
443
0
        return Status::InternalError("Failed to open parquet row group {}: {}", row_group_idx,
444
0
                                     e.what());
445
0
    }
446
447
99
    auto row_group_metadata = file_context.metadata->RowGroup(row_group_idx);
448
99
    DORIS_CHECK(row_group_metadata != nullptr);
449
99
    _current_row_group_rows = row_group_metadata->num_rows();
450
99
    DORIS_CHECK(_current_row_group_rows == row_group_plan.row_group_rows);
451
99
    DORIS_CHECK(_current_row_group_rows > 0);
452
99
    _current_row_group_id = row_group_idx;
453
99
    DORIS_CHECK(!row_group_plan.selected_ranges.empty());
454
99
    _current_row_group_first_row = row_group_plan.first_file_row;
455
99
    _current_row_group_rows_read = 0;
456
99
    _current_selected_ranges = row_group_plan.selected_ranges;
457
99
    _current_range_idx = 0;
458
99
    _current_range_rows_read = 0;
459
99
    _current_predicate_columns.clear();
460
99
    _current_non_predicate_columns.clear();
461
462
99
    ParquetColumnReaderFactory column_reader_factory(
463
99
            _current_row_group, file_context.schema->num_columns(), &row_group_plan.page_skip_plans,
464
99
            _page_skip_profile, _timezone, _enable_strict_mode,
465
99
            _scan_profile.column_reader_profile);
466
99
    for (const auto& col : request.predicate_columns) {
467
47
        const auto local_id = col.local_id();
468
47
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
469
11
            _current_predicate_columns[local_id] =
470
11
                    column_reader_factory.create_row_position_column_reader(
471
11
                            _current_row_group_first_row);
472
11
            continue;
473
11
        }
474
36
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
475
0
            DORIS_CHECK(_global_rowid_context.has_value());
476
0
            _current_predicate_columns[local_id] =
477
0
                    column_reader_factory.create_global_rowid_column_reader(
478
0
                            *_global_rowid_context, _current_row_group_first_row);
479
0
            continue;
480
0
        }
481
482
36
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
483
36
        const auto& column_schema = file_schema[local_id];
484
36
        DORIS_CHECK(column_schema != nullptr);
485
36
        std::unique_ptr<ParquetColumnReader> column_reader;
486
36
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
487
36
        _current_predicate_columns[local_id] = std::move(column_reader);
488
36
    }
489
    // Start warming filter-column chunks as soon as their row group is selected. Parquet v2 still
490
    // reads through Arrow's random-access reader; this prefetch only warms Doris file cache blocks
491
    // in the background and never changes the row/column materialization order.
492
99
    if (!_current_merge_range_active) {
493
11
        prefetch_current_row_group_columns(file_context, file_schema, request.predicate_columns,
494
11
                                           &_current_predicate_prefetched);
495
11
    }
496
117
    for (const auto& col : request.non_predicate_columns) {
497
117
        const auto local_id = col.local_id();
498
117
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
499
14
            _current_non_predicate_columns[local_id] =
500
14
                    column_reader_factory.create_row_position_column_reader(
501
14
                            _current_row_group_first_row);
502
14
            continue;
503
14
        }
504
103
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
505
2
            DORIS_CHECK(_global_rowid_context.has_value());
506
2
            _current_non_predicate_columns[local_id] =
507
2
                    column_reader_factory.create_global_rowid_column_reader(
508
2
                            *_global_rowid_context, _current_row_group_first_row);
509
2
            continue;
510
2
        }
511
101
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
512
101
        const auto& column_schema = file_schema[local_id];
513
101
        DORIS_CHECK(column_schema != nullptr);
514
101
        std::unique_ptr<ParquetColumnReader> column_reader;
515
101
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
516
101
        _current_non_predicate_columns[local_id] = std::move(column_reader);
517
101
    }
518
99
    if (!_current_merge_range_active && request.conjuncts.empty() &&
519
99
        request.delete_conjuncts.empty()) {
520
        // With no row-level filters there is no lazy-read decision to wait for, so start warming
521
        // output chunks immediately after their readers are created. Filtered scans still defer
522
        // this until at least one row survives the predicate phase.
523
11
        prefetch_current_row_group_columns(file_context, file_schema, request.non_predicate_columns,
524
11
                                           &_current_non_predicate_prefetched);
525
11
    }
526
99
    *has_row_group = true;
527
99
    return Status::OK();
528
99
}
529
530
3
Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) {
531
3
    DORIS_CHECK(rows >= 0);
532
3
    if (rows == 0) {
533
0
        return Status::OK();
534
0
    }
535
3
    if (_scan_profile.range_gap_skipped_rows != nullptr) {
536
2
        COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows);
537
2
    }
538
3
    for (const auto& column_reader : _current_predicate_columns | std::views::values) {
539
2
        RETURN_IF_ERROR(column_reader->skip(rows));
540
2
    }
541
3
    for (const auto& column_reader : _current_non_predicate_columns | std::views::values) {
542
2
        RETURN_IF_ERROR(column_reader->skip(rows));
543
2
    }
544
3
    _current_row_group_rows_read += rows;
545
3
    return Status::OK();
546
3
}
547
548
Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows,
549
                                                 const format::FileScanRequest& request,
550
                                                 Block* file_block, SelectionVector* selection,
551
                                                 uint16_t* selected_rows,
552
92
                                                 int64_t* conjunct_filtered_rows) {
553
92
    if (!request.conjuncts.empty() || !request.delete_conjuncts.empty()) {
554
43
        selection->resize(static_cast<size_t>(batch_rows));
555
43
    }
556
92
    for (const auto& [fid, column_reader] : _current_predicate_columns) {
557
47
        auto position_it = request.local_positions.find(format::LocalColumnId(fid));
558
47
        DORIS_CHECK(position_it != request.local_positions.end());
559
47
        const auto block_position = position_it->second.value();
560
47
        DCHECK(remove_nullable(column_reader->type())
561
0
                       ->equals(*remove_nullable(file_block->get_by_position(block_position).type)))
562
0
                << column_reader->type()->get_name() << " "
563
0
                << file_block->get_by_position(block_position).type->get_name() << " "
564
0
                << column_reader->name() << " " << file_block->get_by_position(block_position).name;
565
47
        auto column = file_block->get_by_position(block_position).column->assert_mutable();
566
47
        int64_t column_rows = 0;
567
47
        {
568
47
            SCOPED_TIMER(_scan_profile.column_read_time);
569
47
            RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
570
47
        }
571
47
        if (column_rows != batch_rows) {
572
0
            return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows",
573
0
                                      column_reader->name(), column_rows, batch_rows);
574
0
        }
575
47
        file_block->replace_by_position(block_position, std::move(column));
576
47
    }
577
92
    if (_scan_profile.predicate_filter_time == nullptr) {
578
67
        return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
579
67
                                     conjunct_filtered_rows);
580
67
    }
581
25
    SCOPED_TIMER(_scan_profile.predicate_filter_time);
582
25
    return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
583
25
                                 conjunct_filtered_rows);
584
92
}
585
586
bool ParquetScanScheduler::prepare_current_row_group_reader(
587
        ParquetFileContext& file_context,
588
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
589
99
        const format::FileScanRequest& request, int row_group_idx) {
590
99
    if (file_context.metadata == nullptr) {
591
0
        return false;
592
0
    }
593
99
    const auto ranges = build_row_group_prefetch_ranges(
594
99
            *file_context.metadata, file_schema, request_scan_columns(request), row_group_idx);
595
99
    const size_t avg_io_size = detail::average_prefetch_range_size(ranges);
596
99
    return file_context.set_random_access_ranges(ranges, avg_io_size, _profile,
597
99
                                                 _merge_read_slice_size);
598
99
}
599
600
void ParquetScanScheduler::prefetch_current_row_group_columns(
601
        ParquetFileContext& file_context,
602
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
603
22
        const std::vector<format::LocalColumnIndex>& scan_columns, bool* prefetched) {
604
22
    DORIS_CHECK(prefetched != nullptr);
605
22
    if (_current_merge_range_active || *prefetched || scan_columns.empty() ||
606
22
        _current_row_group_id < 0 || file_context.metadata == nullptr) {
607
22
        return;
608
22
    }
609
0
    *prefetched = true;
610
    // The scanner request separates predicate and non-predicate columns so Parquet can read
611
    // predicate columns first and lazily materialize the rest. Keep the same contract for
612
    // prefetch: callers decide which side to warm, and this helper only translates that selected
613
    // projection into physical column-chunk byte ranges for the current row group.
614
0
    file_context.prefetch_ranges(
615
0
            build_row_group_prefetch_ranges(*file_context.metadata, file_schema, scan_columns,
616
0
                                            _current_row_group_id),
617
0
            nullptr);
618
0
}
619
620
Status ParquetScanScheduler::read_current_row_group_batch(
621
        ParquetFileContext& file_context,
622
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, int64_t batch_rows,
623
        const format::FileScanRequest& request, int64_t batch_first_file_row, Block* file_block,
624
103
        size_t* rows) {
625
103
    if (_scan_profile.total_batches != nullptr) {
626
25
        COUNTER_UPDATE(_scan_profile.total_batches, 1);
627
25
    }
628
103
    if (_scan_profile.raw_rows_read != nullptr) {
629
25
        COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows);
630
25
    }
631
103
    if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) {
632
11
        *rows = static_cast<size_t>(batch_rows);
633
11
        if (_scan_profile.selected_rows != nullptr) {
634
0
            COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows);
635
0
        }
636
11
        return Status::OK();
637
11
    }
638
92
    SelectionVector selection;
639
92
    DORIS_CHECK(batch_rows <= std::numeric_limits<uint16_t>::max());
640
92
    uint16_t selected_rows = static_cast<uint16_t>(batch_rows);
641
92
    int64_t conjunct_filtered_rows = 0;
642
92
    RETURN_IF_ERROR(read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows,
643
92
                                        &conjunct_filtered_rows));
644
92
    _predicate_filtered_rows += conjunct_filtered_rows;
645
92
    mark_condition_cache_granules(selection, selected_rows, batch_first_file_row);
646
647
92
    const bool need_filter_output = selected_rows != batch_rows;
648
92
    if (_scan_profile.selected_rows != nullptr) {
649
25
        COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows);
650
25
    }
651
92
    if (_scan_profile.rows_filtered_by_conjunct != nullptr) {
652
25
        COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, conjunct_filtered_rows);
653
25
    }
654
92
    if (!_current_non_predicate_columns.empty() &&
655
92
        _scan_profile.lazy_read_filtered_rows != nullptr) {
656
23
        COUNTER_UPDATE(_scan_profile.lazy_read_filtered_rows, batch_rows - selected_rows);
657
23
    }
658
92
    if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) {
659
1
        COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1);
660
1
    }
661
92
    if (need_filter_output) {
662
29
        IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows);
663
32
        for (const auto& col : request.predicate_columns) {
664
32
            auto position_it = request.local_positions.find(col.column_id());
665
32
            DORIS_CHECK(position_it != request.local_positions.end());
666
32
            const auto block_position = position_it->second.value();
667
32
            RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position(
668
32
                    block_position, file_block->get_by_position(block_position)
669
32
                                            .column->filter(output_filter, selected_rows)));
670
32
        }
671
29
    }
672
92
    if (!_current_merge_range_active && selected_rows > 0 &&
673
92
        !_current_non_predicate_columns.empty()) {
674
        // Do not prefetch lazy output columns until at least one row survives filtering. This is
675
        // the same decision point where the v2 reader switches from predicate-only reads to
676
        // materializing non-predicate columns, so fully filtered batches avoid unnecessary IO.
677
0
        prefetch_current_row_group_columns(file_context, file_schema, request.non_predicate_columns,
678
0
                                           &_current_non_predicate_prefetched);
679
0
    }
680
681
92
    {
682
92
        SCOPED_TIMER(_scan_profile.column_read_time);
683
125
        for (const auto& [fid, column_reader] : _current_non_predicate_columns) {
684
125
            auto position_it = request.local_positions.find(format::LocalColumnId(fid));
685
125
            DORIS_CHECK(position_it != request.local_positions.end());
686
125
            const auto block_position = position_it->second.value();
687
125
            auto column = file_block->get_by_position(block_position).column->assert_mutable();
688
125
            DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
689
0
                      column_reader->type()->get_primitive_type())
690
0
                    << type_to_string(file_block->get_by_position(block_position)
691
0
                                              .type->get_primitive_type())
692
0
                    << " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
693
0
                    << column_reader->name() << " " << fid << " " << block_position;
694
125
            if (need_filter_output) {
695
24
                [[maybe_unused]] auto old_size = column->size();
696
24
                RETURN_IF_ERROR(
697
24
                        column_reader->select(selection, selected_rows, batch_rows, column));
698
24
                if (column->size() != old_size + selected_rows) {
699
0
                    return Status::Corruption(
700
0
                            "Parquet selected output column {} returned {} rows, expected {} rows",
701
0
                            column_reader->name(), column->size(), old_size + selected_rows);
702
0
                }
703
101
            } else {
704
101
                int64_t column_rows = 0;
705
101
                RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
706
101
                if (column_rows != batch_rows) {
707
0
                    return Status::Corruption(
708
0
                            "Parquet output column {} returned {} rows, expected {} rows",
709
0
                            column_reader->name(), column_rows, batch_rows);
710
0
                }
711
101
            }
712
125
            file_block->replace_by_position(block_position, std::move(column));
713
125
        }
714
92
    }
715
92
    *rows = static_cast<size_t>(selected_rows);
716
92
    return Status::OK();
717
92
}
718
719
void ParquetScanScheduler::mark_condition_cache_granules(const SelectionVector& selection,
720
                                                         uint16_t selected_rows,
721
92
                                                         int64_t batch_first_file_row) {
722
92
    if (!_condition_cache_ctx || _condition_cache_ctx->is_hit ||
723
92
        !_condition_cache_ctx->filter_result) {
724
91
        return;
725
91
    }
726
1
    auto& cache = *_condition_cache_ctx->filter_result;
727
2.04k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
728
2.04k
        const int64_t file_row = batch_first_file_row + selection.get_index(selection_idx);
729
2.04k
        const int64_t granule = file_row / ConditionCacheContext::GRANULE_SIZE;
730
2.04k
        const int64_t cache_idx = granule - _condition_cache_ctx->base_granule;
731
2.04k
        if (cache_idx >= 0 && static_cast<size_t>(cache_idx) < cache.size()) {
732
2.04k
            cache[static_cast<size_t>(cache_idx)] = true;
733
2.04k
        }
734
2.04k
    }
735
1
}
736
737
Status ParquetScanScheduler::read_next_batch(
738
        ParquetFileContext& file_context,
739
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
740
141
        const format::FileScanRequest& request, Block* file_block, size_t* rows, bool* eof) {
741
141
    *rows = 0;
742
194
    while (true) {
743
194
        if (_current_row_group == nullptr) {
744
139
            bool has_row_group = false;
745
139
            RETURN_IF_ERROR(
746
139
                    open_next_row_group(file_context, file_schema, request, &has_row_group));
747
139
            if (!has_row_group) {
748
40
                *eof = true;
749
40
                return Status::OK();
750
40
            }
751
139
        }
752
753
154
        if (_current_range_idx >= _current_selected_ranges.size()) {
754
            // Current row group finished, try next row group.
755
51
            reset_current_row_group();
756
51
            continue;
757
51
        }
758
759
103
        const RowRange& current_range = _current_selected_ranges[_current_range_idx];
760
103
        DORIS_CHECK(current_range.start >= 0);
761
103
        DORIS_CHECK(current_range.length > 0);
762
103
        DORIS_CHECK(current_range.start + current_range.length <= _current_row_group_rows);
763
764
103
        if (_current_row_group_rows_read < current_range.start) {
765
            // Skip filtered rows according to row group level pruning.
766
3
            RETURN_IF_ERROR(skip_current_row_group_rows(current_range.start -
767
3
                                                        _current_row_group_rows_read));
768
3
        }
769
103
        DORIS_CHECK(_current_row_group_rows_read == current_range.start + _current_range_rows_read);
770
103
        const int64_t remaining_rows = current_range.length - _current_range_rows_read;
771
103
        if (remaining_rows <= 0) {
772
            // Current range finished, try next range in the same row group.
773
0
            ++_current_range_idx;
774
0
            _current_range_rows_read = 0;
775
0
            continue;
776
0
        }
777
778
103
        const int64_t batch_rows = std::min<int64_t>(_batch_size, remaining_rows);
779
103
        const int64_t physical_rows_read = batch_rows;
780
103
        const int64_t batch_first_file_row =
781
103
                _current_row_group_first_row + _current_row_group_rows_read;
782
103
        RETURN_IF_ERROR(read_current_row_group_batch(file_context, file_schema, batch_rows, request,
783
103
                                                     batch_first_file_row, file_block, rows));
784
103
        _current_row_group_rows_read += physical_rows_read;
785
103
        _current_range_rows_read += physical_rows_read;
786
103
        if (_current_range_rows_read >= current_range.length) {
787
99
            ++_current_range_idx;
788
99
            _current_range_rows_read = 0;
789
99
        }
790
103
        if (*rows == 0) {
791
2
            continue;
792
2
        }
793
101
        *eof = false;
794
101
        return Status::OK();
795
103
    }
796
141
}
797
798
} // namespace doris::format::parquet