Coverage Report

Created: 2026-05-19 18:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <gen_cpp/segment_v2.pb.h>
25
26
#include <cstring>
27
#include <memory>
28
#include <set>
29
#include <sstream>
30
#include <utility>
31
32
#include "cloud/config.h"
33
#include "common/config.h"
34
#include "common/exception.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/column/column.h"
38
#include "core/data_type/data_type.h"
39
#include "core/data_type/data_type_factory.hpp"
40
#include "core/data_type/data_type_nullable.h"
41
#include "core/data_type/data_type_variant.h"
42
#include "core/field.h"
43
#include "core/string_ref.h"
44
#include "cpp/sync_point.h"
45
#include "exprs/vexpr_context.h"
46
#include "io/cache/block_file_cache.h"
47
#include "io/cache/block_file_cache_factory.h"
48
#include "io/cache/cached_remote_file_reader.h"
49
#include "io/fs/file_reader.h"
50
#include "io/fs/file_system.h"
51
#include "io/io_common.h"
52
#include "runtime/exec_env.h"
53
#include "runtime/query_context.h"
54
#include "runtime/runtime_predicate.h"
55
#include "runtime/runtime_state.h"
56
#include "storage/index/index_file_reader.h"
57
#include "storage/index/indexed_column_reader.h"
58
#include "storage/index/primary_key_index.h"
59
#include "storage/index/short_key_index.h"
60
#include "storage/index/zone_map/zonemap_eval_context.h"
61
#include "storage/iterator/vgeneric_iterators.h"
62
#include "storage/iterators.h"
63
#include "storage/olap_common.h"
64
#include "storage/predicate/block_column_predicate.h"
65
#include "storage/predicate/column_predicate.h"
66
#include "storage/rowset/rowset_reader_context.h"
67
#include "storage/schema.h"
68
#include "storage/segment/column_meta_accessor.h"
69
#include "storage/segment/column_reader.h"
70
#include "storage/segment/column_reader_cache.h"
71
#include "storage/segment/empty_segment_iterator.h"
72
#include "storage/segment/page_io.h"
73
#include "storage/segment/page_pointer.h"
74
#include "storage/segment/segment_iterator.h"
75
#include "storage/segment/segment_writer.h" // k_segment_magic_length
76
#include "storage/segment/stream_reader.h"
77
#include "storage/segment/variant/variant_column_reader.h"
78
#include "storage/tablet/tablet_schema.h"
79
#include "storage/types.h"
80
#include "storage/utils.h"
81
#include "util/coding.h"
82
#include "util/json/path_in_data.h"
83
#include "util/slice.h" // Slice
84
85
namespace doris::segment_v2 {
86
87
class InvertedIndexIterator;
88
89
namespace {
90
91
16.7k
void accumulate_expr_zonemap_stats(const ZoneMapEvalContext& ctx, OlapReaderStatistics* stats) {
92
16.7k
    if (stats == nullptr) {
93
0
        return;
94
0
    }
95
16.7k
    stats->expr_zonemap_unsupported_exprs += ctx.unsupported_expr_count;
96
16.7k
    stats->expr_zonemap_type_mismatch += ctx.type_mismatch_count;
97
16.7k
    stats->in_zonemap_point_check_count += ctx.in_zonemap_point_check_count;
98
16.7k
    stats->in_zonemap_range_only_count += ctx.in_zonemap_range_only_count;
99
16.7k
    stats->in_zonemap_point_check_skipped_due_to_threshold +=
100
16.7k
            ctx.in_zonemap_point_check_skipped_due_to_threshold;
101
16.7k
}
102
103
Status build_segment_zonemap_context(Segment* segment, const Schema& schema,
104
                                     const StorageReadOptions& read_options,
105
16.7k
                                     const VExprContextSPtrs& conjuncts, ZoneMapEvalContext* ctx) {
106
16.7k
    DORIS_CHECK(segment != nullptr);
107
16.7k
    DORIS_CHECK(ctx != nullptr);
108
16.7k
    std::set<int> slot_indexes;
109
19.2k
    for (const auto& conjunct : conjuncts) {
110
19.4k
        if (conjunct->root() != nullptr) {
111
19.4k
            conjunct->root()->collect_slot_column_ids(slot_indexes);
112
19.4k
        }
113
19.2k
    }
114
22.3k
    for (const int slot_index : slot_indexes) {
115
22.3k
        if (slot_index < 0 || cast_set<size_t>(slot_index) >= schema.num_column_ids()) {
116
0
            continue;
117
0
        }
118
22.3k
        const auto column_id = schema.column_id(cast_set<size_t>(slot_index));
119
22.3k
        const auto* field = schema.column(column_id);
120
22.3k
        if (field == nullptr) {
121
0
            continue;
122
0
        }
123
22.3k
        if (!segment->can_apply_predicate_safely(
124
22.3k
                    column_id, schema, read_options.target_cast_type_for_variants, read_options)) {
125
1.78k
            continue;
126
1.78k
        }
127
20.5k
        auto data_type = segment->get_data_type_of(field->get_desc(), read_options);
128
20.5k
        if (data_type == nullptr) {
129
0
            continue;
130
0
        }
131
20.5k
        ZoneMapEvalContext::SlotZoneMap slot_zone_map;
132
20.5k
        slot_zone_map.data_type = data_type;
133
20.5k
        std::shared_ptr<ColumnReader> reader;
134
20.5k
        Status st = segment->get_column_reader(field->get_desc(), &reader, read_options.stats);
135
20.5k
        if (st.is<ErrorCode::NOT_FOUND>()) {
136
427
            ctx->slots.emplace(slot_index, std::move(slot_zone_map));
137
427
            continue;
138
427
        }
139
20.1k
        RETURN_IF_ERROR(st);
140
20.1k
        if (reader != nullptr && reader->has_zone_map()) {
141
16.0k
            ZoneMap zone_map;
142
16.0k
            RETURN_IF_ERROR(reader->get_segment_zone_map(&zone_map));
143
16.0k
            slot_zone_map.zone_map = std::move(zone_map);
144
16.0k
        }
145
20.1k
        ctx->slots.emplace(slot_index, std::move(slot_zone_map));
146
20.1k
    }
147
16.7k
    return Status::OK();
148
16.7k
}
149
150
} // namespace
151
152
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
153
                     uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
154
                     const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
155
1.76M
                     InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
156
    // Ensure tablet_id is available in reader_options for CachedRemoteFileReader peer read.
157
1.76M
    io::FileReaderOptions opts_with_tablet = reader_options;
158
1.76M
    opts_with_tablet.tablet_id = tablet_id;
159
160
1.76M
    auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, opts_with_tablet, output,
161
1.76M
                   idx_file_info, stats);
162
1.76M
    if (s.ok() && output && *output) {
163
1.76M
        (*output)->_tablet_id = tablet_id;
164
1.76M
    }
165
1.76M
    if (!s.ok()) {
166
5
        if (!config::is_cloud_mode()) {
167
5
            auto res = ExecEnv::get_tablet(tablet_id);
168
5
            TabletSharedPtr tablet =
169
5
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
170
5
            if (tablet) {
171
0
                tablet->report_error(s);
172
0
            }
173
5
        }
174
5
    }
175
176
1.76M
    return s;
177
1.76M
}
178
179
Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
180
                      RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
181
                      const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
182
1.76M
                      InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
183
1.76M
    io::FileReaderSPtr file_reader;
184
1.76M
    auto st = fs->open_file(path, &file_reader, &reader_options);
185
1.76M
    TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
186
1.76M
    std::shared_ptr<Segment> segment(
187
1.76M
            new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
188
1.76M
    if (st) {
189
1.75M
        segment->_fs = fs;
190
1.75M
        segment->_file_reader = std::move(file_reader);
191
1.75M
        st = segment->_open(stats);
192
1.75M
    }
193
194
    // Three-tier retry for CORRUPTION errors when file cache is enabled.
195
    // This handles CORRUPTION from both open_file() and _parse_footer() (via _open()).
196
1.76M
    if (st.is<ErrorCode::CORRUPTION>() &&
197
1.76M
        reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
198
        // Tier 1: Clear file cache and retry with cache support (re-downloads from remote).
199
2
        LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source "
200
2
                        "file directly, file path: "
201
2
                     << path << " cache_key: " << file_cache_key_str(path);
202
2
        auto file_key = file_cache_key_from_path(path);
203
2
        auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
204
2
        file_cache->remove_if_cached(file_key);
205
206
2
        st = fs->open_file(path, &file_reader, &reader_options);
207
2
        if (st) {
208
2
            segment->_fs = fs;
209
2
            segment->_file_reader = std::move(file_reader);
210
2
            st = segment->_open(stats);
211
2
        }
212
2
        TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
213
2
        if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
214
            // Tier 2: Bypass cache entirely and read directly from remote storage.
215
0
            LOG(WARNING) << "failed to try to read remote source file again with cache support,"
216
0
                         << " try to read from remote directly, "
217
0
                         << " file path: " << path << " cache_key: " << file_cache_key_str(path);
218
0
            file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
219
0
            file_cache->remove_if_cached(file_key);
220
221
0
            io::FileReaderOptions opt = reader_options;
222
0
            opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
223
0
            RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
224
0
            segment->_fs = fs;
225
0
            segment->_file_reader = std::move(file_reader);
226
0
            st = segment->_open(stats);
227
0
            if (!st.ok()) {
228
                // Tier 3: Remote source itself is corrupt.
229
0
                LOG(WARNING) << "failed to try to read remote source file directly,"
230
0
                             << " file path: " << path
231
0
                             << " cache_key: " << file_cache_key_str(path);
232
0
            }
233
0
        }
234
2
    }
235
1.76M
    RETURN_IF_ERROR(st);
236
1.76M
    DCHECK(segment->_fs != nullptr) << "file system is nullptr after segment open";
237
1.76M
    *output = std::move(segment);
238
1.76M
    return Status::OK();
239
1.76M
}
240
241
Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
242
                 InvertedIndexFileInfo idx_file_info)
243
1.76M
        : _segment_id(segment_id),
244
1.76M
          _meta_mem_usage(0),
245
1.76M
          _rowset_id(rowset_id),
246
1.76M
          _tablet_schema(std::move(tablet_schema)),
247
1.76M
          _idx_file_info(std::move(idx_file_info)) {}
248
249
1.76M
Segment::~Segment() {
250
1.76M
    g_segment_estimate_mem_bytes << -_tracked_meta_mem_usage;
251
    // if failed, fix `_tracked_meta_mem_usage` accuracy
252
1.76M
    DCHECK(_tracked_meta_mem_usage == meta_mem_usage());
253
1.76M
}
254
255
49.1k
io::UInt128Wrapper Segment::file_cache_key(std::string_view rowset_id, uint32_t seg_id) {
256
49.1k
    return io::BlockFileCache::hash(fmt::format("{}_{}.dat", rowset_id, seg_id));
257
49.1k
}
258
259
1.75M
int64_t Segment::get_metadata_size() const {
260
1.75M
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
261
1.75M
    return sizeof(Segment) + (_pk_index_meta ? _pk_index_meta->ByteSizeLong() : 0) +
262
18.4E
           (footer_pb_shared ? footer_pb_shared->ByteSizeLong() : 0);
263
1.75M
}
264
265
1.75M
void Segment::update_metadata_size() {
266
1.75M
    MetadataAdder::update_metadata_size();
267
1.75M
    g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage;
268
1.75M
    _tracked_meta_mem_usage = _meta_mem_usage;
269
1.75M
}
270
271
1.75M
Status Segment::_open(OlapReaderStatistics* stats) {
272
1.75M
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
273
1.75M
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
274
275
1.75M
    _pk_index_meta.reset(
276
1.75M
            footer_pb_shared->has_primary_key_index_meta()
277
1.75M
                    ? new PrimaryKeyIndexMetaPB(footer_pb_shared->primary_key_index_meta())
278
1.75M
                    : nullptr);
279
    // delete_bitmap_calculator_test.cpp
280
    // DCHECK(footer.has_short_key_index_page());
281
1.75M
    _sk_index_page = footer_pb_shared->short_key_index_page();
282
1.75M
    _num_rows = footer_pb_shared->num_rows();
283
284
    // An estimated memory usage of a segment
285
    // Footer is seperated to StoragePageCache so we don't need to add it to _meta_mem_usage
286
    // _meta_mem_usage += footer_pb_shared->ByteSizeLong();
287
1.75M
    if (_pk_index_meta != nullptr) {
288
1.45M
        _meta_mem_usage += _pk_index_meta->ByteSizeLong();
289
1.45M
    }
290
291
1.75M
    _meta_mem_usage += sizeof(*this);
292
1.75M
    _meta_mem_usage += std::min(static_cast<int>(_tablet_schema->num_columns()),
293
1.75M
                                config::max_segment_partial_column_cache_size) *
294
1.75M
                       config::estimated_mem_per_column_reader;
295
296
    // 1024 comes from SegmentWriterOptions
297
1.75M
    _meta_mem_usage += (_num_rows + 1023) / 1024 * (36 + 4);
298
    // 0.01 comes from PrimaryKeyIndexBuilder::init
299
1.75M
    _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
300
301
1.75M
    update_metadata_size();
302
303
1.75M
    return Status::OK();
304
1.75M
}
305
306
22.6k
Status Segment::_open_index_file_reader() {
307
22.6k
    _index_file_reader = std::make_shared<IndexFileReader>(
308
22.6k
            _fs,
309
22.6k
            std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
310
22.6k
                    _file_reader->path().native())},
311
22.6k
            _tablet_schema->get_inverted_index_storage_format(), _idx_file_info, _tablet_id);
312
22.6k
    return Status::OK();
313
22.6k
}
314
315
Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
316
1.59M
                             std::unique_ptr<RowwiseIterator>* iter) {
317
1.59M
    if (read_options.runtime_state != nullptr) {
318
1.52M
        _be_exec_version = read_options.runtime_state->be_exec_version();
319
1.52M
    }
320
1.59M
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
321
322
1.59M
    read_options.stats->total_segment_number++;
323
    // trying to prune the current segment by segment-level zone map
324
1.59M
    for (const auto& entry : read_options.col_id_to_predicates) {
325
1.37M
        int32_t column_id = entry.first;
326
        // schema change
327
1.37M
        if (_tablet_schema->num_columns() <= column_id) {
328
1.82k
            continue;
329
1.82k
        }
330
1.36M
        const TabletColumn& col = read_options.tablet_schema->column(column_id);
331
1.36M
        std::shared_ptr<ColumnReader> reader;
332
1.36M
        Status st = get_column_reader(col, &reader, read_options.stats);
333
        // not found in this segment, skip
334
1.36M
        if (st.is<ErrorCode::NOT_FOUND>()) {
335
41
            continue;
336
41
        }
337
1.36M
        RETURN_IF_ERROR(st);
338
        // should be OK
339
1.36M
        DCHECK(reader != nullptr);
340
1.36M
        if (!reader->has_zone_map()) {
341
0
            continue;
342
0
        }
343
1.36M
        if (read_options.col_id_to_predicates.contains(column_id) &&
344
1.36M
            can_apply_predicate_safely(column_id, *schema,
345
1.36M
                                       read_options.target_cast_type_for_variants, read_options)) {
346
1.36M
            bool matched = true;
347
1.36M
            RETURN_IF_ERROR(reader->match_condition(entry.second.get(), &matched));
348
1.36M
            if (!matched) {
349
                // any condition not satisfied, return.
350
86.6k
                *iter = std::make_unique<EmptySegmentIterator>(*schema);
351
86.6k
                read_options.stats->filtered_segment_number++;
352
86.6k
                return Status::OK();
353
86.6k
            }
354
1.36M
        }
355
1.36M
    }
356
357
1.50M
    if (config::enable_expr_zonemap_filter && !read_options.common_expr_ctxs_push_down.empty()) {
358
16.8k
        ZoneMapEvalContext ctx;
359
16.8k
        RETURN_IF_ERROR(build_segment_zonemap_context(
360
16.8k
                this, *schema, read_options, read_options.common_expr_ctxs_push_down, &ctx));
361
16.8k
        const auto result =
362
16.8k
                VExprContext::evaluate_zonemap_filter(read_options.common_expr_ctxs_push_down, ctx);
363
16.8k
        accumulate_expr_zonemap_stats(ctx, read_options.stats);
364
16.8k
        if (result == ZoneMapFilterResult::kNoMatch) {
365
348
            *iter = std::make_unique<EmptySegmentIterator>(*schema);
366
348
            read_options.stats->filtered_segment_number++;
367
348
            read_options.stats->expr_zonemap_filtered_segments++;
368
348
            return Status::OK();
369
348
        }
370
16.8k
    }
371
372
1.50M
    {
373
1.50M
        SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns);
374
1.50M
        RETURN_IF_ERROR(load_index(read_options.stats));
375
1.50M
    }
376
377
1.50M
    if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 &&
378
1.50M
        read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
379
1.50M
        read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
380
7.60k
        iter->reset(new_vstatistics_iterator(this->shared_from_this(), *schema));
381
1.49M
    } else {
382
1.49M
        *iter = std::make_unique<SegmentIterator>(this->shared_from_this(), schema);
383
1.49M
    }
384
385
    // TODO: Valid the opt not only in ReaderType::READER_QUERY
386
1.50M
    if (read_options.io_ctx.reader_type == ReaderType::READER_QUERY &&
387
1.50M
        !read_options.column_predicates.empty()) {
388
1.23M
        auto pruned_predicates = read_options.column_predicates;
389
1.23M
        auto pruned = false;
390
1.27M
        for (auto& it : _column_reader_cache->get_available_readers(false)) {
391
1.27M
            const auto uid = it.first;
392
1.27M
            const auto column_id = read_options.tablet_schema->field_index(uid);
393
1.27M
            bool tmp_pruned = false;
394
1.27M
            RETURN_IF_ERROR(it.second->prune_predicates_by_zone_map(pruned_predicates, column_id,
395
1.27M
                                                                    &tmp_pruned));
396
1.27M
            pruned |= tmp_pruned;
397
1.27M
        }
398
399
1.23M
        if (pruned) {
400
8.19k
            auto options_with_pruned_predicates = read_options;
401
8.19k
            options_with_pruned_predicates.column_predicates = pruned_predicates;
402
            //because column_predicates is changed, we need to rebuild col_id_to_predicates so that inverted index will not go through it.
403
8.19k
            options_with_pruned_predicates.col_id_to_predicates.clear();
404
15.1k
            for (auto pred : options_with_pruned_predicates.column_predicates) {
405
15.1k
                if (!options_with_pruned_predicates.col_id_to_predicates.contains(
406
15.1k
                            pred->column_id())) {
407
10.5k
                    options_with_pruned_predicates.col_id_to_predicates.insert(
408
10.5k
                            {pred->column_id(), AndBlockColumnPredicate::create_shared()});
409
10.5k
                }
410
15.1k
                options_with_pruned_predicates.col_id_to_predicates[pred->column_id()]
411
15.1k
                        ->add_column_predicate(SingleColumnBlockPredicate::create_unique(pred));
412
15.1k
            }
413
8.19k
            return iter->get()->init(options_with_pruned_predicates);
414
8.19k
        }
415
1.23M
    }
416
1.49M
    return iter->get()->init(read_options);
417
1.50M
}
418
419
Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
420
1
                                  io::IOContext& io_ctx) {
421
1
    if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) {
422
1
        return Status::OK();
423
1
    }
424
425
0
    std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat";
426
0
    std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/";
427
0
    Status create_st = io::global_local_filesystem()->create_directory(dir_path, true);
428
0
    if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) {
429
0
        LOG(WARNING) << "failed to create error file dir: " << create_st.to_string();
430
0
        return create_st;
431
0
    }
432
0
    size_t dir_size = 0;
433
0
    RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size));
434
0
    if (dir_size > config::file_cache_error_log_limit_bytes) {
435
0
        LOG(WARNING) << "error file dir size is too large: " << dir_size;
436
0
        return Status::OK();
437
0
    }
438
439
0
    std::string error_part;
440
0
    error_part.resize(bytes_read);
441
0
    std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset);
442
0
    LOG(WARNING) << "writer error part to " << part_path;
443
0
    bool is_part_exist = false;
444
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist));
445
0
    if (is_part_exist) {
446
0
        LOG(WARNING) << "error part already exists: " << part_path;
447
0
    } else {
448
0
        std::unique_ptr<io::FileWriter> part_writer;
449
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer));
450
0
        RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read)));
451
0
        RETURN_IF_ERROR(part_writer->close());
452
0
    }
453
454
0
    std::string error_file;
455
0
    error_file.resize(file_size);
456
0
    auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get());
457
0
    if (cached_reader == nullptr) {
458
0
        return Status::InternalError("file reader is not CachedRemoteFileReader");
459
0
    }
460
0
    size_t error_file_bytes_read = 0;
461
0
    RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at(
462
0
            0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx));
463
0
    DCHECK(error_file_bytes_read == file_size);
464
    //std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str();
465
0
    std::string file_path = dir_path + file_name;
466
0
    LOG(WARNING) << "writer error file to " << file_path;
467
0
    bool is_file_exist = false;
468
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist));
469
0
    if (is_file_exist) {
470
0
        LOG(WARNING) << "error file already exists: " << part_path;
471
0
    } else {
472
0
        std::unique_ptr<io::FileWriter> writer;
473
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer));
474
0
        RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size)));
475
0
        RETURN_IF_ERROR(writer->close());
476
0
    }
477
0
    return Status::OK(); // already exists
478
0
};
479
480
Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer,
481
73.2k
                              OlapReaderStatistics* stats) {
482
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
483
73.2k
    auto file_size = _file_reader->size();
484
73.2k
    if (file_size < 12) {
485
0
        return Status::Corruption("Bad segment file {}: file size {} < 12, cache_key: {}",
486
0
                                  _file_reader->path().native(), file_size,
487
0
                                  file_cache_key_str(_file_reader->path().native()));
488
0
    }
489
490
73.2k
    uint8_t fixed_buf[12];
491
73.2k
    size_t bytes_read = 0;
492
    // TODO(plat1ko): Support session variable `enable_file_cache`
493
73.2k
    io::IOContext io_ctx {.is_index_data = true,
494
73.2k
                          .file_cache_stats = stats ? &stats->file_cache_stats : nullptr};
495
73.2k
    RETURN_IF_ERROR(
496
73.2k
            _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
497
73.2k
    DCHECK_EQ(bytes_read, 12);
498
73.2k
    TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf);
499
73.2k
    TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf);
500
73.2k
    if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
501
1
        Status st =
502
1
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
503
1
        if (!st.ok()) {
504
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
505
0
        }
506
1
        return Status::Corruption(
507
1
                "Bad segment file {}: file_size: {}, magic number not match, cache_key: {}",
508
1
                _file_reader->path().native(), file_size,
509
1
                file_cache_key_str(_file_reader->path().native()));
510
1
    }
511
512
    // read footer PB
513
73.2k
    uint32_t footer_length = decode_fixed32_le(fixed_buf);
514
73.2k
    if (file_size < 12 + footer_length) {
515
0
        Status st =
516
0
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
517
0
        if (!st.ok()) {
518
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
519
0
        }
520
0
        return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}",
521
0
                                  _file_reader->path().native(), file_size, 12 + footer_length,
522
0
                                  file_cache_key_str(_file_reader->path().native()));
523
0
    }
524
525
73.2k
    std::string footer_buf;
526
73.2k
    footer_buf.resize(footer_length);
527
73.2k
    RETURN_IF_ERROR(_file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read,
528
73.2k
                                          &io_ctx));
529
73.2k
    DCHECK_EQ(bytes_read, footer_length);
530
531
    // validate footer PB's checksum
532
73.2k
    uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4);
533
73.2k
    uint32_t actual_checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
534
73.2k
    if (actual_checksum != expect_checksum) {
535
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
536
0
                                      footer_buf.data(), io_ctx);
537
0
        if (!st.ok()) {
538
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
539
0
        }
540
0
        return Status::Corruption(
541
0
                "Bad segment file {}: file_size = {}, footer checksum not match, actual={} "
542
0
                "vs expect={}, cache_key: {}",
543
0
                _file_reader->path().native(), file_size, actual_checksum, expect_checksum,
544
0
                file_cache_key_str(_file_reader->path().native()));
545
0
    }
546
547
    // deserialize footer PB
548
73.2k
    footer = std::make_shared<SegmentFooterPB>();
549
73.2k
    if (!footer->ParseFromString(footer_buf)) {
550
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
551
0
                                      footer_buf.data(), io_ctx);
552
0
        if (!st.ok()) {
553
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
554
0
        }
555
0
        return Status::Corruption(
556
0
                "Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, "
557
0
                "cache_key: ",
558
0
                _file_reader->path().native(), file_size,
559
0
                file_cache_key_str(_file_reader->path().native()));
560
0
    }
561
562
18.4E
    VLOG_DEBUG << fmt::format("Loading segment footer from {} finished",
563
18.4E
                              _file_reader->path().native());
564
73.2k
    return Status::OK();
565
73.2k
}
566
567
3.82M
Status Segment::_load_pk_bloom_filter(OlapReaderStatistics* stats) {
568
#ifdef BE_TEST
569
    if (_pk_index_meta == nullptr) {
570
        // for BE UT "segment_cache_test"
571
        return _load_pk_bf_once.call([this] {
572
            _meta_mem_usage += 100;
573
            update_metadata_size();
574
            return Status::OK();
575
        });
576
    }
577
#endif
578
3.82M
    DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
579
3.82M
    DCHECK(_pk_index_meta != nullptr);
580
3.82M
    DCHECK(_pk_index_reader != nullptr);
581
582
3.82M
    return _load_pk_bf_once.call([this, stats] {
583
33.0k
        RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta, stats));
584
        // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
585
33.0k
        return Status::OK();
586
33.0k
    });
587
3.82M
}
588
589
3.82M
Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
590
    // `DorisCallOnce` may catch exception in calling stack A and re-throw it in
591
    // a different calling stack B which doesn't have catch block. So we add catch block here
592
    // to prevent coreudmp
593
3.82M
    RETURN_IF_CATCH_EXCEPTION({
594
3.82M
        RETURN_IF_ERROR(load_index(index_load_stats));
595
3.82M
        RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats));
596
3.82M
    });
597
3.81M
    return Status::OK();
598
3.82M
}
599
600
5.35M
Status Segment::load_index(OlapReaderStatistics* stats) {
601
5.35M
    return _load_index_once.call([this, stats] {
602
1.54M
        if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
603
1.29M
            _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
604
1.29M
            RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta, stats));
605
            // _meta_mem_usage += _pk_index_reader->get_memory_size();
606
1.29M
            return Status::OK();
607
1.29M
        } else {
608
            // read and parse short key index page
609
250k
            OlapReaderStatistics tmp_stats;
610
250k
            OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
611
250k
            PageReadOptions opts(io::IOContext {.is_index_data = true,
612
250k
                                                .file_cache_stats = &stats_ptr->file_cache_stats});
613
250k
            opts.use_page_cache = true;
614
250k
            opts.type = INDEX_PAGE;
615
250k
            opts.file_reader = _file_reader.get();
616
250k
            opts.page_pointer = PagePointer(_sk_index_page);
617
            // short key index page uses NO_COMPRESSION for now
618
250k
            opts.codec = nullptr;
619
250k
            opts.stats = &tmp_stats;
620
621
250k
            Slice body;
622
250k
            PageFooterPB footer;
623
250k
            RETURN_IF_ERROR(
624
250k
                    PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer));
625
250k
            DCHECK_EQ(footer.type(), SHORT_KEY_PAGE);
626
250k
            DCHECK(footer.has_short_key_page_footer());
627
628
            // _meta_mem_usage += body.get_size();
629
250k
            _sk_index_decoder = std::make_unique<ShortKeyIndexDecoder>();
630
250k
            return _sk_index_decoder->parse(body, footer.short_key_page_footer());
631
250k
        }
632
1.54M
    });
633
5.35M
}
634
635
632
Status Segment::healthy_status() {
636
632
    try {
637
632
        if (_load_index_once.has_called()) {
638
548
            RETURN_IF_ERROR(_load_index_once.stored_result());
639
548
        }
640
632
        if (_load_pk_bf_once.has_called()) {
641
166
            RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
642
166
        }
643
632
        if (_create_column_meta_once_call.has_called()) {
644
576
            RETURN_IF_ERROR(_create_column_meta_once_call.stored_result());
645
576
        }
646
632
        if (_index_file_reader_open.has_called()) {
647
0
            RETURN_IF_ERROR(_index_file_reader_open.stored_result());
648
0
        }
649
        // This status is set by running time, for example, if there is something wrong during read segment iterator.
650
632
        return _healthy_status.status();
651
632
    } catch (const doris::Exception& e) {
652
        // If there is an exception during load_xxx, should not throw exception directly because
653
        // the caller may not exception safe.
654
0
        return e.to_status();
655
0
    } catch (const std::exception& e) {
656
        // The exception is not thrown by doris code.
657
0
        return Status::InternalError("Unexcepted error during load segment: {}", e.what());
658
0
    }
659
632
}
660
661
// Return the storage datatype of related column to field.
662
DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
663
22.8M
                                      const StorageReadOptions& read_options) {
664
22.8M
    const PathInDataPtr path = column.path_info_ptr();
665
666
    // none variant column
667
22.9M
    if (path == nullptr || path->empty()) {
668
22.9M
        return DataTypeFactory::instance().create_data_type(column);
669
22.9M
    }
670
671
    // Path exists, proceed with variant logic.
672
18.4E
    PathInData relative_path = path->copy_pop_front();
673
18.4E
    int32_t unique_id = column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
674
675
    // If this uid does not exist in segment meta, fallback to schema type.
676
18.4E
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
677
819
        return DataTypeFactory::instance().create_data_type(column);
678
819
    }
679
680
18.4E
    std::shared_ptr<ColumnReader> v_reader;
681
682
    // Get the parent variant column reader
683
18.4E
    OlapReaderStatistics stats;
684
    // If status is not ok, it will throw exception(data corruption)
685
18.4E
    THROW_IF_ERROR(get_column_reader(unique_id, &v_reader, &stats));
686
18.4E
    DCHECK(v_reader != nullptr);
687
18.4E
    auto* variant_reader = static_cast<VariantColumnReader*>(v_reader.get());
688
    // Delegate type inference for variant paths to VariantColumnReader.
689
18.4E
    DataTypePtr type;
690
18.4E
    THROW_IF_ERROR(variant_reader->infer_data_type_for_path(&type, column, read_options,
691
18.4E
                                                            _column_reader_cache.get()));
692
18.4E
    return type;
693
18.4E
}
694
695
42.8M
Status Segment::_create_column_meta_once(OlapReaderStatistics* stats) {
696
42.8M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
697
42.8M
    return _create_column_meta_once_call.call([&] {
698
1.65M
        std::shared_ptr<SegmentFooterPB> footer_pb_shared;
699
1.65M
        RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
700
1.65M
        return _create_column_meta(*footer_pb_shared);
701
1.65M
    });
702
42.8M
}
703
704
1.64M
Status Segment::_create_column_meta(const SegmentFooterPB& footer) {
705
    // Initialize column meta accessor which internally maintains uid -> column_ordinal mapping.
706
1.64M
    _column_meta_accessor = std::make_unique<ColumnMetaAccessor>();
707
1.64M
    RETURN_IF_ERROR(_column_meta_accessor->init(footer, _file_reader));
708
709
1.64M
    if (config::enable_adaptive_batch_size) {
710
        // Cache raw_data_bytes per column uid for adaptive batch size prediction.
711
        // This runs under call_once, so no thread-safety concerns.
712
27.6M
        auto st = _column_meta_accessor->traverse_metas(footer, [this](const ColumnMetaPB& meta) {
713
27.6M
            if (meta.has_unique_id() && meta.unique_id() != -1 && meta.has_raw_data_bytes()) {
714
26.7M
                _column_uid_to_raw_bytes[meta.unique_id()] = meta.raw_data_bytes();
715
26.7M
            }
716
27.6M
        });
717
718
1.64M
        if (!st.ok()) {
719
0
            LOG(WARNING) << "Failed to traverse column metas to cache raw_data_bytes, error: "
720
0
                         << st.to_string();
721
0
        }
722
1.64M
    }
723
724
1.64M
    _column_reader_cache = std::make_unique<ColumnReaderCache>(
725
1.64M
            _column_meta_accessor.get(), _tablet_schema, _file_reader, _num_rows,
726
21.2M
            [this](std::shared_ptr<SegmentFooterPB>& footer_pb, OlapReaderStatistics* stats) {
727
21.2M
                return _get_segment_footer(footer_pb, stats);
728
21.2M
            });
729
1.64M
    return Status::OK();
730
1.64M
}
731
732
Status Segment::new_default_iterator(const TabletColumn& tablet_column,
733
9.64k
                                     std::unique_ptr<ColumnIterator>* iter) {
734
9.64k
    if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
735
0
        return Status::InternalError(
736
0
                "invalid nonexistent column without default value. column_uid={}, "
737
0
                "column_name={}, "
738
0
                "column_type={}",
739
0
                tablet_column.unique_id(), tablet_column.name(), tablet_column.type());
740
0
    }
741
9.64k
    std::unique_ptr<DefaultValueColumnIterator> default_value_iter(new DefaultValueColumnIterator(
742
9.64k
            tablet_column.has_default_value(), tablet_column.default_value(),
743
9.64k
            tablet_column.is_nullable(), tablet_column.type(), tablet_column.precision(),
744
9.64k
            tablet_column.frac(), tablet_column.length()));
745
9.64k
    ColumnIteratorOptions iter_opts;
746
747
9.64k
    RETURN_IF_ERROR(default_value_iter->init(iter_opts));
748
9.64k
    *iter = std::move(default_value_iter);
749
9.64k
    return Status::OK();
750
9.64k
}
751
752
// Not use cid anymore, for example original table schema is colA int, then user do following actions
753
// 1.add column b
754
// 2. drop column b
755
// 3. add column c
756
// in the new schema column c's cid == 2
757
// but in the old schema column b's cid == 2
758
// but they are not the same column
759
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
760
                                    std::unique_ptr<ColumnIterator>* iter,
761
                                    const StorageReadOptions* opt,
762
                                    const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
763
19.8M
                                            variant_sparse_column_cache) {
764
19.8M
    if (opt->runtime_state != nullptr) {
765
19.5M
        _be_exec_version = opt->runtime_state->be_exec_version();
766
19.5M
    }
767
19.8M
    RETURN_IF_ERROR(_create_column_meta_once(opt->stats));
768
769
    // For compability reason unique_id may less than 0 for variant extracted column
770
19.9M
    int32_t unique_id = tablet_column.unique_id() >= 0 ? tablet_column.unique_id()
771
18.4E
                                                       : tablet_column.parent_unique_id();
772
773
    // If column meta for this uid is not found in this segment, use default iterator.
774
19.8M
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
775
3.43k
        RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
776
3.43k
        return Status::OK();
777
3.43k
    }
778
779
    // init iterator by unique id
780
19.8M
    std::shared_ptr<ColumnReader> reader;
781
19.8M
    RETURN_IF_ERROR(get_column_reader(unique_id, &reader, opt->stats));
782
19.8M
    if (reader == nullptr) {
783
0
        return Status::InternalError("column reader is nullptr, unique_id={}", unique_id);
784
0
    }
785
19.8M
    if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
786
        // if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used
787
30.8k
        PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr;
788
30.8k
        if (variant_sparse_column_cache) {
789
30.4k
            auto it = variant_sparse_column_cache->find(unique_id);
790
30.4k
            if (it != variant_sparse_column_cache->end()) {
791
30.4k
                sparse_column_cache_ptr = it->second.get();
792
18.4E
            } else {
793
18.4E
                DCHECK(false) << "sparse column cache is not found, unique_id=" << unique_id;
794
18.4E
            }
795
30.4k
        }
796
        // use _column_reader_cache to get variant subcolumn(path column) reader
797
30.8k
        RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get())
798
30.8k
                                ->new_iterator(iter, &tablet_column, opt,
799
30.8k
                                               _column_reader_cache.get(),
800
30.8k
                                               sparse_column_cache_ptr));
801
19.8M
    } else {
802
19.8M
        RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
803
19.8M
        if (opt->all_access_paths.contains(unique_id) ||
804
19.8M
            opt->predicate_access_paths.contains(unique_id)) {
805
71.9k
            const auto& all_access_paths = opt->all_access_paths.contains(unique_id)
806
71.9k
                                                   ? opt->all_access_paths.at(unique_id)
807
71.9k
                                                   : TColumnAccessPaths {};
808
71.9k
            const auto& predicate_access_paths = opt->predicate_access_paths.contains(unique_id)
809
71.9k
                                                         ? opt->predicate_access_paths.at(unique_id)
810
71.9k
                                                         : TColumnAccessPaths {};
811
812
            // set column name to apply access paths.
813
71.9k
            (*iter)->set_column_name(tablet_column.name());
814
71.9k
            RETURN_IF_ERROR((*iter)->set_access_paths(all_access_paths, predicate_access_paths));
815
71.9k
            (*iter)->remove_pruned_sub_iterators();
816
71.9k
        }
817
19.8M
    }
818
819
19.9M
    if (config::enable_column_type_check && !tablet_column.has_path_info() &&
820
19.9M
        !tablet_column.is_agg_state_type() && tablet_column.type() != reader->get_meta_type()) {
821
0
        LOG(WARNING) << "different type between schema and column reader,"
822
0
                     << " column schema name: " << tablet_column.name()
823
0
                     << " column schema type: " << int(tablet_column.type())
824
0
                     << " column reader meta type: " << int(reader->get_meta_type());
825
0
        return Status::InternalError("different type between schema and column reader");
826
0
    }
827
19.8M
    return Status::OK();
828
19.8M
}
829
830
Status Segment::get_column_reader(int32_t col_uid, std::shared_ptr<ColumnReader>* column_reader,
831
19.9M
                                  OlapReaderStatistics* stats) {
832
19.9M
    RETURN_IF_ERROR(_create_column_meta_once(stats));
833
19.9M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
834
    // The column is not in this segment, return nullptr
835
19.9M
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
836
0
        *column_reader = nullptr;
837
0
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
838
0
                                                          col_uid);
839
0
    }
840
19.9M
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats);
841
19.9M
}
842
843
43.5k
Status Segment::traverse_column_meta_pbs(const std::function<void(const ColumnMetaPB&)>& visitor) {
844
    // Ensure column meta accessor and reader cache are initialized once.
845
43.5k
    OlapReaderStatistics dummy_stats;
846
43.5k
    RETURN_IF_ERROR(_create_column_meta_once(&dummy_stats));
847
43.5k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
848
43.5k
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, &dummy_stats));
849
43.5k
    return _column_meta_accessor->traverse_metas(*footer_pb_shared, visitor);
850
43.5k
}
851
852
Status Segment::get_column_reader(const TabletColumn& col,
853
                                  std::shared_ptr<ColumnReader>* column_reader,
854
1.46M
                                  OlapReaderStatistics* stats) {
855
1.46M
    RETURN_IF_ERROR(_create_column_meta_once(stats));
856
1.46M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
857
1.46M
    int col_uid = col.unique_id() >= 0 ? col.unique_id() : col.parent_unique_id();
858
    // The column is not in this segment, return nullptr
859
1.46M
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
860
221
        *column_reader = nullptr;
861
221
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
862
221
                                                          col_uid);
863
221
    }
864
1.46M
    if (col.has_path_info()) {
865
5.02k
        PathInData relative_path = col.path_info_ptr()->copy_pop_front();
866
5.02k
        return _column_reader_cache->get_path_column_reader(col_uid, relative_path, column_reader,
867
5.02k
                                                            stats);
868
5.02k
    }
869
1.45M
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats);
870
1.46M
}
871
872
Status Segment::new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta,
873
                                   const StorageReadOptions& read_options,
874
74.0k
                                   std::unique_ptr<IndexIterator>* iter) {
875
74.0k
    if (read_options.runtime_state != nullptr) {
876
64.9k
        _be_exec_version = read_options.runtime_state->be_exec_version();
877
64.9k
    }
878
74.0k
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
879
74.0k
    std::shared_ptr<ColumnReader> reader;
880
74.0k
    auto st = get_column_reader(tablet_column, &reader, read_options.stats);
881
74.0k
    if (st.is<ErrorCode::NOT_FOUND>()) {
882
458
        return Status::OK();
883
458
    }
884
73.6k
    RETURN_IF_ERROR(st);
885
73.6k
    DCHECK(reader != nullptr);
886
73.8k
    if (index_meta) {
887
        // call DorisCallOnce.call without check if _index_file_reader is nullptr
888
        // to avoid data race during parallel method calls
889
73.8k
        RETURN_IF_ERROR(_index_file_reader_open.call([&] { return _open_index_file_reader(); }));
890
        // after DorisCallOnce.call, _index_file_reader is guaranteed to be not nullptr
891
73.8k
        const std::string rowset_id =
892
73.8k
                index_meta->index_type() == IndexType::ANN ? _rowset_id.to_string() : "";
893
73.8k
        RETURN_IF_ERROR(reader->new_index_iterator(_index_file_reader, index_meta, rowset_id,
894
73.8k
                                                   _segment_id, _num_rows, iter));
895
73.8k
        return Status::OK();
896
73.8k
    }
897
18.4E
    return Status::OK();
898
73.6k
}
899
900
Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
901
                               bool with_seq_col, bool with_rowid, RowLocation* row_location,
902
3.79M
                               OlapReaderStatistics* stats, std::string* encoded_seq_value) {
903
3.79M
    RETURN_IF_ERROR(load_pk_index_and_bf(stats));
904
3.79M
    bool has_seq_col = latest_schema->has_sequence_col();
905
3.79M
    bool has_rowid = !latest_schema->cluster_key_uids().empty();
906
3.79M
    size_t seq_col_length = 0;
907
3.79M
    if (has_seq_col) {
908
25.1k
        seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
909
25.1k
    }
910
3.79M
    size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;
911
912
3.79M
    Slice key_without_seq =
913
3.79M
            Slice(key.get_data(), key.get_size() - (with_seq_col ? seq_col_length : 0) -
914
3.79M
                                          (with_rowid ? rowid_length : 0));
915
916
3.79M
    DCHECK(_pk_index_reader != nullptr);
917
3.79M
    if (!_pk_index_reader->check_present(key_without_seq)) {
918
4.47k
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
919
4.47k
    }
920
3.79M
    bool exact_match = false;
921
3.79M
    std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
922
3.79M
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats));
923
3.79M
    auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match);
924
3.79M
    if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
925
0
        return st;
926
0
    }
927
3.80M
    if (st.is<ErrorCode::ENTRY_NOT_FOUND>() || (!has_seq_col && !has_rowid && !exact_match)) {
928
10
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
929
10
    }
930
3.79M
    row_location->row_id = cast_set<uint32_t>(index_iterator->get_current_ordinal());
931
3.79M
    row_location->segment_id = _segment_id;
932
3.79M
    row_location->rowset_id = _rowset_id;
933
934
3.79M
    size_t num_to_read = 1;
935
3.79M
    auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0);
936
3.79M
    auto index_column = index_type->create_column();
937
3.79M
    size_t num_read = num_to_read;
938
3.79M
    RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
939
3.79M
    DCHECK(num_to_read == num_read);
940
941
3.79M
    Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
942
943
    // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column
944
    // for a merge-on-write table which doesn't have sequence column, so `has_seq_col ==  true` doesn't mean
945
    // data in segment has sequence column value
946
3.79M
    bool segment_has_seq_col = _tablet_schema->has_sequence_col();
947
3.79M
    Slice sought_key_without_seq = Slice(
948
3.79M
            sought_key.get_data(),
949
3.79M
            sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length);
950
951
3.79M
    if (has_seq_col) {
952
        // compare key
953
25.2k
        if (key_without_seq.compare(sought_key_without_seq) != 0) {
954
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
955
0
        }
956
957
25.2k
        if (with_seq_col && segment_has_seq_col) {
958
            // compare sequence id
959
23.3k
            Slice sequence_id =
960
23.3k
                    Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1);
961
23.3k
            Slice previous_sequence_id =
962
23.3k
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size() + 1,
963
23.3k
                          seq_col_length - 1);
964
23.3k
            if (sequence_id.compare(previous_sequence_id) < 0) {
965
348
                return Status::Error<ErrorCode::KEY_ALREADY_EXISTS>(
966
348
                        "key with higher sequence id exists");
967
348
            }
968
23.3k
        }
969
3.76M
    } else if (has_rowid) {
970
59.1k
        Slice sought_key_without_rowid =
971
59.1k
                Slice(sought_key.get_data(), sought_key.get_size() - rowid_length);
972
        // compare key
973
59.1k
        if (key_without_seq.compare(sought_key_without_rowid) != 0) {
974
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
975
0
        }
976
59.1k
    }
977
    // found the key, use rowid in pk index if necessary.
978
3.78M
    if (has_rowid) {
979
71.0k
        Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() +
980
71.0k
                                          (segment_has_seq_col ? seq_col_length : 0) + 1,
981
71.0k
                                  rowid_length - 1);
982
71.0k
        const auto* rowid_coder = get_key_coder(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT);
983
71.0k
        RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length,
984
71.0k
                                                      (uint8_t*)&row_location->row_id));
985
71.0k
    }
986
987
3.78M
    if (encoded_seq_value) {
988
78
        if (!segment_has_seq_col) {
989
0
            *encoded_seq_value = std::string {};
990
78
        } else {
991
            // include marker
992
78
            *encoded_seq_value =
993
78
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size(), seq_col_length)
994
78
                            .to_string();
995
78
        }
996
78
    }
997
3.78M
    return Status::OK();
998
3.78M
}
999
1000
0
Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
1001
0
    OlapReaderStatistics* null_stat = nullptr;
1002
0
    RETURN_IF_ERROR(load_pk_index_and_bf(null_stat));
1003
0
    std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
1004
0
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&iter, null_stat));
1005
1006
0
    auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0);
1007
0
    auto index_column = index_type->create_column();
1008
0
    RETURN_IF_ERROR(iter->seek_to_ordinal(row_id));
1009
0
    size_t num_read = 1;
1010
0
    RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
1011
0
    CHECK(num_read == 1);
1012
    // trim row id
1013
0
    if (_tablet_schema->cluster_key_uids().empty()) {
1014
0
        *key = index_column->get_data_at(0).to_string();
1015
0
    } else {
1016
0
        Slice sought_key =
1017
0
                Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
1018
0
        Slice sought_key_without_rowid =
1019
0
                Slice(sought_key.get_data(),
1020
0
                      sought_key.get_size() - PrimaryKeyIndexReader::ROW_ID_LENGTH);
1021
0
        *key = sought_key_without_rowid.to_string();
1022
0
    }
1023
0
    return Status::OK();
1024
0
}
1025
1026
Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
1027
                                       uint32_t row_id, MutableColumnPtr& result,
1028
                                       StorageReadOptions& storage_read_options,
1029
27.1k
                                       std::unique_ptr<ColumnIterator>& iterator_hint) {
1030
27.1k
    segment_v2::ColumnIteratorOptions opt {
1031
27.1k
            .use_page_cache = !config::disable_storage_page_cache,
1032
27.1k
            .file_reader = file_reader().get(),
1033
27.1k
            .stats = storage_read_options.stats,
1034
27.1k
            .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
1035
27.1k
                                     .file_cache_stats =
1036
27.1k
                                             &storage_read_options.stats->file_cache_stats},
1037
27.1k
    };
1038
1039
27.1k
    std::vector<segment_v2::rowid_t> single_row_loc {row_id};
1040
27.1k
    if (!slot->column_paths().empty()) {
1041
        // here need create column readers to make sure column reader is created before seek_and_read_by_rowid
1042
        // if segment cache miss, column reader will be created to make sure the variant column result not coredump
1043
834
        RETURN_IF_ERROR(_create_column_meta_once(storage_read_options.stats));
1044
1045
834
        const auto& dt_variant =
1046
834
                assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()));
1047
834
        TabletColumn column = TabletColumn::create_materialized_variant_column(
1048
834
                schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
1049
834
                slot->col_unique_id(), dt_variant.variant_max_subcolumns_count(),
1050
834
                dt_variant.enable_doc_mode());
1051
834
        auto storage_type = get_data_type_of(column, storage_read_options);
1052
834
        MutableColumnPtr file_storage_column = storage_type->create_column();
1053
834
        DCHECK(storage_type != nullptr);
1054
1055
834
        if (iterator_hint == nullptr) {
1056
202
            RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_options));
1057
202
            RETURN_IF_ERROR(iterator_hint->init(opt));
1058
202
        }
1059
834
        RETURN_IF_ERROR(
1060
834
                iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
1061
834
        ColumnPtr source_ptr;
1062
        // storage may have different type with schema, so we need to cast the column
1063
834
        RETURN_IF_ERROR(variant_util::cast_column(
1064
834
                ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()),
1065
834
                slot->type(), &source_ptr));
1066
834
        RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 1));
1067
26.3k
    } else {
1068
26.3k
        int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
1069
26.3k
                                                 : schema.field_index(slot->col_name());
1070
26.3k
        if (index < 0) {
1071
0
            std::stringstream ss;
1072
0
            ss << "field name is invalid. field=" << slot->col_name()
1073
0
               << ", field_name_to_index=" << schema.get_all_field_names();
1074
0
            return Status::InternalError(ss.str());
1075
0
        }
1076
26.3k
        if (iterator_hint == nullptr) {
1077
11.5k
            RETURN_IF_ERROR(new_column_iterator(schema.column(index), &iterator_hint,
1078
11.5k
                                                &storage_read_options));
1079
11.5k
            RETURN_IF_ERROR(iterator_hint->init(opt));
1080
11.5k
        }
1081
26.3k
        RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
1082
26.3k
    }
1083
27.1k
    return Status::OK();
1084
27.1k
}
1085
1086
Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
1087
24.6M
                                    OlapReaderStatistics* stats) {
1088
24.6M
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
1089
24.6M
    if (footer_pb_shared != nullptr) {
1090
23.0M
        footer_pb = footer_pb_shared;
1091
23.0M
        return Status::OK();
1092
23.0M
    }
1093
1094
18.4E
    VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is missing, try to load it",
1095
18.4E
                              _file_reader->path().native(), _file_reader->size(),
1096
18.4E
                              _file_reader->size() - 12);
1097
1098
1.59M
    StoragePageCache* segment_footer_cache = ExecEnv::GetInstance()->get_storage_page_cache();
1099
1.59M
    DCHECK(segment_footer_cache != nullptr);
1100
1101
1.59M
    auto cache_key = get_segment_footer_cache_key();
1102
1103
1.59M
    PageCacheHandle cache_handle;
1104
1105
    // Put segment footer into index page cache.
1106
    // Rationale:
1107
    // - Footer is metadata (small, parsed with indexes), not data page payload.
1108
    // - Using PageTypePB::INDEX_PAGE keeps it under the same eviction policy/shards
1109
    //   as other index/metadata pages and avoids competing with DATA_PAGE budget.
1110
1.59M
    if (!segment_footer_cache->lookup(cache_key, &cache_handle,
1111
1.59M
                                      segment_v2::PageTypePB::INDEX_PAGE)) {
1112
73.0k
        RETURN_IF_ERROR(_parse_footer(footer_pb_shared, stats));
1113
73.0k
        segment_footer_cache->insert(cache_key, footer_pb_shared, footer_pb_shared->ByteSizeLong(),
1114
73.0k
                                     &cache_handle, segment_v2::PageTypePB::INDEX_PAGE);
1115
1.52M
    } else {
1116
18.4E
        VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is found in cache",
1117
18.4E
                                  _file_reader->path().native(), _file_reader->size(),
1118
18.4E
                                  _file_reader->size() - 12);
1119
1.52M
    }
1120
1.59M
    footer_pb_shared = cache_handle.get<std::shared_ptr<SegmentFooterPB>>();
1121
1.59M
    _footer_pb = footer_pb_shared;
1122
1.59M
    footer_pb = footer_pb_shared;
1123
1.59M
    return Status::OK();
1124
1.59M
}
1125
1126
1.75M
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key() const {
1127
1.75M
    DCHECK(_file_reader != nullptr);
1128
    // The footer is always at the end of the segment file.
1129
    // The size of footer is 12.
1130
    // So we use the size of file minus 12 as the cache key, which is unique for each segment file.
1131
1.75M
    return get_segment_footer_cache_key(_file_reader);
1132
1.75M
}
1133
1134
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key(
1135
1.75M
        const io::FileReaderSPtr& file_reader) {
1136
1.75M
    return {file_reader->path().native(), file_reader->size(),
1137
1.75M
            static_cast<int64_t>(file_reader->size() - 12)};
1138
1.75M
}
1139
1140
} // namespace doris::segment_v2