be/src/format_v2/file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/file_reader.h" |
19 | | |
20 | | #include <sstream> |
21 | | |
22 | | #include "format_v2/column_mapper.h" |
23 | | #include "io/fs/buffered_reader.h" |
24 | | #include "io/fs/tracing_file_reader.h" |
25 | | #include "runtime/runtime_state.h" |
26 | | |
27 | | namespace doris::format { |
28 | | namespace { |
29 | | |
30 | | std::unique_ptr<FileStructPredicateTarget> clone_struct_predicate_target( |
31 | 770 | const std::unique_ptr<FileStructPredicateTarget>& target) { |
32 | 770 | return target == nullptr ? nullptr : std::make_unique<FileStructPredicateTarget>(*target); |
33 | 770 | } |
34 | | |
35 | | template <typename T, typename Formatter> |
36 | 0 | std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) { |
37 | 0 | std::ostringstream out; |
38 | 0 | out << "["; |
39 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
40 | 0 | if (i > 0) { |
41 | 0 | out << ", "; |
42 | 0 | } |
43 | 0 | out << formatter(values[i]); |
44 | 0 | } |
45 | 0 | out << "]"; |
46 | 0 | return out.str(); |
47 | 0 | } Unexecuted instantiation: file_reader.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16LocalColumnIndexEZNKS0_15FileScanRequest12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: file_reader.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16LocalColumnIndexEZNKS0_15FileScanRequest12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: file_reader.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_25FileColumnPredicateFilterEZNKS0_15FileScanRequest12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ |
48 | | |
49 | 0 | std::string int_vector_debug_string(const std::vector<int32_t>& values) { |
50 | 0 | std::ostringstream out; |
51 | 0 | out << "["; |
52 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
53 | 0 | if (i > 0) { |
54 | 0 | out << ", "; |
55 | 0 | } |
56 | 0 | out << values[i]; |
57 | 0 | } |
58 | 0 | out << "]"; |
59 | 0 | return out.str(); |
60 | 0 | } |
61 | | |
62 | | void append_struct_predicate_path(const FileStructPredicateTarget* target, |
63 | 21.6k | std::vector<int32_t>* path) { |
64 | 21.6k | DORIS_CHECK(path != nullptr); |
65 | 22.4k | for (const auto* current = target; current != nullptr; current = current->child.get()) { |
66 | 743 | path->push_back(current->file_local_id); |
67 | 743 | } |
68 | 21.6k | } |
69 | | |
70 | 0 | std::string struct_predicate_target_debug_string(const FileStructPredicateTarget* target) { |
71 | 0 | if (target == nullptr) { |
72 | 0 | return "null"; |
73 | 0 | } |
74 | 0 | std::ostringstream out; |
75 | 0 | out << "{file_local_id=" << target->file_local_id |
76 | 0 | << ", file_child_name=" << target->file_child_name |
77 | 0 | << ", child=" << struct_predicate_target_debug_string(target->child.get()) << "}"; |
78 | 0 | return out.str(); |
79 | 0 | } |
80 | | |
81 | | bool struct_predicate_targets_equal(const FileStructPredicateTarget* lhs, |
82 | 37 | const FileStructPredicateTarget* rhs) { |
83 | 42 | while (lhs != nullptr && rhs != nullptr) { |
84 | 37 | if (lhs->file_local_id != rhs->file_local_id) { |
85 | 32 | return false; |
86 | 32 | } |
87 | 5 | lhs = lhs->child.get(); |
88 | 5 | rhs = rhs->child.get(); |
89 | 5 | } |
90 | 5 | return lhs == nullptr && rhs == nullptr; |
91 | 37 | } |
92 | | |
93 | | } // namespace |
94 | | |
95 | | FileStructPredicateTarget::FileStructPredicateTarget(const FileStructPredicateTarget& other) |
96 | 408 | : file_local_id(other.file_local_id), |
97 | 408 | file_child_name(other.file_child_name), |
98 | 408 | child(clone_struct_predicate_target(other.child)) {} |
99 | | |
100 | | FileStructPredicateTarget& FileStructPredicateTarget::operator=( |
101 | 0 | const FileStructPredicateTarget& other) { |
102 | 0 | if (this == &other) { |
103 | 0 | return *this; |
104 | 0 | } |
105 | 0 | file_local_id = other.file_local_id; |
106 | 0 | file_child_name = other.file_child_name; |
107 | 0 | child = clone_struct_predicate_target(other.child); |
108 | 0 | return *this; |
109 | 0 | } |
110 | | |
111 | | FileNestedPredicateTarget::FileNestedPredicateTarget(const FileNestedPredicateTarget& other) |
112 | 8 | : file_column_id(other.file_column_id), |
113 | 8 | struct_target(clone_struct_predicate_target(other.struct_target)) {} |
114 | | |
115 | | FileNestedPredicateTarget& FileNestedPredicateTarget::operator=( |
116 | 354 | const FileNestedPredicateTarget& other) { |
117 | 354 | if (this == &other) { |
118 | 0 | return *this; |
119 | 0 | } |
120 | 354 | file_column_id = other.file_column_id; |
121 | 354 | struct_target = clone_struct_predicate_target(other.struct_target); |
122 | 354 | return *this; |
123 | 354 | } |
124 | | |
125 | 27.3k | LocalColumnId FileColumnPredicateFilter::effective_file_column_id() const { |
126 | 27.3k | return target.is_valid() ? target.file_column_id : file_column_id; |
127 | 27.3k | } |
128 | | |
129 | 21.8k | std::vector<int32_t> FileColumnPredicateFilter::effective_file_child_id_path() const { |
130 | 21.8k | if (!target.is_valid()) { |
131 | 135 | return file_child_id_path; |
132 | 135 | } |
133 | 21.6k | std::vector<int32_t> path; |
134 | 21.6k | append_struct_predicate_path(target.struct_target.get(), &path); |
135 | 21.6k | return path; |
136 | 21.8k | } |
137 | | |
138 | 37 | bool FileColumnPredicateFilter::same_target_as(const FileColumnPredicateFilter& other) const { |
139 | 37 | if (target.is_valid() && other.target.is_valid()) { |
140 | 37 | return target.file_column_id == other.target.file_column_id && |
141 | 37 | struct_predicate_targets_equal(target.struct_target.get(), |
142 | 37 | other.target.struct_target.get()); |
143 | 37 | } |
144 | 0 | return effective_file_column_id() == other.effective_file_column_id() && |
145 | 0 | effective_file_child_id_path() == other.effective_file_child_id_path(); |
146 | 37 | } |
147 | | |
148 | 0 | std::string FileColumnPredicateFilter::debug_string() const { |
149 | 0 | std::ostringstream out; |
150 | 0 | out << "FileColumnPredicateFilter{target={file_column_id=" << effective_file_column_id() |
151 | 0 | << ", struct_target=" << struct_predicate_target_debug_string(target.struct_target.get()) |
152 | 0 | << "}, file_child_id_path=" << int_vector_debug_string(effective_file_child_id_path()) |
153 | 0 | << ", predicate_count=" << predicates.size() << "}"; |
154 | 0 | return out.str(); |
155 | 0 | } |
156 | | |
157 | 0 | std::string FileScanRequest::debug_string() const { |
158 | 0 | std::ostringstream out; |
159 | 0 | out << "FileScanRequest{predicate_columns=" |
160 | 0 | << join_debug_strings( |
161 | 0 | predicate_columns, |
162 | 0 | [](const LocalColumnIndex& projection) { return projection.debug_string(); }) |
163 | 0 | << ", non_predicate_columns=" |
164 | 0 | << join_debug_strings( |
165 | 0 | non_predicate_columns, |
166 | 0 | [](const LocalColumnIndex& projection) { return projection.debug_string(); }) |
167 | 0 | << ", local_positions={"; |
168 | 0 | size_t position_idx = 0; |
169 | 0 | for (const auto& [column_id, block_position] : local_positions) { |
170 | 0 | if (position_idx++ > 0) { |
171 | 0 | out << ", "; |
172 | 0 | } |
173 | 0 | out << column_id << ":" << block_position; |
174 | 0 | } |
175 | 0 | out << "}, conjunct_count=" << conjuncts.size() |
176 | 0 | << ", delete_conjunct_count=" << delete_conjuncts.size() << ", column_predicate_filters=" |
177 | 0 | << join_debug_strings( |
178 | 0 | column_predicate_filters, |
179 | 0 | [](const FileColumnPredicateFilter& filter) { return filter.debug_string(); }) |
180 | 0 | << "}"; |
181 | 0 | return out.str(); |
182 | 0 | } |
183 | | |
184 | 35.2k | Status FileReader::init(RuntimeState* state) { |
185 | 35.2k | _init_profile(); |
186 | 35.2k | SCOPED_RAW_TIMER(&_reader_statistics.file_reader_create_time); |
187 | 35.2k | ++_reader_statistics.open_file_num; |
188 | 35.2k | io::FileReaderOptions reader_options = |
189 | 35.2k | FileFactory::get_reader_options(state->query_options(), *_file_description); |
190 | 35.2k | _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
191 | 35.2k | _profile, *_system_properties, *_file_description, reader_options, |
192 | 35.2k | io::DelegateReader::AccessMode::RANDOM, _io_ctx)); |
193 | | // IOContext can be present without file_reader_stats in standalone tests or callers that only |
194 | | // need extra IO state. TracingFileReader dereferences the stats pointer on every read, so only |
195 | | // wrap the physical reader when stats collection is actually available. |
196 | 35.2k | _tracing_file_reader = _io_ctx && _io_ctx->file_reader_stats |
197 | 35.2k | ? std::make_shared<io::TracingFileReader>( |
198 | 35.2k | _file_reader, _io_ctx->file_reader_stats) |
199 | 35.2k | : _file_reader; |
200 | 35.2k | _eof = false; |
201 | 35.2k | return Status::OK(); |
202 | 35.2k | } |
203 | | |
204 | | std::unique_ptr<TableColumnMapper> FileReader::create_column_mapper( |
205 | 7 | TableColumnMapperOptions options) const { |
206 | 7 | return std::make_unique<TableColumnMapper>(std::move(options)); |
207 | 7 | } |
208 | | |
209 | | } // namespace doris::format |