Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table_reader.h" |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | |
23 | | #include <cstring> |
24 | | #include <set> |
25 | | #include <sstream> |
26 | | #include <stdexcept> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/cast_set.h" |
30 | | #include "common/status.h" |
31 | | #include "core/assert_cast.h" |
32 | | #include "exec/common/endian.h" |
33 | | #include "exprs/vexpr_context.h" |
34 | | #include "exprs/vslot_ref.h" |
35 | | #include "format_v2/parquet/parquet_reader.h" |
36 | | #include "format_v2/column_mapper.h" |
37 | | #include "format/table/deletion_vector_reader.h" |
38 | | #include "io/io_common.h" |
39 | | #include "roaring/roaring64map.hh" |
40 | | |
41 | | namespace doris::format { |
42 | | namespace { |
43 | | |
44 | | template <typename T, typename Formatter> |
45 | 0 | std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) { |
46 | 0 | std::ostringstream out; |
47 | 0 | out << "["; |
48 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
49 | 0 | if (i > 0) { |
50 | 0 | out << ", "; |
51 | 0 | } |
52 | 0 | out << formatter(values[i]); |
53 | 0 | } |
54 | 0 | out << "]"; |
55 | 0 | return out.str(); |
56 | 0 | } Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ |
57 | | |
58 | 0 | std::string file_format_to_string(FileFormat format) { |
59 | 0 | switch (format) { |
60 | 0 | case FileFormat::PARQUET: |
61 | 0 | return "PARQUET"; |
62 | 0 | case FileFormat::ORC: |
63 | 0 | return "ORC"; |
64 | 0 | case FileFormat::CSV: |
65 | 0 | return "CSV"; |
66 | 0 | } |
67 | 0 | return "UNKNOWN"; |
68 | 0 | } |
69 | | |
70 | 0 | std::string push_down_agg_to_string(TPushAggOp::type op) { |
71 | 0 | switch (op) { |
72 | 0 | case TPushAggOp::NONE: |
73 | 0 | return "NONE"; |
74 | 0 | case TPushAggOp::COUNT: |
75 | 0 | return "COUNT"; |
76 | 0 | case TPushAggOp::MINMAX: |
77 | 0 | return "MINMAX"; |
78 | 0 | case TPushAggOp::MIX: |
79 | 0 | return "MIX"; |
80 | 0 | case TPushAggOp::COUNT_ON_INDEX: |
81 | 0 | return "COUNT_ON_INDEX"; |
82 | 0 | } |
83 | 0 | return "UNKNOWN"; |
84 | 0 | } |
85 | | |
86 | 0 | std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) { |
87 | 0 | if (task == nullptr || task->data_file == nullptr) { |
88 | 0 | return "null"; |
89 | 0 | } |
90 | 0 | const auto& file = *task->data_file; |
91 | 0 | std::ostringstream out; |
92 | 0 | out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size |
93 | 0 | << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size |
94 | 0 | << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name |
95 | 0 | << ", file_cache_admission=" << file.file_cache_admission << "}"; |
96 | 0 | return out.str(); |
97 | 0 | } |
98 | | |
99 | 0 | std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) { |
100 | 0 | std::ostringstream out; |
101 | 0 | out << "{"; |
102 | 0 | size_t idx = 0; |
103 | 0 | for (const auto& [key, _] : partition_values) { |
104 | 0 | if (idx++ > 0) { |
105 | 0 | out << ", "; |
106 | 0 | } |
107 | 0 | out << key; |
108 | 0 | } |
109 | 0 | out << "}"; |
110 | 0 | return out.str(); |
111 | 0 | } |
112 | | |
113 | 0 | std::string expr_context_debug_string(const VExprContextSPtr& context) { |
114 | 0 | if (context == nullptr) { |
115 | 0 | return "null"; |
116 | 0 | } |
117 | 0 | const auto root = context->root(); |
118 | 0 | if (root == nullptr) { |
119 | 0 | return "VExprContext{root=null}"; |
120 | 0 | } |
121 | 0 | std::ostringstream out; |
122 | 0 | out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string() |
123 | 0 | << "}"; |
124 | 0 | return out.str(); |
125 | 0 | } |
126 | | |
127 | 0 | std::string table_filter_debug_string(const TableFilter& filter) { |
128 | 0 | std::ostringstream out; |
129 | 0 | out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct) |
130 | 0 | << ", global_indices=" |
131 | 0 | << join_table_reader_debug_strings( |
132 | 0 | filter.global_indices, |
133 | 0 | [](GlobalIndex global_index) { return std::to_string(global_index.value()); }) |
134 | 0 | << "}"; |
135 | 0 | return out.str(); |
136 | 0 | } |
137 | | |
138 | 0 | std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) { |
139 | 0 | std::ostringstream out; |
140 | 0 | out << "{"; |
141 | 0 | size_t idx = 0; |
142 | 0 | for (const auto& [global_index, column_predicates] : predicates) { |
143 | 0 | if (idx++ > 0) { |
144 | 0 | out << ", "; |
145 | 0 | } |
146 | 0 | out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}"; |
147 | 0 | } |
148 | 0 | out << "}"; |
149 | 0 | return out.str(); |
150 | 0 | } |
151 | | |
152 | 0 | void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) { |
153 | 0 | if (expr == nullptr) { |
154 | 0 | return; |
155 | 0 | } |
156 | 0 | if (expr->is_slot_ref()) { |
157 | 0 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get()); |
158 | 0 | DORIS_CHECK(slot_ref->column_id() >= 0); |
159 | 0 | global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id()))); |
160 | 0 | } |
161 | 0 | for (const auto& child : expr->children()) { |
162 | 0 | collect_global_indices(child, global_indices); |
163 | 0 | } |
164 | 0 | } |
165 | | |
166 | | Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state, |
167 | 0 | std::vector<TableFilter>* table_filters) { |
168 | 0 | if (conjunct == nullptr) { |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | 0 | std::set<GlobalIndex> global_indices; |
172 | 0 | collect_global_indices(conjunct->root(), &global_indices); |
173 | 0 | if (!global_indices.empty()) { |
174 | 0 | TableFilter table_filter; |
175 | 0 | VExprSPtr filter_root; |
176 | 0 | RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root)); |
177 | 0 | table_filter.conjunct = VExprContext::create_shared(std::move(filter_root)); |
178 | 0 | for (const auto global_index : global_indices) { |
179 | 0 | table_filter.global_indices.push_back(global_index); |
180 | 0 | } |
181 | 0 | table_filters->push_back(std::move(table_filter)); |
182 | 0 | } |
183 | 0 | return Status::OK(); |
184 | 0 | } |
185 | | |
186 | | Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format, |
187 | 0 | DeleteRows* delete_rows) { |
188 | 0 | DORIS_CHECK(buf != nullptr); |
189 | 0 | DORIS_CHECK(delete_rows != nullptr); |
190 | 0 | DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON || |
191 | 0 | format == DeleteFileDesc::Format::ICEBERG); |
192 | |
|
193 | 0 | const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0; |
194 | 0 | if (buffer_size < 8 + checksum_size) [[unlikely]] { |
195 | 0 | return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size); |
196 | 0 | } |
197 | | |
198 | 0 | auto total_length = BigEndian::Load32(buf); |
199 | 0 | if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] { |
200 | 0 | return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}", |
201 | 0 | total_length + 4 + checksum_size, buffer_size); |
202 | 0 | } |
203 | | |
204 | 0 | constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
205 | 0 | if (memcmp(buf + sizeof(total_length), MAGIC_NUMBER, 4) != 0) [[unlikely]] { |
206 | 0 | return Status::DataQualityError("Deletion vector magic number mismatch"); |
207 | 0 | } |
208 | | |
209 | 0 | const char* bitmap_buf = buf + 8; |
210 | 0 | const size_t bitmap_size = buffer_size - 8 - checksum_size; |
211 | 0 | if (format == DeleteFileDesc::Format::PAIMON) { |
212 | 0 | roaring::Roaring bitmap; |
213 | 0 | try { |
214 | 0 | bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size); |
215 | 0 | } catch (const std::runtime_error& e) { |
216 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
217 | 0 | } |
218 | | |
219 | 0 | delete_rows->reserve(bitmap.cardinality()); |
220 | 0 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
221 | 0 | delete_rows->push_back(*it); |
222 | 0 | } |
223 | 0 | return Status::OK(); |
224 | 0 | } |
225 | | |
226 | 0 | roaring::Roaring64Map bitmap; |
227 | 0 | try { |
228 | 0 | bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size); |
229 | 0 | } catch (const std::runtime_error& e) { |
230 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
231 | 0 | } |
232 | | |
233 | 0 | delete_rows->reserve(bitmap.cardinality()); |
234 | 0 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
235 | 0 | delete_rows->push_back(cast_set<int64_t>(*it)); |
236 | 0 | } |
237 | 0 | return Status::OK(); |
238 | 0 | } |
239 | | |
240 | | } // namespace |
241 | | |
242 | | std::shared_ptr<io::FileSystemProperties> create_system_properties( |
243 | 0 | const TFileScanRangeParams* scan_params) { |
244 | 0 | auto system_properties = std::make_shared<io::FileSystemProperties>(); |
245 | 0 | if (scan_params == nullptr || !scan_params->__isset.file_type) { |
246 | 0 | system_properties->system_type = TFileType::FILE_LOCAL; |
247 | 0 | return system_properties; |
248 | 0 | } |
249 | 0 | system_properties->system_type = scan_params->file_type; |
250 | 0 | system_properties->properties = scan_params->properties; |
251 | 0 | system_properties->hdfs_params = scan_params->hdfs_params; |
252 | 0 | if (scan_params->__isset.broker_addresses) { |
253 | 0 | system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(), |
254 | 0 | scan_params->broker_addresses.end()); |
255 | 0 | } |
256 | 0 | return system_properties; |
257 | 0 | } |
258 | | |
259 | 0 | std::string TableReader::debug_string() const { |
260 | 0 | std::ostringstream out; |
261 | 0 | out << "TableReader{format=" << file_format_to_string(_format) |
262 | 0 | << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type) |
263 | 0 | << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried |
264 | 0 | << ", has_current_reader=" << (_data_reader.reader != nullptr) |
265 | 0 | << ", has_current_task=" << (_current_task != nullptr) |
266 | 0 | << ", current_file=" << current_file_debug_string(_current_task) |
267 | 0 | << ", has_delete_rows=" << (_delete_rows != nullptr) |
268 | 0 | << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size()) |
269 | 0 | << ", has_profile=" << (_profile != nullptr) |
270 | 0 | << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type=" |
271 | 0 | << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL) |
272 | 0 | : static_cast<int>(_system_properties->system_type)) |
273 | 0 | << ", has_scan_params=" << (_scan_params != nullptr) |
274 | 0 | << ", has_io_ctx=" << (_io_ctx != nullptr) |
275 | 0 | << ", has_runtime_state=" << (_runtime_state != nullptr) |
276 | 0 | << ", has_scanner_profile=" << (_scanner_profile != nullptr) |
277 | 0 | << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns=" |
278 | 0 | << join_table_reader_debug_strings( |
279 | 0 | _projected_columns, |
280 | 0 | [](const ColumnDefinition& column) { return column.debug_string(); }) |
281 | 0 | << ", partition_values=" << partition_values_debug_string(_partition_values) |
282 | 0 | << ", table_filters=" |
283 | 0 | << join_table_reader_debug_strings( |
284 | 0 | _table_filters, |
285 | 0 | [](const TableFilter& filter) { return table_filter_debug_string(filter); }) |
286 | 0 | << ", table_column_predicates=" |
287 | 0 | << table_column_predicates_debug_string(_table_column_predicates) |
288 | 0 | << ", conjunct_count=" << _conjuncts.size() << ", conjuncts=" |
289 | 0 | << join_table_reader_debug_strings(_conjuncts, |
290 | 0 | [](const VExprContextSPtr& conjunct) { |
291 | 0 | return expr_context_debug_string(conjunct); |
292 | 0 | }) |
293 | 0 | << ", file_schema=" |
294 | 0 | << join_table_reader_debug_strings( |
295 | 0 | _data_reader.file_schema, |
296 | 0 | [](const ColumnDefinition& field) { return field.debug_string(); }) |
297 | 0 | << ", file_block_layout=" |
298 | 0 | << join_table_reader_debug_strings( |
299 | 0 | _data_reader.file_block_layout, |
300 | 0 | [](const FileBlockColumn& column) { |
301 | 0 | std::ostringstream column_out; |
302 | 0 | column_out << "FileBlockColumn{file_column_id=" << column.file_column_id |
303 | 0 | << ", name=" << column.name << ", type=" |
304 | 0 | << (column.type == nullptr ? "null" : column.type->get_name()) |
305 | 0 | << "}"; |
306 | 0 | return column_out.str(); |
307 | 0 | }) |
308 | 0 | << ", block_template_columns=" << _data_reader.block_template.columns() |
309 | 0 | << ", column_mapper=" << _data_reader.column_mapper.debug_string() << "}"; |
310 | 0 | return out.str(); |
311 | 0 | } |
312 | | |
313 | 0 | Status TableReader::init(TableReadOptions&& options) { |
314 | 0 | _scan_params = options.scan_params; |
315 | 0 | _format = options.format; |
316 | 0 | _io_ctx = options.io_ctx; |
317 | 0 | _runtime_state = options.runtime_state; |
318 | 0 | _scanner_profile = options.scanner_profile; |
319 | 0 | _push_down_agg_type = options.push_down_agg_type; |
320 | 0 | _projected_columns = std::move(options.projected_columns); |
321 | 0 | _system_properties = create_system_properties(_scan_params); |
322 | 0 | _profile = std::move(options.profile); |
323 | 0 | _mapper_options.mode = TableColumnMappingMode::BY_NAME; |
324 | 0 | _mapper_options.allow_missing_columns = options.allow_missing_columns; |
325 | 0 | _conjuncts = std::move(options.conjuncts); |
326 | 0 | _table_column_predicates = std::move(options.column_predicates); |
327 | 0 | return Status::OK(); |
328 | 0 | } |
329 | | |
330 | 0 | Status TableReader::_build_table_filters_from_conjuncts() { |
331 | 0 | _table_filters.clear(); |
332 | 0 | for (const auto& conjunct : _conjuncts) { |
333 | 0 | RETURN_IF_ERROR( |
334 | 0 | build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters)); |
335 | 0 | } |
336 | 0 | return Status::OK(); |
337 | 0 | } |
338 | | |
339 | 0 | Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) { |
340 | 0 | RowDescriptor row_desc; |
341 | 0 | for (const auto& conjunct : file_request.conjuncts) { |
342 | 0 | RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc)); |
343 | 0 | RETURN_IF_ERROR(conjunct->open(_runtime_state)); |
344 | 0 | } |
345 | 0 | for (const auto& delete_conjunct : file_request.delete_conjuncts) { |
346 | 0 | RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc)); |
347 | 0 | RETURN_IF_ERROR(delete_conjunct->open(_runtime_state)); |
348 | 0 | } |
349 | 0 | return Status::OK(); |
350 | 0 | } |
351 | | |
352 | 0 | Status TableReader::create_next_reader(bool* eos) { |
353 | 0 | DCHECK(_data_reader.reader == nullptr); |
354 | 0 | if (_current_task == nullptr) { |
355 | 0 | *eos = true; |
356 | 0 | return Status::OK(); |
357 | 0 | } |
358 | | |
359 | 0 | switch (_format) { |
360 | 0 | case FileFormat::PARQUET: { |
361 | 0 | _data_reader.reader = std::make_unique<parquet::ParquetReader>( |
362 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); |
363 | 0 | break; |
364 | 0 | } |
365 | 0 | case FileFormat::ORC: |
366 | 0 | case FileFormat::CSV: |
367 | 0 | return Status::NotSupported("TableReader does not support file format {}", |
368 | 0 | static_cast<int>(_format)); |
369 | 0 | } |
370 | | |
371 | 0 | RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state)); |
372 | 0 | RETURN_IF_ERROR(open_reader()); |
373 | 0 | if (_data_reader.reader == nullptr) { |
374 | 0 | *eos = _current_task == nullptr; |
375 | 0 | return Status::OK(); |
376 | 0 | } |
377 | 0 | *eos = false; |
378 | 0 | return Status::OK(); |
379 | 0 | } |
380 | | |
381 | 0 | std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) { |
382 | 0 | auto file_description = std::make_unique<io::FileDescription>(); |
383 | 0 | file_description->path = range.path; |
384 | 0 | file_description->file_size = range.__isset.file_size ? range.file_size : -1; |
385 | 0 | file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0; |
386 | 0 | file_description->range_size = range.__isset.size ? range.size : -1; |
387 | 0 | if (range.__isset.fs_name) { |
388 | 0 | file_description->fs_name = range.fs_name; |
389 | 0 | } |
390 | 0 | if (range.__isset.file_cache_admission) { |
391 | 0 | file_description->file_cache_admission = range.file_cache_admission; |
392 | 0 | } |
393 | 0 | return file_description; |
394 | 0 | } |
395 | | |
396 | 0 | Status TableReader::prepare_split(const SplitReadOptions& options) { |
397 | 0 | _partition_values = std::move(options.partition_values); |
398 | 0 | _current_task = std::make_unique<ScanTask>(); |
399 | 0 | _current_task->data_file = create_file_description(options.current_range); |
400 | 0 | _delete_rows = nullptr; |
401 | 0 | _aggregate_pushdown_tried = false; |
402 | 0 | return _parse_delete_predicates(options); |
403 | 0 | } |
404 | | |
405 | 0 | Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) { |
406 | 0 | DeleteFileDesc desc {.fs_name = options.current_range.fs_name}; |
407 | 0 | bool has_delete_file = false; |
408 | 0 | RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc, |
409 | 0 | &has_delete_file)); |
410 | 0 | if (has_delete_file) { |
411 | 0 | DORIS_CHECK(options.cache != nullptr); |
412 | 0 | Status create_status = Status::OK(); |
413 | |
|
414 | 0 | _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* { |
415 | 0 | auto* delete_rows = new DeleteRows; |
416 | |
|
417 | 0 | DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc, |
418 | 0 | _io_ctx.get()); |
419 | 0 | create_status = dv_reader.open(); |
420 | 0 | if (!create_status.ok()) [[unlikely]] { |
421 | 0 | return nullptr; |
422 | 0 | } |
423 | | |
424 | 0 | size_t bytes_read = desc.size; |
425 | 0 | std::vector<char> buffer(bytes_read); |
426 | 0 | create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read}); |
427 | 0 | if (!create_status.ok()) [[unlikely]] { |
428 | 0 | return nullptr; |
429 | 0 | } |
430 | | |
431 | 0 | const char* buf = buffer.data(); |
432 | 0 | SCOPED_TIMER(_profile->parse_delete_file_time); |
433 | 0 | create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows); |
434 | 0 | if (!create_status.ok()) [[unlikely]] { |
435 | 0 | return nullptr; |
436 | 0 | } |
437 | 0 | COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size()); |
438 | 0 | return delete_rows; |
439 | 0 | }); |
440 | 0 | RETURN_IF_ERROR(create_status); |
441 | 0 | } |
442 | | |
443 | 0 | return Status::OK(); |
444 | 0 | } |
445 | | } // namespace doris::format |