Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table_reader.h" |
19 | | |
20 | | #include <gen_cpp/ExternalTableSchema_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <cstring> |
26 | | #include <ranges> |
27 | | #include <set> |
28 | | #include <sstream> |
29 | | #include <stdexcept> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/status.h" |
35 | | #include "core/assert_cast.h" |
36 | | #include "core/data_type/data_type_array.h" |
37 | | #include "core/data_type/data_type_map.h" |
38 | | #include "core/data_type/data_type_struct.h" |
39 | | #include "exec/common/endian.h" |
40 | | #include "exprs/vexpr_context.h" |
41 | | #include "exprs/vslot_ref.h" |
42 | | #include "format/table/deletion_vector_reader.h" |
43 | | #include "format_v2/column_mapper.h" |
44 | | #include "format_v2/delimited_text/csv_reader.h" |
45 | | #include "format_v2/delimited_text/text_reader.h" |
46 | | #include "format_v2/json/json_reader.h" |
47 | | #include "format_v2/native/native_reader.h" |
48 | | #include "format_v2/parquet/parquet_reader.h" |
49 | | #include "roaring/roaring64map.hh" |
50 | | #include "storage/segment/condition_cache.h" |
51 | | #include "util/string_util.h" |
52 | | |
53 | | namespace doris::format { |
54 | | namespace { |
55 | | |
56 | | template <typename T, typename Formatter> |
57 | 0 | std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) { |
58 | 0 | std::ostringstream out; |
59 | 0 | out << "["; |
60 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
61 | 0 | if (i > 0) { |
62 | 0 | out << ", "; |
63 | 0 | } |
64 | 0 | out << formatter(values[i]); |
65 | 0 | } |
66 | 0 | out << "]"; |
67 | 0 | return out.str(); |
68 | 0 | } Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ |
69 | | |
70 | 0 | std::string file_format_to_string(FileFormat format) { |
71 | 0 | switch (format) { |
72 | 0 | case FileFormat::PARQUET: |
73 | 0 | return "PARQUET"; |
74 | 0 | case FileFormat::ORC: |
75 | 0 | return "ORC"; |
76 | 0 | case FileFormat::CSV: |
77 | 0 | return "CSV"; |
78 | 0 | case FileFormat::JSON: |
79 | 0 | return "JSON"; |
80 | 0 | case FileFormat::TEXT: |
81 | 0 | return "TEXT"; |
82 | 0 | case FileFormat::JNI: |
83 | 0 | return "JNI"; |
84 | 0 | case FileFormat::NATIVE: |
85 | 0 | return "NATIVE"; |
86 | 0 | case FileFormat::ARROW: |
87 | 0 | return "ARROW"; |
88 | 0 | } |
89 | 0 | return "UNKNOWN"; |
90 | 0 | } |
91 | | |
92 | 0 | std::string push_down_agg_to_string(TPushAggOp::type op) { |
93 | 0 | switch (op) { |
94 | 0 | case TPushAggOp::NONE: |
95 | 0 | return "NONE"; |
96 | 0 | case TPushAggOp::COUNT: |
97 | 0 | return "COUNT"; |
98 | 0 | case TPushAggOp::MINMAX: |
99 | 0 | return "MINMAX"; |
100 | 0 | case TPushAggOp::MIX: |
101 | 0 | return "MIX"; |
102 | 0 | case TPushAggOp::COUNT_ON_INDEX: |
103 | 0 | return "COUNT_ON_INDEX"; |
104 | 0 | } |
105 | 0 | return "UNKNOWN"; |
106 | 0 | } |
107 | | |
108 | 0 | std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) { |
109 | 0 | if (task == nullptr || task->data_file == nullptr) { |
110 | 0 | return "null"; |
111 | 0 | } |
112 | 0 | const auto& file = *task->data_file; |
113 | 0 | std::ostringstream out; |
114 | 0 | out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size |
115 | 0 | << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size |
116 | 0 | << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name |
117 | 0 | << ", file_cache_admission=" << file.file_cache_admission << "}"; |
118 | 0 | return out.str(); |
119 | 0 | } |
120 | | |
121 | 0 | std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) { |
122 | 0 | std::ostringstream out; |
123 | 0 | out << "{"; |
124 | 0 | size_t idx = 0; |
125 | 0 | for (const auto& [key, _] : partition_values) { |
126 | 0 | if (idx++ > 0) { |
127 | 0 | out << ", "; |
128 | 0 | } |
129 | 0 | out << key; |
130 | 0 | } |
131 | 0 | out << "}"; |
132 | 0 | return out.str(); |
133 | 0 | } |
134 | | |
135 | 0 | const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) { |
136 | 0 | if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) { |
137 | 0 | return nullptr; |
138 | 0 | } |
139 | 0 | return field_ptr.field_ptr.get(); |
140 | 0 | } |
141 | | |
142 | 0 | bool external_field_matches_name(const schema::external::TField& field, const std::string& name) { |
143 | 0 | if (field.__isset.name && to_lower(field.name) == to_lower(name)) { |
144 | 0 | return true; |
145 | 0 | } |
146 | 0 | return field.__isset.name_mapping && |
147 | 0 | std::ranges::any_of(field.name_mapping, [&](const std::string& alias) { |
148 | 0 | return to_lower(alias) == to_lower(name); |
149 | 0 | }); |
150 | 0 | } |
151 | | |
152 | | DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type, |
153 | 0 | const std::string& field_name) { |
154 | 0 | for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) { |
155 | 0 | if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) { |
156 | 0 | return struct_type.get_element(field_idx); |
157 | 0 | } |
158 | 0 | } |
159 | 0 | return nullptr; |
160 | 0 | } |
161 | | |
162 | | ColumnDefinition build_schema_column_from_external_field(const schema::external::TField& field, |
163 | 0 | DataTypePtr type) { |
164 | 0 | ColumnDefinition column { |
165 | 0 | .identifier = field.__isset.id ? Field::create_field<TYPE_INT>(field.id) : Field {}, |
166 | 0 | .name = field.__isset.name ? field.name : "", |
167 | 0 | .name_mapping = |
168 | 0 | field.__isset.name_mapping ? field.name_mapping : std::vector<std::string> {}, |
169 | 0 | .type = std::move(type), |
170 | 0 | .children = {}, |
171 | 0 | .default_expr = nullptr, |
172 | 0 | .is_partition_key = false, |
173 | 0 | }; |
174 | 0 | if (column.type == nullptr || !field.__isset.nestedField) { |
175 | 0 | return column; |
176 | 0 | } |
177 | | |
178 | 0 | const auto nested_type = remove_nullable(column.type); |
179 | 0 | switch (nested_type->get_primitive_type()) { |
180 | 0 | case TYPE_STRUCT: { |
181 | 0 | if (!field.nestedField.__isset.struct_field || |
182 | 0 | !field.nestedField.struct_field.__isset.fields) { |
183 | 0 | return column; |
184 | 0 | } |
185 | 0 | const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type); |
186 | 0 | for (const auto& child_ptr : field.nestedField.struct_field.fields) { |
187 | 0 | const auto* child_field = get_field_ptr(child_ptr); |
188 | 0 | if (child_field == nullptr || !child_field->__isset.name) { |
189 | 0 | continue; |
190 | 0 | } |
191 | 0 | auto child_type = find_struct_child_type_by_name(struct_type, child_field->name); |
192 | 0 | if (child_type == nullptr) { |
193 | 0 | continue; |
194 | 0 | } |
195 | 0 | column.children.push_back( |
196 | 0 | build_schema_column_from_external_field(*child_field, child_type)); |
197 | 0 | } |
198 | 0 | break; |
199 | 0 | } |
200 | 0 | case TYPE_ARRAY: { |
201 | 0 | if (!field.nestedField.__isset.array_field || |
202 | 0 | !field.nestedField.array_field.__isset.item_field) { |
203 | 0 | return column; |
204 | 0 | } |
205 | 0 | const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field); |
206 | 0 | if (item_field == nullptr) { |
207 | 0 | return column; |
208 | 0 | } |
209 | 0 | const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type); |
210 | 0 | auto child = |
211 | 0 | build_schema_column_from_external_field(*item_field, array_type.get_nested_type()); |
212 | 0 | child.name = "element"; |
213 | 0 | if (child.has_identifier_name()) { |
214 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
215 | 0 | } |
216 | 0 | column.children.push_back(std::move(child)); |
217 | 0 | break; |
218 | 0 | } |
219 | 0 | case TYPE_MAP: { |
220 | 0 | if (!field.nestedField.__isset.map_field || |
221 | 0 | !field.nestedField.map_field.__isset.key_field || |
222 | 0 | !field.nestedField.map_field.__isset.value_field) { |
223 | 0 | return column; |
224 | 0 | } |
225 | 0 | const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type); |
226 | 0 | const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field); |
227 | 0 | if (key_field != nullptr) { |
228 | 0 | auto child = |
229 | 0 | build_schema_column_from_external_field(*key_field, map_type.get_key_type()); |
230 | 0 | child.name = "key"; |
231 | 0 | if (child.has_identifier_name()) { |
232 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
233 | 0 | } |
234 | 0 | column.children.push_back(std::move(child)); |
235 | 0 | } |
236 | 0 | const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field); |
237 | 0 | if (value_field != nullptr) { |
238 | 0 | auto child = build_schema_column_from_external_field(*value_field, |
239 | 0 | map_type.get_value_type()); |
240 | 0 | child.name = "value"; |
241 | 0 | if (child.has_identifier_name()) { |
242 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
243 | 0 | } |
244 | 0 | column.children.push_back(std::move(child)); |
245 | 0 | } |
246 | 0 | break; |
247 | 0 | } |
248 | 0 | default: |
249 | 0 | break; |
250 | 0 | } |
251 | 0 | return column; |
252 | 0 | } |
253 | | |
254 | | const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params, |
255 | 6 | const ColumnDefinition& column) { |
256 | 6 | if (params == nullptr || !params->__isset.history_schema_info || |
257 | 6 | params->history_schema_info.empty()) { |
258 | 6 | return nullptr; |
259 | 6 | } |
260 | 0 | const auto* schema = ¶ms->history_schema_info.front(); |
261 | 0 | if (params->__isset.current_schema_id) { |
262 | 0 | for (const auto& candidate_schema : params->history_schema_info) { |
263 | 0 | if (candidate_schema.__isset.schema_id && |
264 | 0 | candidate_schema.schema_id == params->current_schema_id) { |
265 | 0 | schema = &candidate_schema; |
266 | 0 | break; |
267 | 0 | } |
268 | 0 | } |
269 | 0 | } |
270 | 0 | if (!schema->__isset.root_field || !schema->root_field.__isset.fields) { |
271 | 0 | return nullptr; |
272 | 0 | } |
273 | 0 | for (const auto& field_ptr : schema->root_field.fields) { |
274 | 0 | const auto* field = get_field_ptr(field_ptr); |
275 | 0 | if (field == nullptr) { |
276 | 0 | continue; |
277 | 0 | } |
278 | 0 | if (external_field_matches_name(*field, column.name)) { |
279 | 0 | return field; |
280 | 0 | } |
281 | 0 | } |
282 | 0 | return nullptr; |
283 | 0 | } |
284 | | |
285 | 0 | std::string expr_context_debug_string(const VExprContextSPtr& context) { |
286 | 0 | if (context == nullptr) { |
287 | 0 | return "null"; |
288 | 0 | } |
289 | 0 | const auto root = context->root(); |
290 | 0 | if (root == nullptr) { |
291 | 0 | return "VExprContext{root=null}"; |
292 | 0 | } |
293 | 0 | std::ostringstream out; |
294 | 0 | out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string() |
295 | 0 | << "}"; |
296 | 0 | return out.str(); |
297 | 0 | } |
298 | | |
299 | 0 | std::string table_filter_debug_string(const TableFilter& filter) { |
300 | 0 | std::ostringstream out; |
301 | 0 | out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct) |
302 | 0 | << ", global_indices=" |
303 | 0 | << join_table_reader_debug_strings( |
304 | 0 | filter.global_indices, |
305 | 0 | [](GlobalIndex global_index) { return std::to_string(global_index.value()); }) |
306 | 0 | << "}"; |
307 | 0 | return out.str(); |
308 | 0 | } |
309 | | |
310 | 0 | std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) { |
311 | 0 | std::ostringstream out; |
312 | 0 | out << "{"; |
313 | 0 | size_t idx = 0; |
314 | 0 | for (const auto& [global_index, column_predicates] : predicates) { |
315 | 0 | if (idx++ > 0) { |
316 | 0 | out << ", "; |
317 | 0 | } |
318 | 0 | out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}"; |
319 | 0 | } |
320 | 0 | out << "}"; |
321 | 0 | return out.str(); |
322 | 0 | } |
323 | | |
324 | 3 | bool contains_runtime_filter(const VExprContextSPtrs& conjuncts) { |
325 | 3 | return std::ranges::any_of(conjuncts, [](const auto& conjunct) { |
326 | 3 | return conjunct != nullptr && conjunct->root() != nullptr && |
327 | 3 | conjunct->root()->is_rf_wrapper(); |
328 | 3 | }); |
329 | 3 | } |
330 | | |
331 | 61 | void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) { |
332 | 61 | if (expr == nullptr) { |
333 | 0 | return; |
334 | 0 | } |
335 | 61 | if (expr->is_rf_wrapper()) { |
336 | | // RuntimeFilterExpr wraps a real predicate expression but its own thrift node can still |
337 | | // look like SLOT_REF. Collect indices from the wrapped predicate; do not cast the wrapper |
338 | | // itself to VSlotRef. |
339 | 2 | collect_global_indices(expr->get_impl(), global_indices); |
340 | 2 | return; |
341 | 2 | } |
342 | 59 | if (expr->is_slot_ref()) { |
343 | 20 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get()); |
344 | 20 | DORIS_CHECK(slot_ref->column_id() >= 0); |
345 | 20 | global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id()))); |
346 | 20 | } |
347 | 59 | for (const auto& child : expr->children()) { |
348 | 40 | collect_global_indices(child, global_indices); |
349 | 40 | } |
350 | 59 | } |
351 | | |
352 | | Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state, |
353 | 19 | std::vector<TableFilter>* table_filters) { |
354 | 19 | if (conjunct == nullptr) { |
355 | 0 | return Status::OK(); |
356 | 0 | } |
357 | 19 | std::set<GlobalIndex> global_indices; |
358 | 19 | collect_global_indices(conjunct->root(), &global_indices); |
359 | 19 | if (!global_indices.empty()) { |
360 | 19 | TableFilter table_filter; |
361 | 19 | VExprSPtr filter_root; |
362 | 19 | RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root)); |
363 | 19 | table_filter.conjunct = VExprContext::create_shared(std::move(filter_root)); |
364 | 20 | for (const auto global_index : global_indices) { |
365 | 20 | table_filter.global_indices.push_back(global_index); |
366 | 20 | } |
367 | 19 | table_filters->push_back(std::move(table_filter)); |
368 | 19 | } |
369 | 19 | return Status::OK(); |
370 | 19 | } |
371 | | |
372 | | Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format, |
373 | 4 | DeleteRows* delete_rows) { |
374 | 4 | DORIS_CHECK(buf != nullptr); |
375 | 4 | DORIS_CHECK(delete_rows != nullptr); |
376 | 4 | DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON || |
377 | 4 | format == DeleteFileDesc::Format::ICEBERG); |
378 | | |
379 | 4 | const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0; |
380 | 4 | if (buffer_size < 8 + checksum_size) [[unlikely]] { |
381 | 0 | return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size); |
382 | 0 | } |
383 | | |
384 | 4 | auto total_length = BigEndian::Load32(buf); |
385 | 4 | if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] { |
386 | 0 | return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}", |
387 | 0 | total_length + 4 + checksum_size, buffer_size); |
388 | 0 | } |
389 | | |
390 | 4 | const char* bitmap_buf = buf + 8; |
391 | 4 | const size_t bitmap_size = buffer_size - 8 - checksum_size; |
392 | 4 | if (format == DeleteFileDesc::Format::PAIMON) { |
393 | | // Paimon BitmapDeletionVector stores: |
394 | | // [4-byte big-endian length][4-byte magic 0x5E43F2D0][32-bit roaring bitmap] |
395 | | // The length covers magic + bitmap, and does not include the leading length field. |
396 | 1 | constexpr static char PAIMON_BITMAP_MAGIC[] = {'\x5E', '\x43', '\xF2', '\xD0'}; |
397 | 1 | if (memcmp(buf + sizeof(total_length), PAIMON_BITMAP_MAGIC, 4) != 0) [[unlikely]] { |
398 | 0 | return Status::DataQualityError( |
399 | 0 | "Paimon deletion vector magic number mismatch, expected: {}, actual: {}", |
400 | 0 | BigEndian::Load32(PAIMON_BITMAP_MAGIC), |
401 | 0 | BigEndian::Load32(buf + sizeof(total_length))); |
402 | 0 | } |
403 | | |
404 | 1 | roaring::Roaring bitmap; |
405 | 1 | try { |
406 | 1 | bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size); |
407 | 1 | } catch (const std::runtime_error& e) { |
408 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
409 | 0 | } |
410 | | |
411 | 1 | delete_rows->reserve(bitmap.cardinality()); |
412 | 3 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
413 | 2 | delete_rows->push_back(*it); |
414 | 2 | } |
415 | 1 | return Status::OK(); |
416 | 1 | } |
417 | | |
418 | 3 | constexpr static char ICEBERG_DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
419 | 3 | if (memcmp(buf + sizeof(total_length), ICEBERG_DV_MAGIC, 4) != 0) [[unlikely]] { |
420 | 0 | return Status::DataQualityError( |
421 | 0 | "Iceberg deletion vector magic number mismatch, expected: {}, actual: {}", |
422 | 0 | BigEndian::Load32(ICEBERG_DV_MAGIC), BigEndian::Load32(buf + sizeof(total_length))); |
423 | 0 | } |
424 | | |
425 | 3 | roaring::Roaring64Map bitmap; |
426 | 3 | try { |
427 | 3 | bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size); |
428 | 3 | } catch (const std::runtime_error& e) { |
429 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
430 | 0 | } |
431 | | |
432 | 3 | delete_rows->reserve(bitmap.cardinality()); |
433 | 7 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
434 | 4 | delete_rows->push_back(cast_set<int64_t>(*it)); |
435 | 4 | } |
436 | 3 | return Status::OK(); |
437 | 3 | } |
438 | | |
439 | | } // namespace |
440 | | |
441 | | std::shared_ptr<io::FileSystemProperties> create_system_properties( |
442 | 70 | const TFileScanRangeParams* scan_params) { |
443 | 70 | auto system_properties = std::make_shared<io::FileSystemProperties>(); |
444 | 70 | if (scan_params == nullptr || !scan_params->__isset.file_type) { |
445 | 56 | system_properties->system_type = TFileType::FILE_LOCAL; |
446 | 56 | return system_properties; |
447 | 56 | } |
448 | 14 | system_properties->system_type = scan_params->file_type; |
449 | 14 | system_properties->properties = scan_params->properties; |
450 | 14 | system_properties->hdfs_params = scan_params->hdfs_params; |
451 | 14 | if (scan_params->__isset.broker_addresses) { |
452 | 0 | system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(), |
453 | 0 | scan_params->broker_addresses.end()); |
454 | 0 | } |
455 | 14 | return system_properties; |
456 | 70 | } |
457 | | |
458 | 0 | std::string TableReader::debug_string() const { |
459 | 0 | std::ostringstream out; |
460 | 0 | out << "TableReader{format=" << file_format_to_string(_format) |
461 | 0 | << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type) |
462 | 0 | << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried |
463 | 0 | << ", has_current_reader=" << (_data_reader.reader != nullptr) |
464 | 0 | << ", has_current_task=" << (_current_task != nullptr) |
465 | 0 | << ", current_file=" << current_file_debug_string(_current_task) |
466 | 0 | << ", has_delete_rows=" << (_delete_rows != nullptr) |
467 | 0 | << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size()) |
468 | 0 | << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type=" |
469 | 0 | << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL) |
470 | 0 | : static_cast<int>(_system_properties->system_type)) |
471 | 0 | << ", has_scan_params=" << (_scan_params != nullptr) |
472 | 0 | << ", has_io_ctx=" << (_io_ctx != nullptr) |
473 | 0 | << ", has_runtime_state=" << (_runtime_state != nullptr) |
474 | 0 | << ", has_scanner_profile=" << (_scanner_profile != nullptr) |
475 | 0 | << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns=" |
476 | 0 | << join_table_reader_debug_strings( |
477 | 0 | _projected_columns, |
478 | 0 | [](const ColumnDefinition& column) { return column.debug_string(); }) |
479 | 0 | << ", partition_values=" << partition_values_debug_string(_partition_values) |
480 | 0 | << ", table_filters=" |
481 | 0 | << join_table_reader_debug_strings( |
482 | 0 | _table_filters, |
483 | 0 | [](const TableFilter& filter) { return table_filter_debug_string(filter); }) |
484 | 0 | << ", table_column_predicates=" |
485 | 0 | << table_column_predicates_debug_string(_table_column_predicates) |
486 | 0 | << ", conjunct_count=" << _conjuncts.size() << ", conjuncts=" |
487 | 0 | << join_table_reader_debug_strings(_conjuncts, |
488 | 0 | [](const VExprContextSPtr& conjunct) { |
489 | 0 | return expr_context_debug_string(conjunct); |
490 | 0 | }) |
491 | 0 | << ", file_schema=" |
492 | 0 | << join_table_reader_debug_strings( |
493 | 0 | _data_reader.file_schema, |
494 | 0 | [](const ColumnDefinition& field) { return field.debug_string(); }) |
495 | 0 | << ", file_block_layout=" |
496 | 0 | << join_table_reader_debug_strings( |
497 | 0 | _data_reader.file_block_layout, |
498 | 0 | [](const FileBlockColumn& column) { |
499 | 0 | std::ostringstream column_out; |
500 | 0 | column_out << "FileBlockColumn{file_column_id=" << column.file_column_id |
501 | 0 | << ", name=" << column.name << ", type=" |
502 | 0 | << (column.type == nullptr ? "null" : column.type->get_name()) |
503 | 0 | << "}"; |
504 | 0 | return column_out.str(); |
505 | 0 | }) |
506 | 0 | << ", block_template_columns=" << _data_reader.block_template.columns() |
507 | 0 | << ", column_mapper=" |
508 | 0 | << (_data_reader.column_mapper == nullptr ? "null" |
509 | 0 | : _data_reader.column_mapper->debug_string()) |
510 | 0 | << "}"; |
511 | 0 | return out.str(); |
512 | 0 | } |
513 | | |
514 | | Status TableReader::annotate_projected_column(const TFileScanSlotInfo& slot_info, |
515 | | ProjectedColumnBuildContext* context, |
516 | 6 | ColumnDefinition* column) const { |
517 | 6 | (void)slot_info; |
518 | 6 | DORIS_CHECK(context != nullptr); |
519 | 6 | DORIS_CHECK(column != nullptr); |
520 | 6 | context->schema_column.reset(); |
521 | 6 | const auto* schema_field = find_external_root_field(context->scan_params, *column); |
522 | 6 | if (schema_field == nullptr) { |
523 | 6 | return Status::OK(); |
524 | 6 | } |
525 | 0 | context->schema_column = build_schema_column_from_external_field(*schema_field, column->type); |
526 | 0 | column->identifier = context->schema_column->identifier; |
527 | 0 | column->name_mapping = context->schema_column->name_mapping; |
528 | 0 | return Status::OK(); |
529 | 6 | } |
530 | | |
531 | 70 | Status TableReader::init(TableReadOptions&& options) { |
532 | 70 | _scan_params = options.scan_params; |
533 | 70 | _format = options.format; |
534 | 70 | _io_ctx = options.io_ctx; |
535 | 70 | _runtime_state = options.runtime_state; |
536 | 70 | _scanner_profile = options.scanner_profile; |
537 | 70 | _file_slot_descs = options.file_slot_descs; |
538 | 70 | _push_down_agg_type = options.push_down_agg_type; |
539 | 70 | _condition_cache_digest = options.condition_cache_digest; |
540 | 70 | _projected_columns = std::move(options.projected_columns); |
541 | 70 | _system_properties = create_system_properties(_scan_params); |
542 | 70 | _mapper_options.mode = TableColumnMappingMode::BY_NAME; |
543 | 70 | _conjuncts = std::move(options.conjuncts); |
544 | 70 | _table_column_predicates = std::move(options.column_predicates); |
545 | | |
546 | 70 | if (_scanner_profile != nullptr) { |
547 | 17 | static const char* table_profile = "TableReader"; |
548 | 17 | ADD_TIMER_WITH_LEVEL(_scanner_profile, table_profile, 1); |
549 | 17 | _profile.num_delete_files = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteFiles", |
550 | 17 | TUnit::UNIT, table_profile, 1); |
551 | 17 | _profile.num_delete_rows = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteRows", |
552 | 17 | TUnit::UNIT, table_profile, 1); |
553 | 17 | _profile.parse_delete_file_time = ADD_CHILD_TIMER_WITH_LEVEL( |
554 | 17 | _scanner_profile, "ParseDeleteFileTime", table_profile, 1); |
555 | 17 | _profile.exec_timer = |
556 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "GetBlockTime", table_profile, 1); |
557 | 17 | _profile.prepare_split_timer = |
558 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PrepareSplitTime", table_profile, 1); |
559 | 17 | _profile.finalize_timer = |
560 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "FinalizeBlockTime", table_profile, 1); |
561 | 17 | _profile.create_reader_timer = |
562 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "CreateReaderTime", table_profile, 1); |
563 | 17 | _profile.pushdown_agg_timer = |
564 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PushDownAggTime", table_profile, 1); |
565 | 17 | _profile.open_reader_timer = |
566 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "OpenReaderTime", table_profile, 1); |
567 | 17 | } |
568 | 70 | return Status::OK(); |
569 | 70 | } |
570 | | |
571 | 64 | Status TableReader::_build_table_filters_from_conjuncts() { |
572 | 64 | _table_filters.clear(); |
573 | 64 | for (const auto& conjunct : _conjuncts) { |
574 | 19 | RETURN_IF_ERROR( |
575 | 19 | build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters)); |
576 | 19 | } |
577 | 64 | return Status::OK(); |
578 | 64 | } |
579 | | |
580 | 63 | Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) { |
581 | 63 | RowDescriptor row_desc; |
582 | 63 | for (const auto& conjunct : file_request.conjuncts) { |
583 | 14 | RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc)); |
584 | 14 | RETURN_IF_ERROR(conjunct->open(_runtime_state)); |
585 | 14 | } |
586 | 63 | for (const auto& delete_conjunct : file_request.delete_conjuncts) { |
587 | 12 | RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc)); |
588 | 12 | RETURN_IF_ERROR(delete_conjunct->open(_runtime_state)); |
589 | 12 | } |
590 | 63 | return Status::OK(); |
591 | 63 | } |
592 | | |
593 | 63 | bool TableReader::_should_enable_condition_cache(const FileScanRequest& file_request) const { |
594 | 63 | if (_condition_cache_digest == 0 || _push_down_agg_type == TPushAggOp::type::COUNT || |
595 | 63 | _current_file_description == std::nullopt || _data_reader.reader == nullptr) { |
596 | 58 | return false; |
597 | 58 | } |
598 | | // Condition cache is populated by file readers after evaluating file-local row-level |
599 | | // conjuncts. ColumnPredicate-only scans can prune row groups/pages, but they do not produce a |
600 | | // per-row survivor bitmap that can safely populate the cache. |
601 | 5 | if (file_request.conjuncts.empty()) { |
602 | 1 | return false; |
603 | 1 | } |
604 | | // Delete files/deletion vectors are table-format state. They may change independently of the |
605 | | // data file path/mtime/size used by the external cache key, so caching their result can become |
606 | | // stale. Keep delete filtering enabled, but do not read or write condition cache. |
607 | 4 | if (_delete_rows != nullptr || !file_request.delete_conjuncts.empty()) { |
608 | 1 | return false; |
609 | 1 | } |
610 | | // Runtime filters can arrive late and their payload is not guaranteed to be represented by the |
611 | | // scan-local digest. Without a read-only mode, a MISS could insert a bitmap for P AND RF under |
612 | | // the digest for only P. This mirrors the old FileScanner guard. |
613 | 3 | return !contains_runtime_filter(file_request.conjuncts); |
614 | 4 | } |
615 | | |
616 | 63 | Status TableReader::_init_reader_condition_cache(const FileScanRequest& file_request) { |
617 | 63 | _condition_cache = nullptr; |
618 | 63 | _condition_cache_ctx = nullptr; |
619 | 63 | if (!_should_enable_condition_cache(file_request)) { |
620 | 61 | return Status::OK(); |
621 | 61 | } |
622 | | |
623 | 2 | auto* cache = segment_v2::ConditionCache::instance(); |
624 | 2 | if (cache == nullptr) { |
625 | 0 | return Status::OK(); |
626 | 0 | } |
627 | 2 | const auto& file = *_current_file_description; |
628 | 2 | _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey( |
629 | 2 | file.path, file.mtime, file.file_size, _condition_cache_digest, file.range_start_offset, |
630 | 2 | file.range_size); |
631 | | |
632 | 2 | segment_v2::ConditionCacheHandle handle; |
633 | 2 | const bool condition_cache_hit = cache->lookup(_condition_cache_key, &handle); |
634 | 2 | if (condition_cache_hit) { |
635 | 0 | _condition_cache = handle.get_filter_result(); |
636 | 0 | ++_condition_cache_hit_count; |
637 | 2 | } else { |
638 | 2 | const int64_t total_rows = _data_reader.reader->get_total_rows(); |
639 | 2 | if (total_rows <= 0) { |
640 | 0 | return Status::OK(); |
641 | 0 | } |
642 | | // Add one guard granule for split ranges that start in the middle of a granule. A guard |
643 | | // false bit beyond the real range never overlaps real rows, but avoids boundary overflow |
644 | | // when a reader marks the last partial granule. |
645 | 2 | const size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) / |
646 | 2 | ConditionCacheContext::GRANULE_SIZE; |
647 | 2 | _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false); |
648 | 2 | } |
649 | | |
650 | 2 | if (_condition_cache != nullptr) { |
651 | 2 | _condition_cache_ctx = std::make_shared<ConditionCacheContext>(); |
652 | 2 | _condition_cache_ctx->is_hit = condition_cache_hit; |
653 | 2 | _condition_cache_ctx->filter_result = _condition_cache; |
654 | 2 | _data_reader.reader->set_condition_cache_context(_condition_cache_ctx); |
655 | 2 | } |
656 | 2 | return Status::OK(); |
657 | 2 | } |
658 | | |
659 | 64 | void TableReader::_finalize_reader_condition_cache() { |
660 | 64 | if (_condition_cache_ctx == nullptr || _condition_cache_ctx->is_hit) { |
661 | 62 | _condition_cache = nullptr; |
662 | 62 | _condition_cache_ctx = nullptr; |
663 | 62 | return; |
664 | 62 | } |
665 | | // LIMIT or scanner cancellation may close a reader before all selected row ranges are visited. |
666 | | // Unvisited granules remain false in a MISS bitmap, so inserting a partial bitmap would make a |
667 | | // later HIT skip valid rows. Only publish cache entries after the physical reader reaches EOF. |
668 | 2 | if (!_current_reader_reached_eof) { |
669 | 1 | _condition_cache = nullptr; |
670 | 1 | _condition_cache_ctx = nullptr; |
671 | 1 | return; |
672 | 1 | } |
673 | 1 | segment_v2::ConditionCache::instance()->insert(_condition_cache_key, |
674 | 1 | std::move(_condition_cache)); |
675 | 1 | _condition_cache = nullptr; |
676 | 1 | _condition_cache_ctx = nullptr; |
677 | 1 | } |
678 | | |
679 | 75 | Status TableReader::create_next_reader(bool* eos) { |
680 | 75 | SCOPED_TIMER(_profile.create_reader_timer); |
681 | 75 | DCHECK(_data_reader.reader == nullptr); |
682 | 75 | if (_current_task == nullptr) { |
683 | 11 | *eos = true; |
684 | 11 | return Status::OK(); |
685 | 11 | } |
686 | | |
687 | 64 | RETURN_IF_ERROR(create_file_reader(&_data_reader.reader)); |
688 | 64 | DORIS_CHECK(_data_reader.reader != nullptr); |
689 | 64 | if (_batch_size > 0) { |
690 | 0 | _data_reader.reader->set_batch_size(_batch_size); |
691 | 0 | } |
692 | 64 | RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state)); |
693 | 64 | RETURN_IF_ERROR(open_reader()); |
694 | 64 | if (_data_reader.reader == nullptr) { |
695 | 1 | *eos = _current_task == nullptr; |
696 | 1 | return Status::OK(); |
697 | 1 | } |
698 | 63 | *eos = false; |
699 | 63 | return Status::OK(); |
700 | 64 | } |
701 | | |
702 | 57 | Status TableReader::create_file_reader(std::unique_ptr<FileReader>* reader) { |
703 | 57 | DORIS_CHECK(reader != nullptr); |
704 | 57 | if (_format == FileFormat::PARQUET) { |
705 | 57 | const bool enable_mapping_timestamp_tz = |
706 | 57 | _scan_params != nullptr && _scan_params->__isset.enable_mapping_timestamp_tz && |
707 | 57 | _scan_params->enable_mapping_timestamp_tz; |
708 | 57 | *reader = std::make_unique<format::parquet::ParquetReader>( |
709 | 57 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
710 | 57 | _global_rowid_context, enable_mapping_timestamp_tz); |
711 | 57 | return Status::OK(); |
712 | 57 | } |
713 | 0 | if (_format == FileFormat::CSV) { |
714 | 0 | if (_file_slot_descs == nullptr) { |
715 | 0 | return Status::InvalidArgument("CSV reader requires file slot descriptors"); |
716 | 0 | } |
717 | | // CSV has no embedded schema. TableReader owns table-level mapping, while CsvReader needs |
718 | | // only the physical file slots plus scan text parameters to build a file-local schema. |
719 | | // Non-file columns such as partitions/defaults/virtual row ids are intentionally excluded |
720 | | // from `_file_slot_descs` and are materialized during finalize_chunk(). |
721 | 0 | *reader = std::make_unique<format::csv::CsvReader>( |
722 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
723 | 0 | _scan_params, *_file_slot_descs, _current_range_compress_type, |
724 | 0 | _current_range_load_id); |
725 | 0 | return Status::OK(); |
726 | 0 | } |
727 | 0 | if (_format == FileFormat::TEXT) { |
728 | 0 | if (_file_slot_descs == nullptr) { |
729 | 0 | return Status::InvalidArgument("Text reader requires file slot descriptors"); |
730 | 0 | } |
731 | | // Text files have no embedded schema. As with CSV, TableReader handles table-level mapping |
732 | | // and only passes physical file slots to the v2 TextReader. |
733 | 0 | *reader = std::make_unique<format::text::TextReader>( |
734 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
735 | 0 | _scan_params, *_file_slot_descs, _current_range_compress_type, |
736 | 0 | _current_range_load_id); |
737 | 0 | return Status::OK(); |
738 | 0 | } |
739 | 0 | if (_format == FileFormat::JSON) { |
740 | 0 | if (_file_slot_descs == nullptr) { |
741 | 0 | return Status::InvalidArgument("JSON reader requires file slot descriptors"); |
742 | 0 | } |
743 | 0 | *reader = std::make_unique<format::json::JsonReader>( |
744 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
745 | 0 | _scan_params, _current_file_range_desc, *_file_slot_descs, |
746 | 0 | _current_range_compress_type, _current_range_load_id); |
747 | 0 | return Status::OK(); |
748 | 0 | } |
749 | 0 | if (_format == FileFormat::NATIVE) { |
750 | 0 | *reader = std::make_unique<format::native::NativeReader>( |
751 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); |
752 | 0 | return Status::OK(); |
753 | 0 | } |
754 | 0 | return Status::NotSupported("TableReader does not support file format {}", |
755 | 0 | file_format_to_string(_format)); |
756 | 0 | } |
757 | | |
758 | 76 | std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) { |
759 | 76 | auto file_description = std::make_unique<io::FileDescription>(); |
760 | 76 | file_description->path = range.path; |
761 | 76 | file_description->file_size = range.__isset.file_size ? range.file_size : -1; |
762 | 76 | file_description->mtime = range.__isset.modification_time ? range.modification_time : 0; |
763 | 76 | file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0; |
764 | 76 | file_description->range_size = range.__isset.size ? range.size : -1; |
765 | 76 | if (range.__isset.fs_name) { |
766 | 0 | file_description->fs_name = range.fs_name; |
767 | 0 | } |
768 | 76 | if (range.__isset.file_cache_admission) { |
769 | 0 | file_description->file_cache_admission = range.file_cache_admission; |
770 | 0 | } |
771 | 76 | return file_description; |
772 | 76 | } |
773 | | |
774 | 76 | Status TableReader::prepare_split(const SplitReadOptions& options) { |
775 | 76 | SCOPED_TIMER(_profile.prepare_split_timer); |
776 | 76 | _partition_values = std::move(options.partition_values); |
777 | 76 | _current_task = std::make_unique<ScanTask>(); |
778 | 76 | _current_task->data_file = create_file_description(options.current_range); |
779 | 76 | _current_file_description = *_current_task->data_file; |
780 | 76 | _current_file_range_desc = options.current_range; |
781 | 76 | _current_range_compress_type = options.current_range.__isset.compress_type |
782 | 76 | ? options.current_range.compress_type |
783 | 76 | : TFileCompressType::UNKNOWN; |
784 | 76 | _current_range_load_id = options.current_range.__isset.load_id |
785 | 76 | ? std::make_optional(options.current_range.load_id) |
786 | 76 | : std::nullopt; |
787 | 76 | _global_rowid_context = options.global_rowid_context; |
788 | 76 | _delete_rows = nullptr; |
789 | 76 | _aggregate_pushdown_tried = false; |
790 | 76 | _remaining_table_level_count = -1; |
791 | 76 | _current_reader_reached_eof = false; |
792 | 76 | if (_push_down_agg_type == TPushAggOp::type::COUNT && |
793 | 76 | options.current_range.__isset.table_format_params && |
794 | 76 | options.current_range.table_format_params.__isset.table_level_row_count) { |
795 | 6 | DORIS_CHECK(options.current_range.table_format_params.table_level_row_count >= -1); |
796 | 6 | _remaining_table_level_count = |
797 | 6 | options.current_range.table_format_params.table_level_row_count; |
798 | 6 | } |
799 | 76 | if (_is_table_level_count_active()) { |
800 | 2 | return Status::OK(); |
801 | 2 | } |
802 | 74 | return _parse_delete_predicates(options); |
803 | 76 | } |
804 | | |
805 | 74 | Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) { |
806 | 74 | DeleteFileDesc desc {.fs_name = options.current_range.fs_name}; |
807 | 74 | bool has_delete_file = false; |
808 | 74 | RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc, |
809 | 74 | &has_delete_file)); |
810 | 74 | if (has_delete_file) { |
811 | 4 | DORIS_CHECK(options.cache != nullptr); |
812 | 4 | Status create_status = Status::OK(); |
813 | | |
814 | 4 | _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* { |
815 | 4 | auto* delete_rows = new DeleteRows; |
816 | | |
817 | 4 | DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc, |
818 | 4 | _io_ctx.get()); |
819 | 4 | create_status = dv_reader.open(); |
820 | 4 | if (!create_status.ok()) [[unlikely]] { |
821 | 0 | return nullptr; |
822 | 0 | } |
823 | | |
824 | 4 | size_t bytes_read = desc.size; |
825 | 4 | std::vector<char> buffer(bytes_read); |
826 | 4 | create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read}); |
827 | 4 | if (!create_status.ok()) [[unlikely]] { |
828 | 0 | return nullptr; |
829 | 0 | } |
830 | | |
831 | 4 | const char* buf = buffer.data(); |
832 | 4 | SCOPED_TIMER(_profile.parse_delete_file_time); |
833 | 4 | create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows); |
834 | 4 | if (!create_status.ok()) [[unlikely]] { |
835 | 0 | return nullptr; |
836 | 0 | } |
837 | 4 | COUNTER_UPDATE(_profile.num_delete_rows, delete_rows->size()); |
838 | 4 | return delete_rows; |
839 | 4 | }); |
840 | 4 | RETURN_IF_ERROR(create_status); |
841 | 4 | } |
842 | | |
843 | 74 | return Status::OK(); |
844 | 74 | } |
845 | | } // namespace doris::format |