Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table_reader.h" |
19 | | |
20 | | #include <gen_cpp/ExternalTableSchema_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <cstring> |
26 | | #include <ranges> |
27 | | #include <set> |
28 | | #include <sstream> |
29 | | #include <stdexcept> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/status.h" |
35 | | #include "core/assert_cast.h" |
36 | | #include "core/data_type/data_type_array.h" |
37 | | #include "core/data_type/data_type_map.h" |
38 | | #include "core/data_type/data_type_struct.h" |
39 | | #include "exec/common/endian.h" |
40 | | #include "exprs/vexpr_context.h" |
41 | | #include "exprs/vslot_ref.h" |
42 | | #include "format/table/deletion_vector_reader.h" |
43 | | #include "format_v2/column_mapper.h" |
44 | | #include "format_v2/delimited_text/csv_reader.h" |
45 | | #include "format_v2/delimited_text/text_reader.h" |
46 | | #include "format_v2/json/json_reader.h" |
47 | | #include "format_v2/parquet/parquet_reader.h" |
48 | | #include "roaring/roaring64map.hh" |
49 | | #include "storage/segment/condition_cache.h" |
50 | | #include "util/string_util.h" |
51 | | |
52 | | namespace doris::format { |
53 | | namespace { |
54 | | |
55 | | template <typename T, typename Formatter> |
56 | 0 | std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) { |
57 | 0 | std::ostringstream out; |
58 | 0 | out << "["; |
59 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
60 | 0 | if (i > 0) { |
61 | 0 | out << ", "; |
62 | 0 | } |
63 | 0 | out << formatter(values[i]); |
64 | 0 | } |
65 | 0 | out << "]"; |
66 | 0 | return out.str(); |
67 | 0 | } Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ |
68 | | |
69 | 0 | std::string file_format_to_string(FileFormat format) { |
70 | 0 | switch (format) { |
71 | 0 | case FileFormat::PARQUET: |
72 | 0 | return "PARQUET"; |
73 | 0 | case FileFormat::ORC: |
74 | 0 | return "ORC"; |
75 | 0 | case FileFormat::CSV: |
76 | 0 | return "CSV"; |
77 | 0 | case FileFormat::JSON: |
78 | 0 | return "JSON"; |
79 | 0 | case FileFormat::TEXT: |
80 | 0 | return "TEXT"; |
81 | 0 | case FileFormat::JNI: |
82 | 0 | return "JNI"; |
83 | 0 | } |
84 | 0 | return "UNKNOWN"; |
85 | 0 | } |
86 | | |
87 | 0 | std::string push_down_agg_to_string(TPushAggOp::type op) { |
88 | 0 | switch (op) { |
89 | 0 | case TPushAggOp::NONE: |
90 | 0 | return "NONE"; |
91 | 0 | case TPushAggOp::COUNT: |
92 | 0 | return "COUNT"; |
93 | 0 | case TPushAggOp::MINMAX: |
94 | 0 | return "MINMAX"; |
95 | 0 | case TPushAggOp::MIX: |
96 | 0 | return "MIX"; |
97 | 0 | case TPushAggOp::COUNT_ON_INDEX: |
98 | 0 | return "COUNT_ON_INDEX"; |
99 | 0 | } |
100 | 0 | return "UNKNOWN"; |
101 | 0 | } |
102 | | |
103 | 0 | std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) { |
104 | 0 | if (task == nullptr || task->data_file == nullptr) { |
105 | 0 | return "null"; |
106 | 0 | } |
107 | 0 | const auto& file = *task->data_file; |
108 | 0 | std::ostringstream out; |
109 | 0 | out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size |
110 | 0 | << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size |
111 | 0 | << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name |
112 | 0 | << ", file_cache_admission=" << file.file_cache_admission << "}"; |
113 | 0 | return out.str(); |
114 | 0 | } |
115 | | |
116 | 0 | std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) { |
117 | 0 | std::ostringstream out; |
118 | 0 | out << "{"; |
119 | 0 | size_t idx = 0; |
120 | 0 | for (const auto& [key, _] : partition_values) { |
121 | 0 | if (idx++ > 0) { |
122 | 0 | out << ", "; |
123 | 0 | } |
124 | 0 | out << key; |
125 | 0 | } |
126 | 0 | out << "}"; |
127 | 0 | return out.str(); |
128 | 0 | } |
129 | | |
130 | 0 | const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) { |
131 | 0 | if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) { |
132 | 0 | return nullptr; |
133 | 0 | } |
134 | 0 | return field_ptr.field_ptr.get(); |
135 | 0 | } |
136 | | |
137 | 0 | bool external_field_matches_name(const schema::external::TField& field, const std::string& name) { |
138 | 0 | if (field.__isset.name && to_lower(field.name) == to_lower(name)) { |
139 | 0 | return true; |
140 | 0 | } |
141 | 0 | return field.__isset.name_mapping && |
142 | 0 | std::ranges::any_of(field.name_mapping, [&](const std::string& alias) { |
143 | 0 | return to_lower(alias) == to_lower(name); |
144 | 0 | }); |
145 | 0 | } |
146 | | |
147 | | DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type, |
148 | 0 | const std::string& field_name) { |
149 | 0 | for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) { |
150 | 0 | if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) { |
151 | 0 | return struct_type.get_element(field_idx); |
152 | 0 | } |
153 | 0 | } |
154 | 0 | return nullptr; |
155 | 0 | } |
156 | | |
157 | | ColumnDefinition build_schema_column_from_external_field(const schema::external::TField& field, |
158 | 0 | DataTypePtr type) { |
159 | 0 | ColumnDefinition column { |
160 | 0 | .identifier = field.__isset.id ? Field::create_field<TYPE_INT>(field.id) : Field {}, |
161 | 0 | .name = field.__isset.name ? field.name : "", |
162 | 0 | .name_mapping = |
163 | 0 | field.__isset.name_mapping ? field.name_mapping : std::vector<std::string> {}, |
164 | 0 | .type = std::move(type), |
165 | 0 | .children = {}, |
166 | 0 | .default_expr = nullptr, |
167 | 0 | .is_partition_key = false, |
168 | 0 | }; |
169 | 0 | if (column.type == nullptr || !field.__isset.nestedField) { |
170 | 0 | return column; |
171 | 0 | } |
172 | | |
173 | 0 | const auto nested_type = remove_nullable(column.type); |
174 | 0 | switch (nested_type->get_primitive_type()) { |
175 | 0 | case TYPE_STRUCT: { |
176 | 0 | if (!field.nestedField.__isset.struct_field || |
177 | 0 | !field.nestedField.struct_field.__isset.fields) { |
178 | 0 | return column; |
179 | 0 | } |
180 | 0 | const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type); |
181 | 0 | for (const auto& child_ptr : field.nestedField.struct_field.fields) { |
182 | 0 | const auto* child_field = get_field_ptr(child_ptr); |
183 | 0 | if (child_field == nullptr || !child_field->__isset.name) { |
184 | 0 | continue; |
185 | 0 | } |
186 | 0 | auto child_type = find_struct_child_type_by_name(struct_type, child_field->name); |
187 | 0 | if (child_type == nullptr) { |
188 | 0 | continue; |
189 | 0 | } |
190 | 0 | column.children.push_back( |
191 | 0 | build_schema_column_from_external_field(*child_field, child_type)); |
192 | 0 | } |
193 | 0 | break; |
194 | 0 | } |
195 | 0 | case TYPE_ARRAY: { |
196 | 0 | if (!field.nestedField.__isset.array_field || |
197 | 0 | !field.nestedField.array_field.__isset.item_field) { |
198 | 0 | return column; |
199 | 0 | } |
200 | 0 | const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field); |
201 | 0 | if (item_field == nullptr) { |
202 | 0 | return column; |
203 | 0 | } |
204 | 0 | const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type); |
205 | 0 | auto child = |
206 | 0 | build_schema_column_from_external_field(*item_field, array_type.get_nested_type()); |
207 | 0 | child.name = "element"; |
208 | 0 | if (child.has_identifier_name()) { |
209 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
210 | 0 | } |
211 | 0 | column.children.push_back(std::move(child)); |
212 | 0 | break; |
213 | 0 | } |
214 | 0 | case TYPE_MAP: { |
215 | 0 | if (!field.nestedField.__isset.map_field || |
216 | 0 | !field.nestedField.map_field.__isset.key_field || |
217 | 0 | !field.nestedField.map_field.__isset.value_field) { |
218 | 0 | return column; |
219 | 0 | } |
220 | 0 | const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type); |
221 | 0 | const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field); |
222 | 0 | if (key_field != nullptr) { |
223 | 0 | auto child = |
224 | 0 | build_schema_column_from_external_field(*key_field, map_type.get_key_type()); |
225 | 0 | child.name = "key"; |
226 | 0 | if (child.has_identifier_name()) { |
227 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
228 | 0 | } |
229 | 0 | column.children.push_back(std::move(child)); |
230 | 0 | } |
231 | 0 | const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field); |
232 | 0 | if (value_field != nullptr) { |
233 | 0 | auto child = build_schema_column_from_external_field(*value_field, |
234 | 0 | map_type.get_value_type()); |
235 | 0 | child.name = "value"; |
236 | 0 | if (child.has_identifier_name()) { |
237 | 0 | child.identifier = Field::create_field<TYPE_STRING>(child.name); |
238 | 0 | } |
239 | 0 | column.children.push_back(std::move(child)); |
240 | 0 | } |
241 | 0 | break; |
242 | 0 | } |
243 | 0 | default: |
244 | 0 | break; |
245 | 0 | } |
246 | 0 | return column; |
247 | 0 | } |
248 | | |
249 | | const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params, |
250 | 6 | const ColumnDefinition& column) { |
251 | 6 | if (params == nullptr || !params->__isset.history_schema_info || |
252 | 6 | params->history_schema_info.empty()) { |
253 | 6 | return nullptr; |
254 | 6 | } |
255 | 0 | const auto* schema = ¶ms->history_schema_info.front(); |
256 | 0 | if (params->__isset.current_schema_id) { |
257 | 0 | for (const auto& candidate_schema : params->history_schema_info) { |
258 | 0 | if (candidate_schema.__isset.schema_id && |
259 | 0 | candidate_schema.schema_id == params->current_schema_id) { |
260 | 0 | schema = &candidate_schema; |
261 | 0 | break; |
262 | 0 | } |
263 | 0 | } |
264 | 0 | } |
265 | 0 | if (!schema->__isset.root_field || !schema->root_field.__isset.fields) { |
266 | 0 | return nullptr; |
267 | 0 | } |
268 | 0 | for (const auto& field_ptr : schema->root_field.fields) { |
269 | 0 | const auto* field = get_field_ptr(field_ptr); |
270 | 0 | if (field == nullptr) { |
271 | 0 | continue; |
272 | 0 | } |
273 | 0 | if (external_field_matches_name(*field, column.name)) { |
274 | 0 | return field; |
275 | 0 | } |
276 | 0 | } |
277 | 0 | return nullptr; |
278 | 0 | } |
279 | | |
280 | 0 | std::string expr_context_debug_string(const VExprContextSPtr& context) { |
281 | 0 | if (context == nullptr) { |
282 | 0 | return "null"; |
283 | 0 | } |
284 | 0 | const auto root = context->root(); |
285 | 0 | if (root == nullptr) { |
286 | 0 | return "VExprContext{root=null}"; |
287 | 0 | } |
288 | 0 | std::ostringstream out; |
289 | 0 | out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string() |
290 | 0 | << "}"; |
291 | 0 | return out.str(); |
292 | 0 | } |
293 | | |
294 | 0 | std::string table_filter_debug_string(const TableFilter& filter) { |
295 | 0 | std::ostringstream out; |
296 | 0 | out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct) |
297 | 0 | << ", global_indices=" |
298 | 0 | << join_table_reader_debug_strings( |
299 | 0 | filter.global_indices, |
300 | 0 | [](GlobalIndex global_index) { return std::to_string(global_index.value()); }) |
301 | 0 | << "}"; |
302 | 0 | return out.str(); |
303 | 0 | } |
304 | | |
305 | 0 | std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) { |
306 | 0 | std::ostringstream out; |
307 | 0 | out << "{"; |
308 | 0 | size_t idx = 0; |
309 | 0 | for (const auto& [global_index, column_predicates] : predicates) { |
310 | 0 | if (idx++ > 0) { |
311 | 0 | out << ", "; |
312 | 0 | } |
313 | 0 | out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}"; |
314 | 0 | } |
315 | 0 | out << "}"; |
316 | 0 | return out.str(); |
317 | 0 | } |
318 | | |
319 | 3 | bool contains_runtime_filter(const VExprContextSPtrs& conjuncts) { |
320 | 3 | return std::ranges::any_of(conjuncts, [](const auto& conjunct) { |
321 | 3 | return conjunct != nullptr && conjunct->root() != nullptr && |
322 | 3 | conjunct->root()->is_rf_wrapper(); |
323 | 3 | }); |
324 | 3 | } |
325 | | |
326 | 61 | void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) { |
327 | 61 | if (expr == nullptr) { |
328 | 0 | return; |
329 | 0 | } |
330 | 61 | if (expr->is_rf_wrapper()) { |
331 | | // RuntimeFilterExpr wraps a real predicate expression but its own thrift node can still |
332 | | // look like SLOT_REF. Collect indices from the wrapped predicate; do not cast the wrapper |
333 | | // itself to VSlotRef. |
334 | 2 | collect_global_indices(expr->get_impl(), global_indices); |
335 | 2 | return; |
336 | 2 | } |
337 | 59 | if (expr->is_slot_ref()) { |
338 | 20 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get()); |
339 | 20 | DORIS_CHECK(slot_ref->column_id() >= 0); |
340 | 20 | global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id()))); |
341 | 20 | } |
342 | 59 | for (const auto& child : expr->children()) { |
343 | 40 | collect_global_indices(child, global_indices); |
344 | 40 | } |
345 | 59 | } |
346 | | |
347 | | Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state, |
348 | 19 | std::vector<TableFilter>* table_filters) { |
349 | 19 | if (conjunct == nullptr) { |
350 | 0 | return Status::OK(); |
351 | 0 | } |
352 | 19 | std::set<GlobalIndex> global_indices; |
353 | 19 | collect_global_indices(conjunct->root(), &global_indices); |
354 | 19 | if (!global_indices.empty()) { |
355 | 19 | TableFilter table_filter; |
356 | 19 | VExprSPtr filter_root; |
357 | 19 | RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root)); |
358 | 19 | table_filter.conjunct = VExprContext::create_shared(std::move(filter_root)); |
359 | 20 | for (const auto global_index : global_indices) { |
360 | 20 | table_filter.global_indices.push_back(global_index); |
361 | 20 | } |
362 | 19 | table_filters->push_back(std::move(table_filter)); |
363 | 19 | } |
364 | 19 | return Status::OK(); |
365 | 19 | } |
366 | | |
367 | | Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format, |
368 | 4 | DeleteRows* delete_rows) { |
369 | 4 | DORIS_CHECK(buf != nullptr); |
370 | 4 | DORIS_CHECK(delete_rows != nullptr); |
371 | 4 | DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON || |
372 | 4 | format == DeleteFileDesc::Format::ICEBERG); |
373 | | |
374 | 4 | const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0; |
375 | 4 | if (buffer_size < 8 + checksum_size) [[unlikely]] { |
376 | 0 | return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size); |
377 | 0 | } |
378 | | |
379 | 4 | auto total_length = BigEndian::Load32(buf); |
380 | 4 | if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] { |
381 | 0 | return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}", |
382 | 0 | total_length + 4 + checksum_size, buffer_size); |
383 | 0 | } |
384 | | |
385 | 4 | const char* bitmap_buf = buf + 8; |
386 | 4 | const size_t bitmap_size = buffer_size - 8 - checksum_size; |
387 | 4 | if (format == DeleteFileDesc::Format::PAIMON) { |
388 | | // Paimon BitmapDeletionVector stores: |
389 | | // [4-byte big-endian length][4-byte magic 0x5E43F2D0][32-bit roaring bitmap] |
390 | | // The length covers magic + bitmap, and does not include the leading length field. |
391 | 1 | constexpr static char PAIMON_BITMAP_MAGIC[] = {'\x5E', '\x43', '\xF2', '\xD0'}; |
392 | 1 | if (memcmp(buf + sizeof(total_length), PAIMON_BITMAP_MAGIC, 4) != 0) [[unlikely]] { |
393 | 0 | return Status::DataQualityError( |
394 | 0 | "Paimon deletion vector magic number mismatch, expected: {}, actual: {}", |
395 | 0 | BigEndian::Load32(PAIMON_BITMAP_MAGIC), |
396 | 0 | BigEndian::Load32(buf + sizeof(total_length))); |
397 | 0 | } |
398 | | |
399 | 1 | roaring::Roaring bitmap; |
400 | 1 | try { |
401 | 1 | bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size); |
402 | 1 | } catch (const std::runtime_error& e) { |
403 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
404 | 0 | } |
405 | | |
406 | 1 | delete_rows->reserve(bitmap.cardinality()); |
407 | 3 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
408 | 2 | delete_rows->push_back(*it); |
409 | 2 | } |
410 | 1 | return Status::OK(); |
411 | 1 | } |
412 | | |
413 | 3 | constexpr static char ICEBERG_DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
414 | 3 | if (memcmp(buf + sizeof(total_length), ICEBERG_DV_MAGIC, 4) != 0) [[unlikely]] { |
415 | 0 | return Status::DataQualityError( |
416 | 0 | "Iceberg deletion vector magic number mismatch, expected: {}, actual: {}", |
417 | 0 | BigEndian::Load32(ICEBERG_DV_MAGIC), BigEndian::Load32(buf + sizeof(total_length))); |
418 | 0 | } |
419 | | |
420 | 3 | roaring::Roaring64Map bitmap; |
421 | 3 | try { |
422 | 3 | bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size); |
423 | 3 | } catch (const std::runtime_error& e) { |
424 | 0 | return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
425 | 0 | } |
426 | | |
427 | 3 | delete_rows->reserve(bitmap.cardinality()); |
428 | 7 | for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
429 | 4 | delete_rows->push_back(cast_set<int64_t>(*it)); |
430 | 4 | } |
431 | 3 | return Status::OK(); |
432 | 3 | } |
433 | | |
434 | | } // namespace |
435 | | |
436 | | std::shared_ptr<io::FileSystemProperties> create_system_properties( |
437 | 70 | const TFileScanRangeParams* scan_params) { |
438 | 70 | auto system_properties = std::make_shared<io::FileSystemProperties>(); |
439 | 70 | if (scan_params == nullptr || !scan_params->__isset.file_type) { |
440 | 56 | system_properties->system_type = TFileType::FILE_LOCAL; |
441 | 56 | return system_properties; |
442 | 56 | } |
443 | 14 | system_properties->system_type = scan_params->file_type; |
444 | 14 | system_properties->properties = scan_params->properties; |
445 | 14 | system_properties->hdfs_params = scan_params->hdfs_params; |
446 | 14 | if (scan_params->__isset.broker_addresses) { |
447 | 0 | system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(), |
448 | 0 | scan_params->broker_addresses.end()); |
449 | 0 | } |
450 | 14 | return system_properties; |
451 | 70 | } |
452 | | |
453 | 0 | std::string TableReader::debug_string() const { |
454 | 0 | std::ostringstream out; |
455 | 0 | out << "TableReader{format=" << file_format_to_string(_format) |
456 | 0 | << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type) |
457 | 0 | << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried |
458 | 0 | << ", has_current_reader=" << (_data_reader.reader != nullptr) |
459 | 0 | << ", has_current_task=" << (_current_task != nullptr) |
460 | 0 | << ", current_file=" << current_file_debug_string(_current_task) |
461 | 0 | << ", has_delete_rows=" << (_delete_rows != nullptr) |
462 | 0 | << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size()) |
463 | 0 | << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type=" |
464 | 0 | << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL) |
465 | 0 | : static_cast<int>(_system_properties->system_type)) |
466 | 0 | << ", has_scan_params=" << (_scan_params != nullptr) |
467 | 0 | << ", has_io_ctx=" << (_io_ctx != nullptr) |
468 | 0 | << ", has_runtime_state=" << (_runtime_state != nullptr) |
469 | 0 | << ", has_scanner_profile=" << (_scanner_profile != nullptr) |
470 | 0 | << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns=" |
471 | 0 | << join_table_reader_debug_strings( |
472 | 0 | _projected_columns, |
473 | 0 | [](const ColumnDefinition& column) { return column.debug_string(); }) |
474 | 0 | << ", partition_values=" << partition_values_debug_string(_partition_values) |
475 | 0 | << ", table_filters=" |
476 | 0 | << join_table_reader_debug_strings( |
477 | 0 | _table_filters, |
478 | 0 | [](const TableFilter& filter) { return table_filter_debug_string(filter); }) |
479 | 0 | << ", table_column_predicates=" |
480 | 0 | << table_column_predicates_debug_string(_table_column_predicates) |
481 | 0 | << ", conjunct_count=" << _conjuncts.size() << ", conjuncts=" |
482 | 0 | << join_table_reader_debug_strings(_conjuncts, |
483 | 0 | [](const VExprContextSPtr& conjunct) { |
484 | 0 | return expr_context_debug_string(conjunct); |
485 | 0 | }) |
486 | 0 | << ", file_schema=" |
487 | 0 | << join_table_reader_debug_strings( |
488 | 0 | _data_reader.file_schema, |
489 | 0 | [](const ColumnDefinition& field) { return field.debug_string(); }) |
490 | 0 | << ", file_block_layout=" |
491 | 0 | << join_table_reader_debug_strings( |
492 | 0 | _data_reader.file_block_layout, |
493 | 0 | [](const FileBlockColumn& column) { |
494 | 0 | std::ostringstream column_out; |
495 | 0 | column_out << "FileBlockColumn{file_column_id=" << column.file_column_id |
496 | 0 | << ", name=" << column.name << ", type=" |
497 | 0 | << (column.type == nullptr ? "null" : column.type->get_name()) |
498 | 0 | << "}"; |
499 | 0 | return column_out.str(); |
500 | 0 | }) |
501 | 0 | << ", block_template_columns=" << _data_reader.block_template.columns() |
502 | 0 | << ", column_mapper=" |
503 | 0 | << (_data_reader.column_mapper == nullptr ? "null" |
504 | 0 | : _data_reader.column_mapper->debug_string()) |
505 | 0 | << "}"; |
506 | 0 | return out.str(); |
507 | 0 | } |
508 | | |
509 | | Status TableReader::annotate_projected_column(const TFileScanSlotInfo& slot_info, |
510 | | ProjectedColumnBuildContext* context, |
511 | 6 | ColumnDefinition* column) const { |
512 | 6 | (void)slot_info; |
513 | 6 | DORIS_CHECK(context != nullptr); |
514 | 6 | DORIS_CHECK(column != nullptr); |
515 | 6 | context->schema_column.reset(); |
516 | 6 | const auto* schema_field = find_external_root_field(context->scan_params, *column); |
517 | 6 | if (schema_field == nullptr) { |
518 | 6 | return Status::OK(); |
519 | 6 | } |
520 | 0 | context->schema_column = build_schema_column_from_external_field(*schema_field, column->type); |
521 | 0 | column->identifier = context->schema_column->identifier; |
522 | 0 | column->name_mapping = context->schema_column->name_mapping; |
523 | 0 | return Status::OK(); |
524 | 6 | } |
525 | | |
526 | 70 | Status TableReader::init(TableReadOptions&& options) { |
527 | 70 | _scan_params = options.scan_params; |
528 | 70 | _format = options.format; |
529 | 70 | _io_ctx = options.io_ctx; |
530 | 70 | _runtime_state = options.runtime_state; |
531 | 70 | _scanner_profile = options.scanner_profile; |
532 | 70 | _file_slot_descs = options.file_slot_descs; |
533 | 70 | _push_down_agg_type = options.push_down_agg_type; |
534 | 70 | _condition_cache_digest = options.condition_cache_digest; |
535 | 70 | _projected_columns = std::move(options.projected_columns); |
536 | 70 | _system_properties = create_system_properties(_scan_params); |
537 | 70 | _mapper_options.mode = TableColumnMappingMode::BY_NAME; |
538 | 70 | _conjuncts = std::move(options.conjuncts); |
539 | 70 | _table_column_predicates = std::move(options.column_predicates); |
540 | | |
541 | 70 | if (_scanner_profile != nullptr) { |
542 | 17 | static const char* table_profile = "TableReader"; |
543 | 17 | ADD_TIMER_WITH_LEVEL(_scanner_profile, table_profile, 1); |
544 | 17 | _profile.num_delete_files = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteFiles", |
545 | 17 | TUnit::UNIT, table_profile, 1); |
546 | 17 | _profile.num_delete_rows = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteRows", |
547 | 17 | TUnit::UNIT, table_profile, 1); |
548 | 17 | _profile.parse_delete_file_time = ADD_CHILD_TIMER_WITH_LEVEL( |
549 | 17 | _scanner_profile, "ParseDeleteFileTime", table_profile, 1); |
550 | 17 | _profile.exec_timer = |
551 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "GetBlockTime", table_profile, 1); |
552 | 17 | _profile.prepare_split_timer = |
553 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PrepareSplitTime", table_profile, 1); |
554 | 17 | _profile.finalize_timer = |
555 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "FinalizeBlockTime", table_profile, 1); |
556 | 17 | _profile.create_reader_timer = |
557 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "CreateReaderTime", table_profile, 1); |
558 | 17 | _profile.pushdown_agg_timer = |
559 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PushDownAggTime", table_profile, 1); |
560 | 17 | _profile.open_reader_timer = |
561 | 17 | ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "OpenReaderTime", table_profile, 1); |
562 | 17 | } |
563 | 70 | return Status::OK(); |
564 | 70 | } |
565 | | |
566 | 64 | Status TableReader::_build_table_filters_from_conjuncts() { |
567 | 64 | _table_filters.clear(); |
568 | 64 | for (const auto& conjunct : _conjuncts) { |
569 | 19 | RETURN_IF_ERROR( |
570 | 19 | build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters)); |
571 | 19 | } |
572 | 64 | return Status::OK(); |
573 | 64 | } |
574 | | |
575 | 63 | Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) { |
576 | 63 | RowDescriptor row_desc; |
577 | 63 | for (const auto& conjunct : file_request.conjuncts) { |
578 | 14 | RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc)); |
579 | 14 | RETURN_IF_ERROR(conjunct->open(_runtime_state)); |
580 | 14 | } |
581 | 63 | for (const auto& delete_conjunct : file_request.delete_conjuncts) { |
582 | 12 | RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc)); |
583 | 12 | RETURN_IF_ERROR(delete_conjunct->open(_runtime_state)); |
584 | 12 | } |
585 | 63 | return Status::OK(); |
586 | 63 | } |
587 | | |
588 | 63 | bool TableReader::_should_enable_condition_cache(const FileScanRequest& file_request) const { |
589 | 63 | if (_condition_cache_digest == 0 || _push_down_agg_type == TPushAggOp::type::COUNT || |
590 | 63 | _current_file_description == std::nullopt || _data_reader.reader == nullptr) { |
591 | 58 | return false; |
592 | 58 | } |
593 | | // Condition cache is populated by file readers after evaluating file-local row-level |
594 | | // conjuncts. ColumnPredicate-only scans can prune row groups/pages, but they do not produce a |
595 | | // per-row survivor bitmap that can safely populate the cache. |
596 | 5 | if (file_request.conjuncts.empty()) { |
597 | 1 | return false; |
598 | 1 | } |
599 | | // Delete files/deletion vectors are table-format state. They may change independently of the |
600 | | // data file path/mtime/size used by the external cache key, so caching their result can become |
601 | | // stale. Keep delete filtering enabled, but do not read or write condition cache. |
602 | 4 | if (_delete_rows != nullptr || !file_request.delete_conjuncts.empty()) { |
603 | 1 | return false; |
604 | 1 | } |
605 | | // Runtime filters can arrive late and their payload is not guaranteed to be represented by the |
606 | | // scan-local digest. Without a read-only mode, a MISS could insert a bitmap for P AND RF under |
607 | | // the digest for only P. This mirrors the old FileScanner guard. |
608 | 3 | return !contains_runtime_filter(file_request.conjuncts); |
609 | 4 | } |
610 | | |
611 | 63 | Status TableReader::_init_reader_condition_cache(const FileScanRequest& file_request) { |
612 | 63 | _condition_cache = nullptr; |
613 | 63 | _condition_cache_ctx = nullptr; |
614 | 63 | if (!_should_enable_condition_cache(file_request)) { |
615 | 61 | return Status::OK(); |
616 | 61 | } |
617 | | |
618 | 2 | auto* cache = segment_v2::ConditionCache::instance(); |
619 | 2 | if (cache == nullptr) { |
620 | 0 | return Status::OK(); |
621 | 0 | } |
622 | 2 | const auto& file = *_current_file_description; |
623 | 2 | _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey( |
624 | 2 | file.path, file.mtime, file.file_size, _condition_cache_digest, file.range_start_offset, |
625 | 2 | file.range_size); |
626 | | |
627 | 2 | segment_v2::ConditionCacheHandle handle; |
628 | 2 | const bool condition_cache_hit = cache->lookup(_condition_cache_key, &handle); |
629 | 2 | if (condition_cache_hit) { |
630 | 0 | _condition_cache = handle.get_filter_result(); |
631 | 0 | ++_condition_cache_hit_count; |
632 | 2 | } else { |
633 | 2 | const int64_t total_rows = _data_reader.reader->get_total_rows(); |
634 | 2 | if (total_rows <= 0) { |
635 | 0 | return Status::OK(); |
636 | 0 | } |
637 | | // Add one guard granule for split ranges that start in the middle of a granule. A guard |
638 | | // false bit beyond the real range never overlaps real rows, but avoids boundary overflow |
639 | | // when a reader marks the last partial granule. |
640 | 2 | const size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) / |
641 | 2 | ConditionCacheContext::GRANULE_SIZE; |
642 | 2 | _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false); |
643 | 2 | } |
644 | | |
645 | 2 | if (_condition_cache != nullptr) { |
646 | 2 | _condition_cache_ctx = std::make_shared<ConditionCacheContext>(); |
647 | 2 | _condition_cache_ctx->is_hit = condition_cache_hit; |
648 | 2 | _condition_cache_ctx->filter_result = _condition_cache; |
649 | 2 | _data_reader.reader->set_condition_cache_context(_condition_cache_ctx); |
650 | 2 | } |
651 | 2 | return Status::OK(); |
652 | 2 | } |
653 | | |
654 | 64 | void TableReader::_finalize_reader_condition_cache() { |
655 | 64 | if (_condition_cache_ctx == nullptr || _condition_cache_ctx->is_hit) { |
656 | 62 | _condition_cache = nullptr; |
657 | 62 | _condition_cache_ctx = nullptr; |
658 | 62 | return; |
659 | 62 | } |
660 | | // LIMIT or scanner cancellation may close a reader before all selected row ranges are visited. |
661 | | // Unvisited granules remain false in a MISS bitmap, so inserting a partial bitmap would make a |
662 | | // later HIT skip valid rows. Only publish cache entries after the physical reader reaches EOF. |
663 | 2 | if (!_current_reader_reached_eof) { |
664 | 1 | _condition_cache = nullptr; |
665 | 1 | _condition_cache_ctx = nullptr; |
666 | 1 | return; |
667 | 1 | } |
668 | 1 | segment_v2::ConditionCache::instance()->insert(_condition_cache_key, |
669 | 1 | std::move(_condition_cache)); |
670 | 1 | _condition_cache = nullptr; |
671 | 1 | _condition_cache_ctx = nullptr; |
672 | 1 | } |
673 | | |
674 | 75 | Status TableReader::create_next_reader(bool* eos) { |
675 | 75 | SCOPED_TIMER(_profile.create_reader_timer); |
676 | 75 | DCHECK(_data_reader.reader == nullptr); |
677 | 75 | if (_current_task == nullptr) { |
678 | 11 | *eos = true; |
679 | 11 | return Status::OK(); |
680 | 11 | } |
681 | | |
682 | 64 | RETURN_IF_ERROR(create_file_reader(&_data_reader.reader)); |
683 | 64 | DORIS_CHECK(_data_reader.reader != nullptr); |
684 | 64 | RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state)); |
685 | 64 | RETURN_IF_ERROR(open_reader()); |
686 | 64 | if (_data_reader.reader == nullptr) { |
687 | 1 | *eos = _current_task == nullptr; |
688 | 1 | return Status::OK(); |
689 | 1 | } |
690 | 63 | *eos = false; |
691 | 63 | return Status::OK(); |
692 | 64 | } |
693 | | |
694 | 57 | Status TableReader::create_file_reader(std::unique_ptr<FileReader>* reader) { |
695 | 57 | DORIS_CHECK(reader != nullptr); |
696 | 57 | if (_format == FileFormat::PARQUET) { |
697 | 57 | const bool enable_mapping_timestamp_tz = |
698 | 57 | _scan_params != nullptr && _scan_params->__isset.enable_mapping_timestamp_tz && |
699 | 57 | _scan_params->enable_mapping_timestamp_tz; |
700 | 57 | *reader = std::make_unique<format::parquet::ParquetReader>( |
701 | 57 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
702 | 57 | _global_rowid_context, enable_mapping_timestamp_tz); |
703 | 57 | return Status::OK(); |
704 | 57 | } |
705 | 0 | if (_format == FileFormat::CSV) { |
706 | 0 | if (_file_slot_descs == nullptr) { |
707 | 0 | return Status::InvalidArgument("CSV reader requires file slot descriptors"); |
708 | 0 | } |
709 | | // CSV has no embedded schema. TableReader owns table-level mapping, while CsvReader needs |
710 | | // only the physical file slots plus scan text parameters to build a file-local schema. |
711 | | // Non-file columns such as partitions/defaults/virtual row ids are intentionally excluded |
712 | | // from `_file_slot_descs` and are materialized during finalize_chunk(). |
713 | 0 | *reader = std::make_unique<format::csv::CsvReader>( |
714 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
715 | 0 | _scan_params, *_file_slot_descs, _current_range_compress_type, |
716 | 0 | _current_range_load_id); |
717 | 0 | return Status::OK(); |
718 | 0 | } |
719 | 0 | if (_format == FileFormat::TEXT) { |
720 | 0 | if (_file_slot_descs == nullptr) { |
721 | 0 | return Status::InvalidArgument("Text reader requires file slot descriptors"); |
722 | 0 | } |
723 | | // Text files have no embedded schema. As with CSV, TableReader handles table-level mapping |
724 | | // and only passes physical file slots to the v2 TextReader. |
725 | 0 | *reader = std::make_unique<format::text::TextReader>( |
726 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
727 | 0 | _scan_params, *_file_slot_descs, _current_range_compress_type, |
728 | 0 | _current_range_load_id); |
729 | 0 | return Status::OK(); |
730 | 0 | } |
731 | 0 | if (_format == FileFormat::JSON) { |
732 | 0 | if (_file_slot_descs == nullptr) { |
733 | 0 | return Status::InvalidArgument("JSON reader requires file slot descriptors"); |
734 | 0 | } |
735 | 0 | *reader = std::make_unique<format::json::JsonReader>( |
736 | 0 | _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, |
737 | 0 | _scan_params, _current_file_range_desc, *_file_slot_descs, |
738 | 0 | _current_range_compress_type, _current_range_load_id); |
739 | 0 | return Status::OK(); |
740 | 0 | } |
741 | 0 | return Status::NotSupported("TableReader does not support file format {}", |
742 | 0 | file_format_to_string(_format)); |
743 | 0 | } |
744 | | |
745 | 76 | std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) { |
746 | 76 | auto file_description = std::make_unique<io::FileDescription>(); |
747 | 76 | file_description->path = range.path; |
748 | 76 | file_description->file_size = range.__isset.file_size ? range.file_size : -1; |
749 | 76 | file_description->mtime = range.__isset.modification_time ? range.modification_time : 0; |
750 | 76 | file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0; |
751 | 76 | file_description->range_size = range.__isset.size ? range.size : -1; |
752 | 76 | if (range.__isset.fs_name) { |
753 | 0 | file_description->fs_name = range.fs_name; |
754 | 0 | } |
755 | 76 | if (range.__isset.file_cache_admission) { |
756 | 0 | file_description->file_cache_admission = range.file_cache_admission; |
757 | 0 | } |
758 | 76 | return file_description; |
759 | 76 | } |
760 | | |
761 | 76 | Status TableReader::prepare_split(const SplitReadOptions& options) { |
762 | 76 | SCOPED_TIMER(_profile.prepare_split_timer); |
763 | 76 | _partition_values = std::move(options.partition_values); |
764 | 76 | _current_task = std::make_unique<ScanTask>(); |
765 | 76 | _current_task->data_file = create_file_description(options.current_range); |
766 | 76 | _current_file_description = *_current_task->data_file; |
767 | 76 | _current_file_range_desc = options.current_range; |
768 | 76 | _current_range_compress_type = options.current_range.__isset.compress_type |
769 | 76 | ? options.current_range.compress_type |
770 | 76 | : TFileCompressType::UNKNOWN; |
771 | 76 | _current_range_load_id = options.current_range.__isset.load_id |
772 | 76 | ? std::make_optional(options.current_range.load_id) |
773 | 76 | : std::nullopt; |
774 | 76 | _global_rowid_context = options.global_rowid_context; |
775 | 76 | _delete_rows = nullptr; |
776 | 76 | _aggregate_pushdown_tried = false; |
777 | 76 | _remaining_table_level_count = -1; |
778 | 76 | _current_reader_reached_eof = false; |
779 | 76 | if (_push_down_agg_type == TPushAggOp::type::COUNT && |
780 | 76 | options.current_range.__isset.table_format_params && |
781 | 76 | options.current_range.table_format_params.__isset.table_level_row_count) { |
782 | 6 | DORIS_CHECK(options.current_range.table_format_params.table_level_row_count >= -1); |
783 | 6 | _remaining_table_level_count = |
784 | 6 | options.current_range.table_format_params.table_level_row_count; |
785 | 6 | } |
786 | 76 | if (_is_table_level_count_active()) { |
787 | 2 | return Status::OK(); |
788 | 2 | } |
789 | 74 | return _parse_delete_predicates(options); |
790 | 76 | } |
791 | | |
792 | 74 | Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) { |
793 | 74 | DeleteFileDesc desc {.fs_name = options.current_range.fs_name}; |
794 | 74 | bool has_delete_file = false; |
795 | 74 | RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc, |
796 | 74 | &has_delete_file)); |
797 | 74 | if (has_delete_file) { |
798 | 4 | DORIS_CHECK(options.cache != nullptr); |
799 | 4 | Status create_status = Status::OK(); |
800 | | |
801 | 4 | _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* { |
802 | 4 | auto* delete_rows = new DeleteRows; |
803 | | |
804 | 4 | DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc, |
805 | 4 | _io_ctx.get()); |
806 | 4 | create_status = dv_reader.open(); |
807 | 4 | if (!create_status.ok()) [[unlikely]] { |
808 | 0 | return nullptr; |
809 | 0 | } |
810 | | |
811 | 4 | size_t bytes_read = desc.size; |
812 | 4 | std::vector<char> buffer(bytes_read); |
813 | 4 | create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read}); |
814 | 4 | if (!create_status.ok()) [[unlikely]] { |
815 | 0 | return nullptr; |
816 | 0 | } |
817 | | |
818 | 4 | const char* buf = buffer.data(); |
819 | 4 | SCOPED_TIMER(_profile.parse_delete_file_time); |
820 | 4 | create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows); |
821 | 4 | if (!create_status.ok()) [[unlikely]] { |
822 | 0 | return nullptr; |
823 | 0 | } |
824 | 4 | COUNTER_UPDATE(_profile.num_delete_rows, delete_rows->size()); |
825 | 4 | return delete_rows; |
826 | 4 | }); |
827 | 4 | RETURN_IF_ERROR(create_status); |
828 | 4 | } |
829 | | |
830 | 74 | return Status::OK(); |
831 | 74 | } |
832 | | } // namespace doris::format |