be/src/format_v2/parquet/parquet_scan.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // http://www.apache.org/licenses/LICENSE-2.0 |
9 | | // Unless required by applicable law or agreed to in writing, |
10 | | // software distributed under the License is distributed on an |
11 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
12 | | // KIND, either express or implied. See the License for the |
13 | | // specific language governing permissions and limitations |
14 | | // under the License. |
15 | | |
16 | | #include "format_v2/parquet/parquet_scan.h" |
17 | | |
18 | | #include <algorithm> |
19 | | #include <limits> |
20 | | #include <memory> |
21 | | #include <ranges> |
22 | | #include <unordered_set> |
23 | | #include <utility> |
24 | | |
25 | | #include "common/exception.h" |
26 | | #include "common/status.h" |
27 | | #include "core/assert_cast.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/column/column_vector.h" |
30 | | #include "exprs/vexpr_context.h" |
31 | | #include "format_v2/parquet/parquet_column_schema.h" |
32 | | #include "format_v2/parquet/parquet_file_context.h" |
33 | | #include "format_v2/parquet/parquet_statistics.h" |
34 | | |
35 | | namespace doris::format::parquet { |
36 | | |
37 | | namespace { |
38 | | |
39 | 202 | int64_t column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { |
40 | 202 | return column_metadata.has_dictionary_page() |
41 | 202 | ? cast_set<int64_t>(column_metadata.dictionary_page_offset()) |
42 | 202 | : cast_set<int64_t>(column_metadata.data_page_offset()); |
43 | 202 | } |
44 | | |
45 | | void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema, |
46 | 150 | std::unordered_set<int>* leaf_column_ids) { |
47 | 150 | DORIS_CHECK(leaf_column_ids != nullptr); |
48 | 150 | if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { |
49 | 142 | if (column_schema.leaf_column_id >= 0) { |
50 | 142 | leaf_column_ids->insert(column_schema.leaf_column_id); |
51 | 142 | } |
52 | 142 | return; |
53 | 142 | } |
54 | 12 | for (const auto& child : column_schema.children) { |
55 | 12 | DORIS_CHECK(child != nullptr); |
56 | 12 | collect_all_leaf_column_ids(*child, leaf_column_ids); |
57 | 12 | } |
58 | 8 | } |
59 | | |
60 | | void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema, |
61 | | const format::LocalColumnIndex& projection, |
62 | 144 | std::unordered_set<int>* leaf_column_ids) { |
63 | 144 | DORIS_CHECK(leaf_column_ids != nullptr); |
64 | 144 | if (projection.project_all_children || projection.children.empty()) { |
65 | 138 | collect_all_leaf_column_ids(column_schema, leaf_column_ids); |
66 | 138 | return; |
67 | 138 | } |
68 | 7 | for (const auto& child_projection : projection.children) { |
69 | 7 | const auto child_it = |
70 | 11 | std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { |
71 | 11 | return child_schema->local_id == child_projection.local_id(); |
72 | 11 | }); |
73 | 7 | DORIS_CHECK(child_it != column_schema.children.end()); |
74 | 7 | collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids); |
75 | 7 | } |
76 | 6 | } |
77 | | |
78 | | bool is_row_group_outside_range(const ::parquet::FileMetaData& metadata, |
79 | 200 | const ParquetScanRange& scan_range, int row_group_idx) { |
80 | 200 | if (scan_range.size < 0) { |
81 | 170 | return false; |
82 | 170 | } |
83 | 30 | const int64_t range_start_offset = scan_range.start_offset; |
84 | 30 | const int64_t range_end_offset = range_start_offset + scan_range.size; |
85 | 30 | DORIS_CHECK(range_start_offset >= 0); |
86 | 30 | DORIS_CHECK(range_end_offset >= range_start_offset); |
87 | 30 | if (range_start_offset == 0 && |
88 | 30 | (scan_range.file_size < 0 || range_end_offset >= scan_range.file_size)) { |
89 | 0 | return false; |
90 | 0 | } |
91 | | |
92 | 30 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
93 | 30 | DORIS_CHECK(row_group_metadata != nullptr); |
94 | 30 | DORIS_CHECK(row_group_metadata->num_columns() > 0); |
95 | 30 | const auto first_column = row_group_metadata->ColumnChunk(0); |
96 | 30 | const auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1); |
97 | 30 | DORIS_CHECK(first_column != nullptr); |
98 | 30 | DORIS_CHECK(last_column != nullptr); |
99 | 30 | const int64_t row_group_start_offset = column_start_offset(*first_column); |
100 | 30 | const int64_t row_group_end_offset = |
101 | 30 | column_start_offset(*last_column) + last_column->total_compressed_size(); |
102 | 30 | const int64_t row_group_mid_offset = |
103 | 30 | row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2; |
104 | 30 | return row_group_mid_offset < range_start_offset || row_group_mid_offset >= range_end_offset; |
105 | 30 | } |
106 | | |
107 | 99 | std::vector<format::LocalColumnIndex> request_scan_columns(const format::FileScanRequest& request) { |
108 | 99 | std::vector<format::LocalColumnIndex> scan_columns; |
109 | 99 | scan_columns.reserve(request.predicate_columns.size() + request.non_predicate_columns.size()); |
110 | 99 | scan_columns.insert(scan_columns.end(), request.predicate_columns.begin(), |
111 | 99 | request.predicate_columns.end()); |
112 | 99 | scan_columns.insert(scan_columns.end(), request.non_predicate_columns.begin(), |
113 | 99 | request.non_predicate_columns.end()); |
114 | 99 | return scan_columns; |
115 | 99 | } |
116 | | |
117 | | std::vector<ParquetPageCacheRange> build_row_group_prefetch_ranges( |
118 | | const ::parquet::FileMetaData& metadata, |
119 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
120 | 99 | const std::vector<format::LocalColumnIndex>& scan_columns, int row_group_idx) { |
121 | 99 | std::unordered_set<int> leaf_column_ids; |
122 | 164 | for (const auto& projection : scan_columns) { |
123 | 164 | const auto local_id = projection.local_id(); |
124 | 164 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
125 | 164 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
126 | 27 | continue; |
127 | 27 | } |
128 | 137 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
129 | 137 | DORIS_CHECK(file_schema[local_id] != nullptr); |
130 | | // Prefetch and merge-reader ranges must be physical leaf chunks, not Doris logical slots. |
131 | | // Example: for a struct column s<a:int,b:string>, projecting only s.a should include only |
132 | | // the Parquet leaf chunk of a. Projecting the whole struct includes both a and b. |
133 | 137 | collect_projected_leaf_column_ids(*file_schema[local_id], projection, &leaf_column_ids); |
134 | 137 | } |
135 | | |
136 | 99 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
137 | 99 | DORIS_CHECK(row_group_metadata != nullptr); |
138 | 99 | std::vector<int> ordered_leaf_column_ids(leaf_column_ids.begin(), leaf_column_ids.end()); |
139 | 99 | std::ranges::sort(ordered_leaf_column_ids); |
140 | | |
141 | 99 | std::vector<ParquetPageCacheRange> ranges; |
142 | 99 | ranges.reserve(ordered_leaf_column_ids.size()); |
143 | 142 | for (const auto leaf_column_id : ordered_leaf_column_ids) { |
144 | 142 | DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns()); |
145 | 142 | auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id); |
146 | 142 | DORIS_CHECK(column_metadata != nullptr); |
147 | 142 | const int64_t offset = column_start_offset(*column_metadata); |
148 | 142 | const int64_t size = column_metadata->total_compressed_size(); |
149 | 142 | DORIS_CHECK(offset >= 0); |
150 | 142 | if (size > 0) { |
151 | 142 | ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size}); |
152 | 142 | } |
153 | 142 | } |
154 | 99 | return ranges; |
155 | 99 | } |
156 | | |
157 | | } // namespace |
158 | | |
159 | | Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata, |
160 | | ::parquet::ParquetFileReader* file_reader, |
161 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
162 | | const format::FileScanRequest& request, |
163 | | const ParquetScanRange& scan_range, bool enable_bloom_filter, |
164 | 116 | RowGroupScanPlan* plan, const cctz::time_zone* timezone) { |
165 | 116 | DORIS_CHECK(plan != nullptr); |
166 | 116 | plan->row_groups.clear(); |
167 | 116 | plan->pruning_stats = ParquetPruningStats {}; |
168 | | |
169 | 116 | std::vector<int64_t> row_group_first_rows(metadata.num_row_groups()); |
170 | 116 | std::vector<int> scan_range_selected_row_groups; |
171 | 116 | scan_range_selected_row_groups.reserve(metadata.num_row_groups()); |
172 | 116 | int64_t next_row_group_first_row = 0; |
173 | 316 | for (int row_group_idx = 0; row_group_idx < metadata.num_row_groups(); ++row_group_idx) { |
174 | 200 | row_group_first_rows[row_group_idx] = next_row_group_first_row; |
175 | 200 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
176 | 200 | DORIS_CHECK(row_group_metadata != nullptr); |
177 | 200 | const int64_t row_group_rows = row_group_metadata->num_rows(); |
178 | 200 | if (row_group_rows < 0) { |
179 | 0 | return Status::Corruption("Invalid negative row count in parquet row group {}", |
180 | 0 | row_group_idx); |
181 | 0 | } |
182 | 200 | next_row_group_first_row += row_group_rows; |
183 | 200 | if (!is_row_group_outside_range(metadata, scan_range, row_group_idx)) { |
184 | 179 | scan_range_selected_row_groups.push_back(row_group_idx); |
185 | 179 | } |
186 | 200 | } |
187 | | |
188 | 116 | std::vector<int> statistics_selected_row_groups; |
189 | 116 | RETURN_IF_ERROR(select_row_groups_by_statistics( |
190 | 116 | metadata, file_reader, file_schema, request, &scan_range_selected_row_groups, |
191 | 116 | &statistics_selected_row_groups, enable_bloom_filter, &plan->pruning_stats, timezone)); |
192 | | |
193 | 116 | plan->row_groups.reserve(statistics_selected_row_groups.size()); |
194 | 144 | for (const auto row_group_idx : statistics_selected_row_groups) { |
195 | 144 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
196 | 144 | DORIS_CHECK(row_group_metadata != nullptr); |
197 | 144 | const int64_t row_group_rows = row_group_metadata->num_rows(); |
198 | 144 | if (row_group_rows == 0) { |
199 | 0 | continue; |
200 | 0 | } |
201 | | |
202 | 144 | RowGroupReadPlan row_group_plan; |
203 | 144 | row_group_plan.row_group_id = row_group_idx; |
204 | 144 | row_group_plan.first_file_row = row_group_first_rows[row_group_idx]; |
205 | 144 | row_group_plan.row_group_rows = row_group_rows; |
206 | 144 | RETURN_IF_ERROR(select_row_group_ranges_by_page_index( |
207 | 144 | file_reader, file_schema, request, row_group_idx, row_group_rows, |
208 | 144 | &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans, |
209 | 144 | &plan->pruning_stats, timezone)); |
210 | 144 | if (row_group_plan.selected_ranges.empty()) { |
211 | 1 | continue; |
212 | 1 | } |
213 | 143 | plan->pruning_stats.selected_row_ranges += row_group_plan.selected_ranges.size(); |
214 | 143 | plan->row_groups.push_back(std::move(row_group_plan)); |
215 | 143 | } |
216 | 116 | plan->pruning_stats.selected_row_groups = plan->row_groups.size(); |
217 | 116 | return Status::OK(); |
218 | 116 | } |
219 | | |
220 | | namespace { |
221 | | |
222 | | uint16_t apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection, |
223 | 42 | uint16_t selected_rows) { |
224 | 42 | uint16_t new_selected_rows = 0; |
225 | 6.36k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
226 | 6.32k | const auto row_idx = selection->get_index(selection_idx); |
227 | 6.32k | if (filter[row_idx] != 0) { |
228 | 4.23k | selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx)); |
229 | 4.23k | } |
230 | 6.32k | } |
231 | 42 | return new_selected_rows; |
232 | 42 | } |
233 | | |
234 | | Status execute_filter_conjuncts(const format::FileScanRequest& request, int64_t batch_rows, |
235 | | Block* file_block, SelectionVector* selection, |
236 | 43 | uint16_t* selected_rows) { |
237 | 43 | for (const auto& conjunct : request.conjuncts) { |
238 | 31 | if (*selected_rows == 0) { |
239 | 0 | break; |
240 | 0 | } |
241 | 31 | DORIS_CHECK(conjunct != nullptr); |
242 | 31 | IColumn::Filter filter(static_cast<size_t>(batch_rows), 1); |
243 | 31 | bool can_filter_all = false; |
244 | 31 | RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(), |
245 | 31 | static_cast<size_t>(batch_rows), false, |
246 | 31 | &can_filter_all)); |
247 | 31 | *selected_rows = |
248 | 31 | can_filter_all ? 0 : apply_filter_to_selection(filter, selection, *selected_rows); |
249 | 31 | } |
250 | 43 | return Status::OK(); |
251 | 43 | } |
252 | | |
253 | | Status execute_delete_conjuncts(const format::FileScanRequest& request, int64_t batch_rows, |
254 | | Block* file_block, SelectionVector* selection, |
255 | 41 | uint16_t* selected_rows) { |
256 | 41 | for (const auto& delete_conjunct : request.delete_conjuncts) { |
257 | 13 | if (*selected_rows == 0) { |
258 | 0 | break; |
259 | 0 | } |
260 | 13 | DORIS_CHECK(delete_conjunct != nullptr); |
261 | 13 | int result_column_id = -1; |
262 | 13 | RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block, |
263 | 13 | &result_column_id)); |
264 | 13 | DORIS_CHECK(result_column_id >= 0 && |
265 | 13 | result_column_id < static_cast<int>(file_block->columns())); |
266 | 13 | const auto& delete_filter = assert_cast<const ColumnUInt8&>( |
267 | 13 | *file_block->get_by_position(result_column_id).column) |
268 | 13 | .get_data(); |
269 | 13 | DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows)); |
270 | 13 | IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1); |
271 | 13 | bool has_kept_row = false; |
272 | 64 | for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) { |
273 | 51 | keep_filter[row] = !delete_filter[row]; |
274 | 51 | has_kept_row |= keep_filter[row] != 0; |
275 | 51 | } |
276 | 13 | file_block->erase(result_column_id); |
277 | 13 | *selected_rows = |
278 | 13 | !has_kept_row ? 0 |
279 | 13 | : apply_filter_to_selection(keep_filter, selection, *selected_rows); |
280 | 13 | } |
281 | 41 | return Status::OK(); |
282 | 41 | } |
283 | | |
284 | | } // namespace |
285 | | |
286 | | IColumn::Filter selection_to_filter(const SelectionVector& selection, uint16_t selected_rows, |
287 | 29 | int64_t batch_rows) { |
288 | 29 | IColumn::Filter filter(static_cast<size_t>(batch_rows), 0); |
289 | 2.13k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
290 | 2.10k | filter[selection.get_index(selection_idx)] = 1; |
291 | 2.10k | } |
292 | 29 | return filter; |
293 | 29 | } |
294 | | |
295 | | Status execute_batch_filters(const format::FileScanRequest& request, int64_t batch_rows, |
296 | | Block* file_block, SelectionVector* selection, uint16_t* selected_rows, |
297 | 92 | int64_t* conjunct_filtered_rows) { |
298 | 92 | if (request.conjuncts.empty() && request.delete_conjuncts.empty()) { |
299 | 49 | return Status::OK(); |
300 | 49 | } |
301 | 43 | const auto selected_rows_before_conjunct = *selected_rows; |
302 | 43 | RETURN_IF_ERROR( |
303 | 43 | execute_filter_conjuncts(request, batch_rows, file_block, selection, selected_rows)); |
304 | 43 | if (conjunct_filtered_rows != nullptr) { |
305 | 43 | *conjunct_filtered_rows += static_cast<int64_t>(selected_rows_before_conjunct) - |
306 | 43 | static_cast<int64_t>(*selected_rows); |
307 | 43 | } |
308 | 43 | if (*selected_rows == 0) { |
309 | 2 | return Status::OK(); |
310 | 2 | } |
311 | 41 | return execute_delete_conjuncts(request, batch_rows, file_block, selection, selected_rows); |
312 | 43 | } |
313 | | |
314 | | namespace { |
315 | 2 | int64_t count_range_rows(const std::vector<RowRange>& ranges) { |
316 | 2 | int64_t rows = 0; |
317 | 2 | for (const auto& range : ranges) { |
318 | 2 | rows += range.length; |
319 | 2 | } |
320 | 2 | return rows; |
321 | 2 | } |
322 | | |
323 | | void append_intersection(const RowRange& left, const RowRange& right, |
324 | 1 | std::vector<RowRange>* result) { |
325 | 1 | const int64_t start = std::max(left.start, right.start); |
326 | 1 | const int64_t end = std::min(left.start + left.length, right.start + right.length); |
327 | 1 | if (start < end) { |
328 | 1 | result->push_back(RowRange {.start = start, .length = end - start}); |
329 | 1 | } |
330 | 1 | } |
331 | | |
332 | | std::vector<RowRange> filter_ranges_by_condition_cache(const std::vector<RowRange>& ranges, |
333 | | const std::vector<bool>& cache, |
334 | | int64_t row_group_first_row, |
335 | 1 | int64_t base_granule) { |
336 | 1 | std::vector<RowRange> result; |
337 | 1 | if (cache.empty()) { |
338 | 0 | return ranges; |
339 | 0 | } |
340 | | |
341 | | // Cache coordinates are file-global granules; RowRange coordinates are row-group-relative. |
342 | | // Walk every selected range in order and split it by granule. Granules covered by the bitmap |
343 | | // are kept only when the bit is true. Granules outside the bitmap are kept conservatively, so |
344 | | // an undersized or old-format cache entry cannot skip valid rows. |
345 | 1 | for (const auto& range : ranges) { |
346 | 1 | const int64_t global_start = row_group_first_row + range.start; |
347 | 1 | const int64_t global_end = global_start + range.length; |
348 | 1 | for (int64_t granule = global_start / ConditionCacheContext::GRANULE_SIZE; |
349 | 3 | granule <= (global_end - 1) / ConditionCacheContext::GRANULE_SIZE; ++granule) { |
350 | 2 | const int64_t cache_idx = granule - base_granule; |
351 | 2 | const bool keep = cache_idx < 0 || static_cast<size_t>(cache_idx) >= cache.size() || |
352 | 2 | cache[static_cast<size_t>(cache_idx)]; |
353 | 2 | if (!keep) { |
354 | 1 | continue; |
355 | 1 | } |
356 | 1 | const int64_t granule_start = granule * ConditionCacheContext::GRANULE_SIZE; |
357 | 1 | const int64_t granule_end = granule_start + ConditionCacheContext::GRANULE_SIZE; |
358 | 1 | const RowRange file_granule_range {.start = granule_start - row_group_first_row, |
359 | 1 | .length = granule_end - granule_start}; |
360 | 1 | append_intersection(range, file_granule_range, &result); |
361 | 1 | } |
362 | 1 | } |
363 | 1 | return result; |
364 | 1 | } |
365 | | |
366 | | } // namespace |
367 | | |
368 | 104 | void ParquetScanScheduler::set_plan(RowGroupScanPlan plan) { |
369 | 104 | _row_group_plans = std::move(plan.row_groups); |
370 | 104 | _condition_cache_filtered_rows = 0; |
371 | 104 | _predicate_filtered_rows = 0; |
372 | 104 | reset(); |
373 | 104 | } |
374 | | |
375 | 2 | void ParquetScanScheduler::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) { |
376 | 2 | _condition_cache_ctx = std::move(ctx); |
377 | 2 | if (!_condition_cache_ctx || !_condition_cache_ctx->filter_result || _row_group_plans.empty()) { |
378 | 0 | return; |
379 | 0 | } |
380 | | |
381 | 2 | _condition_cache_ctx->base_granule = |
382 | 2 | _row_group_plans.front().first_file_row / ConditionCacheContext::GRANULE_SIZE; |
383 | 2 | if (!_condition_cache_ctx->is_hit) { |
384 | 1 | return; |
385 | 1 | } |
386 | | |
387 | 1 | std::vector<RowGroupReadPlan> filtered_plans; |
388 | 1 | filtered_plans.reserve(_row_group_plans.size()); |
389 | 1 | for (auto& plan : _row_group_plans) { |
390 | 1 | const int64_t old_rows = count_range_rows(plan.selected_ranges); |
391 | 1 | plan.selected_ranges = filter_ranges_by_condition_cache( |
392 | 1 | plan.selected_ranges, *_condition_cache_ctx->filter_result, plan.first_file_row, |
393 | 1 | _condition_cache_ctx->base_granule); |
394 | 1 | const int64_t new_rows = count_range_rows(plan.selected_ranges); |
395 | 1 | _condition_cache_filtered_rows += old_rows - new_rows; |
396 | 1 | if (!plan.selected_ranges.empty()) { |
397 | 1 | filtered_plans.push_back(std::move(plan)); |
398 | 1 | } |
399 | 1 | } |
400 | 1 | _row_group_plans = std::move(filtered_plans); |
401 | 1 | reset(); |
402 | 1 | } |
403 | | |
404 | 105 | void ParquetScanScheduler::reset() { |
405 | 105 | _next_row_group_plan_idx = 0; |
406 | 105 | reset_current_row_group(); |
407 | 105 | } |
408 | | |
409 | 156 | void ParquetScanScheduler::reset_current_row_group() { |
410 | 156 | _current_row_group.reset(); |
411 | 156 | _current_predicate_columns.clear(); |
412 | 156 | _current_non_predicate_columns.clear(); |
413 | 156 | _current_row_group_rows = 0; |
414 | 156 | _current_row_group_id = -1; |
415 | 156 | _current_row_group_rows_read = 0; |
416 | 156 | _current_row_group_first_row = 0; |
417 | 156 | _current_selected_ranges.clear(); |
418 | 156 | _current_range_idx = 0; |
419 | 156 | _current_range_rows_read = 0; |
420 | 156 | _current_predicate_prefetched = false; |
421 | 156 | _current_non_predicate_prefetched = false; |
422 | 156 | _current_merge_range_active = false; |
423 | 156 | } |
424 | | |
425 | | Status ParquetScanScheduler::open_next_row_group( |
426 | | ParquetFileContext& file_context, |
427 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
428 | 139 | const format::FileScanRequest& request, bool* has_row_group) { |
429 | 139 | *has_row_group = false; |
430 | 139 | if (_next_row_group_plan_idx >= _row_group_plans.size()) { |
431 | 40 | return Status::OK(); |
432 | 40 | } |
433 | 99 | const RowGroupReadPlan& row_group_plan = _row_group_plans[_next_row_group_plan_idx++]; |
434 | 99 | const int row_group_idx = row_group_plan.row_group_id; |
435 | 99 | _current_merge_range_active = |
436 | 99 | prepare_current_row_group_reader(file_context, file_schema, request, row_group_idx); |
437 | 99 | try { |
438 | 99 | _current_row_group = file_context.file_reader->RowGroup(row_group_idx); |
439 | 99 | } catch (const ::parquet::ParquetException& e) { |
440 | 0 | return Status::Corruption("Failed to open parquet row group {}: {}", row_group_idx, |
441 | 0 | e.what()); |
442 | 0 | } catch (const std::exception& e) { |
443 | 0 | return Status::InternalError("Failed to open parquet row group {}: {}", row_group_idx, |
444 | 0 | e.what()); |
445 | 0 | } |
446 | | |
447 | 99 | auto row_group_metadata = file_context.metadata->RowGroup(row_group_idx); |
448 | 99 | DORIS_CHECK(row_group_metadata != nullptr); |
449 | 99 | _current_row_group_rows = row_group_metadata->num_rows(); |
450 | 99 | DORIS_CHECK(_current_row_group_rows == row_group_plan.row_group_rows); |
451 | 99 | DORIS_CHECK(_current_row_group_rows > 0); |
452 | 99 | _current_row_group_id = row_group_idx; |
453 | 99 | DORIS_CHECK(!row_group_plan.selected_ranges.empty()); |
454 | 99 | _current_row_group_first_row = row_group_plan.first_file_row; |
455 | 99 | _current_row_group_rows_read = 0; |
456 | 99 | _current_selected_ranges = row_group_plan.selected_ranges; |
457 | 99 | _current_range_idx = 0; |
458 | 99 | _current_range_rows_read = 0; |
459 | 99 | _current_predicate_columns.clear(); |
460 | 99 | _current_non_predicate_columns.clear(); |
461 | | |
462 | 99 | ParquetColumnReaderFactory column_reader_factory( |
463 | 99 | _current_row_group, file_context.schema->num_columns(), &row_group_plan.page_skip_plans, |
464 | 99 | _page_skip_profile, _timezone, _enable_strict_mode, |
465 | 99 | _scan_profile.column_reader_profile); |
466 | 99 | for (const auto& col : request.predicate_columns) { |
467 | 47 | const auto local_id = col.local_id(); |
468 | 47 | if (local_id == format::ROW_POSITION_COLUMN_ID) { |
469 | 11 | _current_predicate_columns[local_id] = |
470 | 11 | column_reader_factory.create_row_position_column_reader( |
471 | 11 | _current_row_group_first_row); |
472 | 11 | continue; |
473 | 11 | } |
474 | 36 | if (local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
475 | 0 | DORIS_CHECK(_global_rowid_context.has_value()); |
476 | 0 | _current_predicate_columns[local_id] = |
477 | 0 | column_reader_factory.create_global_rowid_column_reader( |
478 | 0 | *_global_rowid_context, _current_row_group_first_row); |
479 | 0 | continue; |
480 | 0 | } |
481 | | |
482 | 36 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
483 | 36 | const auto& column_schema = file_schema[local_id]; |
484 | 36 | DORIS_CHECK(column_schema != nullptr); |
485 | 36 | std::unique_ptr<ParquetColumnReader> column_reader; |
486 | 36 | RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader)); |
487 | 36 | _current_predicate_columns[local_id] = std::move(column_reader); |
488 | 36 | } |
489 | | // Start warming filter-column chunks as soon as their row group is selected. Parquet v2 still |
490 | | // reads through Arrow's random-access reader; this prefetch only warms Doris file cache blocks |
491 | | // in the background and never changes the row/column materialization order. |
492 | 99 | if (!_current_merge_range_active) { |
493 | 11 | prefetch_current_row_group_columns(file_context, file_schema, request.predicate_columns, |
494 | 11 | &_current_predicate_prefetched); |
495 | 11 | } |
496 | 117 | for (const auto& col : request.non_predicate_columns) { |
497 | 117 | const auto local_id = col.local_id(); |
498 | 117 | if (local_id == format::ROW_POSITION_COLUMN_ID) { |
499 | 14 | _current_non_predicate_columns[local_id] = |
500 | 14 | column_reader_factory.create_row_position_column_reader( |
501 | 14 | _current_row_group_first_row); |
502 | 14 | continue; |
503 | 14 | } |
504 | 103 | if (local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
505 | 2 | DORIS_CHECK(_global_rowid_context.has_value()); |
506 | 2 | _current_non_predicate_columns[local_id] = |
507 | 2 | column_reader_factory.create_global_rowid_column_reader( |
508 | 2 | *_global_rowid_context, _current_row_group_first_row); |
509 | 2 | continue; |
510 | 2 | } |
511 | 101 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
512 | 101 | const auto& column_schema = file_schema[local_id]; |
513 | 101 | DORIS_CHECK(column_schema != nullptr); |
514 | 101 | std::unique_ptr<ParquetColumnReader> column_reader; |
515 | 101 | RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader)); |
516 | 101 | _current_non_predicate_columns[local_id] = std::move(column_reader); |
517 | 101 | } |
518 | 99 | if (!_current_merge_range_active && request.conjuncts.empty() && |
519 | 99 | request.delete_conjuncts.empty()) { |
520 | | // With no row-level filters there is no lazy-read decision to wait for, so start warming |
521 | | // output chunks immediately after their readers are created. Filtered scans still defer |
522 | | // this until at least one row survives the predicate phase. |
523 | 11 | prefetch_current_row_group_columns(file_context, file_schema, request.non_predicate_columns, |
524 | 11 | &_current_non_predicate_prefetched); |
525 | 11 | } |
526 | 99 | *has_row_group = true; |
527 | 99 | return Status::OK(); |
528 | 99 | } |
529 | | |
530 | 3 | Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) { |
531 | 3 | DORIS_CHECK(rows >= 0); |
532 | 3 | if (rows == 0) { |
533 | 0 | return Status::OK(); |
534 | 0 | } |
535 | 3 | if (_scan_profile.range_gap_skipped_rows != nullptr) { |
536 | 2 | COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows); |
537 | 2 | } |
538 | 3 | for (const auto& column_reader : _current_predicate_columns | std::views::values) { |
539 | 2 | RETURN_IF_ERROR(column_reader->skip(rows)); |
540 | 2 | } |
541 | 3 | for (const auto& column_reader : _current_non_predicate_columns | std::views::values) { |
542 | 2 | RETURN_IF_ERROR(column_reader->skip(rows)); |
543 | 2 | } |
544 | 3 | _current_row_group_rows_read += rows; |
545 | 3 | return Status::OK(); |
546 | 3 | } |
547 | | |
548 | | Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows, |
549 | | const format::FileScanRequest& request, |
550 | | Block* file_block, SelectionVector* selection, |
551 | | uint16_t* selected_rows, |
552 | 92 | int64_t* conjunct_filtered_rows) { |
553 | 92 | if (!request.conjuncts.empty() || !request.delete_conjuncts.empty()) { |
554 | 43 | selection->resize(static_cast<size_t>(batch_rows)); |
555 | 43 | } |
556 | 92 | for (const auto& [fid, column_reader] : _current_predicate_columns) { |
557 | 47 | auto position_it = request.local_positions.find(format::LocalColumnId(fid)); |
558 | 47 | DORIS_CHECK(position_it != request.local_positions.end()); |
559 | 47 | const auto block_position = position_it->second.value(); |
560 | 47 | DCHECK(remove_nullable(column_reader->type()) |
561 | 0 | ->equals(*remove_nullable(file_block->get_by_position(block_position).type))) |
562 | 0 | << column_reader->type()->get_name() << " " |
563 | 0 | << file_block->get_by_position(block_position).type->get_name() << " " |
564 | 0 | << column_reader->name() << " " << file_block->get_by_position(block_position).name; |
565 | 47 | auto column = file_block->get_by_position(block_position).column->assert_mutable(); |
566 | 47 | int64_t column_rows = 0; |
567 | 47 | { |
568 | 47 | SCOPED_TIMER(_scan_profile.column_read_time); |
569 | 47 | RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); |
570 | 47 | } |
571 | 47 | if (column_rows != batch_rows) { |
572 | 0 | return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows", |
573 | 0 | column_reader->name(), column_rows, batch_rows); |
574 | 0 | } |
575 | 47 | file_block->replace_by_position(block_position, std::move(column)); |
576 | 47 | } |
577 | 92 | if (_scan_profile.predicate_filter_time == nullptr) { |
578 | 67 | return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows, |
579 | 67 | conjunct_filtered_rows); |
580 | 67 | } |
581 | 25 | SCOPED_TIMER(_scan_profile.predicate_filter_time); |
582 | 25 | return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows, |
583 | 25 | conjunct_filtered_rows); |
584 | 92 | } |
585 | | |
586 | | bool ParquetScanScheduler::prepare_current_row_group_reader( |
587 | | ParquetFileContext& file_context, |
588 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
589 | 99 | const format::FileScanRequest& request, int row_group_idx) { |
590 | 99 | if (file_context.metadata == nullptr) { |
591 | 0 | return false; |
592 | 0 | } |
593 | 99 | const auto ranges = build_row_group_prefetch_ranges( |
594 | 99 | *file_context.metadata, file_schema, request_scan_columns(request), row_group_idx); |
595 | 99 | const size_t avg_io_size = detail::average_prefetch_range_size(ranges); |
596 | 99 | return file_context.set_random_access_ranges(ranges, avg_io_size, _profile, |
597 | 99 | _merge_read_slice_size); |
598 | 99 | } |
599 | | |
600 | | void ParquetScanScheduler::prefetch_current_row_group_columns( |
601 | | ParquetFileContext& file_context, |
602 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
603 | 22 | const std::vector<format::LocalColumnIndex>& scan_columns, bool* prefetched) { |
604 | 22 | DORIS_CHECK(prefetched != nullptr); |
605 | 22 | if (_current_merge_range_active || *prefetched || scan_columns.empty() || |
606 | 22 | _current_row_group_id < 0 || file_context.metadata == nullptr) { |
607 | 22 | return; |
608 | 22 | } |
609 | 0 | *prefetched = true; |
610 | | // The scanner request separates predicate and non-predicate columns so Parquet can read |
611 | | // predicate columns first and lazily materialize the rest. Keep the same contract for |
612 | | // prefetch: callers decide which side to warm, and this helper only translates that selected |
613 | | // projection into physical column-chunk byte ranges for the current row group. |
614 | 0 | file_context.prefetch_ranges( |
615 | 0 | build_row_group_prefetch_ranges(*file_context.metadata, file_schema, scan_columns, |
616 | 0 | _current_row_group_id), |
617 | 0 | nullptr); |
618 | 0 | } |
619 | | |
620 | | Status ParquetScanScheduler::read_current_row_group_batch( |
621 | | ParquetFileContext& file_context, |
622 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, int64_t batch_rows, |
623 | | const format::FileScanRequest& request, int64_t batch_first_file_row, Block* file_block, |
624 | 103 | size_t* rows) { |
625 | 103 | if (_scan_profile.total_batches != nullptr) { |
626 | 25 | COUNTER_UPDATE(_scan_profile.total_batches, 1); |
627 | 25 | } |
628 | 103 | if (_scan_profile.raw_rows_read != nullptr) { |
629 | 25 | COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows); |
630 | 25 | } |
631 | 103 | if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) { |
632 | 11 | *rows = static_cast<size_t>(batch_rows); |
633 | 11 | if (_scan_profile.selected_rows != nullptr) { |
634 | 0 | COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows); |
635 | 0 | } |
636 | 11 | return Status::OK(); |
637 | 11 | } |
638 | 92 | SelectionVector selection; |
639 | 92 | DORIS_CHECK(batch_rows <= std::numeric_limits<uint16_t>::max()); |
640 | 92 | uint16_t selected_rows = static_cast<uint16_t>(batch_rows); |
641 | 92 | int64_t conjunct_filtered_rows = 0; |
642 | 92 | RETURN_IF_ERROR(read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows, |
643 | 92 | &conjunct_filtered_rows)); |
644 | 92 | _predicate_filtered_rows += conjunct_filtered_rows; |
645 | 92 | mark_condition_cache_granules(selection, selected_rows, batch_first_file_row); |
646 | | |
647 | 92 | const bool need_filter_output = selected_rows != batch_rows; |
648 | 92 | if (_scan_profile.selected_rows != nullptr) { |
649 | 25 | COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows); |
650 | 25 | } |
651 | 92 | if (_scan_profile.rows_filtered_by_conjunct != nullptr) { |
652 | 25 | COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, conjunct_filtered_rows); |
653 | 25 | } |
654 | 92 | if (!_current_non_predicate_columns.empty() && |
655 | 92 | _scan_profile.lazy_read_filtered_rows != nullptr) { |
656 | 23 | COUNTER_UPDATE(_scan_profile.lazy_read_filtered_rows, batch_rows - selected_rows); |
657 | 23 | } |
658 | 92 | if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) { |
659 | 1 | COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1); |
660 | 1 | } |
661 | 92 | if (need_filter_output) { |
662 | 29 | IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows); |
663 | 32 | for (const auto& col : request.predicate_columns) { |
664 | 32 | auto position_it = request.local_positions.find(col.column_id()); |
665 | 32 | DORIS_CHECK(position_it != request.local_positions.end()); |
666 | 32 | const auto block_position = position_it->second.value(); |
667 | 32 | RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position( |
668 | 32 | block_position, file_block->get_by_position(block_position) |
669 | 32 | .column->filter(output_filter, selected_rows))); |
670 | 32 | } |
671 | 29 | } |
672 | 92 | if (!_current_merge_range_active && selected_rows > 0 && |
673 | 92 | !_current_non_predicate_columns.empty()) { |
674 | | // Do not prefetch lazy output columns until at least one row survives filtering. This is |
675 | | // the same decision point where the v2 reader switches from predicate-only reads to |
676 | | // materializing non-predicate columns, so fully filtered batches avoid unnecessary IO. |
677 | 0 | prefetch_current_row_group_columns(file_context, file_schema, request.non_predicate_columns, |
678 | 0 | &_current_non_predicate_prefetched); |
679 | 0 | } |
680 | | |
681 | 92 | { |
682 | 92 | SCOPED_TIMER(_scan_profile.column_read_time); |
683 | 125 | for (const auto& [fid, column_reader] : _current_non_predicate_columns) { |
684 | 125 | auto position_it = request.local_positions.find(format::LocalColumnId(fid)); |
685 | 125 | DORIS_CHECK(position_it != request.local_positions.end()); |
686 | 125 | const auto block_position = position_it->second.value(); |
687 | 125 | auto column = file_block->get_by_position(block_position).column->assert_mutable(); |
688 | 125 | DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(), |
689 | 0 | column_reader->type()->get_primitive_type()) |
690 | 0 | << type_to_string(file_block->get_by_position(block_position) |
691 | 0 | .type->get_primitive_type()) |
692 | 0 | << " " << type_to_string(column_reader->type()->get_primitive_type()) << " " |
693 | 0 | << column_reader->name() << " " << fid << " " << block_position; |
694 | 125 | if (need_filter_output) { |
695 | 24 | [[maybe_unused]] auto old_size = column->size(); |
696 | 24 | RETURN_IF_ERROR( |
697 | 24 | column_reader->select(selection, selected_rows, batch_rows, column)); |
698 | 24 | if (column->size() != old_size + selected_rows) { |
699 | 0 | return Status::Corruption( |
700 | 0 | "Parquet selected output column {} returned {} rows, expected {} rows", |
701 | 0 | column_reader->name(), column->size(), old_size + selected_rows); |
702 | 0 | } |
703 | 101 | } else { |
704 | 101 | int64_t column_rows = 0; |
705 | 101 | RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); |
706 | 101 | if (column_rows != batch_rows) { |
707 | 0 | return Status::Corruption( |
708 | 0 | "Parquet output column {} returned {} rows, expected {} rows", |
709 | 0 | column_reader->name(), column_rows, batch_rows); |
710 | 0 | } |
711 | 101 | } |
712 | 125 | file_block->replace_by_position(block_position, std::move(column)); |
713 | 125 | } |
714 | 92 | } |
715 | 92 | *rows = static_cast<size_t>(selected_rows); |
716 | 92 | return Status::OK(); |
717 | 92 | } |
718 | | |
719 | | void ParquetScanScheduler::mark_condition_cache_granules(const SelectionVector& selection, |
720 | | uint16_t selected_rows, |
721 | 92 | int64_t batch_first_file_row) { |
722 | 92 | if (!_condition_cache_ctx || _condition_cache_ctx->is_hit || |
723 | 92 | !_condition_cache_ctx->filter_result) { |
724 | 91 | return; |
725 | 91 | } |
726 | 1 | auto& cache = *_condition_cache_ctx->filter_result; |
727 | 2.04k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
728 | 2.04k | const int64_t file_row = batch_first_file_row + selection.get_index(selection_idx); |
729 | 2.04k | const int64_t granule = file_row / ConditionCacheContext::GRANULE_SIZE; |
730 | 2.04k | const int64_t cache_idx = granule - _condition_cache_ctx->base_granule; |
731 | 2.04k | if (cache_idx >= 0 && static_cast<size_t>(cache_idx) < cache.size()) { |
732 | 2.04k | cache[static_cast<size_t>(cache_idx)] = true; |
733 | 2.04k | } |
734 | 2.04k | } |
735 | 1 | } |
736 | | |
737 | | Status ParquetScanScheduler::read_next_batch( |
738 | | ParquetFileContext& file_context, |
739 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
740 | 141 | const format::FileScanRequest& request, Block* file_block, size_t* rows, bool* eof) { |
741 | 141 | *rows = 0; |
742 | 194 | while (true) { |
743 | 194 | if (_current_row_group == nullptr) { |
744 | 139 | bool has_row_group = false; |
745 | 139 | RETURN_IF_ERROR( |
746 | 139 | open_next_row_group(file_context, file_schema, request, &has_row_group)); |
747 | 139 | if (!has_row_group) { |
748 | 40 | *eof = true; |
749 | 40 | return Status::OK(); |
750 | 40 | } |
751 | 139 | } |
752 | | |
753 | 154 | if (_current_range_idx >= _current_selected_ranges.size()) { |
754 | | // Current row group finished, try next row group. |
755 | 51 | reset_current_row_group(); |
756 | 51 | continue; |
757 | 51 | } |
758 | | |
759 | 103 | const RowRange& current_range = _current_selected_ranges[_current_range_idx]; |
760 | 103 | DORIS_CHECK(current_range.start >= 0); |
761 | 103 | DORIS_CHECK(current_range.length > 0); |
762 | 103 | DORIS_CHECK(current_range.start + current_range.length <= _current_row_group_rows); |
763 | | |
764 | 103 | if (_current_row_group_rows_read < current_range.start) { |
765 | | // Skip filtered rows according to row group level pruning. |
766 | 3 | RETURN_IF_ERROR(skip_current_row_group_rows(current_range.start - |
767 | 3 | _current_row_group_rows_read)); |
768 | 3 | } |
769 | 103 | DORIS_CHECK(_current_row_group_rows_read == current_range.start + _current_range_rows_read); |
770 | 103 | const int64_t remaining_rows = current_range.length - _current_range_rows_read; |
771 | 103 | if (remaining_rows <= 0) { |
772 | | // Current range finished, try next range in the same row group. |
773 | 0 | ++_current_range_idx; |
774 | 0 | _current_range_rows_read = 0; |
775 | 0 | continue; |
776 | 0 | } |
777 | | |
778 | 103 | const int64_t batch_rows = std::min<int64_t>(_batch_size, remaining_rows); |
779 | 103 | const int64_t physical_rows_read = batch_rows; |
780 | 103 | const int64_t batch_first_file_row = |
781 | 103 | _current_row_group_first_row + _current_row_group_rows_read; |
782 | 103 | RETURN_IF_ERROR(read_current_row_group_batch(file_context, file_schema, batch_rows, request, |
783 | 103 | batch_first_file_row, file_block, rows)); |
784 | 103 | _current_row_group_rows_read += physical_rows_read; |
785 | 103 | _current_range_rows_read += physical_rows_read; |
786 | 103 | if (_current_range_rows_read >= current_range.length) { |
787 | 99 | ++_current_range_idx; |
788 | 99 | _current_range_rows_read = 0; |
789 | 99 | } |
790 | 103 | if (*rows == 0) { |
791 | 2 | continue; |
792 | 2 | } |
793 | 101 | *eof = false; |
794 | 101 | return Status::OK(); |
795 | 103 | } |
796 | 141 | } |
797 | | |
798 | | } // namespace doris::format::parquet |