be/src/storage/segment/segment.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/segment.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <gen_cpp/Descriptors_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | #include <gen_cpp/segment_v2.pb.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <cstring> |
28 | | #include <memory> |
29 | | #include <set> |
30 | | #include <sstream> |
31 | | #include <utility> |
32 | | |
33 | | #include "cloud/config.h" |
34 | | #include "common/config.h" |
35 | | #include "common/exception.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "core/column/column.h" |
39 | | #include "core/data_type/data_type.h" |
40 | | #include "core/data_type/data_type_factory.hpp" |
41 | | #include "core/data_type/data_type_nullable.h" |
42 | | #include "core/data_type/data_type_variant.h" |
43 | | #include "core/field.h" |
44 | | #include "core/string_ref.h" |
45 | | #include "cpp/sync_point.h" |
46 | | #include "exprs/expr_zonemap_filter.h" |
47 | | #include "exprs/vexpr_context.h" |
48 | | #include "io/cache/block_file_cache.h" |
49 | | #include "io/cache/block_file_cache_factory.h" |
50 | | #include "io/cache/cached_remote_file_reader.h" |
51 | | #include "io/fs/file_reader.h" |
52 | | #include "io/fs/file_system.h" |
53 | | #include "io/io_common.h" |
54 | | #include "runtime/exec_env.h" |
55 | | #include "runtime/query_context.h" |
56 | | #include "runtime/runtime_predicate.h" |
57 | | #include "runtime/runtime_state.h" |
58 | | #include "storage/index/index_file_reader.h" |
59 | | #include "storage/index/indexed_column_reader.h" |
60 | | #include "storage/index/primary_key_index.h" |
61 | | #include "storage/index/short_key_index.h" |
62 | | #include "storage/index/zone_map/zonemap_eval_context.h" |
63 | | #include "storage/iterator/vgeneric_iterators.h" |
64 | | #include "storage/iterators.h" |
65 | | #include "storage/key_coder.h" |
66 | | #include "storage/olap_common.h" |
67 | | #include "storage/predicate/block_column_predicate.h" |
68 | | #include "storage/predicate/column_predicate.h" |
69 | | #include "storage/rowset/rowset_reader_context.h" |
70 | | #include "storage/schema.h" |
71 | | #include "storage/segment/column_meta_accessor.h" |
72 | | #include "storage/segment/column_reader.h" |
73 | | #include "storage/segment/column_reader_cache.h" |
74 | | #include "storage/segment/empty_segment_iterator.h" |
75 | | #include "storage/segment/page_io.h" |
76 | | #include "storage/segment/page_pointer.h" |
77 | | #include "storage/segment/segment_iterator.h" |
78 | | #include "storage/segment/segment_writer.h" // k_segment_magic_length |
79 | | #include "storage/segment/stream_reader.h" |
80 | | #include "storage/segment/variant/variant_column_reader.h" |
81 | | #include "storage/tablet/tablet_schema.h" |
82 | | #include "storage/types.h" |
83 | | #include "storage/utils.h" |
84 | | #include "util/coding.h" |
85 | | #include "util/json/path_in_data.h" |
86 | | #include "util/slice.h" // Slice |
87 | | |
88 | | namespace doris::segment_v2 { |
89 | | |
90 | | class InvertedIndexIterator; |
91 | | |
92 | | namespace { |
93 | | |
94 | | Status build_segment_zonemap_context(Segment* segment, const Schema& schema, |
95 | | const StorageReadOptions& read_options, |
96 | 17.7k | const VExprContextSPtrs& conjuncts, ZoneMapEvalContext* ctx) { |
97 | 17.7k | DORIS_CHECK(segment != nullptr); |
98 | 17.7k | DORIS_CHECK(ctx != nullptr); |
99 | 17.7k | std::set<int> slot_indexes; |
100 | 20.0k | for (const auto& conjunct : conjuncts) { |
101 | 20.0k | DORIS_CHECK(conjunct != nullptr); |
102 | 20.0k | const auto& root = conjunct->root(); |
103 | 20.0k | DORIS_CHECK(root != nullptr); |
104 | 20.0k | if (!root->can_evaluate_zonemap_filter()) { |
105 | 18.2k | continue; |
106 | 18.2k | } |
107 | | // Segment zone maps have one min/max/null summary per column for the whole segment, so a |
108 | | // segment-level context can safely hold every slot referenced by a compound expression. |
109 | | // Page zone maps are page-aligned per column and still use single-slot filtering in |
110 | | // SegmentIterator. |
111 | 1.76k | root->collect_slot_column_ids(slot_indexes); |
112 | 1.76k | } |
113 | 17.7k | for (const int slot_index : slot_indexes) { |
114 | 3.05k | if (slot_index < 0 || cast_set<size_t>(slot_index) >= schema.num_column_ids()) { |
115 | 0 | continue; |
116 | 0 | } |
117 | 3.05k | const auto column_id = schema.column_id(cast_set<size_t>(slot_index)); |
118 | 3.05k | const auto* tablet_column = schema.column(column_id); |
119 | 3.05k | DORIS_CHECK(tablet_column != nullptr); |
120 | 3.05k | if (!segment->can_apply_predicate_safely( |
121 | 3.05k | column_id, schema, read_options.target_cast_type_for_variants, read_options)) { |
122 | 11 | continue; |
123 | 11 | } |
124 | 3.04k | auto data_type = segment->get_data_type_of(*tablet_column, read_options); |
125 | 3.04k | if (data_type == nullptr) { |
126 | 0 | continue; |
127 | 0 | } |
128 | 3.04k | ZoneMapEvalContext::SlotZoneMap slot_zone_map; |
129 | 3.04k | slot_zone_map.data_type = data_type; |
130 | 3.04k | std::shared_ptr<ColumnReader> reader; |
131 | 3.04k | Status st = segment->get_column_reader(*tablet_column, &reader, read_options.stats); |
132 | 3.04k | if (st.is<ErrorCode::NOT_FOUND>()) { |
133 | 30 | ctx->slots.emplace(slot_index, std::move(slot_zone_map)); |
134 | 30 | continue; |
135 | 30 | } |
136 | 3.01k | RETURN_IF_ERROR(st); |
137 | 3.02k | if (reader != nullptr && reader->has_zone_map()) { |
138 | 2.80k | ZoneMap zone_map; |
139 | 2.80k | RETURN_IF_ERROR(reader->get_segment_zone_map(&zone_map)); |
140 | 2.80k | slot_zone_map.zone_map = std::make_shared<ZoneMap>(std::move(zone_map)); |
141 | 2.80k | } |
142 | 3.01k | ctx->slots.emplace(slot_index, std::move(slot_zone_map)); |
143 | 3.01k | } |
144 | 17.7k | return Status::OK(); |
145 | 17.7k | } |
146 | | |
147 | | } // namespace |
148 | | |
149 | | Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id, |
150 | | uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, |
151 | | const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output, |
152 | 886k | InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) { |
153 | | // Ensure tablet_id is available in reader_options for CachedRemoteFileReader peer read. |
154 | 886k | io::FileReaderOptions opts_with_tablet = reader_options; |
155 | 886k | opts_with_tablet.tablet_id = tablet_id; |
156 | | |
157 | 886k | auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, opts_with_tablet, output, |
158 | 886k | idx_file_info, stats); |
159 | 888k | if (s.ok() && output && *output) { |
160 | 888k | (*output)->_tablet_id = tablet_id; |
161 | 888k | } |
162 | 886k | if (!s.ok()) { |
163 | 5 | if (!config::is_cloud_mode()) { |
164 | 5 | auto res = ExecEnv::get_tablet(tablet_id); |
165 | 5 | TabletSharedPtr tablet = |
166 | 5 | res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr; |
167 | 5 | if (tablet) { |
168 | 0 | tablet->report_error(s); |
169 | 0 | } |
170 | 5 | } |
171 | 5 | } |
172 | | |
173 | 886k | return s; |
174 | 886k | } |
175 | | |
176 | | Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, |
177 | | RowsetId rowset_id, TabletSchemaSPtr tablet_schema, |
178 | | const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output, |
179 | 887k | InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) { |
180 | 887k | io::FileReaderSPtr file_reader; |
181 | 887k | auto st = fs->open_file(path, &file_reader, &reader_options); |
182 | 887k | TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st); |
183 | 887k | std::shared_ptr<Segment> segment( |
184 | 887k | new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info)); |
185 | 887k | segment->_seg_path = path; |
186 | 888k | if (st) { |
187 | 888k | segment->_fs = fs; |
188 | 888k | segment->_file_reader = std::move(file_reader); |
189 | 888k | st = segment->_open(stats); |
190 | 888k | } |
191 | | |
192 | | // Three-tier retry for CORRUPTION errors when file cache is enabled. |
193 | | // This handles CORRUPTION from both open_file() and _parse_footer() (via _open()). |
194 | 887k | if (st.is<ErrorCode::CORRUPTION>() && |
195 | 887k | reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) { |
196 | | // Tier 1: Clear file cache and retry with cache support (re-downloads from remote). |
197 | 2 | LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source " |
198 | 2 | "file directly, file path: " |
199 | 2 | << path << " cache_key: " << file_cache_key_str(path); |
200 | 2 | auto file_key = file_cache_key_from_path(path); |
201 | 2 | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
202 | 2 | file_cache->remove_if_cached(file_key); |
203 | | |
204 | 2 | st = fs->open_file(path, &file_reader, &reader_options); |
205 | 2 | if (st) { |
206 | 2 | segment->_fs = fs; |
207 | 2 | segment->_file_reader = std::move(file_reader); |
208 | 2 | st = segment->_open(stats); |
209 | 2 | } |
210 | 2 | TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st); |
211 | 2 | if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again |
212 | | // Tier 2: Bypass cache entirely and read directly from remote storage. |
213 | 0 | LOG(WARNING) << "failed to try to read remote source file again with cache support," |
214 | 0 | << " try to read from remote directly, " |
215 | 0 | << " file path: " << path << " cache_key: " << file_cache_key_str(path); |
216 | 0 | file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
217 | 0 | file_cache->remove_if_cached(file_key); |
218 | |
|
219 | 0 | io::FileReaderOptions opt = reader_options; |
220 | 0 | opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache |
221 | 0 | RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt)); |
222 | 0 | segment->_fs = fs; |
223 | 0 | segment->_file_reader = std::move(file_reader); |
224 | 0 | st = segment->_open(stats); |
225 | 0 | if (!st.ok()) { |
226 | | // Tier 3: Remote source itself is corrupt. |
227 | 0 | LOG(WARNING) << "failed to try to read remote source file directly," |
228 | 0 | << " file path: " << path |
229 | 0 | << " cache_key: " << file_cache_key_str(path); |
230 | 0 | } |
231 | 0 | } |
232 | 2 | } |
233 | 887k | RETURN_IF_ERROR(st); |
234 | 18.4E | DCHECK(segment->_fs != nullptr) << "file system is nullptr after segment open"; |
235 | 887k | *output = std::move(segment); |
236 | 887k | return Status::OK(); |
237 | 887k | } |
238 | | |
239 | | Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, |
240 | | InvertedIndexFileInfo idx_file_info) |
241 | 888k | : _segment_id(segment_id), |
242 | 888k | _meta_mem_usage(0), |
243 | 888k | _rowset_id(rowset_id), |
244 | 888k | _tablet_schema(std::move(tablet_schema)), |
245 | 888k | _idx_file_info(std::move(idx_file_info)) {} |
246 | | |
247 | 854k | Segment::~Segment() { |
248 | 854k | g_segment_estimate_mem_bytes << -_tracked_meta_mem_usage; |
249 | | // if failed, fix `_tracked_meta_mem_usage` accuracy |
250 | 854k | DCHECK(_tracked_meta_mem_usage == meta_mem_usage()); |
251 | 854k | } |
252 | | |
253 | 89.3k | io::UInt128Wrapper Segment::file_cache_key(std::string_view rowset_id, uint32_t seg_id) { |
254 | 89.3k | return io::BlockFileCache::hash(fmt::format("{}_{}.dat", rowset_id, seg_id)); |
255 | 89.3k | } |
256 | | |
257 | 885k | int64_t Segment::get_metadata_size() const { |
258 | 885k | std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock(); |
259 | 885k | return sizeof(Segment) + (_pk_index_meta ? _pk_index_meta->ByteSizeLong() : 0) + |
260 | 18.4E | (footer_pb_shared ? footer_pb_shared->ByteSizeLong() : 0); |
261 | 885k | } |
262 | | |
263 | 886k | void Segment::update_metadata_size() { |
264 | 886k | MetadataAdder::update_metadata_size(); |
265 | 886k | g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage; |
266 | 886k | _tracked_meta_mem_usage = _meta_mem_usage; |
267 | 886k | } |
268 | | |
269 | 887k | Status Segment::_open(OlapReaderStatistics* stats) { |
270 | 887k | std::shared_ptr<SegmentFooterPB> footer_pb_shared; |
271 | 887k | RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats)); |
272 | | |
273 | 887k | _pk_index_meta.reset( |
274 | 887k | footer_pb_shared->has_primary_key_index_meta() |
275 | 887k | ? new PrimaryKeyIndexMetaPB(footer_pb_shared->primary_key_index_meta()) |
276 | 887k | : nullptr); |
277 | | // delete_bitmap_calculator_test.cpp |
278 | | // DCHECK(footer.has_short_key_index_page()); |
279 | 887k | _sk_index_page = footer_pb_shared->short_key_index_page(); |
280 | 887k | _num_rows = footer_pb_shared->num_rows(); |
281 | | |
282 | | // An estimated memory usage of a segment |
283 | | // Footer is seperated to StoragePageCache so we don't need to add it to _meta_mem_usage |
284 | | // _meta_mem_usage += footer_pb_shared->ByteSizeLong(); |
285 | 887k | if (_pk_index_meta != nullptr) { |
286 | 785k | _meta_mem_usage += _pk_index_meta->ByteSizeLong(); |
287 | 785k | } |
288 | | |
289 | 887k | _meta_mem_usage += sizeof(*this); |
290 | 887k | _meta_mem_usage += std::min(static_cast<int>(_tablet_schema->num_columns()), |
291 | 887k | config::max_segment_partial_column_cache_size) * |
292 | 887k | config::estimated_mem_per_column_reader; |
293 | | |
294 | | // 1024 comes from SegmentWriterOptions |
295 | 887k | _meta_mem_usage += (_num_rows + 1023) / 1024 * (36 + 4); |
296 | | // 0.01 comes from PrimaryKeyIndexBuilder::init |
297 | 887k | _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8; |
298 | | |
299 | 887k | update_metadata_size(); |
300 | | |
301 | 887k | return Status::OK(); |
302 | 887k | } |
303 | | |
304 | 5.62k | Status Segment::_open_index_file_reader() { |
305 | | // Derive the index path from `_seg_path`, not `_file_reader->path()`: remote FS normalizes the |
306 | | // latter to an absolute path that won't match the relative keys in PackedFileSystem's index map. |
307 | 5.62k | _index_file_reader = std::make_shared<IndexFileReader>( |
308 | 5.62k | _fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(_seg_path)}, |
309 | 5.62k | _tablet_schema->get_inverted_index_storage_format(), _idx_file_info, _tablet_id); |
310 | 5.62k | return Status::OK(); |
311 | 5.62k | } |
312 | | |
313 | | bool Segment::is_tso_placeholder_col(int cid, const Schema& schema, |
314 | 1.99M | const StorageReadOptions& read_options) const { |
315 | 1.99M | if (read_options.version.first != read_options.version.second) { |
316 | 1.11M | return false; |
317 | 1.11M | } |
318 | 878k | if (read_options.io_ctx.reader_type != ReaderType::READER_BINLOG && |
319 | 879k | read_options.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) { |
320 | 879k | return false; |
321 | 879k | } |
322 | | // tso_col_idx() is -1 for non-binlog schemas, so this returns false there. |
323 | 18.4E | return cid == schema.tso_col_idx(); |
324 | 878k | } |
325 | | |
326 | | Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options, |
327 | 2.17M | std::unique_ptr<RowwiseIterator>* iter) { |
328 | 2.17M | if (read_options.runtime_state != nullptr) { |
329 | 2.05M | _be_exec_version = read_options.runtime_state->be_exec_version(); |
330 | 2.05M | } |
331 | 2.17M | RETURN_IF_ERROR(_create_column_meta_once(read_options.stats)); |
332 | | |
333 | 2.17M | read_options.stats->total_segment_number++; |
334 | | // trying to prune the current segment by segment-level zone map |
335 | 2.17M | for (const auto& entry : read_options.col_id_to_predicates) { |
336 | 1.87M | int32_t column_id = entry.first; |
337 | | // schema change |
338 | 1.87M | if (_tablet_schema->num_columns() <= column_id) { |
339 | 1.42k | continue; |
340 | 1.42k | } |
341 | 1.87M | const TabletColumn& col = read_options.tablet_schema->column(column_id); |
342 | 1.87M | std::shared_ptr<ColumnReader> reader; |
343 | | // __DORIS_COMMIT_TSO_COL__ on a single-version segment stores a 0 placeholder on disk |
344 | | // (replaced with the rowset's real commit_tso at read time). Its on-disk zonemap [0,0] |
345 | | // must not drive segment-level pruning, so build a ConstantColumnReader carrying the real |
346 | | // commit_tso to prune against the real value instead. |
347 | 1.87M | std::optional<Field> const_value; |
348 | 1.87M | if (read_options.version.first == read_options.version.second && |
349 | 1.87M | column_id == schema->commit_tso_col_idx() && read_options.commit_tso.end_tso() != -1) { |
350 | 0 | const_value = Field::create_field<TYPE_BIGINT>(read_options.commit_tso.end_tso()); |
351 | 0 | } |
352 | 1.87M | Status st = get_column_reader(col, &reader, read_options.stats, std::move(const_value)); |
353 | | // not found in this segment, skip |
354 | 1.87M | if (st.is<ErrorCode::NOT_FOUND>()) { |
355 | 102 | continue; |
356 | 102 | } |
357 | 1.86M | RETURN_IF_ERROR(st); |
358 | | // should be OK |
359 | 1.86M | DCHECK(reader != nullptr); |
360 | 1.86M | if (!reader->has_zone_map()) { |
361 | 0 | continue; |
362 | 0 | } |
363 | | // Placeholder tso column on a single-version binlog segment: its zonemap reflects the |
364 | | // NULL placeholder (replaced with commit_tso at read time), so skip pruning by |
365 | | // zonemap (min == max == commit_tso) and reuse the predicate's own zonemap matching: |
366 | | // evaluate_and() returns false iff no value in [min, max] can satisfy the predicates, |
367 | | // i.e. commit_tso fails them and the whole segment can be pruned. Predicates that don't |
368 | | // support zonemap return true (conservative: not pruned, row-level eval handles them). |
369 | 1.86M | if (read_options.col_id_to_predicates.contains(column_id) && |
370 | 1.87M | is_tso_placeholder_col(column_id, *schema, read_options)) { |
371 | 0 | const Int64 commit_tso = |
372 | 0 | read_options.commit_tso.end_tso() == -1 ? 0 : read_options.commit_tso.end_tso(); |
373 | 0 | ZoneMap zone_map; |
374 | 0 | zone_map.min_value = Field::create_field<TYPE_BIGINT>(commit_tso); |
375 | 0 | zone_map.max_value = Field::create_field<TYPE_BIGINT>(commit_tso); |
376 | 0 | zone_map.has_not_null = true; |
377 | 0 | if (!entry.second->evaluate_and(zone_map)) { |
378 | | // any condition not satisfied, return. |
379 | 0 | *iter = std::make_unique<EmptySegmentIterator>(*schema); |
380 | 0 | read_options.stats->filtered_segment_number++; |
381 | 0 | return Status::OK(); |
382 | 0 | } |
383 | 0 | continue; |
384 | 0 | } |
385 | 1.86M | if (read_options.col_id_to_predicates.contains(column_id) && |
386 | 1.86M | can_apply_predicate_safely(column_id, *schema, |
387 | 1.86M | read_options.target_cast_type_for_variants, read_options)) { |
388 | 1.86M | bool matched = true; |
389 | 1.86M | RETURN_IF_ERROR(reader->match_condition(entry.second.get(), &matched)); |
390 | 1.86M | if (!matched) { |
391 | | // any condition not satisfied, return. |
392 | 93.9k | *iter = std::make_unique<EmptySegmentIterator>(*schema); |
393 | 93.9k | read_options.stats->filtered_segment_number++; |
394 | 93.9k | read_options.stats->rows_stats_filtered += num_rows(); |
395 | 93.9k | return Status::OK(); |
396 | 93.9k | } |
397 | 1.86M | } |
398 | 1.86M | } |
399 | | |
400 | | // Segment-level expr-zonemap runs before SegmentIterator can rebind storage expressions to |
401 | | // the reader schema. Only apply it when scan tuple slot ordinals already match this schema. |
402 | 2.08M | if (expr_zonemap::is_expr_zonemap_filter_enabled(read_options.runtime_state) && |
403 | 2.08M | !read_options.common_expr_ctxs_push_down.empty()) { |
404 | 17.8k | ZoneMapEvalContext ctx; |
405 | 17.8k | RETURN_IF_ERROR(build_segment_zonemap_context( |
406 | 17.8k | this, *schema, read_options, read_options.common_expr_ctxs_push_down, &ctx)); |
407 | 17.8k | const auto result = |
408 | 17.8k | VExprContext::evaluate_zonemap_filter(read_options.common_expr_ctxs_push_down, ctx); |
409 | 17.8k | ctx.stats.accumulate_to(read_options.stats); |
410 | 17.8k | if (result == ZoneMapFilterResult::kNoMatch) { |
411 | 359 | *iter = std::make_unique<EmptySegmentIterator>(*schema); |
412 | 359 | read_options.stats->filtered_segment_number++; |
413 | 359 | read_options.stats->expr_zonemap_filtered_segments++; |
414 | 359 | return Status::OK(); |
415 | 359 | } |
416 | 17.8k | } |
417 | | |
418 | 2.08M | { |
419 | 2.08M | SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns); |
420 | 2.08M | RETURN_IF_ERROR(load_index(read_options.stats)); |
421 | 2.08M | } |
422 | | |
423 | 2.08M | if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 && |
424 | 2.08M | read_options.push_down_agg_type_opt != TPushAggOp::NONE && |
425 | 2.08M | read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { |
426 | 32.2k | iter->reset(new_vstatistics_iterator(this->shared_from_this(), *schema)); |
427 | 2.04M | } else { |
428 | 2.04M | *iter = std::make_unique<SegmentIterator>(this->shared_from_this(), schema); |
429 | 2.04M | } |
430 | | |
431 | | // TODO: Valid the opt not only in ReaderType::READER_QUERY |
432 | 2.08M | if (read_options.io_ctx.reader_type == ReaderType::READER_QUERY && |
433 | 2.08M | !read_options.column_predicates.empty()) { |
434 | 1.73M | auto pruned_predicates = read_options.column_predicates; |
435 | 1.73M | auto pruned = false; |
436 | 18.3M | for (auto& it : _column_reader_cache->get_available_readers(false)) { |
437 | 18.3M | const auto uid = it.first; |
438 | 18.3M | const auto column_id = read_options.tablet_schema->field_index(uid); |
439 | 18.3M | bool tmp_pruned = false; |
440 | 18.3M | RETURN_IF_ERROR(it.second->prune_predicates_by_zone_map(pruned_predicates, column_id, |
441 | 18.3M | &tmp_pruned)); |
442 | 18.3M | pruned |= tmp_pruned; |
443 | 18.3M | } |
444 | | |
445 | 1.73M | if (pruned) { |
446 | 7.53k | auto options_with_pruned_predicates = read_options; |
447 | 7.53k | options_with_pruned_predicates.column_predicates = pruned_predicates; |
448 | | //because column_predicates is changed, we need to rebuild col_id_to_predicates so that inverted index will not go through it. |
449 | 7.53k | options_with_pruned_predicates.col_id_to_predicates.clear(); |
450 | 12.6k | for (auto pred : options_with_pruned_predicates.column_predicates) { |
451 | 12.6k | if (!options_with_pruned_predicates.col_id_to_predicates.contains( |
452 | 12.6k | pred->column_id())) { |
453 | 8.71k | options_with_pruned_predicates.col_id_to_predicates.insert( |
454 | 8.71k | {pred->column_id(), AndBlockColumnPredicate::create_shared()}); |
455 | 8.71k | } |
456 | 12.6k | options_with_pruned_predicates.col_id_to_predicates[pred->column_id()] |
457 | 12.6k | ->add_column_predicate(SingleColumnBlockPredicate::create_unique(pred)); |
458 | 12.6k | } |
459 | 7.53k | return iter->get()->init(options_with_pruned_predicates); |
460 | 7.53k | } |
461 | 1.73M | } |
462 | 2.07M | return iter->get()->init(read_options); |
463 | 2.08M | } |
464 | | |
465 | | Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data, |
466 | 1 | io::IOContext& io_ctx) { |
467 | 1 | if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) { |
468 | 1 | return Status::OK(); |
469 | 1 | } |
470 | | |
471 | 0 | std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat"; |
472 | 0 | std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/"; |
473 | 0 | Status create_st = io::global_local_filesystem()->create_directory(dir_path, true); |
474 | 0 | if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) { |
475 | 0 | LOG(WARNING) << "failed to create error file dir: " << create_st.to_string(); |
476 | 0 | return create_st; |
477 | 0 | } |
478 | 0 | size_t dir_size = 0; |
479 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size)); |
480 | 0 | if (dir_size > config::file_cache_error_log_limit_bytes) { |
481 | 0 | LOG(WARNING) << "error file dir size is too large: " << dir_size; |
482 | 0 | return Status::OK(); |
483 | 0 | } |
484 | | |
485 | 0 | std::string error_part; |
486 | 0 | error_part.resize(bytes_read); |
487 | 0 | std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset); |
488 | 0 | LOG(WARNING) << "writer error part to " << part_path; |
489 | 0 | bool is_part_exist = false; |
490 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist)); |
491 | 0 | if (is_part_exist) { |
492 | 0 | LOG(WARNING) << "error part already exists: " << part_path; |
493 | 0 | } else { |
494 | 0 | std::unique_ptr<io::FileWriter> part_writer; |
495 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer)); |
496 | 0 | RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read))); |
497 | 0 | RETURN_IF_ERROR(part_writer->close()); |
498 | 0 | } |
499 | | |
500 | 0 | std::string error_file; |
501 | 0 | error_file.resize(file_size); |
502 | 0 | auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get()); |
503 | 0 | if (cached_reader == nullptr) { |
504 | 0 | return Status::InternalError("file reader is not CachedRemoteFileReader"); |
505 | 0 | } |
506 | 0 | size_t error_file_bytes_read = 0; |
507 | 0 | RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at( |
508 | 0 | 0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx)); |
509 | 0 | DCHECK(error_file_bytes_read == file_size); |
510 | | //std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str(); |
511 | 0 | std::string file_path = dir_path + file_name; |
512 | 0 | LOG(WARNING) << "writer error file to " << file_path; |
513 | 0 | bool is_file_exist = false; |
514 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist)); |
515 | 0 | if (is_file_exist) { |
516 | 0 | LOG(WARNING) << "error file already exists: " << part_path; |
517 | 0 | } else { |
518 | 0 | std::unique_ptr<io::FileWriter> writer; |
519 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer)); |
520 | 0 | RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size))); |
521 | 0 | RETURN_IF_ERROR(writer->close()); |
522 | 0 | } |
523 | 0 | return Status::OK(); // already exists |
524 | 0 | }; |
525 | | |
526 | | Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer, |
527 | 95.1k | OlapReaderStatistics* stats) { |
528 | | // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4) |
529 | 95.1k | auto file_size = _file_reader->size(); |
530 | 95.1k | if (file_size < 12) { |
531 | 0 | return Status::Corruption("Bad segment file {}: file size {} < 12, cache_key: {}", |
532 | 0 | _file_reader->path().native(), file_size, |
533 | 0 | file_cache_key_str(_file_reader->path().native())); |
534 | 0 | } |
535 | | |
536 | 95.1k | uint8_t fixed_buf[12]; |
537 | 95.1k | size_t bytes_read = 0; |
538 | | // TODO(plat1ko): Support session variable `enable_file_cache` |
539 | 95.1k | io::IOContext io_ctx {.is_index_data = true, |
540 | 95.1k | .file_cache_stats = stats ? &stats->file_cache_stats : nullptr}; |
541 | 95.1k | RETURN_IF_ERROR( |
542 | 95.1k | _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx)); |
543 | 95.1k | DCHECK_EQ(bytes_read, 12); |
544 | 95.1k | TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf); |
545 | 95.1k | TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf); |
546 | 95.1k | if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) { |
547 | 1 | Status st = |
548 | 1 | _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx); |
549 | 1 | if (!st.ok()) { |
550 | 0 | LOG(WARNING) << "failed to write error file: " << st.to_string(); |
551 | 0 | } |
552 | 1 | return Status::Corruption( |
553 | 1 | "Bad segment file {}: file_size: {}, magic number not match, cache_key: {}", |
554 | 1 | _file_reader->path().native(), file_size, |
555 | 1 | file_cache_key_str(_file_reader->path().native())); |
556 | 1 | } |
557 | | |
558 | | // read footer PB |
559 | 95.1k | uint32_t footer_length = decode_fixed32_le(fixed_buf); |
560 | 95.1k | if (file_size < 12 + footer_length) { |
561 | 0 | Status st = |
562 | 0 | _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx); |
563 | 0 | if (!st.ok()) { |
564 | 0 | LOG(WARNING) << "failed to write error file: " << st.to_string(); |
565 | 0 | } |
566 | 0 | return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}", |
567 | 0 | _file_reader->path().native(), file_size, 12 + footer_length, |
568 | 0 | file_cache_key_str(_file_reader->path().native())); |
569 | 0 | } |
570 | | |
571 | 95.1k | std::string footer_buf; |
572 | 95.1k | footer_buf.resize(footer_length); |
573 | 95.1k | RETURN_IF_ERROR(_file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read, |
574 | 95.1k | &io_ctx)); |
575 | 95.1k | DCHECK_EQ(bytes_read, footer_length); |
576 | | |
577 | | // validate footer PB's checksum |
578 | 95.1k | uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4); |
579 | 95.1k | uint32_t actual_checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size()); |
580 | 95.1k | if (actual_checksum != expect_checksum) { |
581 | 0 | Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read, |
582 | 0 | footer_buf.data(), io_ctx); |
583 | 0 | if (!st.ok()) { |
584 | 0 | LOG(WARNING) << "failed to write error file: " << st.to_string(); |
585 | 0 | } |
586 | 0 | return Status::Corruption( |
587 | 0 | "Bad segment file {}: file_size = {}, footer checksum not match, actual={} " |
588 | 0 | "vs expect={}, cache_key: {}", |
589 | 0 | _file_reader->path().native(), file_size, actual_checksum, expect_checksum, |
590 | 0 | file_cache_key_str(_file_reader->path().native())); |
591 | 0 | } |
592 | | |
593 | | // deserialize footer PB |
594 | 95.1k | footer = std::make_shared<SegmentFooterPB>(); |
595 | 95.1k | if (!footer->ParseFromString(footer_buf)) { |
596 | 0 | Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read, |
597 | 0 | footer_buf.data(), io_ctx); |
598 | 0 | if (!st.ok()) { |
599 | 0 | LOG(WARNING) << "failed to write error file: " << st.to_string(); |
600 | 0 | } |
601 | 0 | return Status::Corruption( |
602 | 0 | "Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, " |
603 | 0 | "cache_key: ", |
604 | 0 | _file_reader->path().native(), file_size, |
605 | 0 | file_cache_key_str(_file_reader->path().native())); |
606 | 0 | } |
607 | | |
608 | 18.4E | VLOG_DEBUG << fmt::format("Loading segment footer from {} finished", |
609 | 18.4E | _file_reader->path().native()); |
610 | 95.1k | return Status::OK(); |
611 | 95.1k | } |
612 | | |
613 | 3.90M | Status Segment::_load_pk_bloom_filter(OlapReaderStatistics* stats) { |
614 | | #ifdef BE_TEST |
615 | | if (_pk_index_meta == nullptr) { |
616 | | // for BE UT "segment_cache_test" |
617 | | return _load_pk_bf_once.call([this] { |
618 | | _meta_mem_usage += 100; |
619 | | update_metadata_size(); |
620 | | return Status::OK(); |
621 | | }); |
622 | | } |
623 | | #endif |
624 | 3.90M | DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS); |
625 | 3.90M | DCHECK(_pk_index_meta != nullptr); |
626 | 3.90M | DCHECK(_pk_index_reader != nullptr); |
627 | | |
628 | 3.90M | return _load_pk_bf_once.call([this, stats] { |
629 | 72.5k | RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta, stats)); |
630 | | // _meta_mem_usage += _pk_index_reader->get_bf_memory_size(); |
631 | 72.5k | return Status::OK(); |
632 | 72.5k | }); |
633 | 3.90M | } |
634 | | |
635 | 3.90M | Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) { |
636 | | // `DorisCallOnce` may catch exception in calling stack A and re-throw it in |
637 | | // a different calling stack B which doesn't have catch block. So we add catch block here |
638 | | // to prevent coreudmp |
639 | 3.90M | RETURN_IF_CATCH_EXCEPTION({ |
640 | 3.90M | RETURN_IF_ERROR(load_index(index_load_stats)); |
641 | 3.90M | RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats)); |
642 | 3.90M | }); |
643 | 3.92M | return Status::OK(); |
644 | 3.90M | } |
645 | | |
646 | 6.01M | Status Segment::load_index(OlapReaderStatistics* stats) { |
647 | 6.01M | return _load_index_once.call([this, stats] { |
648 | 749k | if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) { |
649 | 674k | _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>(); |
650 | 674k | RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta, stats)); |
651 | | // _meta_mem_usage += _pk_index_reader->get_memory_size(); |
652 | 674k | return Status::OK(); |
653 | 674k | } else { |
654 | | // read and parse short key index page |
655 | 74.7k | OlapReaderStatistics tmp_stats; |
656 | 74.7k | OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; |
657 | 74.7k | PageReadOptions opts(io::IOContext {.is_index_data = true, |
658 | 74.7k | .file_cache_stats = &stats_ptr->file_cache_stats}); |
659 | 74.7k | opts.use_page_cache = true; |
660 | 74.7k | opts.type = INDEX_PAGE; |
661 | 74.7k | opts.file_reader = _file_reader.get(); |
662 | 74.7k | opts.page_pointer = PagePointer(_sk_index_page); |
663 | | // short key index page uses NO_COMPRESSION for now |
664 | 74.7k | opts.codec = nullptr; |
665 | 74.7k | opts.stats = &tmp_stats; |
666 | | |
667 | 74.7k | Slice body; |
668 | 74.7k | PageFooterPB footer; |
669 | 74.7k | RETURN_IF_ERROR( |
670 | 74.7k | PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer)); |
671 | 74.7k | DCHECK_EQ(footer.type(), SHORT_KEY_PAGE); |
672 | 74.7k | DCHECK(footer.has_short_key_page_footer()); |
673 | | |
674 | | // _meta_mem_usage += body.get_size(); |
675 | 74.7k | _sk_index_decoder = std::make_unique<ShortKeyIndexDecoder>(); |
676 | 74.7k | return _sk_index_decoder->parse(body, footer.short_key_page_footer()); |
677 | 74.7k | } |
678 | 749k | }); |
679 | 6.01M | } |
680 | | |
681 | 1.58M | Status Segment::healthy_status() { |
682 | 1.58M | try { |
683 | 1.58M | if (_load_index_once.has_called()) { |
684 | 1.54M | RETURN_IF_ERROR(_load_index_once.stored_result()); |
685 | 1.54M | } |
686 | 1.58M | if (_load_pk_bf_once.has_called()) { |
687 | 889k | RETURN_IF_ERROR(_load_pk_bf_once.stored_result()); |
688 | 889k | } |
689 | 1.58M | if (_create_column_meta_once_call.has_called()) { |
690 | 1.52M | RETURN_IF_ERROR(_create_column_meta_once_call.stored_result()); |
691 | 1.52M | } |
692 | 1.58M | if (_index_file_reader_open.has_called()) { |
693 | 22.8k | RETURN_IF_ERROR(_index_file_reader_open.stored_result()); |
694 | 22.8k | } |
695 | | // This status is set by running time, for example, if there is something wrong during read segment iterator. |
696 | 1.58M | return _healthy_status.status(); |
697 | 1.58M | } catch (const doris::Exception& e) { |
698 | | // If there is an exception during load_xxx, should not throw exception directly because |
699 | | // the caller may not exception safe. |
700 | 0 | return e.to_status(); |
701 | 0 | } catch (const std::exception& e) { |
702 | | // The exception is not thrown by doris code. |
703 | 0 | return Status::InternalError("Unexcepted error during load segment: {}", e.what()); |
704 | 0 | } |
705 | 1.58M | } |
706 | | |
707 | | // Return the storage datatype of related column to field. |
708 | | DataTypePtr Segment::get_data_type_of(const TabletColumn& column, |
709 | 31.2M | const StorageReadOptions& read_options) { |
710 | 31.2M | const PathInDataPtr path = column.path_info_ptr(); |
711 | | |
712 | | // none variant column |
713 | 31.2M | if (path == nullptr || path->empty()) { |
714 | 31.2M | return DataTypeFactory::instance().create_data_type(column); |
715 | 31.2M | } |
716 | | |
717 | | // Path exists, proceed with variant logic. |
718 | 18.4E | PathInData relative_path = path->copy_pop_front(); |
719 | 18.4E | int32_t unique_id = column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id(); |
720 | | |
721 | | // If this uid does not exist in segment meta, fallback to schema type. |
722 | 18.4E | if (!_column_meta_accessor->has_column_uid(unique_id)) { |
723 | 601 | return DataTypeFactory::instance().create_data_type(column); |
724 | 601 | } |
725 | | |
726 | 18.4E | std::shared_ptr<ColumnReader> v_reader; |
727 | | |
728 | | // Get the parent variant column reader |
729 | 18.4E | OlapReaderStatistics stats; |
730 | | // If status is not ok, it will throw exception(data corruption) |
731 | 18.4E | THROW_IF_ERROR(get_column_reader(unique_id, &v_reader, &stats)); |
732 | 18.4E | DCHECK(v_reader != nullptr); |
733 | 18.4E | auto* variant_reader = static_cast<VariantColumnReader*>(v_reader.get()); |
734 | | // Delegate type inference for variant paths to VariantColumnReader. |
735 | 18.4E | DataTypePtr type; |
736 | 18.4E | THROW_IF_ERROR(variant_reader->infer_data_type_for_path(&type, column, read_options, |
737 | 18.4E | _column_reader_cache.get())); |
738 | 18.4E | return type; |
739 | 18.4E | } |
740 | | |
741 | 58.3M | Status Segment::_create_column_meta_once(OlapReaderStatistics* stats) { |
742 | 58.3M | SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns); |
743 | 58.3M | return _create_column_meta_once_call.call([&] { |
744 | 758k | std::shared_ptr<SegmentFooterPB> footer_pb_shared; |
745 | 758k | RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats)); |
746 | 758k | return _create_column_meta(*footer_pb_shared); |
747 | 758k | }); |
748 | 58.3M | } |
749 | | |
750 | 757k | Status Segment::_create_column_meta(const SegmentFooterPB& footer) { |
751 | | // Initialize column meta accessor which internally maintains uid -> column_ordinal mapping. |
752 | 757k | _column_meta_accessor = std::make_unique<ColumnMetaAccessor>(); |
753 | 757k | RETURN_IF_ERROR(_column_meta_accessor->init(footer, _file_reader)); |
754 | | |
755 | 757k | if (config::enable_adaptive_batch_size) { |
756 | | // Cache raw_data_bytes per column uid for adaptive batch size prediction. |
757 | | // This runs under call_once, so no thread-safety concerns. |
758 | 10.8M | auto st = _column_meta_accessor->traverse_metas(footer, [this](const ColumnMetaPB& meta) { |
759 | 10.8M | if (meta.has_unique_id() && meta.unique_id() != -1 && meta.has_raw_data_bytes()) { |
760 | 10.6M | _column_uid_to_raw_bytes[meta.unique_id()] = meta.raw_data_bytes(); |
761 | 10.6M | } |
762 | 10.8M | }); |
763 | | |
764 | 753k | if (!st.ok()) { |
765 | 0 | LOG(WARNING) << "Failed to traverse column metas to cache raw_data_bytes, error: " |
766 | 0 | << st.to_string(); |
767 | 0 | } |
768 | 753k | } |
769 | | |
770 | 757k | _column_reader_cache = std::make_unique<ColumnReaderCache>( |
771 | 757k | _column_meta_accessor.get(), _tablet_schema, _file_reader, _num_rows, |
772 | 9.95M | [this](std::shared_ptr<SegmentFooterPB>& footer_pb, OlapReaderStatistics* stats) { |
773 | 9.95M | return _get_segment_footer(footer_pb, stats); |
774 | 9.95M | }); |
775 | 757k | return Status::OK(); |
776 | 757k | } |
777 | | |
778 | | Status Segment::new_default_iterator(const TabletColumn& tablet_column, |
779 | 9.29k | std::unique_ptr<ColumnIterator>* iter) { |
780 | 9.29k | if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { |
781 | 0 | return Status::InternalError( |
782 | 0 | "invalid nonexistent column without default value. column_uid={}, " |
783 | 0 | "column_name={}, " |
784 | 0 | "column_type={}", |
785 | 0 | tablet_column.unique_id(), tablet_column.name(), tablet_column.type()); |
786 | 0 | } |
787 | 9.29k | std::unique_ptr<DefaultValueColumnIterator> default_value_iter(new DefaultValueColumnIterator( |
788 | 9.29k | tablet_column.has_default_value(), tablet_column.default_value(), |
789 | 9.29k | tablet_column.is_nullable(), tablet_column.type(), tablet_column.precision(), |
790 | 9.29k | tablet_column.frac(), tablet_column.length())); |
791 | 9.29k | ColumnIteratorOptions iter_opts; |
792 | | |
793 | 9.29k | RETURN_IF_ERROR(default_value_iter->init(iter_opts)); |
794 | 9.29k | *iter = std::move(default_value_iter); |
795 | 9.29k | return Status::OK(); |
796 | 9.29k | } |
797 | | |
798 | | // Not use cid anymore, for example original table schema is colA int, then user do following actions |
799 | | // 1.add column b |
800 | | // 2. drop column b |
801 | | // 3. add column c |
802 | | // in the new schema column c's cid == 2 |
803 | | // but in the old schema column b's cid == 2 |
804 | | // but they are not the same column |
805 | | Status Segment::new_column_iterator(const TabletColumn& tablet_column, |
806 | | std::unique_ptr<ColumnIterator>* iter, |
807 | | const StorageReadOptions* opt, |
808 | | const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>* |
809 | 27.1M | variant_sparse_column_cache) { |
810 | 27.1M | if (opt->runtime_state != nullptr) { |
811 | 26.6M | _be_exec_version = opt->runtime_state->be_exec_version(); |
812 | 26.6M | } |
813 | 27.1M | RETURN_IF_ERROR(_create_column_meta_once(opt->stats)); |
814 | | |
815 | | // For compability reason unique_id may less than 0 for variant extracted column |
816 | 27.1M | int32_t unique_id = tablet_column.unique_id() >= 0 ? tablet_column.unique_id() |
817 | 18.4E | : tablet_column.parent_unique_id(); |
818 | | |
819 | | // If column meta for this uid is not found in this segment, use default iterator. |
820 | 27.1M | if (!_column_meta_accessor->has_column_uid(unique_id)) { |
821 | 3.40k | RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); |
822 | 3.40k | return Status::OK(); |
823 | 3.40k | } |
824 | | |
825 | | // __DORIS_COMMIT_TSO_COL__ on a single-version segment stores a 0 placeholder on disk (its |
826 | | // real value is the rowset's commit_tso, filled at read time). Pass the real commit_tso as a |
827 | | // const value so the cache returns a ConstantColumnReader, whose iterator yields the real value |
828 | | // on every read path (projection / predicate / MIN-MAX zone-map) instead of the placeholder 0. |
829 | | // commit_tso == -1 means it is not assigned yet (before publish); keep the on-disk value then. |
830 | | // The value is constant per segment (a segment belongs to a single rowset), so caching the |
831 | | // ConstantColumnReader does not cross-pollute other queries. Some internal read paths (e.g. MOW |
832 | | // partial-update row fetch) build a bare StorageReadOptions without tablet_schema, so guard it. |
833 | 27.1M | std::optional<Field> const_value; |
834 | 27.1M | if (opt->tablet_schema != nullptr && opt->version.first == opt->version.second && |
835 | 27.1M | opt->commit_tso.end_tso() != -1) { |
836 | 0 | int32_t tso_idx = opt->tablet_schema->commit_tso_col_idx(); |
837 | 0 | if (tso_idx != -1 && opt->tablet_schema->column(tso_idx).unique_id() == unique_id) { |
838 | 0 | const_value = Field::create_field<TYPE_BIGINT>(opt->commit_tso.end_tso()); |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | | // init iterator by unique id |
843 | 27.1M | std::shared_ptr<ColumnReader> reader; |
844 | 27.1M | RETURN_IF_ERROR(get_column_reader(unique_id, &reader, opt->stats, std::move(const_value))); |
845 | 27.1M | if (reader == nullptr) { |
846 | 0 | return Status::InternalError("column reader is nullptr, unique_id={}", unique_id); |
847 | 0 | } |
848 | 27.1M | if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) { |
849 | | // if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used |
850 | 29.7k | PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr; |
851 | 29.7k | if (variant_sparse_column_cache) { |
852 | 29.3k | auto it = variant_sparse_column_cache->find(unique_id); |
853 | 29.3k | if (it != variant_sparse_column_cache->end()) { |
854 | 29.3k | sparse_column_cache_ptr = it->second.get(); |
855 | 18.4E | } else { |
856 | 18.4E | DCHECK(false) << "sparse column cache is not found, unique_id=" << unique_id; |
857 | 18.4E | } |
858 | 29.3k | } |
859 | | // use _column_reader_cache to get variant subcolumn(path column) reader |
860 | 29.7k | RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get()) |
861 | 29.7k | ->new_iterator(iter, &tablet_column, opt, |
862 | 29.7k | _column_reader_cache.get(), |
863 | 29.7k | sparse_column_cache_ptr)); |
864 | 27.1M | } else { |
865 | 27.1M | RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt)); |
866 | 27.1M | if (opt->all_access_paths.contains(unique_id) || |
867 | 27.1M | opt->predicate_access_paths.contains(unique_id)) { |
868 | 61.5k | const auto& all_access_paths = opt->all_access_paths.contains(unique_id) |
869 | 61.5k | ? opt->all_access_paths.at(unique_id) |
870 | 61.5k | : TColumnAccessPaths {}; |
871 | 61.5k | const auto& predicate_access_paths = opt->predicate_access_paths.contains(unique_id) |
872 | 61.5k | ? opt->predicate_access_paths.at(unique_id) |
873 | 61.5k | : TColumnAccessPaths {}; |
874 | | |
875 | | // set column name to apply access paths. |
876 | 61.5k | (*iter)->set_column_name(tablet_column.name()); |
877 | 61.5k | RETURN_IF_ERROR((*iter)->set_access_paths(all_access_paths, predicate_access_paths)); |
878 | 61.5k | (*iter)->remove_pruned_sub_iterators(); |
879 | 61.5k | } |
880 | 27.1M | } |
881 | | |
882 | 27.1M | if (config::enable_column_type_check && !tablet_column.has_path_info() && |
883 | 27.1M | !tablet_column.is_agg_state_type() && tablet_column.type() != reader->get_meta_type()) { |
884 | 0 | LOG(WARNING) << "different type between schema and column reader," |
885 | 0 | << " column schema name: " << tablet_column.name() |
886 | 0 | << " column schema type: " << int(tablet_column.type()) |
887 | 0 | << " column reader meta type: " << int(reader->get_meta_type()); |
888 | 0 | return Status::InternalError("different type between schema and column reader"); |
889 | 0 | } |
890 | 27.1M | return Status::OK(); |
891 | 27.1M | } |
892 | | |
893 | | Status Segment::get_column_reader(int32_t col_uid, std::shared_ptr<ColumnReader>* column_reader, |
894 | 27.2M | OlapReaderStatistics* stats, std::optional<Field> const_value) { |
895 | 27.2M | RETURN_IF_ERROR(_create_column_meta_once(stats)); |
896 | 27.2M | SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns); |
897 | | // The column is not in this segment, return nullptr |
898 | 27.2M | if (!_tablet_schema->has_column_unique_id(col_uid)) { |
899 | 0 | *column_reader = nullptr; |
900 | 0 | return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}", |
901 | 0 | col_uid); |
902 | 0 | } |
903 | 27.2M | return _column_reader_cache->get_column_reader(col_uid, column_reader, stats, |
904 | 27.2M | std::move(const_value)); |
905 | 27.2M | } |
906 | | |
907 | 45.0k | Status Segment::traverse_column_meta_pbs(const std::function<void(const ColumnMetaPB&)>& visitor) { |
908 | | // Ensure column meta accessor and reader cache are initialized once. |
909 | 45.0k | OlapReaderStatistics dummy_stats; |
910 | 45.0k | RETURN_IF_ERROR(_create_column_meta_once(&dummy_stats)); |
911 | 45.0k | std::shared_ptr<SegmentFooterPB> footer_pb_shared; |
912 | 45.0k | RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, &dummy_stats)); |
913 | 45.0k | return _column_meta_accessor->traverse_metas(*footer_pb_shared, visitor); |
914 | 45.0k | } |
915 | | |
916 | | Status Segment::get_column_reader(const TabletColumn& col, |
917 | | std::shared_ptr<ColumnReader>* column_reader, |
918 | 1.94M | OlapReaderStatistics* stats, std::optional<Field> const_value) { |
919 | 1.94M | RETURN_IF_ERROR(_create_column_meta_once(stats)); |
920 | 1.94M | SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns); |
921 | 18.4E | int col_uid = col.unique_id() >= 0 ? col.unique_id() : col.parent_unique_id(); |
922 | | // The column is not in this segment, return nullptr |
923 | 1.94M | if (!_tablet_schema->has_column_unique_id(col_uid)) { |
924 | 40 | *column_reader = nullptr; |
925 | 40 | return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}", |
926 | 40 | col_uid); |
927 | 40 | } |
928 | 1.94M | if (col.has_path_info()) { |
929 | 2.12k | PathInData relative_path = col.path_info_ptr()->copy_pop_front(); |
930 | 2.12k | return _column_reader_cache->get_path_column_reader(col_uid, relative_path, column_reader, |
931 | 2.12k | stats); |
932 | 2.12k | } |
933 | 1.94M | return _column_reader_cache->get_column_reader(col_uid, column_reader, stats, |
934 | 1.94M | std::move(const_value)); |
935 | 1.94M | } |
936 | | |
937 | | Status Segment::new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta, |
938 | | const StorageReadOptions& read_options, |
939 | 73.0k | std::unique_ptr<IndexIterator>* iter) { |
940 | 73.0k | if (read_options.runtime_state != nullptr) { |
941 | 64.2k | _be_exec_version = read_options.runtime_state->be_exec_version(); |
942 | 64.2k | } |
943 | 73.0k | RETURN_IF_ERROR(_create_column_meta_once(read_options.stats)); |
944 | 73.0k | std::shared_ptr<ColumnReader> reader; |
945 | 73.0k | auto st = get_column_reader(tablet_column, &reader, read_options.stats); |
946 | 73.0k | if (st.is<ErrorCode::NOT_FOUND>()) { |
947 | 504 | return Status::OK(); |
948 | 504 | } |
949 | 72.5k | RETURN_IF_ERROR(st); |
950 | 72.5k | DCHECK(reader != nullptr); |
951 | 72.8k | if (index_meta) { |
952 | | // call DorisCallOnce.call without check if _index_file_reader is nullptr |
953 | | // to avoid data race during parallel method calls |
954 | 72.8k | RETURN_IF_ERROR(_index_file_reader_open.call([&] { return _open_index_file_reader(); })); |
955 | | // after DorisCallOnce.call, _index_file_reader is guaranteed to be not nullptr |
956 | 72.8k | const std::string rowset_id = |
957 | 72.8k | index_meta->index_type() == IndexType::ANN ? _rowset_id.to_string() : ""; |
958 | 72.8k | const bool need_binding_diagnostic = tablet_column.is_variant_type() || |
959 | 72.8k | tablet_column.is_extracted_column() || |
960 | 72.8k | !index_meta->get_index_suffix().empty(); |
961 | 72.8k | bool index_file_exists = false; |
962 | 72.8k | Status probe_status; |
963 | 72.8k | if (need_binding_diagnostic) { |
964 | 1.32k | probe_status = _index_file_reader->init(config::inverted_index_read_buffer_size, |
965 | 1.32k | &read_options.io_ctx); |
966 | 1.32k | if (probe_status.ok()) { |
967 | 1.32k | probe_status = _index_file_reader->index_file_exist(index_meta, &index_file_exists); |
968 | 1.32k | } |
969 | 1.32k | const auto diagnostic = fmt::format( |
970 | 1.32k | "[VariantSearchBinding] phase=index_file_probe tablet_id={} rowset_id={} " |
971 | 1.32k | "segment_id={} column={} logical_path={} index_id={} suffix={} exists={} " |
972 | 1.32k | "status={}", |
973 | 1.32k | read_options.tablet_id, _rowset_id.to_string(), _segment_id, |
974 | 1.32k | tablet_column.name(), |
975 | 1.32k | tablet_column.has_path_info() ? tablet_column.path_info_ptr()->get_path() |
976 | 1.32k | : tablet_column.name(), |
977 | 1.32k | index_meta->index_id(), index_meta->get_index_suffix(), index_file_exists, |
978 | 1.32k | probe_status.ok() ? "OK" : probe_status.to_string()); |
979 | 1.32k | VLOG_DEBUG << diagnostic; |
980 | 1.32k | if (read_options.stats != nullptr) { |
981 | 1.32k | read_options.stats->inverted_index_stats.add_binding_diagnostic(diagnostic); |
982 | 1.32k | } |
983 | 1.32k | } |
984 | 72.8k | Status iter_status = reader->new_index_iterator(_index_file_reader, index_meta, rowset_id, |
985 | 72.8k | _segment_id, _num_rows, iter); |
986 | 72.8k | if (!iter_status.ok()) { |
987 | 0 | if (need_binding_diagnostic) { |
988 | 0 | const auto diagnostic = fmt::format( |
989 | 0 | "[VariantSearchBinding] phase=index_iterator_create result=reject " |
990 | 0 | "tablet_id={} rowset_id={} segment_id={} column={} logical_path={} " |
991 | 0 | "index_id={} suffix={} reason={}", |
992 | 0 | read_options.tablet_id, _rowset_id.to_string(), _segment_id, |
993 | 0 | tablet_column.name(), |
994 | 0 | tablet_column.has_path_info() ? tablet_column.path_info_ptr()->get_path() |
995 | 0 | : tablet_column.name(), |
996 | 0 | index_meta->index_id(), index_meta->get_index_suffix(), |
997 | 0 | iter_status.to_string()); |
998 | 0 | VLOG_DEBUG << diagnostic; |
999 | 0 | if (read_options.stats != nullptr) { |
1000 | 0 | read_options.stats->inverted_index_stats.add_binding_diagnostic(diagnostic); |
1001 | 0 | } |
1002 | 0 | } |
1003 | 0 | return iter_status; |
1004 | 0 | } |
1005 | 72.8k | return Status::OK(); |
1006 | 72.8k | } |
1007 | 18.4E | return Status::OK(); |
1008 | 72.5k | } |
1009 | | |
1010 | | Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema, |
1011 | | bool with_seq_col, bool with_rowid, RowLocation* row_location, |
1012 | 3.85M | OlapReaderStatistics* stats, std::string* encoded_seq_value) { |
1013 | 3.85M | RETURN_IF_ERROR(load_pk_index_and_bf(stats)); |
1014 | 3.85M | bool has_seq_col = latest_schema->has_sequence_col(); |
1015 | 3.85M | bool has_rowid = !latest_schema->cluster_key_uids().empty(); |
1016 | 3.85M | size_t seq_col_length = 0; |
1017 | 3.85M | if (has_seq_col) { |
1018 | 25.2k | seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1; |
1019 | 25.2k | } |
1020 | 3.85M | size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0; |
1021 | | |
1022 | 3.85M | Slice key_without_seq = |
1023 | 3.85M | Slice(key.get_data(), key.get_size() - (with_seq_col ? seq_col_length : 0) - |
1024 | 3.85M | (with_rowid ? rowid_length : 0)); |
1025 | | |
1026 | 3.85M | DCHECK(_pk_index_reader != nullptr); |
1027 | 3.85M | if (!_pk_index_reader->check_present(key_without_seq)) { |
1028 | 188k | return Status::Error<ErrorCode::KEY_NOT_FOUND, false>(""); |
1029 | 188k | } |
1030 | 3.67M | bool exact_match = false; |
1031 | 3.67M | std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator; |
1032 | 3.67M | RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats)); |
1033 | 3.67M | auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match); |
1034 | 3.67M | if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) { |
1035 | 0 | return st; |
1036 | 0 | } |
1037 | 3.68M | if (st.is<ErrorCode::ENTRY_NOT_FOUND>() || (!has_seq_col && !has_rowid && !exact_match)) { |
1038 | 104 | return Status::Error<ErrorCode::KEY_NOT_FOUND, false>(""); |
1039 | 104 | } |
1040 | 3.67M | row_location->row_id = cast_set<uint32_t>(index_iterator->get_current_ordinal()); |
1041 | 3.67M | row_location->segment_id = _segment_id; |
1042 | 3.67M | row_location->rowset_id = _rowset_id; |
1043 | | |
1044 | 3.67M | size_t num_to_read = 1; |
1045 | 3.67M | auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0); |
1046 | 3.67M | auto index_column = index_type->create_column(); |
1047 | 3.67M | size_t num_read = num_to_read; |
1048 | 3.67M | RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column)); |
1049 | 3.67M | DCHECK(num_to_read == num_read); |
1050 | | |
1051 | 3.67M | Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); |
1052 | | |
1053 | | // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column |
1054 | | // for a merge-on-write table which doesn't have sequence column, so `has_seq_col == true` doesn't mean |
1055 | | // data in segment has sequence column value |
1056 | 3.67M | bool segment_has_seq_col = _tablet_schema->has_sequence_col(); |
1057 | 3.67M | Slice sought_key_without_seq = Slice( |
1058 | 3.67M | sought_key.get_data(), |
1059 | 3.67M | sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length); |
1060 | | |
1061 | 3.67M | if (has_seq_col) { |
1062 | | // compare key |
1063 | 25.2k | if (key_without_seq.compare(sought_key_without_seq) != 0) { |
1064 | 0 | return Status::Error<ErrorCode::KEY_NOT_FOUND, false>(""); |
1065 | 0 | } |
1066 | | |
1067 | 25.2k | if (with_seq_col && segment_has_seq_col) { |
1068 | | // compare sequence id |
1069 | 23.3k | Slice sequence_id = |
1070 | 23.3k | Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1); |
1071 | 23.3k | Slice previous_sequence_id = |
1072 | 23.3k | Slice(sought_key.get_data() + sought_key_without_seq.get_size() + 1, |
1073 | 23.3k | seq_col_length - 1); |
1074 | 23.3k | if (sequence_id.compare(previous_sequence_id) < 0) { |
1075 | 338 | return Status::Error<ErrorCode::KEY_ALREADY_EXISTS>( |
1076 | 338 | "key with higher sequence id exists"); |
1077 | 338 | } |
1078 | 23.3k | } |
1079 | 3.64M | } else if (has_rowid) { |
1080 | 59.0k | Slice sought_key_without_rowid = |
1081 | 59.0k | Slice(sought_key.get_data(), sought_key.get_size() - rowid_length); |
1082 | | // compare key |
1083 | 59.0k | if (key_without_seq.compare(sought_key_without_rowid) != 0) { |
1084 | 0 | return Status::Error<ErrorCode::KEY_NOT_FOUND, false>(""); |
1085 | 0 | } |
1086 | 59.0k | } |
1087 | | // found the key, use rowid in pk index if necessary. |
1088 | 3.67M | if (has_rowid) { |
1089 | 71.0k | Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() + |
1090 | 71.0k | (segment_has_seq_col ? seq_col_length : 0) + 1, |
1091 | 71.0k | rowid_length - 1); |
1092 | 71.0k | const auto* rowid_coder = get_key_coder(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT); |
1093 | 71.0k | RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length, |
1094 | 71.0k | (uint8_t*)&row_location->row_id)); |
1095 | 71.0k | } |
1096 | | |
1097 | 3.67M | if (encoded_seq_value) { |
1098 | 78 | if (!segment_has_seq_col) { |
1099 | 0 | *encoded_seq_value = std::string {}; |
1100 | 78 | } else { |
1101 | | // include marker |
1102 | 78 | *encoded_seq_value = |
1103 | 78 | Slice(sought_key.get_data() + sought_key_without_seq.get_size(), seq_col_length) |
1104 | 78 | .to_string(); |
1105 | 78 | } |
1106 | 78 | } |
1107 | 3.67M | return Status::OK(); |
1108 | 3.67M | } |
1109 | | |
1110 | 0 | Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) { |
1111 | 0 | OlapReaderStatistics* null_stat = nullptr; |
1112 | 0 | RETURN_IF_ERROR(load_pk_index_and_bf(null_stat)); |
1113 | 0 | std::unique_ptr<segment_v2::IndexedColumnIterator> iter; |
1114 | 0 | RETURN_IF_ERROR(_pk_index_reader->new_iterator(&iter, null_stat)); |
1115 | | |
1116 | 0 | auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0); |
1117 | 0 | auto index_column = index_type->create_column(); |
1118 | 0 | RETURN_IF_ERROR(iter->seek_to_ordinal(row_id)); |
1119 | 0 | size_t num_read = 1; |
1120 | 0 | RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); |
1121 | 0 | CHECK(num_read == 1); |
1122 | | // trim row id |
1123 | 0 | if (_tablet_schema->cluster_key_uids().empty()) { |
1124 | 0 | *key = index_column->get_data_at(0).to_string(); |
1125 | 0 | } else { |
1126 | 0 | Slice sought_key = |
1127 | 0 | Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); |
1128 | 0 | Slice sought_key_without_rowid = |
1129 | 0 | Slice(sought_key.get_data(), |
1130 | 0 | sought_key.get_size() - PrimaryKeyIndexReader::ROW_ID_LENGTH); |
1131 | 0 | *key = sought_key_without_rowid.to_string(); |
1132 | 0 | } |
1133 | 0 | return Status::OK(); |
1134 | 0 | } |
1135 | | |
1136 | | Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, |
1137 | | const std::vector<uint32_t>& row_ids, |
1138 | | MutableColumnPtr& result, |
1139 | | StorageReadOptions& storage_read_options, |
1140 | 13.0k | std::unique_ptr<ColumnIterator>& iterator_hint) { |
1141 | 13.0k | if (row_ids.empty()) { |
1142 | 0 | return Status::OK(); |
1143 | 0 | } |
1144 | 13.0k | DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end())); |
1145 | 13.0k | DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) == row_ids.end()); |
1146 | | // ColumnIterator::seek_and_read expects monotonically increasing row_ids without |
1147 | | // duplicates for correct ordinal scanning. Enforce this contract at the entry point. |
1148 | 13.0k | segment_v2::ColumnIteratorOptions opt { |
1149 | 13.0k | .use_page_cache = !config::disable_storage_page_cache, |
1150 | 13.0k | .file_reader = file_reader().get(), |
1151 | 13.0k | .stats = storage_read_options.stats, |
1152 | 13.0k | .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY, |
1153 | 13.0k | .file_cache_stats = |
1154 | 13.0k | &storage_read_options.stats->file_cache_stats}, |
1155 | 13.0k | }; |
1156 | | |
1157 | 13.0k | if (!slot->column_paths().empty()) { |
1158 | | // here need create column readers to make sure column reader is created before seek_and_read_by_rowid |
1159 | | // if segment cache miss, column reader will be created to make sure the variant column result not coredump |
1160 | 254 | RETURN_IF_ERROR(_create_column_meta_once(storage_read_options.stats)); |
1161 | | |
1162 | 254 | const auto& dt_variant = |
1163 | 254 | assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type())); |
1164 | 254 | TabletColumn column = TabletColumn::create_materialized_variant_column( |
1165 | 254 | schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(), |
1166 | 254 | slot->col_unique_id(), dt_variant.variant_max_subcolumns_count(), |
1167 | 254 | dt_variant.enable_doc_mode()); |
1168 | 254 | auto storage_type = get_data_type_of(column, storage_read_options); |
1169 | 254 | MutableColumnPtr file_storage_column = storage_type->create_column(); |
1170 | 254 | DCHECK(storage_type != nullptr); |
1171 | | |
1172 | 254 | if (iterator_hint == nullptr) { |
1173 | 254 | RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_options)); |
1174 | 254 | RETURN_IF_ERROR(iterator_hint->init(opt)); |
1175 | 254 | } |
1176 | 254 | RETURN_IF_ERROR( |
1177 | 254 | iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), file_storage_column)); |
1178 | 254 | ColumnPtr source_ptr; |
1179 | | // storage may have different type with schema, so we need to cast the column |
1180 | 254 | RETURN_IF_ERROR(variant_util::cast_column( |
1181 | 254 | ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()), |
1182 | 254 | slot->type(), &source_ptr)); |
1183 | 254 | RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, row_ids.size())); |
1184 | 12.7k | } else { |
1185 | 12.7k | int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id()) |
1186 | 12.7k | : schema.field_index(slot->col_name()); |
1187 | 12.7k | if (index < 0) { |
1188 | 0 | std::stringstream ss; |
1189 | 0 | ss << "field name is invalid. field=" << slot->col_name() |
1190 | 0 | << ", field_name_to_index=" << schema.get_all_field_names(); |
1191 | 0 | return Status::InternalError(ss.str()); |
1192 | 0 | } |
1193 | 12.7k | if (iterator_hint == nullptr) { |
1194 | 12.7k | RETURN_IF_ERROR(new_column_iterator(schema.column(index), &iterator_hint, |
1195 | 12.7k | &storage_read_options)); |
1196 | 12.7k | RETURN_IF_ERROR(iterator_hint->init(opt)); |
1197 | 12.7k | } |
1198 | 12.7k | RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), result)); |
1199 | 12.7k | } |
1200 | 13.0k | return Status::OK(); |
1201 | 13.0k | } |
1202 | | |
1203 | | Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb, |
1204 | 11.6M | OlapReaderStatistics* stats) { |
1205 | 11.6M | std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock(); |
1206 | 11.6M | if (footer_pb_shared != nullptr) { |
1207 | 10.7M | footer_pb = footer_pb_shared; |
1208 | 10.7M | return Status::OK(); |
1209 | 10.7M | } |
1210 | | |
1211 | 18.4E | VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is missing, try to load it", |
1212 | 18.4E | _file_reader->path().native(), _file_reader->size(), |
1213 | 18.4E | _file_reader->size() - 12); |
1214 | | |
1215 | 860k | StoragePageCache* segment_footer_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
1216 | 860k | DCHECK(segment_footer_cache != nullptr); |
1217 | | |
1218 | 860k | auto cache_key = get_segment_footer_cache_key(); |
1219 | | |
1220 | 860k | PageCacheHandle cache_handle; |
1221 | | |
1222 | | // Put segment footer into index page cache. |
1223 | | // Rationale: |
1224 | | // - Footer is metadata (small, parsed with indexes), not data page payload. |
1225 | | // - Using PageTypePB::INDEX_PAGE keeps it under the same eviction policy/shards |
1226 | | // as other index/metadata pages and avoids competing with DATA_PAGE budget. |
1227 | 860k | if (!segment_footer_cache->lookup(cache_key, &cache_handle, |
1228 | 860k | segment_v2::PageTypePB::INDEX_PAGE)) { |
1229 | 95.0k | RETURN_IF_ERROR(_parse_footer(footer_pb_shared, stats)); |
1230 | 95.0k | segment_footer_cache->insert(cache_key, footer_pb_shared, footer_pb_shared->ByteSizeLong(), |
1231 | 95.0k | &cache_handle, segment_v2::PageTypePB::INDEX_PAGE); |
1232 | 765k | } else { |
1233 | 18.4E | VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is found in cache", |
1234 | 18.4E | _file_reader->path().native(), _file_reader->size(), |
1235 | 18.4E | _file_reader->size() - 12); |
1236 | 765k | } |
1237 | 860k | footer_pb_shared = cache_handle.get<std::shared_ptr<SegmentFooterPB>>(); |
1238 | 860k | _footer_pb = footer_pb_shared; |
1239 | 860k | footer_pb = footer_pb_shared; |
1240 | 860k | return Status::OK(); |
1241 | 860k | } |
1242 | | |
1243 | 894k | StoragePageCache::CacheKey Segment::get_segment_footer_cache_key() const { |
1244 | 894k | DCHECK(_file_reader != nullptr); |
1245 | | // The footer is always at the end of the segment file. |
1246 | | // The size of footer is 12. |
1247 | | // So we use the size of file minus 12 as the cache key, which is unique for each segment file. |
1248 | 894k | return get_segment_footer_cache_key(_file_reader); |
1249 | 894k | } |
1250 | | |
1251 | | StoragePageCache::CacheKey Segment::get_segment_footer_cache_key( |
1252 | 893k | const io::FileReaderSPtr& file_reader) { |
1253 | 893k | return {file_reader->path().native(), file_reader->size(), |
1254 | 893k | static_cast<int64_t>(file_reader->size() - 12)}; |
1255 | 893k | } |
1256 | | |
1257 | | } // namespace doris::segment_v2 |