Coverage Report

Created: 2026-06-25 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_scan.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/parquet/parquet_scan.h"
19
20
#include <algorithm>
21
#include <limits>
22
#include <memory>
23
#include <utility>
24
25
#include "common/exception.h"
26
#include "common/status.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/column/column_vector.h"
30
#include "exprs/vexpr_context.h"
31
#include "format_v2/parquet/parquet_column_schema.h"
32
#include "format_v2/parquet/parquet_file_context.h"
33
#include "format_v2/parquet/parquet_statistics.h"
34
35
namespace doris::format::parquet {
36
37
namespace {
38
39
60
int64_t column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) {
40
60
    return column_metadata.has_dictionary_page()
41
60
                   ? cast_set<int64_t>(column_metadata.dictionary_page_offset())
42
60
                   : cast_set<int64_t>(column_metadata.data_page_offset());
43
60
}
44
45
// 判断 RG 是否在 scan_range 的 offset 范围之外。
46
//
47
// 策略:取 RG 第一个和最后一个 column chunk 的起始 offset 的中点,
48
// 如果中点不在 [range_start, range_end) 内则该 RG 不属于当前 split。
49
// 特殊处理:当 scan_range 覆盖整个文件(start=0, size>=file_size)时直接返回 false。
50
bool is_row_group_outside_range(const ::parquet::FileMetaData& metadata,
51
194
                                const ParquetScanRange& scan_range, int row_group_idx) {
52
    // size < 0 表示不限制范围(读整个文件)
53
194
    if (scan_range.size < 0) {
54
164
        return false;
55
164
    }
56
30
    const int64_t range_start_offset = scan_range.start_offset;
57
30
    const int64_t range_end_offset = range_start_offset + scan_range.size;
58
30
    DORIS_CHECK(range_start_offset >= 0);
59
30
    DORIS_CHECK(range_end_offset >= range_start_offset);
60
    // 覆盖整个文件 → 不过滤
61
30
    if (range_start_offset == 0 &&
62
30
        (scan_range.file_size < 0 || range_end_offset >= scan_range.file_size)) {
63
0
        return false;
64
0
    }
65
66
30
    auto row_group_metadata = metadata.RowGroup(row_group_idx);
67
30
    DORIS_CHECK(row_group_metadata != nullptr);
68
30
    DORIS_CHECK(row_group_metadata->num_columns() > 0);
69
30
    const auto first_column = row_group_metadata->ColumnChunk(0);
70
30
    const auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1);
71
30
    DORIS_CHECK(first_column != nullptr);
72
30
    DORIS_CHECK(last_column != nullptr);
73
    // RG 的 offset 范围 = [第一个 column chunk 起始, 最后一个 column chunk 结束)
74
30
    const int64_t row_group_start_offset = column_start_offset(*first_column);
75
30
    const int64_t row_group_end_offset =
76
30
            column_start_offset(*last_column) + last_column->total_compressed_size();
77
    // 用 RGB 的中点判断归属 — 中点在哪个 split 的范围就属于哪个 split
78
30
    const int64_t row_group_mid_offset =
79
30
            row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2;
80
30
    return row_group_mid_offset < range_start_offset || row_group_mid_offset >= range_end_offset;
81
30
}
82
83
} // namespace
84
85
// 最外层裁剪入口:三级流水线(代价从低到高)→ 输出 RowGroupScanPlan。
86
//
87
// 1. 计算 first_file_row + 过滤 scan_range 外的 RG — O(1) 算术(is_row_group_outside_range)
88
// 2. select_row_groups_by_statistics() — RG 级裁剪 (min/max + dictionary + bloom filter),
89
//    仅对 scan_range 内的 RG 执行,避免对范围外的 RG 做昂贵的 bloom filter/dictionary 读取
90
// 3. select_row_group_ranges_by_page_index() — Page 级细粒度裁剪
91
Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata,
92
                               ::parquet::ParquetFileReader* file_reader,
93
                               const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
94
                               const format::FileScanRequest& request,
95
                               const ParquetScanRange& scan_range, bool enable_bloom_filter,
96
110
                               RowGroupScanPlan* plan, const cctz::time_zone* timezone) {
97
110
    DORIS_CHECK(plan != nullptr);
98
110
    plan->row_groups.clear();
99
110
    plan->pruning_stats = ParquetPruningStats {};
100
101
    // ① 计算 first_file_row + 过滤 scan_range(代价最低,先做)
102
110
    std::vector<int64_t> row_group_first_rows(metadata.num_row_groups());
103
110
    std::vector<int> scan_range_selected_row_groups;
104
110
    scan_range_selected_row_groups.reserve(metadata.num_row_groups());
105
110
    int64_t next_row_group_first_row = 0;
106
304
    for (int row_group_idx = 0; row_group_idx < metadata.num_row_groups(); ++row_group_idx) {
107
194
        row_group_first_rows[row_group_idx] = next_row_group_first_row;
108
194
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
109
194
        DORIS_CHECK(row_group_metadata != nullptr);
110
194
        const int64_t row_group_rows = row_group_metadata->num_rows();
111
194
        if (row_group_rows < 0) {
112
0
            return Status::Corruption("Invalid negative row count in parquet row group {}",
113
0
                                      row_group_idx);
114
0
        }
115
194
        next_row_group_first_row += row_group_rows;
116
194
        if (!is_row_group_outside_range(metadata, scan_range, row_group_idx)) {
117
173
            scan_range_selected_row_groups.push_back(row_group_idx);
118
173
        }
119
194
    }
120
121
    // ② RG 级裁剪:仅对 scan_range 内的 RG 执行
122
110
    std::vector<int> statistics_selected_row_groups;
123
110
    RETURN_IF_ERROR(select_row_groups_by_statistics(
124
110
            metadata, file_reader, file_schema, request, &scan_range_selected_row_groups,
125
110
            &statistics_selected_row_groups, enable_bloom_filter, &plan->pruning_stats, timezone));
126
127
110
    plan->row_groups.reserve(statistics_selected_row_groups.size());
128
132
    for (const auto row_group_idx : statistics_selected_row_groups) {
129
132
        auto row_group_metadata = metadata.RowGroup(row_group_idx);
130
132
        DORIS_CHECK(row_group_metadata != nullptr);
131
132
        const int64_t row_group_rows = row_group_metadata->num_rows();
132
132
        if (row_group_rows == 0) {
133
0
            continue;
134
0
        }
135
136
132
        RowGroupReadPlan row_group_plan;
137
132
        row_group_plan.row_group_id = row_group_idx;
138
132
        row_group_plan.first_file_row = row_group_first_rows[row_group_idx];
139
132
        row_group_plan.row_group_rows = row_group_rows;
140
132
        RETURN_IF_ERROR(select_row_group_ranges_by_page_index(
141
132
                file_reader, file_schema, request, row_group_idx, row_group_rows,
142
132
                &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans,
143
132
                &plan->pruning_stats, timezone));
144
132
        if (row_group_plan.selected_ranges.empty()) {
145
1
            continue;
146
1
        }
147
131
        plan->pruning_stats.selected_row_ranges += row_group_plan.selected_ranges.size();
148
131
        plan->row_groups.push_back(std::move(row_group_plan));
149
131
    }
150
110
    plan->pruning_stats.selected_row_groups = plan->row_groups.size();
151
110
    return Status::OK();
152
110
}
153
154
namespace {
155
156
uint16_t apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection,
157
42
                                   uint16_t selected_rows) {
158
42
    uint16_t new_selected_rows = 0;
159
6.36k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
160
6.32k
        const auto row_idx = selection->get_index(selection_idx);
161
6.32k
        if (filter[row_idx] != 0) {
162
4.23k
            selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx));
163
4.23k
        }
164
6.32k
    }
165
42
    return new_selected_rows;
166
42
}
167
168
Status execute_filter_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
169
                                Block* file_block, SelectionVector* selection,
170
43
                                uint16_t* selected_rows) {
171
43
    for (const auto& conjunct : request.conjuncts) {
172
31
        if (*selected_rows == 0) {
173
0
            break;
174
0
        }
175
31
        DORIS_CHECK(conjunct != nullptr);
176
31
        IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
177
31
        bool can_filter_all = false;
178
31
        RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
179
31
                                                 static_cast<size_t>(batch_rows), false,
180
31
                                                 &can_filter_all));
181
31
        *selected_rows =
182
31
                can_filter_all ? 0 : apply_filter_to_selection(filter, selection, *selected_rows);
183
31
    }
184
43
    return Status::OK();
185
43
}
186
187
Status execute_delete_conjuncts(const format::FileScanRequest& request, int64_t batch_rows,
188
                                Block* file_block, SelectionVector* selection,
189
41
                                uint16_t* selected_rows) {
190
41
    for (const auto& delete_conjunct : request.delete_conjuncts) {
191
13
        if (*selected_rows == 0) {
192
0
            break;
193
0
        }
194
13
        DORIS_CHECK(delete_conjunct != nullptr);
195
13
        int result_column_id = -1;
196
13
        RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block,
197
13
                                                         &result_column_id));
198
13
        DORIS_CHECK(result_column_id >= 0 &&
199
13
                    result_column_id < static_cast<int>(file_block->columns()));
200
13
        const auto& delete_filter = assert_cast<const ColumnUInt8&>(
201
13
                                            *file_block->get_by_position(result_column_id).column)
202
13
                                            .get_data();
203
13
        DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows));
204
13
        IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1);
205
13
        bool has_kept_row = false;
206
64
        for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) {
207
51
            keep_filter[row] = !delete_filter[row];
208
51
            has_kept_row |= keep_filter[row] != 0;
209
51
        }
210
13
        file_block->erase(result_column_id);
211
13
        *selected_rows =
212
13
                !has_kept_row ? 0
213
13
                              : apply_filter_to_selection(keep_filter, selection, *selected_rows);
214
13
    }
215
41
    return Status::OK();
216
41
}
217
218
} // namespace
219
220
IColumn::Filter selection_to_filter(const SelectionVector& selection, uint16_t selected_rows,
221
29
                                    int64_t batch_rows) {
222
29
    IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
223
2.13k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
224
2.10k
        filter[selection.get_index(selection_idx)] = 1;
225
2.10k
    }
226
29
    return filter;
227
29
}
228
229
Status execute_batch_filters(const format::FileScanRequest& request, int64_t batch_rows,
230
                             Block* file_block, SelectionVector* selection, uint16_t* selected_rows,
231
85
                             int64_t* conjunct_filtered_rows) {
232
85
    if (request.conjuncts.empty() && request.delete_conjuncts.empty()) {
233
42
        return Status::OK();
234
42
    }
235
43
    const auto selected_rows_before_conjunct = *selected_rows;
236
43
    RETURN_IF_ERROR(
237
43
            execute_filter_conjuncts(request, batch_rows, file_block, selection, selected_rows));
238
43
    if (conjunct_filtered_rows != nullptr) {
239
43
        *conjunct_filtered_rows += static_cast<int64_t>(selected_rows_before_conjunct) -
240
43
                                   static_cast<int64_t>(*selected_rows);
241
43
    }
242
43
    if (*selected_rows == 0) {
243
2
        return Status::OK();
244
2
    }
245
41
    return execute_delete_conjuncts(request, batch_rows, file_block, selection, selected_rows);
246
43
}
247
248
namespace {
249
// TODO: batch size in SessionVariable
250
constexpr int64_t DEFAULT_PARQUET_READ_BATCH_SIZE = 4096;
251
252
2
int64_t count_range_rows(const std::vector<RowRange>& ranges) {
253
2
    int64_t rows = 0;
254
2
    for (const auto& range : ranges) {
255
2
        rows += range.length;
256
2
    }
257
2
    return rows;
258
2
}
259
260
void append_intersection(const RowRange& left, const RowRange& right,
261
1
                         std::vector<RowRange>* result) {
262
1
    const int64_t start = std::max(left.start, right.start);
263
1
    const int64_t end = std::min(left.start + left.length, right.start + right.length);
264
1
    if (start < end) {
265
1
        result->push_back(RowRange {.start = start, .length = end - start});
266
1
    }
267
1
}
268
269
std::vector<RowRange> filter_ranges_by_condition_cache(const std::vector<RowRange>& ranges,
270
                                                       const std::vector<bool>& cache,
271
                                                       int64_t row_group_first_row,
272
1
                                                       int64_t base_granule) {
273
1
    std::vector<RowRange> result;
274
1
    if (cache.empty()) {
275
0
        return ranges;
276
0
    }
277
278
    // Cache coordinates are file-global granules; RowRange coordinates are row-group-relative.
279
    // Walk every selected range in order and split it by granule. Granules covered by the bitmap
280
    // are kept only when the bit is true. Granules outside the bitmap are kept conservatively, so
281
    // an undersized or old-format cache entry cannot skip valid rows.
282
1
    for (const auto& range : ranges) {
283
1
        const int64_t global_start = row_group_first_row + range.start;
284
1
        const int64_t global_end = global_start + range.length;
285
1
        for (int64_t granule = global_start / ConditionCacheContext::GRANULE_SIZE;
286
3
             granule <= (global_end - 1) / ConditionCacheContext::GRANULE_SIZE; ++granule) {
287
2
            const int64_t cache_idx = granule - base_granule;
288
2
            const bool keep = cache_idx < 0 || static_cast<size_t>(cache_idx) >= cache.size() ||
289
2
                              cache[static_cast<size_t>(cache_idx)];
290
2
            if (!keep) {
291
1
                continue;
292
1
            }
293
1
            const int64_t granule_start = granule * ConditionCacheContext::GRANULE_SIZE;
294
1
            const int64_t granule_end = granule_start + ConditionCacheContext::GRANULE_SIZE;
295
1
            const RowRange file_granule_range {.start = granule_start - row_group_first_row,
296
1
                                               .length = granule_end - granule_start};
297
1
            append_intersection(range, file_granule_range, &result);
298
1
        }
299
1
    }
300
1
    return result;
301
1
}
302
303
} // namespace
304
305
98
void ParquetScanScheduler::set_plan(RowGroupScanPlan plan) {
306
98
    _row_group_plans = std::move(plan.row_groups);
307
98
    _condition_cache_filtered_rows = 0;
308
98
    _predicate_filtered_rows = 0;
309
98
    reset();
310
98
}
311
312
2
void ParquetScanScheduler::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {
313
2
    _condition_cache_ctx = std::move(ctx);
314
2
    if (!_condition_cache_ctx || !_condition_cache_ctx->filter_result || _row_group_plans.empty()) {
315
0
        return;
316
0
    }
317
318
2
    _condition_cache_ctx->base_granule =
319
2
            _row_group_plans.front().first_file_row / ConditionCacheContext::GRANULE_SIZE;
320
2
    if (!_condition_cache_ctx->is_hit) {
321
1
        return;
322
1
    }
323
324
1
    std::vector<RowGroupReadPlan> filtered_plans;
325
1
    filtered_plans.reserve(_row_group_plans.size());
326
1
    for (auto& plan : _row_group_plans) {
327
1
        const int64_t old_rows = count_range_rows(plan.selected_ranges);
328
1
        plan.selected_ranges = filter_ranges_by_condition_cache(
329
1
                plan.selected_ranges, *_condition_cache_ctx->filter_result, plan.first_file_row,
330
1
                _condition_cache_ctx->base_granule);
331
1
        const int64_t new_rows = count_range_rows(plan.selected_ranges);
332
1
        _condition_cache_filtered_rows += old_rows - new_rows;
333
1
        if (!plan.selected_ranges.empty()) {
334
1
            filtered_plans.push_back(std::move(plan));
335
1
        }
336
1
    }
337
1
    _row_group_plans = std::move(filtered_plans);
338
1
    reset();
339
1
}
340
341
99
void ParquetScanScheduler::reset() {
342
99
    _next_row_group_plan_idx = 0;
343
99
    reset_current_row_group();
344
99
}
345
346
147
void ParquetScanScheduler::reset_current_row_group() {
347
147
    _current_row_group.reset();
348
147
    _current_predicate_columns.clear();
349
147
    _current_non_predicate_columns.clear();
350
147
    _current_row_group_rows = 0;
351
147
    _current_row_group_rows_read = 0;
352
147
    _current_row_group_first_row = 0;
353
147
    _current_selected_ranges.clear();
354
147
    _current_range_idx = 0;
355
147
    _current_range_rows_read = 0;
356
147
}
357
358
Status ParquetScanScheduler::open_next_row_group(
359
        ParquetFileContext& file_context,
360
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
361
133
        const format::FileScanRequest& request, bool* has_row_group) {
362
133
    *has_row_group = false;
363
133
    if (_next_row_group_plan_idx >= _row_group_plans.size()) {
364
37
        return Status::OK();
365
37
    }
366
96
    const RowGroupReadPlan& row_group_plan = _row_group_plans[_next_row_group_plan_idx++];
367
96
    const int row_group_idx = row_group_plan.row_group_id;
368
96
    try {
369
96
        _current_row_group = file_context.file_reader->RowGroup(row_group_idx);
370
96
    } catch (const ::parquet::ParquetException& e) {
371
0
        return Status::Corruption("Failed to open parquet row group {}: {}", row_group_idx,
372
0
                                  e.what());
373
0
    } catch (const std::exception& e) {
374
0
        return Status::InternalError("Failed to open parquet row group {}: {}", row_group_idx,
375
0
                                     e.what());
376
0
    }
377
378
96
    auto row_group_metadata = file_context.metadata->RowGroup(row_group_idx);
379
96
    DORIS_CHECK(row_group_metadata != nullptr);
380
96
    _current_row_group_rows = row_group_metadata->num_rows();
381
96
    DORIS_CHECK(_current_row_group_rows == row_group_plan.row_group_rows);
382
96
    DORIS_CHECK(_current_row_group_rows > 0);
383
96
    DORIS_CHECK(!row_group_plan.selected_ranges.empty());
384
96
    _current_row_group_first_row = row_group_plan.first_file_row;
385
96
    _current_row_group_rows_read = 0;
386
96
    _current_selected_ranges = row_group_plan.selected_ranges;
387
96
    _current_range_idx = 0;
388
96
    _current_range_rows_read = 0;
389
96
    _current_predicate_columns.clear();
390
96
    _current_non_predicate_columns.clear();
391
392
96
    ParquetColumnReaderFactory column_reader_factory(
393
96
            _current_row_group, file_context.schema->num_columns(), &row_group_plan.page_skip_plans,
394
96
            _page_skip_profile, _timezone, _enable_strict_mode,
395
96
            _scan_profile.column_reader_profile);
396
96
    for (const auto& col : request.predicate_columns) {
397
47
        const auto local_id = col.local_id();
398
47
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
399
11
            _current_predicate_columns[local_id] =
400
11
                    column_reader_factory.create_row_position_column_reader(
401
11
                            _current_row_group_first_row);
402
11
            continue;
403
11
        }
404
36
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
405
0
            DORIS_CHECK(_global_rowid_context.has_value());
406
0
            _current_predicate_columns[local_id] =
407
0
                    column_reader_factory.create_global_rowid_column_reader(
408
0
                            *_global_rowid_context, _current_row_group_first_row);
409
0
            continue;
410
0
        }
411
412
36
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
413
36
        const auto& column_schema = file_schema[local_id];
414
36
        DORIS_CHECK(column_schema != nullptr);
415
36
        std::unique_ptr<ParquetColumnReader> column_reader;
416
36
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
417
36
        _current_predicate_columns[local_id] = std::move(column_reader);
418
36
    }
419
111
    for (const auto& col : request.non_predicate_columns) {
420
111
        const auto local_id = col.local_id();
421
111
        if (local_id == format::ROW_POSITION_COLUMN_ID) {
422
14
            _current_non_predicate_columns[local_id] =
423
14
                    column_reader_factory.create_row_position_column_reader(
424
14
                            _current_row_group_first_row);
425
14
            continue;
426
14
        }
427
97
        if (local_id == format::GLOBAL_ROWID_COLUMN_ID) {
428
2
            DORIS_CHECK(_global_rowid_context.has_value());
429
2
            _current_non_predicate_columns[local_id] =
430
2
                    column_reader_factory.create_global_rowid_column_reader(
431
2
                            *_global_rowid_context, _current_row_group_first_row);
432
2
            continue;
433
2
        }
434
95
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
435
95
        const auto& column_schema = file_schema[local_id];
436
95
        DORIS_CHECK(column_schema != nullptr);
437
95
        std::unique_ptr<ParquetColumnReader> column_reader;
438
95
        RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader));
439
95
        _current_non_predicate_columns[local_id] = std::move(column_reader);
440
95
    }
441
96
    *has_row_group = true;
442
96
    return Status::OK();
443
96
}
444
445
3
Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) {
446
3
    DORIS_CHECK(rows >= 0);
447
3
    if (rows == 0) {
448
0
        return Status::OK();
449
0
    }
450
3
    if (_scan_profile.range_gap_skipped_rows != nullptr) {
451
2
        COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows);
452
2
    }
453
3
    for (const auto& column_reader : _current_predicate_columns | std::views::values) {
454
2
        RETURN_IF_ERROR(column_reader->skip(rows));
455
2
    }
456
3
    for (const auto& column_reader : _current_non_predicate_columns | std::views::values) {
457
2
        RETURN_IF_ERROR(column_reader->skip(rows));
458
2
    }
459
3
    _current_row_group_rows_read += rows;
460
3
    return Status::OK();
461
3
}
462
463
Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows,
464
                                                 const format::FileScanRequest& request,
465
                                                 Block* file_block, SelectionVector* selection,
466
                                                 uint16_t* selected_rows,
467
85
                                                 int64_t* conjunct_filtered_rows) {
468
85
    if (!request.conjuncts.empty() || !request.delete_conjuncts.empty()) {
469
43
        selection->resize(static_cast<size_t>(batch_rows));
470
43
    }
471
85
    for (const auto& [fid, column_reader] : _current_predicate_columns) {
472
47
        auto position_it = request.local_positions.find(format::LocalColumnId(fid));
473
47
        DORIS_CHECK(position_it != request.local_positions.end());
474
47
        const auto block_position = position_it->second.value();
475
47
        DCHECK(remove_nullable(column_reader->type())
476
0
                       ->equals(*remove_nullable(file_block->get_by_position(block_position).type)))
477
0
                << column_reader->type()->get_name() << " "
478
0
                << file_block->get_by_position(block_position).type->get_name() << " "
479
0
                << column_reader->name() << " " << file_block->get_by_position(block_position).name;
480
47
        auto column = file_block->get_by_position(block_position).column->assert_mutable();
481
47
        int64_t column_rows = 0;
482
47
        {
483
47
            SCOPED_TIMER(_scan_profile.column_read_time);
484
47
            RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
485
47
        }
486
47
        if (column_rows != batch_rows) {
487
0
            return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows",
488
0
                                      column_reader->name(), column_rows, batch_rows);
489
0
        }
490
47
        file_block->replace_by_position(block_position, std::move(column));
491
47
    }
492
85
    if (_scan_profile.predicate_filter_time == nullptr) {
493
62
        return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
494
62
                                     conjunct_filtered_rows);
495
62
    }
496
23
    SCOPED_TIMER(_scan_profile.predicate_filter_time);
497
23
    return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows,
498
23
                                 conjunct_filtered_rows);
499
85
}
500
501
Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows,
502
                                                          const format::FileScanRequest& request,
503
                                                          int64_t batch_first_file_row,
504
96
                                                          Block* file_block, size_t* rows) {
505
96
    if (_scan_profile.total_batches != nullptr) {
506
23
        COUNTER_UPDATE(_scan_profile.total_batches, 1);
507
23
    }
508
96
    if (_scan_profile.raw_rows_read != nullptr) {
509
23
        COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows);
510
23
    }
511
96
    if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) {
512
11
        *rows = static_cast<size_t>(batch_rows);
513
11
        if (_scan_profile.selected_rows != nullptr) {
514
0
            COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows);
515
0
        }
516
11
        return Status::OK();
517
11
    }
518
85
    SelectionVector selection;
519
85
    DORIS_CHECK(batch_rows <= std::numeric_limits<uint16_t>::max());
520
85
    uint16_t selected_rows = static_cast<uint16_t>(batch_rows);
521
85
    int64_t conjunct_filtered_rows = 0;
522
85
    RETURN_IF_ERROR(read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows,
523
85
                                        &conjunct_filtered_rows));
524
85
    _predicate_filtered_rows += conjunct_filtered_rows;
525
85
    mark_condition_cache_granules(selection, selected_rows, batch_first_file_row);
526
527
85
    const bool need_filter_output = selected_rows != batch_rows;
528
85
    if (_scan_profile.selected_rows != nullptr) {
529
23
        COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows);
530
23
    }
531
85
    if (_scan_profile.rows_filtered_by_conjunct != nullptr) {
532
23
        COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, conjunct_filtered_rows);
533
23
    }
534
85
    if (!_current_non_predicate_columns.empty() &&
535
85
        _scan_profile.lazy_read_filtered_rows != nullptr) {
536
21
        COUNTER_UPDATE(_scan_profile.lazy_read_filtered_rows, batch_rows - selected_rows);
537
21
    }
538
85
    if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) {
539
1
        COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1);
540
1
    }
541
85
    if (need_filter_output) {
542
29
        IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows);
543
32
        for (const auto& col : request.predicate_columns) {
544
32
            auto position_it = request.local_positions.find(col.column_id());
545
32
            DORIS_CHECK(position_it != request.local_positions.end());
546
32
            const auto block_position = position_it->second.value();
547
32
            RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position(
548
32
                    block_position, file_block->get_by_position(block_position)
549
32
                                            .column->filter(output_filter, selected_rows)));
550
32
        }
551
29
    }
552
553
85
    {
554
85
        SCOPED_TIMER(_scan_profile.column_read_time);
555
111
        for (const auto& [fid, column_reader] : _current_non_predicate_columns) {
556
111
            auto position_it = request.local_positions.find(format::LocalColumnId(fid));
557
111
            DORIS_CHECK(position_it != request.local_positions.end());
558
111
            const auto block_position = position_it->second.value();
559
111
            auto column = file_block->get_by_position(block_position).column->assert_mutable();
560
111
            DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
561
0
                      column_reader->type()->get_primitive_type())
562
0
                    << type_to_string(file_block->get_by_position(block_position)
563
0
                                              .type->get_primitive_type())
564
0
                    << " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
565
0
                    << column_reader->name() << " " << fid << " " << block_position;
566
111
            if (need_filter_output) {
567
24
                [[maybe_unused]] auto old_size = column->size();
568
24
                RETURN_IF_ERROR(
569
24
                        column_reader->select(selection, selected_rows, batch_rows, column));
570
24
                if (column->size() != old_size + selected_rows) {
571
0
                    return Status::Corruption(
572
0
                            "Parquet selected output column {} returned {} rows, expected {} rows",
573
0
                            column_reader->name(), column->size(), old_size + selected_rows);
574
0
                }
575
87
            } else {
576
87
                int64_t column_rows = 0;
577
87
                RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
578
87
                if (column_rows != batch_rows) {
579
0
                    return Status::Corruption(
580
0
                            "Parquet output column {} returned {} rows, expected {} rows",
581
0
                            column_reader->name(), column_rows, batch_rows);
582
0
                }
583
87
            }
584
111
            file_block->replace_by_position(block_position, std::move(column));
585
111
        }
586
85
    }
587
85
    *rows = static_cast<size_t>(selected_rows);
588
85
    return Status::OK();
589
85
}
590
591
void ParquetScanScheduler::mark_condition_cache_granules(const SelectionVector& selection,
592
                                                         uint16_t selected_rows,
593
85
                                                         int64_t batch_first_file_row) {
594
85
    if (!_condition_cache_ctx || _condition_cache_ctx->is_hit ||
595
85
        !_condition_cache_ctx->filter_result) {
596
84
        return;
597
84
    }
598
1
    auto& cache = *_condition_cache_ctx->filter_result;
599
2.04k
    for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) {
600
2.04k
        const int64_t file_row = batch_first_file_row + selection.get_index(selection_idx);
601
2.04k
        const int64_t granule = file_row / ConditionCacheContext::GRANULE_SIZE;
602
2.04k
        const int64_t cache_idx = granule - _condition_cache_ctx->base_granule;
603
2.04k
        if (cache_idx >= 0 && static_cast<size_t>(cache_idx) < cache.size()) {
604
2.04k
            cache[static_cast<size_t>(cache_idx)] = true;
605
2.04k
        }
606
2.04k
    }
607
1
}
608
609
Status ParquetScanScheduler::read_next_batch(
610
        ParquetFileContext& file_context,
611
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
612
131
        const format::FileScanRequest& request, Block* file_block, size_t* rows, bool* eof) {
613
131
    *rows = 0;
614
181
    while (true) {
615
181
        if (_current_row_group == nullptr) {
616
133
            bool has_row_group = false;
617
133
            RETURN_IF_ERROR(
618
133
                    open_next_row_group(file_context, file_schema, request, &has_row_group));
619
133
            if (!has_row_group) {
620
37
                *eof = true;
621
37
                return Status::OK();
622
37
            }
623
133
        }
624
625
144
        if (_current_range_idx >= _current_selected_ranges.size()) {
626
            // Current row group finished, try next row group.
627
48
            reset_current_row_group();
628
48
            continue;
629
48
        }
630
631
96
        const RowRange& current_range = _current_selected_ranges[_current_range_idx];
632
96
        DORIS_CHECK(current_range.start >= 0);
633
96
        DORIS_CHECK(current_range.length > 0);
634
96
        DORIS_CHECK(current_range.start + current_range.length <= _current_row_group_rows);
635
636
96
        if (_current_row_group_rows_read < current_range.start) {
637
            // Skip filtered rows according to row group level pruning.
638
3
            RETURN_IF_ERROR(skip_current_row_group_rows(current_range.start -
639
3
                                                        _current_row_group_rows_read));
640
3
        }
641
96
        DORIS_CHECK(_current_row_group_rows_read == current_range.start + _current_range_rows_read);
642
96
        const int64_t remaining_rows = current_range.length - _current_range_rows_read;
643
96
        if (remaining_rows <= 0) {
644
            // Current range finished, try next range in the same row group.
645
0
            ++_current_range_idx;
646
0
            _current_range_rows_read = 0;
647
0
            continue;
648
0
        }
649
650
96
        const int64_t batch_rows =
651
96
                std::min<int64_t>(DEFAULT_PARQUET_READ_BATCH_SIZE, remaining_rows);
652
96
        const int64_t physical_rows_read = batch_rows;
653
96
        const int64_t batch_first_file_row =
654
96
                _current_row_group_first_row + _current_row_group_rows_read;
655
96
        RETURN_IF_ERROR(read_current_row_group_batch(batch_rows, request, batch_first_file_row,
656
96
                                                     file_block, rows));
657
96
        _current_row_group_rows_read += physical_rows_read;
658
96
        _current_range_rows_read += physical_rows_read;
659
96
        if (_current_range_rows_read >= current_range.length) {
660
96
            ++_current_range_idx;
661
96
            _current_range_rows_read = 0;
662
96
        }
663
96
        if (*rows == 0) {
664
2
            continue;
665
2
        }
666
94
        *eof = false;
667
94
        return Status::OK();
668
96
    }
669
131
}
670
671
} // namespace doris::format::parquet