Coverage Report

Created: 2026-06-29 16:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_scan.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/parquet/parquet_scan.h"
19
20
#include <algorithm>
21
#include <limits>
22
#include <memory>
23
#include <utility>
24
25
#include "common/exception.h"
26
#include "common/status.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/column/column_vector.h"
30
#include "exprs/vexpr_context.h"
31
#include "format_v2/parquet/parquet_column_schema.h"
32
#include "format_v2/parquet/parquet_file_context.h"
33
#include "format_v2/parquet/parquet_statistics.h"
34
35
namespace doris::format::parquet {
36
37
namespace {
38
39
60
int64_t column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) {
40
60
    return column_metadata.has_dictionary_page()
41
60
                   ? cast_set<int64_t>(column_metadata.dictionary_page_offset())
42
60
                   : cast_set<int64_t>(column_metadata.data_page_offset());
43
60
}
44
45
// 判断 RG 是否在 scan_range 的 offset 范围之外。
46
//
47
// 策略:取 RG 第一个和最后一个 column chunk 的起始 offset 的中点,
48
// 如果中点不在 [range_start, range_end) 内则该 RG 不属于当前 split。
49
// 特殊处理:当 scan_range 覆盖整个文件(start=0, size>=file_size)时直接返回 false。
50
bool is_row_group_outside_range(const ::parquet::FileMetaData& metadata,
51
200
                                const ParquetScanRange& scan_range, int row_group_idx) {
52
    // size < 0 表示不限制范围(读整个文件)
53
200
    if (scan_range.size < 0) {
54
170
        return false;
55
170
    }
56
30
    const int64_t range_start_offset = scan_range.start_offset;
57
30
    const int64_t range_end_offset = range_start_offset + scan_range.size;
58
30
    DORIS_CHECK(range_start_offset >= 0);
59
30
    DORIS_CHECK(range_end_offset >= range_start_offset);
60
    // 覆盖整个文件 → 不过滤
61
30
    if (range_start_offset == 0 &&
62
30
        (scan_range.file_size < 0 || range_end_offset >= scan_range.file_size)) {
63
0
        return false;
64
0
    }
65
66
30
    auto row_group_metadata = metadata.RowGroup(row_group_idx);
67
30
    DORIS_CHECK(row_group_metadata != nullptr);
68
30
    DORIS_CHECK(row_group_metadata->num_columns() > 0);
69
30
    const auto first_column = row_group_metadata->ColumnChunk(0);
70
30
    const auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
71
30
    DORIS_CHECK(first_column != nullptr);
72
30
    DORIS_CHECK(last_column != nullptr);
73
    // RG 的 offset 范围 = [第一个 column chunk 起始, 最后一个 column chunk 结束)
74
30
    const int64_t row_group_start_offset = column_start_offset(*first_column);
75
30
    const int64_t row_group_end_offset =
76
30
            column_start_offset(*last_column) + last_column->total_compressed_size();
77
    // 用 RGB 的中点判断归属 — 中点在哪个 split 的范围就属于哪个 split
78
30
    const int64_t row_group_mid_offset =
79
30
            row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2;
80
30
    return row_group_mid_offset < range_start_offset || row_group_mid_offset >= range_end_offset;
81
30
}
82
83
} // namespace
84
85
// 最外层裁剪入口:三级流水线(代价从低到高)→ 输出 RowGroupScanPlan。
86
//
87
// 1. 计算 first_file_row + 过滤 scan_range 外的 RG — O(1) 算术(is_row_group_outside_range)
88
// 2. select_row_groups_by_statistics() — RG 级裁剪 (min/max + dictionary + bloom filter),
89
//    仅对 scan_range 内的 RG 执行,避免对范围外的 RG 做昂贵的 bloom filter/dictionary 读取
90
// 3. select_row_group_ranges_by_page_index() — Page 级细粒度裁剪
91
Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata,
92
                               ::parquet::ParquetFileReader* file_reader,
93
                               const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
94
                               const format::FileScanRequest& request,
95
                               const ParquetScanRange& scan_range, bool enable_bloom_filter,
96
116
                               RowGroupScanPlan* plan, const cctz::time_zone* timezone) {
97
116
    DORIS_CHECK(plan != nullptr);
98
116
    plan->row_groups.clear();
99
116
    plan->pruning_stats = ParquetPruningStats {};
100
101
    // ① 计算 first_file_row + 过滤 scan_range(代价最低,先做)
102
116
    std::vector<int64_t> row_group_first_rows(metadata.num_row_groups());
103
116
    std::vector<int> scan_range_selected_row_groups;
104
116
    scan_range_selected_row_groups.reserve(metadata.num_row_groups());
105
116
    int64_t next_row_group_first_row = 0;
106
316
    for (int row_group_idx = 0; row_group_idx < metadata.num_row_groups(); ++row_group_idx) {
107
200
        row_group_first_rows[row_group_idx] = next_row_group_first_row;
108
200
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
109
200
        DORIS_CHECK(row_group_metadata != nullptr);
110
200
        const int64_t row_group_rows = row_group_metadata->num_rows();
111
200
        if (row_group_rows < 0) {
112
0
            return Status::Corruption("Invalid negative row count in parquet row group {}",
113
0
                                      row_group_idx);
114
0
        }
115
200
        next_row_group_first_row += row_group_rows;
116
200
        if (!is_row_group_outside_range(metadata, scan_range, row_group_idx)) {
117
179
            scan_range_selected_row_groups.push_back(row_group_idx);
118
179
        }
119
200
    }
120
121
    // ② RG 级裁剪:仅对 scan_range 内的 RG 执行
122
116
    std::vector<int> statistics_selected_row_groups;
123
116
    RETURN_IF_ERROR(select_row_groups_by_statistics(
124
116
            metadata, file_reader, file_schema, request, &scan_range_selected_row_groups,
125
116
            &statistics_selected_row_groups, enable_bloom_filter, &plan->pruning_stats, timezone));
126
127
116
    plan->row_groups.reserve(statistics_selected_row_groups.size());
128
138
    for (const auto row_group_idx : statistics_selected_row_groups) {
129
138
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
130
138
        DORIS_CHECK(row_group_metadata != nullptr);
131
138
        const int64_t row_group_rows = row_group_metadata->num_rows();
132
138
        if (row_group_rows == 0) {
133
0
            continue;
134
0
        }
135
136
138
        RowGroupReadPlan row_group_plan;
137
138
        row_group_plan.row_group_id = row_group_idx;
138
138
        row_group_plan.first_file_row = row_group_first_rows[row_group_idx];
139
138
        row_group_plan.row_group_rows = row_group_rows;
140
138
        RETURN_IF_ERROR(select_row_group_ranges_by_page_index(
141
138
                file_reader, file_schema, request, row_group_idx, row_group_rows,
142
138
                &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans,
143
138
                &plan->pruning_stats, timezone));
144
138
        if (row_group_plan.selected_ranges.empty()) {
145
1
            continue;
146
1
        }
147
137
        plan->pruning_stats.selected_row_ranges += row_group_plan.selected_ranges.size();
148
137
        plan->row_groups.push_back(std::move(row_group_plan));
149
137
    }
150
116
    plan->pruning_stats.selected_row_groups = plan->row_groups.size();
151
116
    return Status::OK();
152
116
}
153
154
namespace {
155
156
uint16_t apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection,
157
42
                                   uint16_t selected_rows) {
158
42
    uint16_t new_selected_rows = 0;
159
6.36k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
160
6.32k
        const auto row_idx = selection->get_index(selection_idx);
161
6.32k
        if (filter[row_idx] != 0) {
162
4.23k
            selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx));
163
4.23k
        }
164
6.32k
    }
165
42
    return new_selected_rows;
166
42
}
167
168
Status execute_filter_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
169
                                Block* file_block, SelectionVector* selection,
170
43
                                uint16_t* selected_rows) {
171
43
    for (const auto& conjunct : request.conjuncts) {
172
31
        if (*selected_rows == 0) {
173
0
            break;
174
0
        }
175
31
        DORIS_CHECK(conjunct != nullptr);
176
31
        IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
177
31
        bool can_filter_all = false;
178
31
        RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
179
31
                                                 static_cast<size_t>(batch_rows), false,
180
31
                                                 &can_filter_all));
181
31
        *selected_rows =
182
31
                can_filter_all ? 0 : apply_filter_to_selection(filter, selection, *selected_rows);
183
31
    }
184
43
    return Status::OK();
185
43
}
186
187
Status execute_delete_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
188
                                Block* file_block, SelectionVector* selection,
189
41
                                uint16_t* selected_rows) {
190
41
    for (const auto& delete_conjunct : request.delete_conjuncts) {
191
13
        if (*selected_rows == 0) {
192
0
            break;
193
0
        }
194
13
        DORIS_CHECK(delete_conjunct != nullptr);
195
13
        int result_column_id = -1;
196
13
        RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block,
197
13
                                                         &result_column_id));
198
13
        DORIS_CHECK(result_column_id >= 0 &&
199
13
                    result_column_id < static_cast<int>(file_block->columns()));
200
13
        const auto& delete_filter = assert_cast<const ColumnUInt8&>(
201
13
                                            *file_block->get_by_position(result_column_id).column)
202
13
                                            .get_data();
203
13
        DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows));
204
13
        IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1);
205
13
        bool has_kept_row = false;
206
64
        for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) {
207
51
            keep_filter[row] = !delete_filter[row];
208
51
            has_kept_row |= keep_filter[row] != 0;
209
51
        }
210
13
        file_block->erase(result_column_id);
211
13
        *selected_rows =
212
13
                !has_kept_row ? 0
213
13
                              : apply_filter_to_selection(keep_filter, selection, *selected_rows);
214
13
    }
215
41
    return Status::OK();
216
41
}
217
218
} // namespace
219
220
IColumn::Filter selection_to_filter(const SelectionVector& selection, uint16_t selected_rows,
221
29
                                    int64_t batch_rows) {
222
29
    IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
223
2.13k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
224
2.10k
        filter[selection.get_index(selection_idx)] = 1;
225
2.10k
    }
226
29
    return filter;
227
29
}
228
229
Status execute_batch_filters(const format::FileScanRequest& request, int64_t batch_rows,
230
                             Block* file_block, SelectionVector* selection, uint16_t* selected_rows,
231
92
                             int64_t* conjunct_filtered_rows) {
232
92
    if (request.conjuncts.empty() && request.delete_conjuncts.empty()) {
233
49
        return Status::OK();
234
49
    }
235
43
    const auto selected_rows_before_conjunct = *selected_rows;
236
43
    RETURN_IF_ERROR(
237
43
            execute_filter_conjuncts(request, batch_rows, file_block, selection, selected_rows));
238
43
    if (conjunct_filtered_rows != nullptr) {
239
43
        *conjunct_filtered_rows += static_cast<int64_t>(selected_rows_before_conjunct) -
240
43
                                   static_cast<int64_t>(*selected_rows);
241
43
    }
242
43
    if (*selected_rows == 0) {
243
2
        return Status::OK();
244
2
    }
245
41
    return execute_delete_conjuncts(request, batch_rows, file_block, selection, selected_rows);
246
43
}
247
248
namespace {
249
2
int64_t count_range_rows(const std::vector<RowRange>& ranges) {
250
2
    int64_t rows = 0;
251
2
    for (const auto& range : ranges) {
252
2
        rows += range.length;
253
2
    }
254
2
    return rows;
255
2
}
256
257
void append_intersection(const RowRange& left, const RowRange& right,
258
1
                         std::vector<RowRange>* result) {
259
1
    const int64_t start = std::max(left.start, right.start);
260
1
    const int64_t end = std::min(left.start + left.length, right.start + right.length);
261
1
    if (start < end) {
262
1
        result->push_back(RowRange {.start = start, .length = end - start});
263
1
    }
264
1
}
265
266
std::vector<RowRange> filter_ranges_by_condition_cache(const std::vector<RowRange>& ranges,
267
                                                       const std::vector<bool>& cache,
268
                                                       int64_t row_group_first_row,
269
1
                                                       int64_t base_granule) {
270
1
    std::vector<RowRange> result;
271
1
    if (cache.empty()) {
272
0
        return ranges;
273
0
    }
274
275
    // Cache coordinates are file-global granules; RowRange coordinates are row-group-relative.
276
    // Walk every selected range in order and split it by granule. Granules covered by the bitmap
277
    // are kept only when the bit is true. Granules outside the bitmap are kept conservatively, so
278
    // an undersized or old-format cache entry cannot skip valid rows.
279
1
    for (const auto& range : ranges) {
280
1
        const int64_t global_start = row_group_first_row + range.start;
281
1
        const int64_t global_end = global_start + range.length;
282
1
        for (int64_t granule = global_start / ConditionCacheContext::GRANULE_SIZE;
283
3
             granule <= (global_end - 1) / ConditionCacheContext::GRANULE_SIZE; ++granule) {
284
2
            const int64_t cache_idx = granule - base_granule;
285
2
            const bool keep = cache_idx < 0 || static_cast<size_t>(cache_idx) >= cache.size() ||
286
2
                              cache[static_cast<size_t>(cache_idx)];
287
2
            if (!keep) {
288
1
                continue;
289
1
            }
290
1
            const int64_t granule_start = granule * ConditionCacheContext::GRANULE_SIZE;
291
1
            const int64_t granule_end = granule_start + ConditionCacheContext::GRANULE_SIZE;
292
1
            const RowRange file_granule_range {.start = granule_start - row_group_first_row,
293
1
                                               .length = granule_end - granule_start};
294
1
            append_intersection(range, file_granule_range, &result);
295
1
        }
296
1
    }
297
1
    return result;
298
1
}
299
300
} // namespace
301
302
104
void ParquetScanScheduler::set_plan(RowGroupScanPlan plan) {
303
104
    _row_group_plans = std::move(plan.row_groups);
304
104
    _condition_cache_filtered_rows = 0;
305
104
    _predicate_filtered_rows = 0;
306
104
    reset();
307
104
}
308
309
2
void ParquetScanScheduler::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {
310
2
    _condition_cache_ctx = std::move(ctx);
311
2
    if (!_condition_cache_ctx || !_condition_cache_ctx->filter_result || _row_group_plans.empty()) {
312
0
        return;
313
0
    }
314
315
2
    _condition_cache_ctx->base_granule =
316
2
            _row_group_plans.front().first_file_row / ConditionCacheContext::GRANULE_SIZE;
317
2
    if (!_condition_cache_ctx->is_hit) {
318
1
        return;
319
1
    }
320
321
1
    std::vector<RowGroupReadPlan> filtered_plans;
322
1
    filtered_plans.reserve(_row_group_plans.size());
323
1
    for (auto& plan : _row_group_plans) {
324
1
        const int64_t old_rows = count_range_rows(plan.selected_ranges);
325
1
        plan.selected_ranges = filter_ranges_by_condition_cache(
326
1
                plan.selected_ranges, *_condition_cache_ctx->filter_result, plan.first_file_row,
327
1
                _condition_cache_ctx->base_granule);
328
1
        const int64_t new_rows = count_range_rows(plan.selected_ranges);
329
1
        _condition_cache_filtered_rows += old_rows - new_rows;
330
1
        if (!plan.selected_ranges.empty()) {
331
1
            filtered_plans.push_back(std::move(plan));
332
1
        }
333
1
    }
334
1
    _row_group_plans = std::move(filtered_plans);
335
1
    reset();
336
1
}
337
338
105
void ParquetScanScheduler::reset() {
339
105
    _next_row_group_plan_idx = 0;
340
105
    reset_current_row_group();
341
105
}
342
343
156
void ParquetScanScheduler::reset_current_row_group() {
344
156
    _current_row_group.reset();
345
156
    _current_predicate_columns.clear();
346
156
    _current_non_predicate_columns.clear();
347
156
    _current_row_group_rows = 0;
348
156
    _current_row_group_rows_read = 0;
349
156
    _current_row_group_first_row = 0;
350
156
    _current_selected_ranges.clear();
351
156
    _current_range_idx = 0;
352
156
    _current_range_rows_read = 0;
353
156
}
354
355
Status ParquetScanScheduler::open_next_row_group(
356
        ParquetFileContext& file_context,
357
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
358
139
        const format::FileScanRequest& request, bool* has_row_group) {
359
139
    *has_row_group = false;
360
139
    if (_next_row_group_plan_idx >= _row_group_plans.size()) {
361
40
        return Status::OK();
362
40
    }
363
99
    const RowGroupReadPlan& row_group_plan = _row_group_plans[_next_row_group_plan_idx++];
364
99
    const int row_group_idx = row_group_plan.row_group_id;
365
99
    try {
366
99
        _current_row_group = file_context.file_reader->RowGroup(row_group_idx);
367
99
    } catch (const ::parquet::ParquetException& e) {
368
0
        return Status::Corruption("Failed to open parquet row group {}: {}", row_group_idx,
369
0
                                  e.what());
370
0
    } catch (const std::exception& e) {
371
0
        return Status::InternalError("Failed to open parquet row group {}: {}", row_group_idx,
372
0
                                     e.what());
373
0
    }
374
375
99
    auto row_group_metadata = file_context.metadata->RowGroup(row_group_idx);
376
99
    DORIS_CHECK(row_group_metadata != nullptr);
377
99
    _current_row_group_rows = row_group_metadata->num_rows();
378
99
    DORIS_CHECK(_current_row_group_rows == row_group_plan.row_group_rows);
379
99
    DORIS_CHECK(_current_row_group_rows > 0);
380
99
    DORIS_CHECK(!row_group_plan.selected_ranges.empty());
381
99
    _current_row_group_first_row = row_group_plan.first_file_row;
382
99
    _current_row_group_rows_read = 0;
383
99
    _current_selected_ranges = row_group_plan.selected_ranges;
384
99
    _current_range_idx = 0;
385
99
    _current_range_rows_read = 0;
386
99
    _current_predicate_columns.clear();
387
99
    _current_non_predicate_columns.clear();
388
389
99
    ParquetColumnReaderFactory column_reader_factory(
390
99
            _current_row_group, file_context.schema->num_columns(), &row_group_plan.page_skip_plans,
391
99
            _page_skip_profile, _timezone, _enable_strict_mode,
392
99
            _scan_profile.column_reader_profile);
393
99
    for (const auto& col : request.predicate_columns) {
394
47
        const auto local_id = col.local_id();
395
47
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
396
11
            _current_predicate_columns[local_id] =
397
11
                    column_reader_factory.create_row_position_column_reader(
398
11
                            _current_row_group_first_row);
399
11
            continue;
400
11
        }
401
36
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
402
0
            DORIS_CHECK(_global_rowid_context.has_value());
403
0
            _current_predicate_columns[local_id] =
404
0
                    column_reader_factory.create_global_rowid_column_reader(
405
0
                            *_global_rowid_context, _current_row_group_first_row);
406
0
            continue;
407
0
        }
408
409
36
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
410
36
        const auto& column_schema = file_schema[local_id];
411
36
        DORIS_CHECK(column_schema != nullptr);
412
36
        std::unique_ptr<ParquetColumnReader> column_reader;
413
36
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
414
36
        _current_predicate_columns[local_id] = std::move(column_reader);
415
36
    }
416
117
    for (const auto& col : request.non_predicate_columns) {
417
117
        const auto local_id = col.local_id();
418
117
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
419
14
            _current_non_predicate_columns[local_id] =
420
14
                    column_reader_factory.create_row_position_column_reader(
421
14
                            _current_row_group_first_row);
422
14
            continue;
423
14
        }
424
103
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
425
2
            DORIS_CHECK(_global_rowid_context.has_value());
426
2
            _current_non_predicate_columns[local_id] =
427
2
                    column_reader_factory.create_global_rowid_column_reader(
428
2
                            *_global_rowid_context, _current_row_group_first_row);
429
2
            continue;
430
2
        }
431
101
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
432
101
        const auto& column_schema = file_schema[local_id];
433
101
        DORIS_CHECK(column_schema != nullptr);
434
101
        std::unique_ptr<ParquetColumnReader> column_reader;
435
101
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
436
101
        _current_non_predicate_columns[local_id] = std::move(column_reader);
437
101
    }
438
99
    *has_row_group = true;
439
99
    return Status::OK();
440
99
}
441
442
3
Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) {
443
3
    DORIS_CHECK(rows >= 0);
444
3
    if (rows == 0) {
445
0
        return Status::OK();
446
0
    }
447
3
    if (_scan_profile.range_gap_skipped_rows != nullptr) {
448
2
        COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows);
449
2
    }
450
3
    for (const auto& column_reader : _current_predicate_columns | std::views::values) {
451
2
        RETURN_IF_ERROR(column_reader->skip(rows));
452
2
    }
453
3
    for (const auto& column_reader : _current_non_predicate_columns | std::views::values) {
454
2
        RETURN_IF_ERROR(column_reader->skip(rows));
455
2
    }
456
3
    _current_row_group_rows_read += rows;
457
3
    return Status::OK();
458
3
}
459
460
Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows,
461
                                                 const format::FileScanRequest& request,
462
                                                 Block* file_block, SelectionVector* selection,
463
                                                 uint16_t* selected_rows,
464
92
                                                 int64_t* conjunct_filtered_rows) {
465
92
    if (!request.conjuncts.empty() || !request.delete_conjuncts.empty()) {
466
43
        selection->resize(static_cast<size_t>(batch_rows));
467
43
    }
468
92
    for (const auto& [fid, column_reader] : _current_predicate_columns) {
469
47
        auto position_it = request.local_positions.find(format::LocalColumnId(fid));
470
47
        DORIS_CHECK(position_it != request.local_positions.end());
471
47
        const auto block_position = position_it->second.value();
472
47
        DCHECK(remove_nullable(column_reader->type())
473
0
                       ->equals(*remove_nullable(file_block->get_by_position(block_position).type)))
474
0
                << column_reader->type()->get_name() << " "
475
0
                << file_block->get_by_position(block_position).type->get_name() << " "
476
0
                << column_reader->name() << " " << file_block->get_by_position(block_position).name;
477
47
        auto column = file_block->get_by_position(block_position).column->assert_mutable();
478
47
        int64_t column_rows = 0;
479
47
        {
480
47
            SCOPED_TIMER(_scan_profile.column_read_time);
481
47
            RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
482
47
        }
483
47
        if (column_rows != batch_rows) {
484
0
            return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows",
485
0
                                      column_reader->name(), column_rows, batch_rows);
486
0
        }
487
47
        file_block->replace_by_position(block_position, std::move(column));
488
47
    }
489
92
    if (_scan_profile.predicate_filter_time == nullptr) {
490
67
        return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
491
67
                                     conjunct_filtered_rows);
492
67
    }
493
25
    SCOPED_TIMER(_scan_profile.predicate_filter_time);
494
25
    return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
495
25
                                 conjunct_filtered_rows);
496
92
}
497
498
Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows,
499
                                                          const format::FileScanRequest& request,
500
                                                          int64_t batch_first_file_row,
501
103
                                                          Block* file_block, size_t* rows) {
502
103
    if (_scan_profile.total_batches != nullptr) {
503
25
        COUNTER_UPDATE(_scan_profile.total_batches, 1);
504
25
    }
505
103
    if (_scan_profile.raw_rows_read != nullptr) {
506
25
        COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows);
507
25
    }
508
103
    if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) {
509
11
        *rows = static_cast<size_t>(batch_rows);
510
11
        if (_scan_profile.selected_rows != nullptr) {
511
0
            COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows);
512
0
        }
513
11
        return Status::OK();
514
11
    }
515
92
    SelectionVector selection;
516
92
    DORIS_CHECK(batch_rows <= std::numeric_limits<uint16_t>::max());
517
92
    uint16_t selected_rows = static_cast<uint16_t>(batch_rows);
518
92
    int64_t conjunct_filtered_rows = 0;
519
92
    RETURN_IF_ERROR(read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows,
520
92
                                        &conjunct_filtered_rows));
521
92
    _predicate_filtered_rows += conjunct_filtered_rows;
522
92
    mark_condition_cache_granules(selection, selected_rows, batch_first_file_row);
523
524
92
    const bool need_filter_output = selected_rows != batch_rows;
525
92
    if (_scan_profile.selected_rows != nullptr) {
526
25
        COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows);
527
25
    }
528
92
    if (_scan_profile.rows_filtered_by_conjunct != nullptr) {
529
25
        COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, conjunct_filtered_rows);
530
25
    }
531
92
    if (!_current_non_predicate_columns.empty() &&
532
92
        _scan_profile.lazy_read_filtered_rows != nullptr) {
533
23
        COUNTER_UPDATE(_scan_profile.lazy_read_filtered_rows, batch_rows - selected_rows);
534
23
    }
535
92
    if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) {
536
1
        COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1);
537
1
    }
538
92
    if (need_filter_output) {
539
29
        IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows);
540
32
        for (const auto& col : request.predicate_columns) {
541
32
            auto position_it = request.local_positions.find(col.column_id());
542
32
            DORIS_CHECK(position_it != request.local_positions.end());
543
32
            const auto block_position = position_it->second.value();
544
32
            RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position(
545
32
                    block_position, file_block->get_by_position(block_position)
546
32
                                            .column->filter(output_filter, selected_rows)));
547
32
        }
548
29
    }
549
550
92
    {
551
92
        SCOPED_TIMER(_scan_profile.column_read_time);
552
125
        for (const auto& [fid, column_reader] : _current_non_predicate_columns) {
553
125
            auto position_it = request.local_positions.find(format::LocalColumnId(fid));
554
125
            DORIS_CHECK(position_it != request.local_positions.end());
555
125
            const auto block_position = position_it->second.value();
556
125
            auto column = file_block->get_by_position(block_position).column->assert_mutable();
557
125
            DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
558
0
                      column_reader->type()->get_primitive_type())
559
0
                    << type_to_string(file_block->get_by_position(block_position)
560
0
                                              .type->get_primitive_type())
561
0
                    << " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
562
0
                    << column_reader->name() << " " << fid << " " << block_position;
563
125
            if (need_filter_output) {
564
24
                [[maybe_unused]] auto old_size = column->size();
565
24
                RETURN_IF_ERROR(
566
24
                        column_reader->select(selection, selected_rows, batch_rows, column));
567
24
                if (column->size() != old_size + selected_rows) {
568
0
                    return Status::Corruption(
569
0
                            "Parquet selected output column {} returned {} rows, expected {} rows",
570
0
                            column_reader->name(), column->size(), old_size + selected_rows);
571
0
                }
572
101
            } else {
573
101
                int64_t column_rows = 0;
574
101
                RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
575
101
                if (column_rows != batch_rows) {
576
0
                    return Status::Corruption(
577
0
                            "Parquet output column {} returned {} rows, expected {} rows",
578
0
                            column_reader->name(), column_rows, batch_rows);
579
0
                }
580
101
            }
581
125
            file_block->replace_by_position(block_position, std::move(column));
582
125
        }
583
92
    }
584
92
    *rows = static_cast<size_t>(selected_rows);
585
92
    return Status::OK();
586
92
}
587
588
void ParquetScanScheduler::mark_condition_cache_granules(const SelectionVector& selection,
589
                                                         uint16_t selected_rows,
590
92
                                                         int64_t batch_first_file_row) {
591
92
    if (!_condition_cache_ctx || _condition_cache_ctx->is_hit ||
592
92
        !_condition_cache_ctx->filter_result) {
593
91
        return;
594
91
    }
595
1
    auto& cache = *_condition_cache_ctx->filter_result;
596
2.04k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
597
2.04k
        const int64_t file_row = batch_first_file_row + selection.get_index(selection_idx);
598
2.04k
        const int64_t granule = file_row / ConditionCacheContext::GRANULE_SIZE;
599
2.04k
        const int64_t cache_idx = granule - _condition_cache_ctx->base_granule;
600
2.04k
        if (cache_idx >= 0 && static_cast<size_t>(cache_idx) < cache.size()) {
601
2.04k
            cache[static_cast<size_t>(cache_idx)] = true;
602
2.04k
        }
603
2.04k
    }
604
1
}
605
606
Status ParquetScanScheduler::read_next_batch(
607
        ParquetFileContext& file_context,
608
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
609
141
        const format::FileScanRequest& request, Block* file_block, size_t* rows, bool* eof) {
610
141
    *rows = 0;
611
194
    while (true) {
612
194
        if (_current_row_group == nullptr) {
613
139
            bool has_row_group = false;
614
139
            RETURN_IF_ERROR(
615
139
                    open_next_row_group(file_context, file_schema, request, &has_row_group));
616
139
            if (!has_row_group) {
617
40
                *eof = true;
618
40
                return Status::OK();
619
40
            }
620
139
        }
621
622
154
        if (_current_range_idx >= _current_selected_ranges.size()) {
623
            // Current row group finished, try next row group.
624
51
            reset_current_row_group();
625
51
            continue;
626
51
        }
627
628
103
        const RowRange& current_range = _current_selected_ranges[_current_range_idx];
629
103
        DORIS_CHECK(current_range.start >= 0);
630
103
        DORIS_CHECK(current_range.length > 0);
631
103
        DORIS_CHECK(current_range.start + current_range.length <= _current_row_group_rows);
632
633
103
        if (_current_row_group_rows_read < current_range.start) {
634
            // Skip filtered rows according to row group level pruning.
635
3
            RETURN_IF_ERROR(skip_current_row_group_rows(current_range.start -
636
3
                                                        _current_row_group_rows_read));
637
3
        }
638
103
        DORIS_CHECK(_current_row_group_rows_read == current_range.start + _current_range_rows_read);
639
103
        const int64_t remaining_rows = current_range.length - _current_range_rows_read;
640
103
        if (remaining_rows <= 0) {
641
            // Current range finished, try next range in the same row group.
642
0
            ++_current_range_idx;
643
0
            _current_range_rows_read = 0;
644
0
            continue;
645
0
        }
646
647
103
        const int64_t batch_rows = std::min<int64_t>(_batch_size, remaining_rows);
648
103
        const int64_t physical_rows_read = batch_rows;
649
103
        const int64_t batch_first_file_row =
650
103
                _current_row_group_first_row + _current_row_group_rows_read;
651
103
        RETURN_IF_ERROR(read_current_row_group_batch(batch_rows, request, batch_first_file_row,
652
103
                                                     file_block, rows));
653
103
        _current_row_group_rows_read += physical_rows_read;
654
103
        _current_range_rows_read += physical_rows_read;
655
103
        if (_current_range_rows_read >= current_range.length) {
656
99
            ++_current_range_idx;
657
99
            _current_range_rows_read = 0;
658
99
        }
659
103
        if (*rows == 0) {
660
2
            continue;
661
2
        }
662
101
        *eof = false;
663
101
        return Status::OK();
664
103
    }
665
141
}
666
667
} // namespace doris::format::parquet