be/src/format_v2/parquet/parquet_scan.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/parquet/parquet_scan.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <limits> |
22 | | #include <memory> |
23 | | #include <utility> |
24 | | |
25 | | #include "common/exception.h" |
26 | | #include "common/status.h" |
27 | | #include "core/assert_cast.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/column/column_vector.h" |
30 | | #include "exprs/vexpr_context.h" |
31 | | #include "format_v2/parquet/parquet_column_schema.h" |
32 | | #include "format_v2/parquet/parquet_file_context.h" |
33 | | #include "format_v2/parquet/parquet_statistics.h" |
34 | | |
35 | | namespace doris::format::parquet { |
36 | | |
37 | | namespace { |
38 | | |
39 | 60 | int64_t column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { |
40 | 60 | return column_metadata.has_dictionary_page() |
41 | 60 | ? cast_set<int64_t>(column_metadata.dictionary_page_offset()) |
42 | 60 | : cast_set<int64_t>(column_metadata.data_page_offset()); |
43 | 60 | } |
44 | | |
45 | | // 判断 RG 是否在 scan_range 的 offset 范围之外。 |
46 | | // |
47 | | // 策略:取 RG 第一个和最后一个 column chunk 的起始 offset 的中点, |
48 | | // 如果中点不在 [range_start, range_end) 内则该 RG 不属于当前 split。 |
49 | | // 特殊处理:当 scan_range 覆盖整个文件(start=0, size>=file_size)时直接返回 false。 |
50 | | bool is_row_group_outside_range(const ::parquet::FileMetaData& metadata, |
51 | 194 | const ParquetScanRange& scan_range, int row_group_idx) { |
52 | | // size < 0 表示不限制范围(读整个文件) |
53 | 194 | if (scan_range.size < 0) { |
54 | 164 | return false; |
55 | 164 | } |
56 | 30 | const int64_t range_start_offset = scan_range.start_offset; |
57 | 30 | const int64_t range_end_offset = range_start_offset + scan_range.size; |
58 | 30 | DORIS_CHECK(range_start_offset >= 0); |
59 | 30 | DORIS_CHECK(range_end_offset >= range_start_offset); |
60 | | // 覆盖整个文件 → 不过滤 |
61 | 30 | if (range_start_offset == 0 && |
62 | 30 | (scan_range.file_size < 0 || range_end_offset >= scan_range.file_size)) { |
63 | 0 | return false; |
64 | 0 | } |
65 | | |
66 | 30 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
67 | 30 | DORIS_CHECK(row_group_metadata != nullptr); |
68 | 30 | DORIS_CHECK(row_group_metadata->num_columns() > 0); |
69 | 30 | const auto first_column = row_group_metadata->ColumnChunk(0); |
70 | 30 | const auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1); |
71 | 30 | DORIS_CHECK(first_column != nullptr); |
72 | 30 | DORIS_CHECK(last_column != nullptr); |
73 | | // RG 的 offset 范围 = [第一个 column chunk 起始, 最后一个 column chunk 结束) |
74 | 30 | const int64_t row_group_start_offset = column_start_offset(*first_column); |
75 | 30 | const int64_t row_group_end_offset = |
76 | 30 | column_start_offset(*last_column) + last_column->total_compressed_size(); |
77 | | // 用 RGB 的中点判断归属 — 中点在哪个 split 的范围就属于哪个 split |
78 | 30 | const int64_t row_group_mid_offset = |
79 | 30 | row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2; |
80 | 30 | return row_group_mid_offset < range_start_offset || row_group_mid_offset >= range_end_offset; |
81 | 30 | } |
82 | | |
83 | | } // namespace |
84 | | |
85 | | // 最外层裁剪入口:三级流水线(代价从低到高)→ 输出 RowGroupScanPlan。 |
86 | | // |
87 | | // 1. 计算 first_file_row + 过滤 scan_range 外的 RG — O(1) 算术(is_row_group_outside_range) |
88 | | // 2. select_row_groups_by_statistics() — RG 级裁剪 (min/max + dictionary + bloom filter), |
89 | | // 仅对 scan_range 内的 RG 执行,避免对范围外的 RG 做昂贵的 bloom filter/dictionary 读取 |
90 | | // 3. select_row_group_ranges_by_page_index() — Page 级细粒度裁剪 |
91 | | Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata, |
92 | | ::parquet::ParquetFileReader* file_reader, |
93 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
94 | | const format::FileScanRequest& request, |
95 | | const ParquetScanRange& scan_range, bool enable_bloom_filter, |
96 | 110 | RowGroupScanPlan* plan, const cctz::time_zone* timezone) { |
97 | 110 | DORIS_CHECK(plan != nullptr); |
98 | 110 | plan->row_groups.clear(); |
99 | 110 | plan->pruning_stats = ParquetPruningStats {}; |
100 | | |
101 | | // ① 计算 first_file_row + 过滤 scan_range(代价最低,先做) |
102 | 110 | std::vector<int64_t> row_group_first_rows(metadata.num_row_groups()); |
103 | 110 | std::vector<int> scan_range_selected_row_groups; |
104 | 110 | scan_range_selected_row_groups.reserve(metadata.num_row_groups()); |
105 | 110 | int64_t next_row_group_first_row = 0; |
106 | 304 | for (int row_group_idx = 0; row_group_idx < metadata.num_row_groups(); ++row_group_idx) { |
107 | 194 | row_group_first_rows[row_group_idx] = next_row_group_first_row; |
108 | 194 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
109 | 194 | DORIS_CHECK(row_group_metadata != nullptr); |
110 | 194 | const int64_t row_group_rows = row_group_metadata->num_rows(); |
111 | 194 | if (row_group_rows < 0) { |
112 | 0 | return Status::Corruption("Invalid negative row count in parquet row group {}", |
113 | 0 | row_group_idx); |
114 | 0 | } |
115 | 194 | next_row_group_first_row += row_group_rows; |
116 | 194 | if (!is_row_group_outside_range(metadata, scan_range, row_group_idx)) { |
117 | 173 | scan_range_selected_row_groups.push_back(row_group_idx); |
118 | 173 | } |
119 | 194 | } |
120 | | |
121 | | // ② RG 级裁剪:仅对 scan_range 内的 RG 执行 |
122 | 110 | std::vector<int> statistics_selected_row_groups; |
123 | 110 | RETURN_IF_ERROR(select_row_groups_by_statistics( |
124 | 110 | metadata, file_reader, file_schema, request, &scan_range_selected_row_groups, |
125 | 110 | &statistics_selected_row_groups, enable_bloom_filter, &plan->pruning_stats, timezone)); |
126 | | |
127 | 110 | plan->row_groups.reserve(statistics_selected_row_groups.size()); |
128 | 132 | for (const auto row_group_idx : statistics_selected_row_groups) { |
129 | 132 | auto row_group_metadata = metadata.RowGroup(row_group_idx); |
130 | 132 | DORIS_CHECK(row_group_metadata != nullptr); |
131 | 132 | const int64_t row_group_rows = row_group_metadata->num_rows(); |
132 | 132 | if (row_group_rows == 0) { |
133 | 0 | continue; |
134 | 0 | } |
135 | | |
136 | 132 | RowGroupReadPlan row_group_plan; |
137 | 132 | row_group_plan.row_group_id = row_group_idx; |
138 | 132 | row_group_plan.first_file_row = row_group_first_rows[row_group_idx]; |
139 | 132 | row_group_plan.row_group_rows = row_group_rows; |
140 | 132 | RETURN_IF_ERROR(select_row_group_ranges_by_page_index( |
141 | 132 | file_reader, file_schema, request, row_group_idx, row_group_rows, |
142 | 132 | &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans, |
143 | 132 | &plan->pruning_stats, timezone)); |
144 | 132 | if (row_group_plan.selected_ranges.empty()) { |
145 | 1 | continue; |
146 | 1 | } |
147 | 131 | plan->pruning_stats.selected_row_ranges += row_group_plan.selected_ranges.size(); |
148 | 131 | plan->row_groups.push_back(std::move(row_group_plan)); |
149 | 131 | } |
150 | 110 | plan->pruning_stats.selected_row_groups = plan->row_groups.size(); |
151 | 110 | return Status::OK(); |
152 | 110 | } |
153 | | |
154 | | namespace { |
155 | | |
156 | | uint16_t apply_filter_to_selection(const IColumn::Filter& filter, SelectionVector* selection, |
157 | 42 | uint16_t selected_rows) { |
158 | 42 | uint16_t new_selected_rows = 0; |
159 | 6.36k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
160 | 6.32k | const auto row_idx = selection->get_index(selection_idx); |
161 | 6.32k | if (filter[row_idx] != 0) { |
162 | 4.23k | selection->set_index(new_selected_rows++, static_cast<SelectionVector::Index>(row_idx)); |
163 | 4.23k | } |
164 | 6.32k | } |
165 | 42 | return new_selected_rows; |
166 | 42 | } |
167 | | |
168 | | Status execute_filter_conjuncts(const format::FileScanRequest& request, int64_t batch_rows, |
169 | | Block* file_block, SelectionVector* selection, |
170 | 43 | uint16_t* selected_rows) { |
171 | 43 | for (const auto& conjunct : request.conjuncts) { |
172 | 31 | if (*selected_rows == 0) { |
173 | 0 | break; |
174 | 0 | } |
175 | 31 | DORIS_CHECK(conjunct != nullptr); |
176 | 31 | IColumn::Filter filter(static_cast<size_t>(batch_rows), 1); |
177 | 31 | bool can_filter_all = false; |
178 | 31 | RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(), |
179 | 31 | static_cast<size_t>(batch_rows), false, |
180 | 31 | &can_filter_all)); |
181 | 31 | *selected_rows = |
182 | 31 | can_filter_all ? 0 : apply_filter_to_selection(filter, selection, *selected_rows); |
183 | 31 | } |
184 | 43 | return Status::OK(); |
185 | 43 | } |
186 | | |
187 | | Status execute_delete_conjuncts(const format::FileScanRequest& request, int64_t batch_rows, |
188 | | Block* file_block, SelectionVector* selection, |
189 | 41 | uint16_t* selected_rows) { |
190 | 41 | for (const auto& delete_conjunct : request.delete_conjuncts) { |
191 | 13 | if (*selected_rows == 0) { |
192 | 0 | break; |
193 | 0 | } |
194 | 13 | DORIS_CHECK(delete_conjunct != nullptr); |
195 | 13 | int result_column_id = -1; |
196 | 13 | RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block, |
197 | 13 | &result_column_id)); |
198 | 13 | DORIS_CHECK(result_column_id >= 0 && |
199 | 13 | result_column_id < static_cast<int>(file_block->columns())); |
200 | 13 | const auto& delete_filter = assert_cast<const ColumnUInt8&>( |
201 | 13 | *file_block->get_by_position(result_column_id).column) |
202 | 13 | .get_data(); |
203 | 13 | DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows)); |
204 | 13 | IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1); |
205 | 13 | bool has_kept_row = false; |
206 | 64 | for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) { |
207 | 51 | keep_filter[row] = !delete_filter[row]; |
208 | 51 | has_kept_row |= keep_filter[row] != 0; |
209 | 51 | } |
210 | 13 | file_block->erase(result_column_id); |
211 | 13 | *selected_rows = |
212 | 13 | !has_kept_row ? 0 |
213 | 13 | : apply_filter_to_selection(keep_filter, selection, *selected_rows); |
214 | 13 | } |
215 | 41 | return Status::OK(); |
216 | 41 | } |
217 | | |
218 | | } // namespace |
219 | | |
220 | | IColumn::Filter selection_to_filter(const SelectionVector& selection, uint16_t selected_rows, |
221 | 29 | int64_t batch_rows) { |
222 | 29 | IColumn::Filter filter(static_cast<size_t>(batch_rows), 0); |
223 | 2.13k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
224 | 2.10k | filter[selection.get_index(selection_idx)] = 1; |
225 | 2.10k | } |
226 | 29 | return filter; |
227 | 29 | } |
228 | | |
229 | | Status execute_batch_filters(const format::FileScanRequest& request, int64_t batch_rows, |
230 | | Block* file_block, SelectionVector* selection, uint16_t* selected_rows, |
231 | 85 | int64_t* conjunct_filtered_rows) { |
232 | 85 | if (request.conjuncts.empty() && request.delete_conjuncts.empty()) { |
233 | 42 | return Status::OK(); |
234 | 42 | } |
235 | 43 | const auto selected_rows_before_conjunct = *selected_rows; |
236 | 43 | RETURN_IF_ERROR( |
237 | 43 | execute_filter_conjuncts(request, batch_rows, file_block, selection, selected_rows)); |
238 | 43 | if (conjunct_filtered_rows != nullptr) { |
239 | 43 | *conjunct_filtered_rows += static_cast<int64_t>(selected_rows_before_conjunct) - |
240 | 43 | static_cast<int64_t>(*selected_rows); |
241 | 43 | } |
242 | 43 | if (*selected_rows == 0) { |
243 | 2 | return Status::OK(); |
244 | 2 | } |
245 | 41 | return execute_delete_conjuncts(request, batch_rows, file_block, selection, selected_rows); |
246 | 43 | } |
247 | | |
248 | | namespace { |
249 | | // TODO: batch size in SessionVariable |
250 | | constexpr int64_t DEFAULT_PARQUET_READ_BATCH_SIZE = 4096; |
251 | | |
252 | 2 | int64_t count_range_rows(const std::vector<RowRange>& ranges) { |
253 | 2 | int64_t rows = 0; |
254 | 2 | for (const auto& range : ranges) { |
255 | 2 | rows += range.length; |
256 | 2 | } |
257 | 2 | return rows; |
258 | 2 | } |
259 | | |
260 | | void append_intersection(const RowRange& left, const RowRange& right, |
261 | 1 | std::vector<RowRange>* result) { |
262 | 1 | const int64_t start = std::max(left.start, right.start); |
263 | 1 | const int64_t end = std::min(left.start + left.length, right.start + right.length); |
264 | 1 | if (start < end) { |
265 | 1 | result->push_back(RowRange {.start = start, .length = end - start}); |
266 | 1 | } |
267 | 1 | } |
268 | | |
269 | | std::vector<RowRange> filter_ranges_by_condition_cache(const std::vector<RowRange>& ranges, |
270 | | const std::vector<bool>& cache, |
271 | | int64_t row_group_first_row, |
272 | 1 | int64_t base_granule) { |
273 | 1 | std::vector<RowRange> result; |
274 | 1 | if (cache.empty()) { |
275 | 0 | return ranges; |
276 | 0 | } |
277 | | |
278 | | // Cache coordinates are file-global granules; RowRange coordinates are row-group-relative. |
279 | | // Walk every selected range in order and split it by granule. Granules covered by the bitmap |
280 | | // are kept only when the bit is true. Granules outside the bitmap are kept conservatively, so |
281 | | // an undersized or old-format cache entry cannot skip valid rows. |
282 | 1 | for (const auto& range : ranges) { |
283 | 1 | const int64_t global_start = row_group_first_row + range.start; |
284 | 1 | const int64_t global_end = global_start + range.length; |
285 | 1 | for (int64_t granule = global_start / ConditionCacheContext::GRANULE_SIZE; |
286 | 3 | granule <= (global_end - 1) / ConditionCacheContext::GRANULE_SIZE; ++granule) { |
287 | 2 | const int64_t cache_idx = granule - base_granule; |
288 | 2 | const bool keep = cache_idx < 0 || static_cast<size_t>(cache_idx) >= cache.size() || |
289 | 2 | cache[static_cast<size_t>(cache_idx)]; |
290 | 2 | if (!keep) { |
291 | 1 | continue; |
292 | 1 | } |
293 | 1 | const int64_t granule_start = granule * ConditionCacheContext::GRANULE_SIZE; |
294 | 1 | const int64_t granule_end = granule_start + ConditionCacheContext::GRANULE_SIZE; |
295 | 1 | const RowRange file_granule_range {.start = granule_start - row_group_first_row, |
296 | 1 | .length = granule_end - granule_start}; |
297 | 1 | append_intersection(range, file_granule_range, &result); |
298 | 1 | } |
299 | 1 | } |
300 | 1 | return result; |
301 | 1 | } |
302 | | |
303 | | } // namespace |
304 | | |
305 | 98 | void ParquetScanScheduler::set_plan(RowGroupScanPlan plan) { |
306 | 98 | _row_group_plans = std::move(plan.row_groups); |
307 | 98 | _condition_cache_filtered_rows = 0; |
308 | 98 | _predicate_filtered_rows = 0; |
309 | 98 | reset(); |
310 | 98 | } |
311 | | |
312 | 2 | void ParquetScanScheduler::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) { |
313 | 2 | _condition_cache_ctx = std::move(ctx); |
314 | 2 | if (!_condition_cache_ctx || !_condition_cache_ctx->filter_result || _row_group_plans.empty()) { |
315 | 0 | return; |
316 | 0 | } |
317 | | |
318 | 2 | _condition_cache_ctx->base_granule = |
319 | 2 | _row_group_plans.front().first_file_row / ConditionCacheContext::GRANULE_SIZE; |
320 | 2 | if (!_condition_cache_ctx->is_hit) { |
321 | 1 | return; |
322 | 1 | } |
323 | | |
324 | 1 | std::vector<RowGroupReadPlan> filtered_plans; |
325 | 1 | filtered_plans.reserve(_row_group_plans.size()); |
326 | 1 | for (auto& plan : _row_group_plans) { |
327 | 1 | const int64_t old_rows = count_range_rows(plan.selected_ranges); |
328 | 1 | plan.selected_ranges = filter_ranges_by_condition_cache( |
329 | 1 | plan.selected_ranges, *_condition_cache_ctx->filter_result, plan.first_file_row, |
330 | 1 | _condition_cache_ctx->base_granule); |
331 | 1 | const int64_t new_rows = count_range_rows(plan.selected_ranges); |
332 | 1 | _condition_cache_filtered_rows += old_rows - new_rows; |
333 | 1 | if (!plan.selected_ranges.empty()) { |
334 | 1 | filtered_plans.push_back(std::move(plan)); |
335 | 1 | } |
336 | 1 | } |
337 | 1 | _row_group_plans = std::move(filtered_plans); |
338 | 1 | reset(); |
339 | 1 | } |
340 | | |
341 | 99 | void ParquetScanScheduler::reset() { |
342 | 99 | _next_row_group_plan_idx = 0; |
343 | 99 | reset_current_row_group(); |
344 | 99 | } |
345 | | |
346 | 147 | void ParquetScanScheduler::reset_current_row_group() { |
347 | 147 | _current_row_group.reset(); |
348 | 147 | _current_predicate_columns.clear(); |
349 | 147 | _current_non_predicate_columns.clear(); |
350 | 147 | _current_row_group_rows = 0; |
351 | 147 | _current_row_group_rows_read = 0; |
352 | 147 | _current_row_group_first_row = 0; |
353 | 147 | _current_selected_ranges.clear(); |
354 | 147 | _current_range_idx = 0; |
355 | 147 | _current_range_rows_read = 0; |
356 | 147 | } |
357 | | |
358 | | Status ParquetScanScheduler::open_next_row_group( |
359 | | ParquetFileContext& file_context, |
360 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
361 | 133 | const format::FileScanRequest& request, bool* has_row_group) { |
362 | 133 | *has_row_group = false; |
363 | 133 | if (_next_row_group_plan_idx >= _row_group_plans.size()) { |
364 | 37 | return Status::OK(); |
365 | 37 | } |
366 | 96 | const RowGroupReadPlan& row_group_plan = _row_group_plans[_next_row_group_plan_idx++]; |
367 | 96 | const int row_group_idx = row_group_plan.row_group_id; |
368 | 96 | try { |
369 | 96 | _current_row_group = file_context.file_reader->RowGroup(row_group_idx); |
370 | 96 | } catch (const ::parquet::ParquetException& e) { |
371 | 0 | return Status::Corruption("Failed to open parquet row group {}: {}", row_group_idx, |
372 | 0 | e.what()); |
373 | 0 | } catch (const std::exception& e) { |
374 | 0 | return Status::InternalError("Failed to open parquet row group {}: {}", row_group_idx, |
375 | 0 | e.what()); |
376 | 0 | } |
377 | | |
378 | 96 | auto row_group_metadata = file_context.metadata->RowGroup(row_group_idx); |
379 | 96 | DORIS_CHECK(row_group_metadata != nullptr); |
380 | 96 | _current_row_group_rows = row_group_metadata->num_rows(); |
381 | 96 | DORIS_CHECK(_current_row_group_rows == row_group_plan.row_group_rows); |
382 | 96 | DORIS_CHECK(_current_row_group_rows > 0); |
383 | 96 | DORIS_CHECK(!row_group_plan.selected_ranges.empty()); |
384 | 96 | _current_row_group_first_row = row_group_plan.first_file_row; |
385 | 96 | _current_row_group_rows_read = 0; |
386 | 96 | _current_selected_ranges = row_group_plan.selected_ranges; |
387 | 96 | _current_range_idx = 0; |
388 | 96 | _current_range_rows_read = 0; |
389 | 96 | _current_predicate_columns.clear(); |
390 | 96 | _current_non_predicate_columns.clear(); |
391 | | |
392 | 96 | ParquetColumnReaderFactory column_reader_factory( |
393 | 96 | _current_row_group, file_context.schema->num_columns(), &row_group_plan.page_skip_plans, |
394 | 96 | _page_skip_profile, _timezone, _enable_strict_mode, |
395 | 96 | _scan_profile.column_reader_profile); |
396 | 96 | for (const auto& col : request.predicate_columns) { |
397 | 47 | const auto local_id = col.local_id(); |
398 | 47 | if (local_id == format::ROW_POSITION_COLUMN_ID) { |
399 | 11 | _current_predicate_columns[local_id] = |
400 | 11 | column_reader_factory.create_row_position_column_reader( |
401 | 11 | _current_row_group_first_row); |
402 | 11 | continue; |
403 | 11 | } |
404 | 36 | if (local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
405 | 0 | DORIS_CHECK(_global_rowid_context.has_value()); |
406 | 0 | _current_predicate_columns[local_id] = |
407 | 0 | column_reader_factory.create_global_rowid_column_reader( |
408 | 0 | *_global_rowid_context, _current_row_group_first_row); |
409 | 0 | continue; |
410 | 0 | } |
411 | | |
412 | 36 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
413 | 36 | const auto& column_schema = file_schema[local_id]; |
414 | 36 | DORIS_CHECK(column_schema != nullptr); |
415 | 36 | std::unique_ptr<ParquetColumnReader> column_reader; |
416 | 36 | RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader)); |
417 | 36 | _current_predicate_columns[local_id] = std::move(column_reader); |
418 | 36 | } |
419 | 111 | for (const auto& col : request.non_predicate_columns) { |
420 | 111 | const auto local_id = col.local_id(); |
421 | 111 | if (local_id == format::ROW_POSITION_COLUMN_ID) { |
422 | 14 | _current_non_predicate_columns[local_id] = |
423 | 14 | column_reader_factory.create_row_position_column_reader( |
424 | 14 | _current_row_group_first_row); |
425 | 14 | continue; |
426 | 14 | } |
427 | 97 | if (local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
428 | 2 | DORIS_CHECK(_global_rowid_context.has_value()); |
429 | 2 | _current_non_predicate_columns[local_id] = |
430 | 2 | column_reader_factory.create_global_rowid_column_reader( |
431 | 2 | *_global_rowid_context, _current_row_group_first_row); |
432 | 2 | continue; |
433 | 2 | } |
434 | 95 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
435 | 95 | const auto& column_schema = file_schema[local_id]; |
436 | 95 | DORIS_CHECK(column_schema != nullptr); |
437 | 95 | std::unique_ptr<ParquetColumnReader> column_reader; |
438 | 95 | RETURN_IF_ERROR(column_reader_factory.create(*column_schema, &col, &column_reader)); |
439 | 95 | _current_non_predicate_columns[local_id] = std::move(column_reader); |
440 | 95 | } |
441 | 96 | *has_row_group = true; |
442 | 96 | return Status::OK(); |
443 | 96 | } |
444 | | |
445 | 3 | Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) { |
446 | 3 | DORIS_CHECK(rows >= 0); |
447 | 3 | if (rows == 0) { |
448 | 0 | return Status::OK(); |
449 | 0 | } |
450 | 3 | if (_scan_profile.range_gap_skipped_rows != nullptr) { |
451 | 2 | COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows); |
452 | 2 | } |
453 | 3 | for (const auto& column_reader : _current_predicate_columns | std::views::values) { |
454 | 2 | RETURN_IF_ERROR(column_reader->skip(rows)); |
455 | 2 | } |
456 | 3 | for (const auto& column_reader : _current_non_predicate_columns | std::views::values) { |
457 | 2 | RETURN_IF_ERROR(column_reader->skip(rows)); |
458 | 2 | } |
459 | 3 | _current_row_group_rows_read += rows; |
460 | 3 | return Status::OK(); |
461 | 3 | } |
462 | | |
463 | | Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows, |
464 | | const format::FileScanRequest& request, |
465 | | Block* file_block, SelectionVector* selection, |
466 | | uint16_t* selected_rows, |
467 | 85 | int64_t* conjunct_filtered_rows) { |
468 | 85 | if (!request.conjuncts.empty() || !request.delete_conjuncts.empty()) { |
469 | 43 | selection->resize(static_cast<size_t>(batch_rows)); |
470 | 43 | } |
471 | 85 | for (const auto& [fid, column_reader] : _current_predicate_columns) { |
472 | 47 | auto position_it = request.local_positions.find(format::LocalColumnId(fid)); |
473 | 47 | DORIS_CHECK(position_it != request.local_positions.end()); |
474 | 47 | const auto block_position = position_it->second.value(); |
475 | 47 | DCHECK(remove_nullable(column_reader->type()) |
476 | 0 | ->equals(*remove_nullable(file_block->get_by_position(block_position).type))) |
477 | 0 | << column_reader->type()->get_name() << " " |
478 | 0 | << file_block->get_by_position(block_position).type->get_name() << " " |
479 | 0 | << column_reader->name() << " " << file_block->get_by_position(block_position).name; |
480 | 47 | auto column = file_block->get_by_position(block_position).column->assert_mutable(); |
481 | 47 | int64_t column_rows = 0; |
482 | 47 | { |
483 | 47 | SCOPED_TIMER(_scan_profile.column_read_time); |
484 | 47 | RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); |
485 | 47 | } |
486 | 47 | if (column_rows != batch_rows) { |
487 | 0 | return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows", |
488 | 0 | column_reader->name(), column_rows, batch_rows); |
489 | 0 | } |
490 | 47 | file_block->replace_by_position(block_position, std::move(column)); |
491 | 47 | } |
492 | 85 | if (_scan_profile.predicate_filter_time == nullptr) { |
493 | 62 | return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows, |
494 | 62 | conjunct_filtered_rows); |
495 | 62 | } |
496 | 23 | SCOPED_TIMER(_scan_profile.predicate_filter_time); |
497 | 23 | return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows, |
498 | 23 | conjunct_filtered_rows); |
499 | 85 | } |
500 | | |
501 | | Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows, |
502 | | const format::FileScanRequest& request, |
503 | | int64_t batch_first_file_row, |
504 | 96 | Block* file_block, size_t* rows) { |
505 | 96 | if (_scan_profile.total_batches != nullptr) { |
506 | 23 | COUNTER_UPDATE(_scan_profile.total_batches, 1); |
507 | 23 | } |
508 | 96 | if (_scan_profile.raw_rows_read != nullptr) { |
509 | 23 | COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows); |
510 | 23 | } |
511 | 96 | if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) { |
512 | 11 | *rows = static_cast<size_t>(batch_rows); |
513 | 11 | if (_scan_profile.selected_rows != nullptr) { |
514 | 0 | COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows); |
515 | 0 | } |
516 | 11 | return Status::OK(); |
517 | 11 | } |
518 | 85 | SelectionVector selection; |
519 | 85 | DORIS_CHECK(batch_rows <= std::numeric_limits<uint16_t>::max()); |
520 | 85 | uint16_t selected_rows = static_cast<uint16_t>(batch_rows); |
521 | 85 | int64_t conjunct_filtered_rows = 0; |
522 | 85 | RETURN_IF_ERROR(read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows, |
523 | 85 | &conjunct_filtered_rows)); |
524 | 85 | _predicate_filtered_rows += conjunct_filtered_rows; |
525 | 85 | mark_condition_cache_granules(selection, selected_rows, batch_first_file_row); |
526 | | |
527 | 85 | const bool need_filter_output = selected_rows != batch_rows; |
528 | 85 | if (_scan_profile.selected_rows != nullptr) { |
529 | 23 | COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows); |
530 | 23 | } |
531 | 85 | if (_scan_profile.rows_filtered_by_conjunct != nullptr) { |
532 | 23 | COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, conjunct_filtered_rows); |
533 | 23 | } |
534 | 85 | if (!_current_non_predicate_columns.empty() && |
535 | 85 | _scan_profile.lazy_read_filtered_rows != nullptr) { |
536 | 21 | COUNTER_UPDATE(_scan_profile.lazy_read_filtered_rows, batch_rows - selected_rows); |
537 | 21 | } |
538 | 85 | if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) { |
539 | 1 | COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1); |
540 | 1 | } |
541 | 85 | if (need_filter_output) { |
542 | 29 | IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows); |
543 | 32 | for (const auto& col : request.predicate_columns) { |
544 | 32 | auto position_it = request.local_positions.find(col.column_id()); |
545 | 32 | DORIS_CHECK(position_it != request.local_positions.end()); |
546 | 32 | const auto block_position = position_it->second.value(); |
547 | 32 | RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position( |
548 | 32 | block_position, file_block->get_by_position(block_position) |
549 | 32 | .column->filter(output_filter, selected_rows))); |
550 | 32 | } |
551 | 29 | } |
552 | | |
553 | 85 | { |
554 | 85 | SCOPED_TIMER(_scan_profile.column_read_time); |
555 | 111 | for (const auto& [fid, column_reader] : _current_non_predicate_columns) { |
556 | 111 | auto position_it = request.local_positions.find(format::LocalColumnId(fid)); |
557 | 111 | DORIS_CHECK(position_it != request.local_positions.end()); |
558 | 111 | const auto block_position = position_it->second.value(); |
559 | 111 | auto column = file_block->get_by_position(block_position).column->assert_mutable(); |
560 | 111 | DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(), |
561 | 0 | column_reader->type()->get_primitive_type()) |
562 | 0 | << type_to_string(file_block->get_by_position(block_position) |
563 | 0 | .type->get_primitive_type()) |
564 | 0 | << " " << type_to_string(column_reader->type()->get_primitive_type()) << " " |
565 | 0 | << column_reader->name() << " " << fid << " " << block_position; |
566 | 111 | if (need_filter_output) { |
567 | 24 | [[maybe_unused]] auto old_size = column->size(); |
568 | 24 | RETURN_IF_ERROR( |
569 | 24 | column_reader->select(selection, selected_rows, batch_rows, column)); |
570 | 24 | if (column->size() != old_size + selected_rows) { |
571 | 0 | return Status::Corruption( |
572 | 0 | "Parquet selected output column {} returned {} rows, expected {} rows", |
573 | 0 | column_reader->name(), column->size(), old_size + selected_rows); |
574 | 0 | } |
575 | 87 | } else { |
576 | 87 | int64_t column_rows = 0; |
577 | 87 | RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); |
578 | 87 | if (column_rows != batch_rows) { |
579 | 0 | return Status::Corruption( |
580 | 0 | "Parquet output column {} returned {} rows, expected {} rows", |
581 | 0 | column_reader->name(), column_rows, batch_rows); |
582 | 0 | } |
583 | 87 | } |
584 | 111 | file_block->replace_by_position(block_position, std::move(column)); |
585 | 111 | } |
586 | 85 | } |
587 | 85 | *rows = static_cast<size_t>(selected_rows); |
588 | 85 | return Status::OK(); |
589 | 85 | } |
590 | | |
591 | | void ParquetScanScheduler::mark_condition_cache_granules(const SelectionVector& selection, |
592 | | uint16_t selected_rows, |
593 | 85 | int64_t batch_first_file_row) { |
594 | 85 | if (!_condition_cache_ctx || _condition_cache_ctx->is_hit || |
595 | 85 | !_condition_cache_ctx->filter_result) { |
596 | 84 | return; |
597 | 84 | } |
598 | 1 | auto& cache = *_condition_cache_ctx->filter_result; |
599 | 2.04k | for (uint16_t selection_idx = 0; selection_idx < selected_rows; ++selection_idx) { |
600 | 2.04k | const int64_t file_row = batch_first_file_row + selection.get_index(selection_idx); |
601 | 2.04k | const int64_t granule = file_row / ConditionCacheContext::GRANULE_SIZE; |
602 | 2.04k | const int64_t cache_idx = granule - _condition_cache_ctx->base_granule; |
603 | 2.04k | if (cache_idx >= 0 && static_cast<size_t>(cache_idx) < cache.size()) { |
604 | 2.04k | cache[static_cast<size_t>(cache_idx)] = true; |
605 | 2.04k | } |
606 | 2.04k | } |
607 | 1 | } |
608 | | |
609 | | Status ParquetScanScheduler::read_next_batch( |
610 | | ParquetFileContext& file_context, |
611 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
612 | 131 | const format::FileScanRequest& request, Block* file_block, size_t* rows, bool* eof) { |
613 | 131 | *rows = 0; |
614 | 181 | while (true) { |
615 | 181 | if (_current_row_group == nullptr) { |
616 | 133 | bool has_row_group = false; |
617 | 133 | RETURN_IF_ERROR( |
618 | 133 | open_next_row_group(file_context, file_schema, request, &has_row_group)); |
619 | 133 | if (!has_row_group) { |
620 | 37 | *eof = true; |
621 | 37 | return Status::OK(); |
622 | 37 | } |
623 | 133 | } |
624 | | |
625 | 144 | if (_current_range_idx >= _current_selected_ranges.size()) { |
626 | | // Current row group finished, try next row group. |
627 | 48 | reset_current_row_group(); |
628 | 48 | continue; |
629 | 48 | } |
630 | | |
631 | 96 | const RowRange& current_range = _current_selected_ranges[_current_range_idx]; |
632 | 96 | DORIS_CHECK(current_range.start >= 0); |
633 | 96 | DORIS_CHECK(current_range.length > 0); |
634 | 96 | DORIS_CHECK(current_range.start + current_range.length <= _current_row_group_rows); |
635 | | |
636 | 96 | if (_current_row_group_rows_read < current_range.start) { |
637 | | // Skip filtered rows according to row group level pruning. |
638 | 3 | RETURN_IF_ERROR(skip_current_row_group_rows(current_range.start - |
639 | 3 | _current_row_group_rows_read)); |
640 | 3 | } |
641 | 96 | DORIS_CHECK(_current_row_group_rows_read == current_range.start + _current_range_rows_read); |
642 | 96 | const int64_t remaining_rows = current_range.length - _current_range_rows_read; |
643 | 96 | if (remaining_rows <= 0) { |
644 | | // Current range finished, try next range in the same row group. |
645 | 0 | ++_current_range_idx; |
646 | 0 | _current_range_rows_read = 0; |
647 | 0 | continue; |
648 | 0 | } |
649 | | |
650 | 96 | const int64_t batch_rows = |
651 | 96 | std::min<int64_t>(DEFAULT_PARQUET_READ_BATCH_SIZE, remaining_rows); |
652 | 96 | const int64_t physical_rows_read = batch_rows; |
653 | 96 | const int64_t batch_first_file_row = |
654 | 96 | _current_row_group_first_row + _current_row_group_rows_read; |
655 | 96 | RETURN_IF_ERROR(read_current_row_group_batch(batch_rows, request, batch_first_file_row, |
656 | 96 | file_block, rows)); |
657 | 96 | _current_row_group_rows_read += physical_rows_read; |
658 | 96 | _current_range_rows_read += physical_rows_read; |
659 | 96 | if (_current_range_rows_read >= current_range.length) { |
660 | 96 | ++_current_range_idx; |
661 | 96 | _current_range_rows_read = 0; |
662 | 96 | } |
663 | 96 | if (*rows == 0) { |
664 | 2 | continue; |
665 | 2 | } |
666 | 94 | *eof = false; |
667 | 94 | return Status::OK(); |
668 | 96 | } |
669 | 131 | } |
670 | | |
671 | | } // namespace doris::format::parquet |