Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <gen_cpp/segment_v2.pb.h>
25
26
#include <cstring>
27
#include <memory>
28
#include <sstream>
29
#include <utility>
30
31
#include "cloud/config.h"
32
#include "common/exception.h"
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "core/column/column.h"
36
#include "core/data_type/data_type.h"
37
#include "core/data_type/data_type_factory.hpp"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type/data_type_variant.h"
40
#include "core/field.h"
41
#include "core/string_ref.h"
42
#include "cpp/sync_point.h"
43
#include "io/cache/block_file_cache.h"
44
#include "io/cache/block_file_cache_factory.h"
45
#include "io/cache/cached_remote_file_reader.h"
46
#include "io/fs/file_reader.h"
47
#include "io/fs/file_system.h"
48
#include "io/io_common.h"
49
#include "runtime/exec_env.h"
50
#include "runtime/query_context.h"
51
#include "runtime/runtime_predicate.h"
52
#include "runtime/runtime_state.h"
53
#include "storage/index/index_file_reader.h"
54
#include "storage/index/indexed_column_reader.h"
55
#include "storage/index/primary_key_index.h"
56
#include "storage/index/short_key_index.h"
57
#include "storage/iterator/vgeneric_iterators.h"
58
#include "storage/iterators.h"
59
#include "storage/olap_common.h"
60
#include "storage/predicate/block_column_predicate.h"
61
#include "storage/predicate/column_predicate.h"
62
#include "storage/rowset/rowset_reader_context.h"
63
#include "storage/schema.h"
64
#include "storage/segment/column_meta_accessor.h"
65
#include "storage/segment/column_reader.h"
66
#include "storage/segment/column_reader_cache.h"
67
#include "storage/segment/empty_segment_iterator.h"
68
#include "storage/segment/page_io.h"
69
#include "storage/segment/page_pointer.h"
70
#include "storage/segment/segment_iterator.h"
71
#include "storage/segment/segment_writer.h" // k_segment_magic_length
72
#include "storage/segment/stream_reader.h"
73
#include "storage/segment/variant/variant_column_reader.h"
74
#include "storage/tablet/tablet_schema.h"
75
#include "storage/types.h"
76
#include "storage/utils.h"
77
#include "util/coding.h"
78
#include "util/json/path_in_data.h"
79
#include "util/slice.h" // Slice
80
81
namespace doris::segment_v2 {
82
#include "common/compile_check_begin.h"
83
84
class InvertedIndexIterator;
85
86
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
87
                     uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
88
                     const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
89
538k
                     InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
90
538k
    auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, reader_options, output,
91
538k
                   idx_file_info, stats);
92
538k
    if (!s.ok()) {
93
5
        if (!config::is_cloud_mode()) {
94
5
            auto res = ExecEnv::get_tablet(tablet_id);
95
5
            TabletSharedPtr tablet =
96
5
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
97
5
            if (tablet) {
98
0
                tablet->report_error(s);
99
0
            }
100
5
        }
101
5
    }
102
103
538k
    return s;
104
538k
}
105
106
Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
107
                      RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
108
                      const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
109
539k
                      InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
110
539k
    io::FileReaderSPtr file_reader;
111
539k
    auto st = fs->open_file(path, &file_reader, &reader_options);
112
539k
    TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
113
539k
    std::shared_ptr<Segment> segment(
114
539k
            new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
115
539k
    if (st) {
116
538k
        segment->_fs = fs;
117
538k
        segment->_file_reader = std::move(file_reader);
118
538k
        st = segment->_open(stats);
119
538k
    } else if (st.is<ErrorCode::CORRUPTION>() &&
120
1.05k
               reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
121
1
        LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source "
122
1
                        "file directly, file path: "
123
1
                     << path << " cache_key: " << file_cache_key_str(path);
124
1
        auto file_key = file_cache_key_from_path(path);
125
1
        auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
126
1
        file_cache->remove_if_cached(file_key);
127
128
1
        st = fs->open_file(path, &file_reader, &reader_options);
129
1
        if (st) {
130
1
            segment->_fs = fs;
131
1
            segment->_file_reader = std::move(file_reader);
132
1
            st = segment->_open(stats);
133
1
        }
134
1
        TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
135
1
        if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
136
0
            LOG(WARNING) << "failed to try to read remote source file again with cache support,"
137
0
                         << " try to read from remote directly, "
138
0
                         << " file path: " << path << " cache_key: " << file_cache_key_str(path);
139
0
            file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
140
0
            file_cache->remove_if_cached(file_key);
141
142
0
            io::FileReaderOptions opt = reader_options;
143
0
            opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
144
0
            RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
145
0
            segment->_fs = fs;
146
0
            segment->_file_reader = std::move(file_reader);
147
0
            st = segment->_open(stats);
148
0
            if (!st.ok()) {
149
0
                LOG(WARNING) << "failed to try to read remote source file directly,"
150
0
                             << " file path: " << path
151
0
                             << " cache_key: " << file_cache_key_str(path);
152
0
            }
153
0
        }
154
1
    }
155
539k
    RETURN_IF_ERROR(st);
156
18.4E
    DCHECK(segment->_fs != nullptr) << "file system is nullptr after segment open";
157
539k
    *output = std::move(segment);
158
539k
    return Status::OK();
159
539k
}
160
161
Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
162
                 InvertedIndexFileInfo idx_file_info)
163
539k
        : _segment_id(segment_id),
164
539k
          _meta_mem_usage(0),
165
539k
          _rowset_id(rowset_id),
166
539k
          _tablet_schema(std::move(tablet_schema)),
167
539k
          _idx_file_info(std::move(idx_file_info)) {}
168
169
540k
Segment::~Segment() {
170
540k
    g_segment_estimate_mem_bytes << -_tracked_meta_mem_usage;
171
    // if failed, fix `_tracked_meta_mem_usage` accuracy
172
540k
    DCHECK(_tracked_meta_mem_usage == meta_mem_usage());
173
540k
}
174
175
6.50k
io::UInt128Wrapper Segment::file_cache_key(std::string_view rowset_id, uint32_t seg_id) {
176
6.50k
    return io::BlockFileCache::hash(fmt::format("{}_{}.dat", rowset_id, seg_id));
177
6.50k
}
178
179
536k
int64_t Segment::get_metadata_size() const {
180
536k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
181
536k
    return sizeof(Segment) + (_pk_index_meta ? _pk_index_meta->ByteSizeLong() : 0) +
182
18.4E
           (footer_pb_shared ? footer_pb_shared->ByteSizeLong() : 0);
183
536k
}
184
185
537k
void Segment::update_metadata_size() {
186
537k
    MetadataAdder::update_metadata_size();
187
537k
    g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage;
188
537k
    _tracked_meta_mem_usage = _meta_mem_usage;
189
537k
}
190
191
535k
Status Segment::_open(OlapReaderStatistics* stats) {
192
535k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
193
535k
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
194
195
535k
    _pk_index_meta.reset(
196
535k
            footer_pb_shared->has_primary_key_index_meta()
197
535k
                    ? new PrimaryKeyIndexMetaPB(footer_pb_shared->primary_key_index_meta())
198
535k
                    : nullptr);
199
    // delete_bitmap_calculator_test.cpp
200
    // DCHECK(footer.has_short_key_index_page());
201
535k
    _sk_index_page = footer_pb_shared->short_key_index_page();
202
535k
    _num_rows = footer_pb_shared->num_rows();
203
204
    // An estimated memory usage of a segment
205
    // Footer is seperated to StoragePageCache so we don't need to add it to _meta_mem_usage
206
    // _meta_mem_usage += footer_pb_shared->ByteSizeLong();
207
535k
    if (_pk_index_meta != nullptr) {
208
520k
        _meta_mem_usage += _pk_index_meta->ByteSizeLong();
209
520k
    }
210
211
535k
    _meta_mem_usage += sizeof(*this);
212
535k
    _meta_mem_usage += std::min(static_cast<int>(_tablet_schema->num_columns()),
213
535k
                                config::max_segment_partial_column_cache_size) *
214
535k
                       config::estimated_mem_per_column_reader;
215
216
    // 1024 comes from SegmentWriterOptions
217
535k
    _meta_mem_usage += (_num_rows + 1023) / 1024 * (36 + 4);
218
    // 0.01 comes from PrimaryKeyIndexBuilder::init
219
535k
    _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
220
221
535k
    update_metadata_size();
222
223
535k
    return Status::OK();
224
535k
}
225
226
652
Status Segment::_open_index_file_reader() {
227
652
    _index_file_reader = std::make_shared<IndexFileReader>(
228
652
            _fs,
229
652
            std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
230
652
                    _file_reader->path().native())},
231
652
            _tablet_schema->get_inverted_index_storage_format(), _idx_file_info);
232
652
    return Status::OK();
233
652
}
234
235
Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
236
537k
                             std::unique_ptr<RowwiseIterator>* iter) {
237
537k
    if (read_options.runtime_state != nullptr) {
238
523k
        _be_exec_version = read_options.runtime_state->be_exec_version();
239
523k
    }
240
537k
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
241
242
537k
    read_options.stats->total_segment_number++;
243
    // trying to prune the current segment by segment-level zone map
244
537k
    for (const auto& entry : read_options.col_id_to_predicates) {
245
517k
        int32_t column_id = entry.first;
246
        // schema change
247
517k
        if (_tablet_schema->num_columns() <= column_id) {
248
30
            continue;
249
30
        }
250
517k
        const TabletColumn& col = read_options.tablet_schema->column(column_id);
251
517k
        std::shared_ptr<ColumnReader> reader;
252
517k
        Status st = get_column_reader(col, &reader, read_options.stats);
253
        // not found in this segment, skip
254
517k
        if (st.is<ErrorCode::NOT_FOUND>()) {
255
0
            continue;
256
0
        }
257
517k
        RETURN_IF_ERROR(st);
258
        // should be OK
259
517k
        DCHECK(reader != nullptr);
260
517k
        if (!reader->has_zone_map()) {
261
0
            continue;
262
0
        }
263
517k
        if (read_options.col_id_to_predicates.contains(column_id) &&
264
517k
            can_apply_predicate_safely(column_id, *schema,
265
516k
                                       read_options.target_cast_type_for_variants, read_options)) {
266
514k
            bool matched = true;
267
514k
            RETURN_IF_ERROR(reader->match_condition(entry.second.get(), &matched));
268
514k
            if (!matched) {
269
                // any condition not satisfied, return.
270
15.5k
                *iter = std::make_unique<EmptySegmentIterator>(*schema);
271
15.5k
                read_options.stats->filtered_segment_number++;
272
15.5k
                return Status::OK();
273
15.5k
            }
274
514k
        }
275
517k
    }
276
277
522k
    {
278
522k
        SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns);
279
522k
        RETURN_IF_ERROR(load_index(read_options.stats));
280
522k
    }
281
282
522k
    if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 &&
283
522k
        read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
284
522k
        read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
285
3.48k
        iter->reset(new_vstatistics_iterator(this->shared_from_this(), *schema));
286
518k
    } else {
287
518k
        *iter = std::make_unique<SegmentIterator>(this->shared_from_this(), schema);
288
518k
    }
289
290
    // TODO: Valid the opt not only in ReaderType::READER_QUERY
291
522k
    if (read_options.io_ctx.reader_type == ReaderType::READER_QUERY &&
292
522k
        !read_options.column_predicates.empty()) {
293
498k
        auto pruned_predicates = read_options.column_predicates;
294
498k
        auto pruned = false;
295
570k
        for (auto& it : _column_reader_cache->get_available_readers(false)) {
296
570k
            const auto uid = it.first;
297
570k
            const auto column_id = read_options.tablet_schema->field_index(uid);
298
570k
            bool tmp_pruned = false;
299
570k
            RETURN_IF_ERROR(it.second->prune_predicates_by_zone_map(pruned_predicates, column_id,
300
570k
                                                                    &tmp_pruned));
301
570k
            pruned |= tmp_pruned;
302
570k
        }
303
304
498k
        if (pruned) {
305
18
            auto options_with_pruned_predicates = read_options;
306
18
            options_with_pruned_predicates.column_predicates = pruned_predicates;
307
            //because column_predicates is changed, we need to rebuild col_id_to_predicates so that inverted index will not go through it.
308
18
            options_with_pruned_predicates.col_id_to_predicates.clear();
309
18
            for (auto pred : options_with_pruned_predicates.column_predicates) {
310
0
                if (!options_with_pruned_predicates.col_id_to_predicates.contains(
311
0
                            pred->column_id())) {
312
0
                    options_with_pruned_predicates.col_id_to_predicates.insert(
313
0
                            {pred->column_id(), AndBlockColumnPredicate::create_shared()});
314
0
                }
315
0
                options_with_pruned_predicates.col_id_to_predicates[pred->column_id()]
316
0
                        ->add_column_predicate(SingleColumnBlockPredicate::create_unique(pred));
317
0
            }
318
18
            return iter->get()->init(options_with_pruned_predicates);
319
18
        }
320
498k
    }
321
522k
    return iter->get()->init(read_options);
322
522k
}
323
324
Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
325
0
                                  io::IOContext& io_ctx) {
326
0
    if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) {
327
0
        return Status::OK();
328
0
    }
329
330
0
    std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat";
331
0
    std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/";
332
0
    Status create_st = io::global_local_filesystem()->create_directory(dir_path, true);
333
0
    if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) {
334
0
        LOG(WARNING) << "failed to create error file dir: " << create_st.to_string();
335
0
        return create_st;
336
0
    }
337
0
    size_t dir_size = 0;
338
0
    RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size));
339
0
    if (dir_size > config::file_cache_error_log_limit_bytes) {
340
0
        LOG(WARNING) << "error file dir size is too large: " << dir_size;
341
0
        return Status::OK();
342
0
    }
343
344
0
    std::string error_part;
345
0
    error_part.resize(bytes_read);
346
0
    std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset);
347
0
    LOG(WARNING) << "writer error part to " << part_path;
348
0
    bool is_part_exist = false;
349
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist));
350
0
    if (is_part_exist) {
351
0
        LOG(WARNING) << "error part already exists: " << part_path;
352
0
    } else {
353
0
        std::unique_ptr<io::FileWriter> part_writer;
354
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer));
355
0
        RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read)));
356
0
        RETURN_IF_ERROR(part_writer->close());
357
0
    }
358
359
0
    std::string error_file;
360
0
    error_file.resize(file_size);
361
0
    auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get());
362
0
    if (cached_reader == nullptr) {
363
0
        return Status::InternalError("file reader is not CachedRemoteFileReader");
364
0
    }
365
0
    size_t error_file_bytes_read = 0;
366
0
    RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at(
367
0
            0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx));
368
0
    DCHECK(error_file_bytes_read == file_size);
369
    //std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str();
370
0
    std::string file_path = dir_path + file_name;
371
0
    LOG(WARNING) << "writer error file to " << file_path;
372
0
    bool is_file_exist = false;
373
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist));
374
0
    if (is_file_exist) {
375
0
        LOG(WARNING) << "error file already exists: " << part_path;
376
0
    } else {
377
0
        std::unique_ptr<io::FileWriter> writer;
378
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer));
379
0
        RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size)));
380
0
        RETURN_IF_ERROR(writer->close());
381
0
    }
382
0
    return Status::OK(); // already exists
383
0
};
384
385
Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer,
386
10.2k
                              OlapReaderStatistics* stats) {
387
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
388
10.2k
    auto file_size = _file_reader->size();
389
10.2k
    if (file_size < 12) {
390
0
        return Status::Corruption("Bad segment file {}: file size {} < 12, cache_key: {}",
391
0
                                  _file_reader->path().native(), file_size,
392
0
                                  file_cache_key_str(_file_reader->path().native()));
393
0
    }
394
395
10.2k
    uint8_t fixed_buf[12];
396
10.2k
    size_t bytes_read = 0;
397
    // TODO(plat1ko): Support session variable `enable_file_cache`
398
10.2k
    io::IOContext io_ctx {.is_index_data = true,
399
10.2k
                          .file_cache_stats = stats ? &stats->file_cache_stats : nullptr};
400
10.2k
    RETURN_IF_ERROR(
401
10.2k
            _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
402
10.2k
    DCHECK_EQ(bytes_read, 12);
403
10.2k
    TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf);
404
10.2k
    TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf);
405
10.2k
    if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
406
0
        Status st =
407
0
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
408
0
        if (!st.ok()) {
409
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
410
0
        }
411
0
        return Status::Corruption(
412
0
                "Bad segment file {}: file_size: {}, magic number not match, cache_key: {}",
413
0
                _file_reader->path().native(), file_size,
414
0
                file_cache_key_str(_file_reader->path().native()));
415
0
    }
416
417
    // read footer PB
418
10.2k
    uint32_t footer_length = decode_fixed32_le(fixed_buf);
419
10.2k
    if (file_size < 12 + footer_length) {
420
0
        Status st =
421
0
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
422
0
        if (!st.ok()) {
423
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
424
0
        }
425
0
        return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}",
426
0
                                  _file_reader->path().native(), file_size, 12 + footer_length,
427
0
                                  file_cache_key_str(_file_reader->path().native()));
428
0
    }
429
430
10.2k
    std::string footer_buf;
431
10.2k
    footer_buf.resize(footer_length);
432
10.2k
    RETURN_IF_ERROR(_file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read,
433
10.2k
                                          &io_ctx));
434
10.2k
    DCHECK_EQ(bytes_read, footer_length);
435
436
    // validate footer PB's checksum
437
10.2k
    uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4);
438
10.2k
    uint32_t actual_checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
439
10.2k
    if (actual_checksum != expect_checksum) {
440
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
441
0
                                      footer_buf.data(), io_ctx);
442
0
        if (!st.ok()) {
443
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
444
0
        }
445
0
        return Status::Corruption(
446
0
                "Bad segment file {}: file_size = {}, footer checksum not match, actual={} "
447
0
                "vs expect={}, cache_key: {}",
448
0
                _file_reader->path().native(), file_size, actual_checksum, expect_checksum,
449
0
                file_cache_key_str(_file_reader->path().native()));
450
0
    }
451
452
    // deserialize footer PB
453
10.2k
    footer = std::make_shared<SegmentFooterPB>();
454
10.2k
    if (!footer->ParseFromString(footer_buf)) {
455
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
456
0
                                      footer_buf.data(), io_ctx);
457
0
        if (!st.ok()) {
458
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
459
0
        }
460
0
        return Status::Corruption(
461
0
                "Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, "
462
0
                "cache_key: ",
463
0
                _file_reader->path().native(), file_size,
464
0
                file_cache_key_str(_file_reader->path().native()));
465
0
    }
466
467
18.4E
    VLOG_DEBUG << fmt::format("Loading segment footer from {} finished",
468
18.4E
                              _file_reader->path().native());
469
10.2k
    return Status::OK();
470
10.2k
}
471
472
9.32k
Status Segment::_load_pk_bloom_filter(OlapReaderStatistics* stats) {
473
#ifdef BE_TEST
474
    if (_pk_index_meta == nullptr) {
475
        // for BE UT "segment_cache_test"
476
        return _load_pk_bf_once.call([this] {
477
            _meta_mem_usage += 100;
478
            update_metadata_size();
479
            return Status::OK();
480
        });
481
    }
482
#endif
483
9.32k
    DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
484
9.32k
    DCHECK(_pk_index_meta != nullptr);
485
9.32k
    DCHECK(_pk_index_reader != nullptr);
486
487
9.32k
    return _load_pk_bf_once.call([this, stats] {
488
4.45k
        RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta, stats));
489
        // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
490
4.45k
        return Status::OK();
491
4.45k
    });
492
9.32k
}
493
494
9.32k
Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
495
    // `DorisCallOnce` may catch exception in calling stack A and re-throw it in
496
    // a different calling stack B which doesn't have catch block. So we add catch block here
497
    // to prevent coreudmp
498
9.32k
    RETURN_IF_CATCH_EXCEPTION({
499
9.32k
        RETURN_IF_ERROR(load_index(index_load_stats));
500
9.32k
        RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats));
501
9.32k
    });
502
9.32k
    return Status::OK();
503
9.32k
}
504
505
531k
Status Segment::load_index(OlapReaderStatistics* stats) {
506
531k
    return _load_index_once.call([this, stats] {
507
516k
        if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
508
498k
            _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
509
498k
            RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta, stats));
510
            // _meta_mem_usage += _pk_index_reader->get_memory_size();
511
498k
            return Status::OK();
512
498k
        } else {
513
            // read and parse short key index page
514
17.9k
            OlapReaderStatistics tmp_stats;
515
17.9k
            OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
516
17.9k
            PageReadOptions opts(io::IOContext {.is_index_data = true,
517
17.9k
                                                .file_cache_stats = &stats_ptr->file_cache_stats});
518
17.9k
            opts.use_page_cache = true;
519
17.9k
            opts.type = INDEX_PAGE;
520
17.9k
            opts.file_reader = _file_reader.get();
521
17.9k
            opts.page_pointer = PagePointer(_sk_index_page);
522
            // short key index page uses NO_COMPRESSION for now
523
17.9k
            opts.codec = nullptr;
524
17.9k
            opts.stats = &tmp_stats;
525
526
17.9k
            Slice body;
527
17.9k
            PageFooterPB footer;
528
17.9k
            RETURN_IF_ERROR(
529
17.9k
                    PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer));
530
17.9k
            DCHECK_EQ(footer.type(), SHORT_KEY_PAGE);
531
17.9k
            DCHECK(footer.has_short_key_page_footer());
532
533
            // _meta_mem_usage += body.get_size();
534
17.9k
            _sk_index_decoder = std::make_unique<ShortKeyIndexDecoder>();
535
17.9k
            return _sk_index_decoder->parse(body, footer.short_key_page_footer());
536
17.9k
        }
537
516k
    });
538
531k
}
539
540
11.5k
Status Segment::healthy_status() {
541
11.5k
    try {
542
11.5k
        if (_load_index_once.has_called()) {
543
11.2k
            RETURN_IF_ERROR(_load_index_once.stored_result());
544
11.2k
        }
545
11.5k
        if (_load_pk_bf_once.has_called()) {
546
4.38k
            RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
547
4.38k
        }
548
11.5k
        if (_create_column_meta_once_call.has_called()) {
549
11.3k
            RETURN_IF_ERROR(_create_column_meta_once_call.stored_result());
550
11.3k
        }
551
11.5k
        if (_index_file_reader_open.has_called()) {
552
0
            RETURN_IF_ERROR(_index_file_reader_open.stored_result());
553
0
        }
554
        // This status is set by running time, for example, if there is something wrong during read segment iterator.
555
11.5k
        return _healthy_status.status();
556
11.5k
    } catch (const doris::Exception& e) {
557
        // If there is an exception during load_xxx, should not throw exception directly because
558
        // the caller may not exception safe.
559
0
        return e.to_status();
560
0
    } catch (const std::exception& e) {
561
        // The exception is not thrown by doris code.
562
0
        return Status::InternalError("Unexcepted error during load segment: {}", e.what());
563
0
    }
564
11.5k
}
565
566
// Return the storage datatype of related column to field.
567
DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
568
9.02M
                                      const StorageReadOptions& read_options) {
569
9.02M
    const PathInDataPtr path = column.path_info_ptr();
570
571
    // none variant column
572
9.03M
    if (path == nullptr || path->empty()) {
573
9.03M
        return DataTypeFactory::instance().create_data_type(column);
574
9.03M
    }
575
576
    // Path exists, proceed with variant logic.
577
18.4E
    PathInData relative_path = path->copy_pop_front();
578
18.4E
    int32_t unique_id = column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
579
580
    // If this uid does not exist in segment meta, fallback to schema type.
581
18.4E
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
582
0
        return DataTypeFactory::instance().create_data_type(column);
583
0
    }
584
585
18.4E
    std::shared_ptr<ColumnReader> v_reader;
586
587
    // Get the parent variant column reader
588
18.4E
    OlapReaderStatistics stats;
589
    // If status is not ok, it will throw exception(data corruption)
590
18.4E
    THROW_IF_ERROR(get_column_reader(unique_id, &v_reader, &stats));
591
18.4E
    DCHECK(v_reader != nullptr);
592
18.4E
    auto* variant_reader = static_cast<VariantColumnReader*>(v_reader.get());
593
    // Delegate type inference for variant paths to VariantColumnReader.
594
18.4E
    DataTypePtr type;
595
18.4E
    THROW_IF_ERROR(variant_reader->infer_data_type_for_path(&type, column, read_options,
596
18.4E
                                                            _column_reader_cache.get()));
597
18.4E
    return type;
598
18.4E
}
599
600
16.9M
Status Segment::_create_column_meta_once(OlapReaderStatistics* stats) {
601
16.9M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
602
16.9M
    return _create_column_meta_once_call.call([&] {
603
528k
        std::shared_ptr<SegmentFooterPB> footer_pb_shared;
604
528k
        RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
605
528k
        return _create_column_meta(*footer_pb_shared);
606
528k
    });
607
16.9M
}
608
609
526k
Status Segment::_create_column_meta(const SegmentFooterPB& footer) {
610
    // Initialize column meta accessor which internally maintains uid -> column_ordinal mapping.
611
526k
    _column_meta_accessor = std::make_unique<ColumnMetaAccessor>();
612
526k
    RETURN_IF_ERROR(_column_meta_accessor->init(footer, _file_reader));
613
526k
    _column_reader_cache = std::make_unique<ColumnReaderCache>(
614
526k
            _column_meta_accessor.get(), _tablet_schema, _file_reader, _num_rows,
615
8.50M
            [this](std::shared_ptr<SegmentFooterPB>& footer_pb, OlapReaderStatistics* stats) {
616
8.50M
                return _get_segment_footer(footer_pb, stats);
617
8.50M
            });
618
526k
    return Status::OK();
619
526k
}
620
621
Status Segment::new_default_iterator(const TabletColumn& tablet_column,
622
79
                                     std::unique_ptr<ColumnIterator>* iter) {
623
79
    if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
624
0
        return Status::InternalError(
625
0
                "invalid nonexistent column without default value. column_uid={}, "
626
0
                "column_name={}, "
627
0
                "column_type={}",
628
0
                tablet_column.unique_id(), tablet_column.name(), tablet_column.type());
629
0
    }
630
79
    auto type_info = get_type_info(&tablet_column);
631
79
    std::unique_ptr<DefaultValueColumnIterator> default_value_iter(new DefaultValueColumnIterator(
632
79
            tablet_column.has_default_value(), tablet_column.default_value(),
633
79
            tablet_column.is_nullable(), std::move(type_info), tablet_column.precision(),
634
79
            tablet_column.frac(), tablet_column.length()));
635
79
    ColumnIteratorOptions iter_opts;
636
637
79
    RETURN_IF_ERROR(default_value_iter->init(iter_opts));
638
79
    *iter = std::move(default_value_iter);
639
79
    return Status::OK();
640
79
}
641
642
// Not use cid anymore, for example original table schema is colA int, then user do following actions
643
// 1.add column b
644
// 2. drop column b
645
// 3. add column c
646
// in the new schema column c's cid == 2
647
// but in the old schema column b's cid == 2
648
// but they are not the same column
649
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
650
                                    std::unique_ptr<ColumnIterator>* iter,
651
                                    const StorageReadOptions* opt,
652
                                    const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
653
7.98M
                                            variant_sparse_column_cache) {
654
7.98M
    if (opt->runtime_state != nullptr) {
655
7.93M
        _be_exec_version = opt->runtime_state->be_exec_version();
656
7.93M
    }
657
7.98M
    RETURN_IF_ERROR(_create_column_meta_once(opt->stats));
658
659
    // For compability reason unique_id may less than 0 for variant extracted column
660
8.02M
    int32_t unique_id = tablet_column.unique_id() >= 0 ? tablet_column.unique_id()
661
18.4E
                                                       : tablet_column.parent_unique_id();
662
663
    // If column meta for this uid is not found in this segment, use default iterator.
664
7.98M
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
665
76
        RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
666
76
        return Status::OK();
667
76
    }
668
669
    // init iterator by unique id
670
7.98M
    std::shared_ptr<ColumnReader> reader;
671
7.98M
    RETURN_IF_ERROR(get_column_reader(unique_id, &reader, opt->stats));
672
7.98M
    if (reader == nullptr) {
673
0
        return Status::InternalError("column reader is nullptr, unique_id={}", unique_id);
674
0
    }
675
7.98M
    if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
676
        // if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used
677
130
        PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr;
678
130
        if (variant_sparse_column_cache) {
679
130
            auto it = variant_sparse_column_cache->find(unique_id);
680
130
            if (it != variant_sparse_column_cache->end()) {
681
130
                sparse_column_cache_ptr = it->second.get();
682
130
            } else {
683
0
                DCHECK(false) << "sparse column cache is not found, unique_id=" << unique_id;
684
0
            }
685
130
        }
686
        // use _column_reader_cache to get variant subcolumn(path column) reader
687
130
        RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get())
688
130
                                ->new_iterator(iter, &tablet_column, opt,
689
130
                                               _column_reader_cache.get(),
690
130
                                               sparse_column_cache_ptr));
691
7.98M
    } else {
692
7.98M
        RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
693
7.98M
        if (opt->all_access_paths.contains(unique_id) ||
694
8.00M
            opt->predicate_access_paths.contains(unique_id)) {
695
1.99k
            const auto& all_access_paths = opt->all_access_paths.contains(unique_id)
696
1.99k
                                                   ? opt->all_access_paths.at(unique_id)
697
1.99k
                                                   : TColumnAccessPaths {};
698
1.99k
            const auto& predicate_access_paths = opt->predicate_access_paths.contains(unique_id)
699
1.99k
                                                         ? opt->predicate_access_paths.at(unique_id)
700
1.99k
                                                         : TColumnAccessPaths {};
701
702
            // set column name to apply access paths.
703
1.99k
            (*iter)->set_column_name(tablet_column.name());
704
1.99k
            RETURN_IF_ERROR((*iter)->set_access_paths(all_access_paths, predicate_access_paths));
705
1.99k
            (*iter)->remove_pruned_sub_iterators();
706
1.99k
        }
707
7.98M
    }
708
709
8.00M
    if (config::enable_column_type_check && !tablet_column.has_path_info() &&
710
8.00M
        !tablet_column.is_agg_state_type() && tablet_column.type() != reader->get_meta_type()) {
711
0
        LOG(WARNING) << "different type between schema and column reader,"
712
0
                     << " column schema name: " << tablet_column.name()
713
0
                     << " column schema type: " << int(tablet_column.type())
714
0
                     << " column reader meta type: " << int(reader->get_meta_type());
715
0
        return Status::InternalError("different type between schema and column reader");
716
0
    }
717
7.98M
    return Status::OK();
718
7.98M
}
719
720
Status Segment::get_column_reader(int32_t col_uid, std::shared_ptr<ColumnReader>* column_reader,
721
7.99M
                                  OlapReaderStatistics* stats) {
722
7.99M
    RETURN_IF_ERROR(_create_column_meta_once(stats));
723
7.99M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
724
    // The column is not in this segment, return nullptr
725
7.99M
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
726
0
        *column_reader = nullptr;
727
0
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
728
0
                                                          col_uid);
729
0
    }
730
7.99M
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats);
731
7.99M
}
732
733
0
Status Segment::traverse_column_meta_pbs(const std::function<void(const ColumnMetaPB&)>& visitor) {
734
    // Ensure column meta accessor and reader cache are initialized once.
735
0
    OlapReaderStatistics dummy_stats;
736
0
    RETURN_IF_ERROR(_create_column_meta_once(&dummy_stats));
737
0
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
738
0
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, &dummy_stats));
739
0
    return _column_meta_accessor->traverse_metas(*footer_pb_shared, visitor);
740
0
}
741
742
Status Segment::get_column_reader(const TabletColumn& col,
743
                                  std::shared_ptr<ColumnReader>* column_reader,
744
518k
                                  OlapReaderStatistics* stats) {
745
518k
    RETURN_IF_ERROR(_create_column_meta_once(stats));
746
518k
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
747
18.4E
    int col_uid = col.unique_id() >= 0 ? col.unique_id() : col.parent_unique_id();
748
    // The column is not in this segment, return nullptr
749
518k
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
750
0
        *column_reader = nullptr;
751
0
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
752
0
                                                          col_uid);
753
0
    }
754
518k
    if (col.has_path_info()) {
755
0
        PathInData relative_path = col.path_info_ptr()->copy_pop_front();
756
0
        return _column_reader_cache->get_path_column_reader(col_uid, relative_path, column_reader,
757
0
                                                            stats);
758
0
    }
759
518k
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats);
760
518k
}
761
762
Status Segment::new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta,
763
                                   const StorageReadOptions& read_options,
764
2.56k
                                   std::unique_ptr<IndexIterator>* iter) {
765
2.56k
    if (read_options.runtime_state != nullptr) {
766
0
        _be_exec_version = read_options.runtime_state->be_exec_version();
767
0
    }
768
2.56k
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
769
2.56k
    std::shared_ptr<ColumnReader> reader;
770
2.56k
    auto st = get_column_reader(tablet_column, &reader, read_options.stats);
771
2.56k
    if (st.is<ErrorCode::NOT_FOUND>()) {
772
0
        return Status::OK();
773
0
    }
774
2.56k
    RETURN_IF_ERROR(st);
775
2.56k
    DCHECK(reader != nullptr);
776
2.56k
    if (index_meta) {
777
        // call DorisCallOnce.call without check if _index_file_reader is nullptr
778
        // to avoid data race during parallel method calls
779
2.56k
        RETURN_IF_ERROR(_index_file_reader_open.call([&] { return _open_index_file_reader(); }));
780
        // after DorisCallOnce.call, _index_file_reader is guaranteed to be not nullptr
781
2.56k
        RETURN_IF_ERROR(reader->new_index_iterator(_index_file_reader, index_meta, iter));
782
2.56k
        return Status::OK();
783
2.56k
    }
784
0
    return Status::OK();
785
2.56k
}
786
787
Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
788
                               bool with_seq_col, bool with_rowid, RowLocation* row_location,
789
5.02k
                               OlapReaderStatistics* stats, std::string* encoded_seq_value) {
790
5.02k
    RETURN_IF_ERROR(load_pk_index_and_bf(stats));
791
5.02k
    bool has_seq_col = latest_schema->has_sequence_col();
792
5.02k
    bool has_rowid = !latest_schema->cluster_key_uids().empty();
793
5.02k
    size_t seq_col_length = 0;
794
5.02k
    if (has_seq_col) {
795
2
        seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
796
2
    }
797
5.02k
    size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;
798
799
5.02k
    Slice key_without_seq =
800
5.02k
            Slice(key.get_data(), key.get_size() - (with_seq_col ? seq_col_length : 0) -
801
5.02k
                                          (with_rowid ? rowid_length : 0));
802
803
5.02k
    DCHECK(_pk_index_reader != nullptr);
804
5.02k
    if (!_pk_index_reader->check_present(key_without_seq)) {
805
3.89k
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
806
3.89k
    }
807
1.12k
    bool exact_match = false;
808
1.12k
    std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
809
1.12k
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats));
810
1.12k
    auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match);
811
1.12k
    if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
812
0
        return st;
813
0
    }
814
1.12k
    if (st.is<ErrorCode::ENTRY_NOT_FOUND>() || (!has_seq_col && !has_rowid && !exact_match)) {
815
8
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
816
8
    }
817
1.11k
    row_location->row_id = cast_set<uint32_t>(index_iterator->get_current_ordinal());
818
1.11k
    row_location->segment_id = _segment_id;
819
1.11k
    row_location->rowset_id = _rowset_id;
820
821
1.11k
    size_t num_to_read = 1;
822
1.11k
    auto index_type = DataTypeFactory::instance().create_data_type(
823
1.11k
            _pk_index_reader->type_info()->type(), 1, 0);
824
1.11k
    auto index_column = index_type->create_column();
825
1.11k
    size_t num_read = num_to_read;
826
1.11k
    RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
827
1.11k
    DCHECK(num_to_read == num_read);
828
829
1.11k
    Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
830
831
    // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column
832
    // for a merge-on-write table which doesn't have sequence column, so `has_seq_col ==  true` doesn't mean
833
    // data in segment has sequence column value
834
1.11k
    bool segment_has_seq_col = _tablet_schema->has_sequence_col();
835
1.11k
    Slice sought_key_without_seq = Slice(
836
1.11k
            sought_key.get_data(),
837
1.11k
            sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length);
838
839
1.11k
    if (has_seq_col) {
840
        // compare key
841
2
        if (key_without_seq.compare(sought_key_without_seq) != 0) {
842
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
843
0
        }
844
845
2
        if (with_seq_col && segment_has_seq_col) {
846
            // compare sequence id
847
2
            Slice sequence_id =
848
2
                    Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1);
849
2
            Slice previous_sequence_id =
850
2
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size() + 1,
851
2
                          seq_col_length - 1);
852
2
            if (sequence_id.compare(previous_sequence_id) < 0) {
853
1
                return Status::Error<ErrorCode::KEY_ALREADY_EXISTS>(
854
1
                        "key with higher sequence id exists");
855
1
            }
856
2
        }
857
1.11k
    } else if (has_rowid) {
858
0
        Slice sought_key_without_rowid =
859
0
                Slice(sought_key.get_data(), sought_key.get_size() - rowid_length);
860
        // compare key
861
0
        if (key_without_seq.compare(sought_key_without_rowid) != 0) {
862
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
863
0
        }
864
0
    }
865
    // found the key, use rowid in pk index if necessary.
866
1.11k
    if (has_rowid) {
867
0
        Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() +
868
0
                                          (segment_has_seq_col ? seq_col_length : 0) + 1,
869
0
                                  rowid_length - 1);
870
0
        const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
871
0
        const auto* rowid_coder = get_key_coder(type_info->type());
872
0
        RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length,
873
0
                                                      (uint8_t*)&row_location->row_id));
874
0
    }
875
876
1.11k
    if (encoded_seq_value) {
877
0
        if (!segment_has_seq_col) {
878
0
            *encoded_seq_value = std::string {};
879
0
        } else {
880
            // include marker
881
0
            *encoded_seq_value =
882
0
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size(), seq_col_length)
883
0
                            .to_string();
884
0
        }
885
0
    }
886
1.11k
    return Status::OK();
887
1.11k
}
888
889
0
Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
890
0
    OlapReaderStatistics* null_stat = nullptr;
891
0
    RETURN_IF_ERROR(load_pk_index_and_bf(null_stat));
892
0
    std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
893
0
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&iter, null_stat));
894
895
0
    auto index_type = DataTypeFactory::instance().create_data_type(
896
0
            _pk_index_reader->type_info()->type(), 1, 0);
897
0
    auto index_column = index_type->create_column();
898
0
    RETURN_IF_ERROR(iter->seek_to_ordinal(row_id));
899
0
    size_t num_read = 1;
900
0
    RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
901
0
    CHECK(num_read == 1);
902
    // trim row id
903
0
    if (_tablet_schema->cluster_key_uids().empty()) {
904
0
        *key = index_column->get_data_at(0).to_string();
905
0
    } else {
906
0
        Slice sought_key =
907
0
                Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
908
0
        Slice sought_key_without_rowid =
909
0
                Slice(sought_key.get_data(),
910
0
                      sought_key.get_size() - PrimaryKeyIndexReader::ROW_ID_LENGTH);
911
0
        *key = sought_key_without_rowid.to_string();
912
0
    }
913
0
    return Status::OK();
914
0
}
915
916
Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
917
                                       uint32_t row_id, MutableColumnPtr& result,
918
                                       StorageReadOptions& storage_read_options,
919
202
                                       std::unique_ptr<ColumnIterator>& iterator_hint) {
920
202
    segment_v2::ColumnIteratorOptions opt {
921
202
            .use_page_cache = !config::disable_storage_page_cache,
922
202
            .file_reader = file_reader().get(),
923
202
            .stats = storage_read_options.stats,
924
202
            .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
925
202
                                     .file_cache_stats =
926
202
                                             &storage_read_options.stats->file_cache_stats},
927
202
    };
928
929
202
    std::vector<segment_v2::rowid_t> single_row_loc {row_id};
930
202
    if (!slot->column_paths().empty()) {
931
        // here need create column readers to make sure column reader is created before seek_and_read_by_rowid
932
        // if segment cache miss, column reader will be created to make sure the variant column result not coredump
933
0
        RETURN_IF_ERROR(_create_column_meta_once(storage_read_options.stats));
934
935
0
        TabletColumn column = TabletColumn::create_materialized_variant_column(
936
0
                schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
937
0
                slot->col_unique_id(),
938
0
                assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()))
939
0
                        .variant_max_subcolumns_count());
940
0
        auto storage_type = get_data_type_of(column, storage_read_options);
941
0
        MutableColumnPtr file_storage_column = storage_type->create_column();
942
0
        DCHECK(storage_type != nullptr);
943
944
0
        if (iterator_hint == nullptr) {
945
0
            RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_options));
946
0
            RETURN_IF_ERROR(iterator_hint->init(opt));
947
0
        }
948
0
        RETURN_IF_ERROR(
949
0
                iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
950
0
        ColumnPtr source_ptr;
951
        // storage may have different type with schema, so we need to cast the column
952
0
        RETURN_IF_ERROR(variant_util::cast_column(
953
0
                ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()),
954
0
                slot->type(), &source_ptr));
955
0
        RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 1));
956
202
    } else {
957
202
        int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
958
202
                                                 : schema.field_index(slot->col_name());
959
202
        if (index < 0) {
960
0
            std::stringstream ss;
961
0
            ss << "field name is invalid. field=" << slot->col_name()
962
0
               << ", field_name_to_index=" << schema.get_all_field_names();
963
0
            return Status::InternalError(ss.str());
964
0
        }
965
202
        if (iterator_hint == nullptr) {
966
82
            RETURN_IF_ERROR(new_column_iterator(schema.column(index), &iterator_hint,
967
82
                                                &storage_read_options));
968
82
            RETURN_IF_ERROR(iterator_hint->init(opt));
969
82
        }
970
202
        RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
971
202
    }
972
202
    return Status::OK();
973
202
}
974
975
Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
976
9.52M
                                    OlapReaderStatistics* stats) {
977
9.52M
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
978
9.52M
    if (footer_pb_shared != nullptr) {
979
9.02M
        footer_pb = footer_pb_shared;
980
9.02M
        return Status::OK();
981
9.02M
    }
982
983
18.4E
    VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is missing, try to load it",
984
18.4E
                              _file_reader->path().native(), _file_reader->size(),
985
18.4E
                              _file_reader->size() - 12);
986
987
491k
    StoragePageCache* segment_footer_cache = ExecEnv::GetInstance()->get_storage_page_cache();
988
491k
    DCHECK(segment_footer_cache != nullptr);
989
990
491k
    auto cache_key = get_segment_footer_cache_key();
991
992
491k
    PageCacheHandle cache_handle;
993
994
    // Put segment footer into index page cache.
995
    // Rationale:
996
    // - Footer is metadata (small, parsed with indexes), not data page payload.
997
    // - Using PageTypePB::INDEX_PAGE keeps it under the same eviction policy/shards
998
    //   as other index/metadata pages and avoids competing with DATA_PAGE budget.
999
491k
    if (!segment_footer_cache->lookup(cache_key, &cache_handle,
1000
491k
                                      segment_v2::PageTypePB::INDEX_PAGE)) {
1001
10.2k
        RETURN_IF_ERROR(_parse_footer(footer_pb_shared, stats));
1002
10.2k
        segment_footer_cache->insert(cache_key, footer_pb_shared, footer_pb_shared->ByteSizeLong(),
1003
10.2k
                                     &cache_handle, segment_v2::PageTypePB::INDEX_PAGE);
1004
480k
    } else {
1005
18.4E
        VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is found in cache",
1006
18.4E
                                  _file_reader->path().native(), _file_reader->size(),
1007
18.4E
                                  _file_reader->size() - 12);
1008
480k
    }
1009
491k
    footer_pb_shared = cache_handle.get<std::shared_ptr<SegmentFooterPB>>();
1010
491k
    _footer_pb = footer_pb_shared;
1011
491k
    footer_pb = footer_pb_shared;
1012
491k
    return Status::OK();
1013
491k
}
1014
1015
537k
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key() const {
1016
537k
    DCHECK(_file_reader != nullptr);
1017
    // The footer is always at the end of the segment file.
1018
    // The size of footer is 12.
1019
    // So we use the size of file minus 12 as the cache key, which is unique for each segment file.
1020
537k
    return get_segment_footer_cache_key(_file_reader);
1021
537k
}
1022
1023
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key(
1024
536k
        const io::FileReaderSPtr& file_reader) {
1025
536k
    return {file_reader->path().native(), file_reader->size(),
1026
536k
            static_cast<int64_t>(file_reader->size() - 12)};
1027
536k
}
1028
1029
#include "common/compile_check_end.h"
1030
} // namespace doris::segment_v2