Coverage Report

Created: 2026-07-02 01:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <gen_cpp/segment_v2.pb.h>
25
26
#include <algorithm>
27
#include <cstring>
28
#include <memory>
29
#include <set>
30
#include <sstream>
31
#include <utility>
32
33
#include "cloud/config.h"
34
#include "common/config.h"
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "core/column/column.h"
39
#include "core/data_type/data_type.h"
40
#include "core/data_type/data_type_factory.hpp"
41
#include "core/data_type/data_type_nullable.h"
42
#include "core/data_type/data_type_variant.h"
43
#include "core/field.h"
44
#include "core/string_ref.h"
45
#include "cpp/sync_point.h"
46
#include "exprs/expr_zonemap_filter.h"
47
#include "exprs/vexpr_context.h"
48
#include "io/cache/block_file_cache.h"
49
#include "io/cache/block_file_cache_factory.h"
50
#include "io/cache/cached_remote_file_reader.h"
51
#include "io/fs/file_reader.h"
52
#include "io/fs/file_system.h"
53
#include "io/io_common.h"
54
#include "runtime/exec_env.h"
55
#include "runtime/query_context.h"
56
#include "runtime/runtime_predicate.h"
57
#include "runtime/runtime_state.h"
58
#include "storage/index/index_file_reader.h"
59
#include "storage/index/indexed_column_reader.h"
60
#include "storage/index/primary_key_index.h"
61
#include "storage/index/short_key_index.h"
62
#include "storage/index/zone_map/zonemap_eval_context.h"
63
#include "storage/iterator/vgeneric_iterators.h"
64
#include "storage/iterators.h"
65
#include "storage/key_coder.h"
66
#include "storage/olap_common.h"
67
#include "storage/predicate/block_column_predicate.h"
68
#include "storage/predicate/column_predicate.h"
69
#include "storage/rowset/rowset_reader_context.h"
70
#include "storage/schema.h"
71
#include "storage/segment/column_meta_accessor.h"
72
#include "storage/segment/column_reader.h"
73
#include "storage/segment/column_reader_cache.h"
74
#include "storage/segment/empty_segment_iterator.h"
75
#include "storage/segment/page_io.h"
76
#include "storage/segment/page_pointer.h"
77
#include "storage/segment/segment_iterator.h"
78
#include "storage/segment/segment_writer.h" // k_segment_magic_length
79
#include "storage/segment/stream_reader.h"
80
#include "storage/segment/variant/variant_column_reader.h"
81
#include "storage/tablet/tablet_schema.h"
82
#include "storage/types.h"
83
#include "storage/utils.h"
84
#include "util/coding.h"
85
#include "util/json/path_in_data.h"
86
#include "util/slice.h" // Slice
87
88
namespace doris::segment_v2 {
89
90
class InvertedIndexIterator;
91
92
namespace {
93
94
Status build_segment_zonemap_context(Segment* segment, const Schema& schema,
95
                                     const StorageReadOptions& read_options,
96
17.7k
                                     const VExprContextSPtrs& conjuncts, ZoneMapEvalContext* ctx) {
97
17.7k
    DORIS_CHECK(segment != nullptr);
98
17.7k
    DORIS_CHECK(ctx != nullptr);
99
17.7k
    std::set<int> slot_indexes;
100
20.0k
    for (const auto& conjunct : conjuncts) {
101
20.0k
        DORIS_CHECK(conjunct != nullptr);
102
20.0k
        const auto& root = conjunct->root();
103
20.0k
        DORIS_CHECK(root != nullptr);
104
20.0k
        if (!root->can_evaluate_zonemap_filter()) {
105
18.2k
            continue;
106
18.2k
        }
107
        // Segment zone maps have one min/max/null summary per column for the whole segment, so a
108
        // segment-level context can safely hold every slot referenced by a compound expression.
109
        // Page zone maps are page-aligned per column and still use single-slot filtering in
110
        // SegmentIterator.
111
1.76k
        root->collect_slot_column_ids(slot_indexes);
112
1.76k
    }
113
17.7k
    for (const int slot_index : slot_indexes) {
114
3.05k
        if (slot_index < 0 || cast_set<size_t>(slot_index) >= schema.num_column_ids()) {
115
0
            continue;
116
0
        }
117
3.05k
        const auto column_id = schema.column_id(cast_set<size_t>(slot_index));
118
3.05k
        const auto* tablet_column = schema.column(column_id);
119
3.05k
        DORIS_CHECK(tablet_column != nullptr);
120
3.05k
        if (!segment->can_apply_predicate_safely(
121
3.05k
                    column_id, schema, read_options.target_cast_type_for_variants, read_options)) {
122
11
            continue;
123
11
        }
124
3.04k
        auto data_type = segment->get_data_type_of(*tablet_column, read_options);
125
3.04k
        if (data_type == nullptr) {
126
0
            continue;
127
0
        }
128
3.04k
        ZoneMapEvalContext::SlotZoneMap slot_zone_map;
129
3.04k
        slot_zone_map.data_type = data_type;
130
3.04k
        std::shared_ptr<ColumnReader> reader;
131
3.04k
        Status st = segment->get_column_reader(*tablet_column, &reader, read_options.stats);
132
3.04k
        if (st.is<ErrorCode::NOT_FOUND>()) {
133
30
            ctx->slots.emplace(slot_index, std::move(slot_zone_map));
134
30
            continue;
135
30
        }
136
3.01k
        RETURN_IF_ERROR(st);
137
3.02k
        if (reader != nullptr && reader->has_zone_map()) {
138
2.80k
            ZoneMap zone_map;
139
2.80k
            RETURN_IF_ERROR(reader->get_segment_zone_map(&zone_map));
140
2.80k
            slot_zone_map.zone_map = std::make_shared<ZoneMap>(std::move(zone_map));
141
2.80k
        }
142
3.01k
        ctx->slots.emplace(slot_index, std::move(slot_zone_map));
143
3.01k
    }
144
17.7k
    return Status::OK();
145
17.7k
}
146
147
} // namespace
148
149
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
150
                     uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
151
                     const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
152
886k
                     InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
153
    // Ensure tablet_id is available in reader_options for CachedRemoteFileReader peer read.
154
886k
    io::FileReaderOptions opts_with_tablet = reader_options;
155
886k
    opts_with_tablet.tablet_id = tablet_id;
156
157
886k
    auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, opts_with_tablet, output,
158
886k
                   idx_file_info, stats);
159
888k
    if (s.ok() && output && *output) {
160
888k
        (*output)->_tablet_id = tablet_id;
161
888k
    }
162
886k
    if (!s.ok()) {
163
5
        if (!config::is_cloud_mode()) {
164
5
            auto res = ExecEnv::get_tablet(tablet_id);
165
5
            TabletSharedPtr tablet =
166
5
                    res.has_value() ? std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
167
5
            if (tablet) {
168
0
                tablet->report_error(s);
169
0
            }
170
5
        }
171
5
    }
172
173
886k
    return s;
174
886k
}
175
176
Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
177
                      RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
178
                      const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
179
887k
                      InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
180
887k
    io::FileReaderSPtr file_reader;
181
887k
    auto st = fs->open_file(path, &file_reader, &reader_options);
182
887k
    TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
183
887k
    std::shared_ptr<Segment> segment(
184
887k
            new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
185
887k
    segment->_seg_path = path;
186
888k
    if (st) {
187
888k
        segment->_fs = fs;
188
888k
        segment->_file_reader = std::move(file_reader);
189
888k
        st = segment->_open(stats);
190
888k
    }
191
192
    // Three-tier retry for CORRUPTION errors when file cache is enabled.
193
    // This handles CORRUPTION from both open_file() and _parse_footer() (via _open()).
194
887k
    if (st.is<ErrorCode::CORRUPTION>() &&
195
887k
        reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
196
        // Tier 1: Clear file cache and retry with cache support (re-downloads from remote).
197
2
        LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source "
198
2
                        "file directly, file path: "
199
2
                     << path << " cache_key: " << file_cache_key_str(path);
200
2
        auto file_key = file_cache_key_from_path(path);
201
2
        auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
202
2
        file_cache->remove_if_cached(file_key);
203
204
2
        st = fs->open_file(path, &file_reader, &reader_options);
205
2
        if (st) {
206
2
            segment->_fs = fs;
207
2
            segment->_file_reader = std::move(file_reader);
208
2
            st = segment->_open(stats);
209
2
        }
210
2
        TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
211
2
        if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
212
            // Tier 2: Bypass cache entirely and read directly from remote storage.
213
0
            LOG(WARNING) << "failed to try to read remote source file again with cache support,"
214
0
                         << " try to read from remote directly, "
215
0
                         << " file path: " << path << " cache_key: " << file_cache_key_str(path);
216
0
            file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
217
0
            file_cache->remove_if_cached(file_key);
218
219
0
            io::FileReaderOptions opt = reader_options;
220
0
            opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
221
0
            RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
222
0
            segment->_fs = fs;
223
0
            segment->_file_reader = std::move(file_reader);
224
0
            st = segment->_open(stats);
225
0
            if (!st.ok()) {
226
                // Tier 3: Remote source itself is corrupt.
227
0
                LOG(WARNING) << "failed to try to read remote source file directly,"
228
0
                             << " file path: " << path
229
0
                             << " cache_key: " << file_cache_key_str(path);
230
0
            }
231
0
        }
232
2
    }
233
887k
    RETURN_IF_ERROR(st);
234
18.4E
    DCHECK(segment->_fs != nullptr) << "file system is nullptr after segment open";
235
887k
    *output = std::move(segment);
236
887k
    return Status::OK();
237
887k
}
238
239
Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
240
                 InvertedIndexFileInfo idx_file_info)
241
888k
        : _segment_id(segment_id),
242
888k
          _meta_mem_usage(0),
243
888k
          _rowset_id(rowset_id),
244
888k
          _tablet_schema(std::move(tablet_schema)),
245
888k
          _idx_file_info(std::move(idx_file_info)) {}
246
247
854k
Segment::~Segment() {
248
854k
    g_segment_estimate_mem_bytes << -_tracked_meta_mem_usage;
249
    // if failed, fix `_tracked_meta_mem_usage` accuracy
250
854k
    DCHECK(_tracked_meta_mem_usage == meta_mem_usage());
251
854k
}
252
253
89.3k
io::UInt128Wrapper Segment::file_cache_key(std::string_view rowset_id, uint32_t seg_id) {
254
89.3k
    return io::BlockFileCache::hash(fmt::format("{}_{}.dat", rowset_id, seg_id));
255
89.3k
}
256
257
885k
int64_t Segment::get_metadata_size() const {
258
885k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
259
885k
    return sizeof(Segment) + (_pk_index_meta ? _pk_index_meta->ByteSizeLong() : 0) +
260
18.4E
           (footer_pb_shared ? footer_pb_shared->ByteSizeLong() : 0);
261
885k
}
262
263
886k
void Segment::update_metadata_size() {
264
886k
    MetadataAdder::update_metadata_size();
265
886k
    g_segment_estimate_mem_bytes << _meta_mem_usage - _tracked_meta_mem_usage;
266
886k
    _tracked_meta_mem_usage = _meta_mem_usage;
267
886k
}
268
269
887k
Status Segment::_open(OlapReaderStatistics* stats) {
270
887k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
271
887k
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
272
273
887k
    _pk_index_meta.reset(
274
887k
            footer_pb_shared->has_primary_key_index_meta()
275
887k
                    ? new PrimaryKeyIndexMetaPB(footer_pb_shared->primary_key_index_meta())
276
887k
                    : nullptr);
277
    // delete_bitmap_calculator_test.cpp
278
    // DCHECK(footer.has_short_key_index_page());
279
887k
    _sk_index_page = footer_pb_shared->short_key_index_page();
280
887k
    _num_rows = footer_pb_shared->num_rows();
281
282
    // An estimated memory usage of a segment
283
    // Footer is seperated to StoragePageCache so we don't need to add it to _meta_mem_usage
284
    // _meta_mem_usage += footer_pb_shared->ByteSizeLong();
285
887k
    if (_pk_index_meta != nullptr) {
286
785k
        _meta_mem_usage += _pk_index_meta->ByteSizeLong();
287
785k
    }
288
289
887k
    _meta_mem_usage += sizeof(*this);
290
887k
    _meta_mem_usage += std::min(static_cast<int>(_tablet_schema->num_columns()),
291
887k
                                config::max_segment_partial_column_cache_size) *
292
887k
                       config::estimated_mem_per_column_reader;
293
294
    // 1024 comes from SegmentWriterOptions
295
887k
    _meta_mem_usage += (_num_rows + 1023) / 1024 * (36 + 4);
296
    // 0.01 comes from PrimaryKeyIndexBuilder::init
297
887k
    _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
298
299
887k
    update_metadata_size();
300
301
887k
    return Status::OK();
302
887k
}
303
304
5.62k
Status Segment::_open_index_file_reader() {
305
    // Derive the index path from `_seg_path`, not `_file_reader->path()`: remote FS normalizes the
306
    // latter to an absolute path that won't match the relative keys in PackedFileSystem's index map.
307
5.62k
    _index_file_reader = std::make_shared<IndexFileReader>(
308
5.62k
            _fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(_seg_path)},
309
5.62k
            _tablet_schema->get_inverted_index_storage_format(), _idx_file_info, _tablet_id);
310
5.62k
    return Status::OK();
311
5.62k
}
312
313
bool Segment::is_tso_placeholder_col(int cid, const Schema& schema,
314
1.99M
                                     const StorageReadOptions& read_options) const {
315
1.99M
    if (read_options.version.first != read_options.version.second) {
316
1.11M
        return false;
317
1.11M
    }
318
878k
    if (read_options.io_ctx.reader_type != ReaderType::READER_BINLOG &&
319
879k
        read_options.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
320
879k
        return false;
321
879k
    }
322
    // tso_col_idx() is -1 for non-binlog schemas, so this returns false there.
323
18.4E
    return cid == schema.tso_col_idx();
324
878k
}
325
326
Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options,
327
2.17M
                             std::unique_ptr<RowwiseIterator>* iter) {
328
2.17M
    if (read_options.runtime_state != nullptr) {
329
2.05M
        _be_exec_version = read_options.runtime_state->be_exec_version();
330
2.05M
    }
331
2.17M
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
332
333
2.17M
    read_options.stats->total_segment_number++;
334
    // trying to prune the current segment by segment-level zone map
335
2.17M
    for (const auto& entry : read_options.col_id_to_predicates) {
336
1.87M
        int32_t column_id = entry.first;
337
        // schema change
338
1.87M
        if (_tablet_schema->num_columns() <= column_id) {
339
1.42k
            continue;
340
1.42k
        }
341
1.87M
        const TabletColumn& col = read_options.tablet_schema->column(column_id);
342
1.87M
        std::shared_ptr<ColumnReader> reader;
343
        // __DORIS_COMMIT_TSO_COL__ on a single-version segment stores a 0 placeholder on disk
344
        // (replaced with the rowset's real commit_tso at read time). Its on-disk zonemap [0,0]
345
        // must not drive segment-level pruning, so build a ConstantColumnReader carrying the real
346
        // commit_tso to prune against the real value instead.
347
1.87M
        std::optional<Field> const_value;
348
1.87M
        if (read_options.version.first == read_options.version.second &&
349
1.87M
            column_id == schema->commit_tso_col_idx() && read_options.commit_tso.end_tso() != -1) {
350
0
            const_value = Field::create_field<TYPE_BIGINT>(read_options.commit_tso.end_tso());
351
0
        }
352
1.87M
        Status st = get_column_reader(col, &reader, read_options.stats, std::move(const_value));
353
        // not found in this segment, skip
354
1.87M
        if (st.is<ErrorCode::NOT_FOUND>()) {
355
102
            continue;
356
102
        }
357
1.86M
        RETURN_IF_ERROR(st);
358
        // should be OK
359
1.86M
        DCHECK(reader != nullptr);
360
1.86M
        if (!reader->has_zone_map()) {
361
0
            continue;
362
0
        }
363
        // Placeholder tso column on a single-version binlog segment: its zonemap reflects the
364
        // NULL placeholder (replaced with commit_tso at read time), so skip pruning by
365
        // zonemap (min == max == commit_tso) and reuse the predicate's own zonemap matching:
366
        // evaluate_and() returns false iff no value in [min, max] can satisfy the predicates,
367
        // i.e. commit_tso fails them and the whole segment can be pruned. Predicates that don't
368
        // support zonemap return true (conservative: not pruned, row-level eval handles them).
369
1.86M
        if (read_options.col_id_to_predicates.contains(column_id) &&
370
1.87M
            is_tso_placeholder_col(column_id, *schema, read_options)) {
371
0
            const Int64 commit_tso =
372
0
                    read_options.commit_tso.end_tso() == -1 ? 0 : read_options.commit_tso.end_tso();
373
0
            ZoneMap zone_map;
374
0
            zone_map.min_value = Field::create_field<TYPE_BIGINT>(commit_tso);
375
0
            zone_map.max_value = Field::create_field<TYPE_BIGINT>(commit_tso);
376
0
            zone_map.has_not_null = true;
377
0
            if (!entry.second->evaluate_and(zone_map)) {
378
                // any condition not satisfied, return.
379
0
                *iter = std::make_unique<EmptySegmentIterator>(*schema);
380
0
                read_options.stats->filtered_segment_number++;
381
0
                return Status::OK();
382
0
            }
383
0
            continue;
384
0
        }
385
1.86M
        if (read_options.col_id_to_predicates.contains(column_id) &&
386
1.86M
            can_apply_predicate_safely(column_id, *schema,
387
1.86M
                                       read_options.target_cast_type_for_variants, read_options)) {
388
1.86M
            bool matched = true;
389
1.86M
            RETURN_IF_ERROR(reader->match_condition(entry.second.get(), &matched));
390
1.86M
            if (!matched) {
391
                // any condition not satisfied, return.
392
93.9k
                *iter = std::make_unique<EmptySegmentIterator>(*schema);
393
93.9k
                read_options.stats->filtered_segment_number++;
394
93.9k
                read_options.stats->rows_stats_filtered += num_rows();
395
93.9k
                return Status::OK();
396
93.9k
            }
397
1.86M
        }
398
1.86M
    }
399
400
    // Segment-level expr-zonemap runs before SegmentIterator can rebind storage expressions to
401
    // the reader schema. Only apply it when scan tuple slot ordinals already match this schema.
402
2.08M
    if (expr_zonemap::is_expr_zonemap_filter_enabled(read_options.runtime_state) &&
403
2.08M
        !read_options.common_expr_ctxs_push_down.empty()) {
404
17.8k
        ZoneMapEvalContext ctx;
405
17.8k
        RETURN_IF_ERROR(build_segment_zonemap_context(
406
17.8k
                this, *schema, read_options, read_options.common_expr_ctxs_push_down, &ctx));
407
17.8k
        const auto result =
408
17.8k
                VExprContext::evaluate_zonemap_filter(read_options.common_expr_ctxs_push_down, ctx);
409
17.8k
        ctx.stats.accumulate_to(read_options.stats);
410
17.8k
        if (result == ZoneMapFilterResult::kNoMatch) {
411
359
            *iter = std::make_unique<EmptySegmentIterator>(*schema);
412
359
            read_options.stats->filtered_segment_number++;
413
359
            read_options.stats->expr_zonemap_filtered_segments++;
414
359
            return Status::OK();
415
359
        }
416
17.8k
    }
417
418
2.08M
    {
419
2.08M
        SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns);
420
2.08M
        RETURN_IF_ERROR(load_index(read_options.stats));
421
2.08M
    }
422
423
2.08M
    if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 &&
424
2.08M
        read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
425
2.08M
        read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
426
32.2k
        iter->reset(new_vstatistics_iterator(this->shared_from_this(), *schema));
427
2.04M
    } else {
428
2.04M
        *iter = std::make_unique<SegmentIterator>(this->shared_from_this(), schema);
429
2.04M
    }
430
431
    // TODO: Valid the opt not only in ReaderType::READER_QUERY
432
2.08M
    if (read_options.io_ctx.reader_type == ReaderType::READER_QUERY &&
433
2.08M
        !read_options.column_predicates.empty()) {
434
1.73M
        auto pruned_predicates = read_options.column_predicates;
435
1.73M
        auto pruned = false;
436
18.3M
        for (auto& it : _column_reader_cache->get_available_readers(false)) {
437
18.3M
            const auto uid = it.first;
438
18.3M
            const auto column_id = read_options.tablet_schema->field_index(uid);
439
18.3M
            bool tmp_pruned = false;
440
18.3M
            RETURN_IF_ERROR(it.second->prune_predicates_by_zone_map(pruned_predicates, column_id,
441
18.3M
                                                                    &tmp_pruned));
442
18.3M
            pruned |= tmp_pruned;
443
18.3M
        }
444
445
1.73M
        if (pruned) {
446
7.53k
            auto options_with_pruned_predicates = read_options;
447
7.53k
            options_with_pruned_predicates.column_predicates = pruned_predicates;
448
            //because column_predicates is changed, we need to rebuild col_id_to_predicates so that inverted index will not go through it.
449
7.53k
            options_with_pruned_predicates.col_id_to_predicates.clear();
450
12.6k
            for (auto pred : options_with_pruned_predicates.column_predicates) {
451
12.6k
                if (!options_with_pruned_predicates.col_id_to_predicates.contains(
452
12.6k
                            pred->column_id())) {
453
8.71k
                    options_with_pruned_predicates.col_id_to_predicates.insert(
454
8.71k
                            {pred->column_id(), AndBlockColumnPredicate::create_shared()});
455
8.71k
                }
456
12.6k
                options_with_pruned_predicates.col_id_to_predicates[pred->column_id()]
457
12.6k
                        ->add_column_predicate(SingleColumnBlockPredicate::create_unique(pred));
458
12.6k
            }
459
7.53k
            return iter->get()->init(options_with_pruned_predicates);
460
7.53k
        }
461
1.73M
    }
462
2.07M
    return iter->get()->init(read_options);
463
2.08M
}
464
465
Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
466
1
                                  io::IOContext& io_ctx) {
467
1
    if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) {
468
1
        return Status::OK();
469
1
    }
470
471
0
    std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat";
472
0
    std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/";
473
0
    Status create_st = io::global_local_filesystem()->create_directory(dir_path, true);
474
0
    if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) {
475
0
        LOG(WARNING) << "failed to create error file dir: " << create_st.to_string();
476
0
        return create_st;
477
0
    }
478
0
    size_t dir_size = 0;
479
0
    RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size));
480
0
    if (dir_size > config::file_cache_error_log_limit_bytes) {
481
0
        LOG(WARNING) << "error file dir size is too large: " << dir_size;
482
0
        return Status::OK();
483
0
    }
484
485
0
    std::string error_part;
486
0
    error_part.resize(bytes_read);
487
0
    std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset);
488
0
    LOG(WARNING) << "writer error part to " << part_path;
489
0
    bool is_part_exist = false;
490
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist));
491
0
    if (is_part_exist) {
492
0
        LOG(WARNING) << "error part already exists: " << part_path;
493
0
    } else {
494
0
        std::unique_ptr<io::FileWriter> part_writer;
495
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer));
496
0
        RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read)));
497
0
        RETURN_IF_ERROR(part_writer->close());
498
0
    }
499
500
0
    std::string error_file;
501
0
    error_file.resize(file_size);
502
0
    auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get());
503
0
    if (cached_reader == nullptr) {
504
0
        return Status::InternalError("file reader is not CachedRemoteFileReader");
505
0
    }
506
0
    size_t error_file_bytes_read = 0;
507
0
    RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at(
508
0
            0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx));
509
0
    DCHECK(error_file_bytes_read == file_size);
510
    //std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str();
511
0
    std::string file_path = dir_path + file_name;
512
0
    LOG(WARNING) << "writer error file to " << file_path;
513
0
    bool is_file_exist = false;
514
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist));
515
0
    if (is_file_exist) {
516
0
        LOG(WARNING) << "error file already exists: " << part_path;
517
0
    } else {
518
0
        std::unique_ptr<io::FileWriter> writer;
519
0
        RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer));
520
0
        RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size)));
521
0
        RETURN_IF_ERROR(writer->close());
522
0
    }
523
0
    return Status::OK(); // already exists
524
0
};
525
526
Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer,
527
95.1k
                              OlapReaderStatistics* stats) {
528
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
529
95.1k
    auto file_size = _file_reader->size();
530
95.1k
    if (file_size < 12) {
531
0
        return Status::Corruption("Bad segment file {}: file size {} < 12, cache_key: {}",
532
0
                                  _file_reader->path().native(), file_size,
533
0
                                  file_cache_key_str(_file_reader->path().native()));
534
0
    }
535
536
95.1k
    uint8_t fixed_buf[12];
537
95.1k
    size_t bytes_read = 0;
538
    // TODO(plat1ko): Support session variable `enable_file_cache`
539
95.1k
    io::IOContext io_ctx {.is_index_data = true,
540
95.1k
                          .file_cache_stats = stats ? &stats->file_cache_stats : nullptr};
541
95.1k
    RETURN_IF_ERROR(
542
95.1k
            _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
543
95.1k
    DCHECK_EQ(bytes_read, 12);
544
95.1k
    TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf);
545
95.1k
    TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf);
546
95.1k
    if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
547
1
        Status st =
548
1
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
549
1
        if (!st.ok()) {
550
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
551
0
        }
552
1
        return Status::Corruption(
553
1
                "Bad segment file {}: file_size: {}, magic number not match, cache_key: {}",
554
1
                _file_reader->path().native(), file_size,
555
1
                file_cache_key_str(_file_reader->path().native()));
556
1
    }
557
558
    // read footer PB
559
95.1k
    uint32_t footer_length = decode_fixed32_le(fixed_buf);
560
95.1k
    if (file_size < 12 + footer_length) {
561
0
        Status st =
562
0
                _write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
563
0
        if (!st.ok()) {
564
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
565
0
        }
566
0
        return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}",
567
0
                                  _file_reader->path().native(), file_size, 12 + footer_length,
568
0
                                  file_cache_key_str(_file_reader->path().native()));
569
0
    }
570
571
95.1k
    std::string footer_buf;
572
95.1k
    footer_buf.resize(footer_length);
573
95.1k
    RETURN_IF_ERROR(_file_reader->read_at(file_size - 12 - footer_length, footer_buf, &bytes_read,
574
95.1k
                                          &io_ctx));
575
95.1k
    DCHECK_EQ(bytes_read, footer_length);
576
577
    // validate footer PB's checksum
578
95.1k
    uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4);
579
95.1k
    uint32_t actual_checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
580
95.1k
    if (actual_checksum != expect_checksum) {
581
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
582
0
                                      footer_buf.data(), io_ctx);
583
0
        if (!st.ok()) {
584
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
585
0
        }
586
0
        return Status::Corruption(
587
0
                "Bad segment file {}: file_size = {}, footer checksum not match, actual={} "
588
0
                "vs expect={}, cache_key: {}",
589
0
                _file_reader->path().native(), file_size, actual_checksum, expect_checksum,
590
0
                file_cache_key_str(_file_reader->path().native()));
591
0
    }
592
593
    // deserialize footer PB
594
95.1k
    footer = std::make_shared<SegmentFooterPB>();
595
95.1k
    if (!footer->ParseFromString(footer_buf)) {
596
0
        Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
597
0
                                      footer_buf.data(), io_ctx);
598
0
        if (!st.ok()) {
599
0
            LOG(WARNING) << "failed to write error file: " << st.to_string();
600
0
        }
601
0
        return Status::Corruption(
602
0
                "Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, "
603
0
                "cache_key: ",
604
0
                _file_reader->path().native(), file_size,
605
0
                file_cache_key_str(_file_reader->path().native()));
606
0
    }
607
608
18.4E
    VLOG_DEBUG << fmt::format("Loading segment footer from {} finished",
609
18.4E
                              _file_reader->path().native());
610
95.1k
    return Status::OK();
611
95.1k
}
612
613
3.90M
Status Segment::_load_pk_bloom_filter(OlapReaderStatistics* stats) {
614
#ifdef BE_TEST
615
    if (_pk_index_meta == nullptr) {
616
        // for BE UT "segment_cache_test"
617
        return _load_pk_bf_once.call([this] {
618
            _meta_mem_usage += 100;
619
            update_metadata_size();
620
            return Status::OK();
621
        });
622
    }
623
#endif
624
3.90M
    DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
625
3.90M
    DCHECK(_pk_index_meta != nullptr);
626
3.90M
    DCHECK(_pk_index_reader != nullptr);
627
628
3.90M
    return _load_pk_bf_once.call([this, stats] {
629
72.5k
        RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta, stats));
630
        // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
631
72.5k
        return Status::OK();
632
72.5k
    });
633
3.90M
}
634
635
3.90M
Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
636
    // `DorisCallOnce` may catch exception in calling stack A and re-throw it in
637
    // a different calling stack B which doesn't have catch block. So we add catch block here
638
    // to prevent coreudmp
639
3.90M
    RETURN_IF_CATCH_EXCEPTION({
640
3.90M
        RETURN_IF_ERROR(load_index(index_load_stats));
641
3.90M
        RETURN_IF_ERROR(_load_pk_bloom_filter(index_load_stats));
642
3.90M
    });
643
3.92M
    return Status::OK();
644
3.90M
}
645
646
6.01M
Status Segment::load_index(OlapReaderStatistics* stats) {
647
6.01M
    return _load_index_once.call([this, stats] {
648
749k
        if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
649
674k
            _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
650
674k
            RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta, stats));
651
            // _meta_mem_usage += _pk_index_reader->get_memory_size();
652
674k
            return Status::OK();
653
674k
        } else {
654
            // read and parse short key index page
655
74.7k
            OlapReaderStatistics tmp_stats;
656
74.7k
            OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
657
74.7k
            PageReadOptions opts(io::IOContext {.is_index_data = true,
658
74.7k
                                                .file_cache_stats = &stats_ptr->file_cache_stats});
659
74.7k
            opts.use_page_cache = true;
660
74.7k
            opts.type = INDEX_PAGE;
661
74.7k
            opts.file_reader = _file_reader.get();
662
74.7k
            opts.page_pointer = PagePointer(_sk_index_page);
663
            // short key index page uses NO_COMPRESSION for now
664
74.7k
            opts.codec = nullptr;
665
74.7k
            opts.stats = &tmp_stats;
666
667
74.7k
            Slice body;
668
74.7k
            PageFooterPB footer;
669
74.7k
            RETURN_IF_ERROR(
670
74.7k
                    PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer));
671
74.7k
            DCHECK_EQ(footer.type(), SHORT_KEY_PAGE);
672
74.7k
            DCHECK(footer.has_short_key_page_footer());
673
674
            // _meta_mem_usage += body.get_size();
675
74.7k
            _sk_index_decoder = std::make_unique<ShortKeyIndexDecoder>();
676
74.7k
            return _sk_index_decoder->parse(body, footer.short_key_page_footer());
677
74.7k
        }
678
749k
    });
679
6.01M
}
680
681
1.58M
Status Segment::healthy_status() {
682
1.58M
    try {
683
1.58M
        if (_load_index_once.has_called()) {
684
1.54M
            RETURN_IF_ERROR(_load_index_once.stored_result());
685
1.54M
        }
686
1.58M
        if (_load_pk_bf_once.has_called()) {
687
889k
            RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
688
889k
        }
689
1.58M
        if (_create_column_meta_once_call.has_called()) {
690
1.52M
            RETURN_IF_ERROR(_create_column_meta_once_call.stored_result());
691
1.52M
        }
692
1.58M
        if (_index_file_reader_open.has_called()) {
693
22.8k
            RETURN_IF_ERROR(_index_file_reader_open.stored_result());
694
22.8k
        }
695
        // This status is set by running time, for example, if there is something wrong during read segment iterator.
696
1.58M
        return _healthy_status.status();
697
1.58M
    } catch (const doris::Exception& e) {
698
        // If there is an exception during load_xxx, should not throw exception directly because
699
        // the caller may not exception safe.
700
0
        return e.to_status();
701
0
    } catch (const std::exception& e) {
702
        // The exception is not thrown by doris code.
703
0
        return Status::InternalError("Unexcepted error during load segment: {}", e.what());
704
0
    }
705
1.58M
}
706
707
// Return the storage datatype of related column to field.
708
DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
709
31.2M
                                      const StorageReadOptions& read_options) {
710
31.2M
    const PathInDataPtr path = column.path_info_ptr();
711
712
    // none variant column
713
31.2M
    if (path == nullptr || path->empty()) {
714
31.2M
        return DataTypeFactory::instance().create_data_type(column);
715
31.2M
    }
716
717
    // Path exists, proceed with variant logic.
718
18.4E
    PathInData relative_path = path->copy_pop_front();
719
18.4E
    int32_t unique_id = column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
720
721
    // If this uid does not exist in segment meta, fallback to schema type.
722
18.4E
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
723
601
        return DataTypeFactory::instance().create_data_type(column);
724
601
    }
725
726
18.4E
    std::shared_ptr<ColumnReader> v_reader;
727
728
    // Get the parent variant column reader
729
18.4E
    OlapReaderStatistics stats;
730
    // If status is not ok, it will throw exception(data corruption)
731
18.4E
    THROW_IF_ERROR(get_column_reader(unique_id, &v_reader, &stats));
732
18.4E
    DCHECK(v_reader != nullptr);
733
18.4E
    auto* variant_reader = static_cast<VariantColumnReader*>(v_reader.get());
734
    // Delegate type inference for variant paths to VariantColumnReader.
735
18.4E
    DataTypePtr type;
736
18.4E
    THROW_IF_ERROR(variant_reader->infer_data_type_for_path(&type, column, read_options,
737
18.4E
                                                            _column_reader_cache.get()));
738
18.4E
    return type;
739
18.4E
}
740
741
58.3M
Status Segment::_create_column_meta_once(OlapReaderStatistics* stats) {
742
58.3M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
743
58.3M
    return _create_column_meta_once_call.call([&] {
744
758k
        std::shared_ptr<SegmentFooterPB> footer_pb_shared;
745
758k
        RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
746
758k
        return _create_column_meta(*footer_pb_shared);
747
758k
    });
748
58.3M
}
749
750
757k
Status Segment::_create_column_meta(const SegmentFooterPB& footer) {
751
    // Initialize column meta accessor which internally maintains uid -> column_ordinal mapping.
752
757k
    _column_meta_accessor = std::make_unique<ColumnMetaAccessor>();
753
757k
    RETURN_IF_ERROR(_column_meta_accessor->init(footer, _file_reader));
754
755
757k
    if (config::enable_adaptive_batch_size) {
756
        // Cache raw_data_bytes per column uid for adaptive batch size prediction.
757
        // This runs under call_once, so no thread-safety concerns.
758
10.8M
        auto st = _column_meta_accessor->traverse_metas(footer, [this](const ColumnMetaPB& meta) {
759
10.8M
            if (meta.has_unique_id() && meta.unique_id() != -1 && meta.has_raw_data_bytes()) {
760
10.6M
                _column_uid_to_raw_bytes[meta.unique_id()] = meta.raw_data_bytes();
761
10.6M
            }
762
10.8M
        });
763
764
753k
        if (!st.ok()) {
765
0
            LOG(WARNING) << "Failed to traverse column metas to cache raw_data_bytes, error: "
766
0
                         << st.to_string();
767
0
        }
768
753k
    }
769
770
757k
    _column_reader_cache = std::make_unique<ColumnReaderCache>(
771
757k
            _column_meta_accessor.get(), _tablet_schema, _file_reader, _num_rows,
772
9.95M
            [this](std::shared_ptr<SegmentFooterPB>& footer_pb, OlapReaderStatistics* stats) {
773
9.95M
                return _get_segment_footer(footer_pb, stats);
774
9.95M
            });
775
757k
    return Status::OK();
776
757k
}
777
778
Status Segment::new_default_iterator(const TabletColumn& tablet_column,
779
9.29k
                                     std::unique_ptr<ColumnIterator>* iter) {
780
9.29k
    if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
781
0
        return Status::InternalError(
782
0
                "invalid nonexistent column without default value. column_uid={}, "
783
0
                "column_name={}, "
784
0
                "column_type={}",
785
0
                tablet_column.unique_id(), tablet_column.name(), tablet_column.type());
786
0
    }
787
9.29k
    std::unique_ptr<DefaultValueColumnIterator> default_value_iter(new DefaultValueColumnIterator(
788
9.29k
            tablet_column.has_default_value(), tablet_column.default_value(),
789
9.29k
            tablet_column.is_nullable(), tablet_column.type(), tablet_column.precision(),
790
9.29k
            tablet_column.frac(), tablet_column.length()));
791
9.29k
    ColumnIteratorOptions iter_opts;
792
793
9.29k
    RETURN_IF_ERROR(default_value_iter->init(iter_opts));
794
9.29k
    *iter = std::move(default_value_iter);
795
9.29k
    return Status::OK();
796
9.29k
}
797
798
// Not use cid anymore, for example original table schema is colA int, then user do following actions
799
// 1.add column b
800
// 2. drop column b
801
// 3. add column c
802
// in the new schema column c's cid == 2
803
// but in the old schema column b's cid == 2
804
// but they are not the same column
805
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
806
                                    std::unique_ptr<ColumnIterator>* iter,
807
                                    const StorageReadOptions* opt,
808
                                    const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
809
27.1M
                                            variant_sparse_column_cache) {
810
27.1M
    if (opt->runtime_state != nullptr) {
811
26.6M
        _be_exec_version = opt->runtime_state->be_exec_version();
812
26.6M
    }
813
27.1M
    RETURN_IF_ERROR(_create_column_meta_once(opt->stats));
814
815
    // For compability reason unique_id may less than 0 for variant extracted column
816
27.1M
    int32_t unique_id = tablet_column.unique_id() >= 0 ? tablet_column.unique_id()
817
18.4E
                                                       : tablet_column.parent_unique_id();
818
819
    // If column meta for this uid is not found in this segment, use default iterator.
820
27.1M
    if (!_column_meta_accessor->has_column_uid(unique_id)) {
821
3.40k
        RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
822
3.40k
        return Status::OK();
823
3.40k
    }
824
825
    // __DORIS_COMMIT_TSO_COL__ on a single-version segment stores a 0 placeholder on disk (its
826
    // real value is the rowset's commit_tso, filled at read time). Pass the real commit_tso as a
827
    // const value so the cache returns a ConstantColumnReader, whose iterator yields the real value
828
    // on every read path (projection / predicate / MIN-MAX zone-map) instead of the placeholder 0.
829
    // commit_tso == -1 means it is not assigned yet (before publish); keep the on-disk value then.
830
    // The value is constant per segment (a segment belongs to a single rowset), so caching the
831
    // ConstantColumnReader does not cross-pollute other queries. Some internal read paths (e.g. MOW
832
    // partial-update row fetch) build a bare StorageReadOptions without tablet_schema, so guard it.
833
27.1M
    std::optional<Field> const_value;
834
27.1M
    if (opt->tablet_schema != nullptr && opt->version.first == opt->version.second &&
835
27.1M
        opt->commit_tso.end_tso() != -1) {
836
0
        int32_t tso_idx = opt->tablet_schema->commit_tso_col_idx();
837
0
        if (tso_idx != -1 && opt->tablet_schema->column(tso_idx).unique_id() == unique_id) {
838
0
            const_value = Field::create_field<TYPE_BIGINT>(opt->commit_tso.end_tso());
839
0
        }
840
0
    }
841
842
    // init iterator by unique id
843
27.1M
    std::shared_ptr<ColumnReader> reader;
844
27.1M
    RETURN_IF_ERROR(get_column_reader(unique_id, &reader, opt->stats, std::move(const_value)));
845
27.1M
    if (reader == nullptr) {
846
0
        return Status::InternalError("column reader is nullptr, unique_id={}", unique_id);
847
0
    }
848
27.1M
    if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
849
        // if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used
850
29.7k
        PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr;
851
29.7k
        if (variant_sparse_column_cache) {
852
29.3k
            auto it = variant_sparse_column_cache->find(unique_id);
853
29.3k
            if (it != variant_sparse_column_cache->end()) {
854
29.3k
                sparse_column_cache_ptr = it->second.get();
855
18.4E
            } else {
856
18.4E
                DCHECK(false) << "sparse column cache is not found, unique_id=" << unique_id;
857
18.4E
            }
858
29.3k
        }
859
        // use _column_reader_cache to get variant subcolumn(path column) reader
860
29.7k
        RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get())
861
29.7k
                                ->new_iterator(iter, &tablet_column, opt,
862
29.7k
                                               _column_reader_cache.get(),
863
29.7k
                                               sparse_column_cache_ptr));
864
27.1M
    } else {
865
27.1M
        RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
866
27.1M
        if (opt->all_access_paths.contains(unique_id) ||
867
27.1M
            opt->predicate_access_paths.contains(unique_id)) {
868
61.5k
            const auto& all_access_paths = opt->all_access_paths.contains(unique_id)
869
61.5k
                                                   ? opt->all_access_paths.at(unique_id)
870
61.5k
                                                   : TColumnAccessPaths {};
871
61.5k
            const auto& predicate_access_paths = opt->predicate_access_paths.contains(unique_id)
872
61.5k
                                                         ? opt->predicate_access_paths.at(unique_id)
873
61.5k
                                                         : TColumnAccessPaths {};
874
875
            // set column name to apply access paths.
876
61.5k
            (*iter)->set_column_name(tablet_column.name());
877
61.5k
            RETURN_IF_ERROR((*iter)->set_access_paths(all_access_paths, predicate_access_paths));
878
61.5k
            (*iter)->remove_pruned_sub_iterators();
879
61.5k
        }
880
27.1M
    }
881
882
27.1M
    if (config::enable_column_type_check && !tablet_column.has_path_info() &&
883
27.1M
        !tablet_column.is_agg_state_type() && tablet_column.type() != reader->get_meta_type()) {
884
0
        LOG(WARNING) << "different type between schema and column reader,"
885
0
                     << " column schema name: " << tablet_column.name()
886
0
                     << " column schema type: " << int(tablet_column.type())
887
0
                     << " column reader meta type: " << int(reader->get_meta_type());
888
0
        return Status::InternalError("different type between schema and column reader");
889
0
    }
890
27.1M
    return Status::OK();
891
27.1M
}
892
893
Status Segment::get_column_reader(int32_t col_uid, std::shared_ptr<ColumnReader>* column_reader,
894
27.2M
                                  OlapReaderStatistics* stats, std::optional<Field> const_value) {
895
27.2M
    RETURN_IF_ERROR(_create_column_meta_once(stats));
896
27.2M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
897
    // The column is not in this segment, return nullptr
898
27.2M
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
899
0
        *column_reader = nullptr;
900
0
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
901
0
                                                          col_uid);
902
0
    }
903
27.2M
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats,
904
27.2M
                                                   std::move(const_value));
905
27.2M
}
906
907
45.0k
Status Segment::traverse_column_meta_pbs(const std::function<void(const ColumnMetaPB&)>& visitor) {
908
    // Ensure column meta accessor and reader cache are initialized once.
909
45.0k
    OlapReaderStatistics dummy_stats;
910
45.0k
    RETURN_IF_ERROR(_create_column_meta_once(&dummy_stats));
911
45.0k
    std::shared_ptr<SegmentFooterPB> footer_pb_shared;
912
45.0k
    RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, &dummy_stats));
913
45.0k
    return _column_meta_accessor->traverse_metas(*footer_pb_shared, visitor);
914
45.0k
}
915
916
Status Segment::get_column_reader(const TabletColumn& col,
917
                                  std::shared_ptr<ColumnReader>* column_reader,
918
1.94M
                                  OlapReaderStatistics* stats, std::optional<Field> const_value) {
919
1.94M
    RETURN_IF_ERROR(_create_column_meta_once(stats));
920
1.94M
    SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
921
18.4E
    int col_uid = col.unique_id() >= 0 ? col.unique_id() : col.parent_unique_id();
922
    // The column is not in this segment, return nullptr
923
1.94M
    if (!_tablet_schema->has_column_unique_id(col_uid)) {
924
40
        *column_reader = nullptr;
925
40
        return Status::Error<ErrorCode::NOT_FOUND, false>("column not found in segment, col_uid={}",
926
40
                                                          col_uid);
927
40
    }
928
1.94M
    if (col.has_path_info()) {
929
2.12k
        PathInData relative_path = col.path_info_ptr()->copy_pop_front();
930
2.12k
        return _column_reader_cache->get_path_column_reader(col_uid, relative_path, column_reader,
931
2.12k
                                                            stats);
932
2.12k
    }
933
1.94M
    return _column_reader_cache->get_column_reader(col_uid, column_reader, stats,
934
1.94M
                                                   std::move(const_value));
935
1.94M
}
936
937
Status Segment::new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta,
938
                                   const StorageReadOptions& read_options,
939
73.0k
                                   std::unique_ptr<IndexIterator>* iter) {
940
73.0k
    if (read_options.runtime_state != nullptr) {
941
64.2k
        _be_exec_version = read_options.runtime_state->be_exec_version();
942
64.2k
    }
943
73.0k
    RETURN_IF_ERROR(_create_column_meta_once(read_options.stats));
944
73.0k
    std::shared_ptr<ColumnReader> reader;
945
73.0k
    auto st = get_column_reader(tablet_column, &reader, read_options.stats);
946
73.0k
    if (st.is<ErrorCode::NOT_FOUND>()) {
947
504
        return Status::OK();
948
504
    }
949
72.5k
    RETURN_IF_ERROR(st);
950
72.5k
    DCHECK(reader != nullptr);
951
72.8k
    if (index_meta) {
952
        // call DorisCallOnce.call without check if _index_file_reader is nullptr
953
        // to avoid data race during parallel method calls
954
72.8k
        RETURN_IF_ERROR(_index_file_reader_open.call([&] { return _open_index_file_reader(); }));
955
        // after DorisCallOnce.call, _index_file_reader is guaranteed to be not nullptr
956
72.8k
        const std::string rowset_id =
957
72.8k
                index_meta->index_type() == IndexType::ANN ? _rowset_id.to_string() : "";
958
72.8k
        const bool need_binding_diagnostic = tablet_column.is_variant_type() ||
959
72.8k
                                             tablet_column.is_extracted_column() ||
960
72.8k
                                             !index_meta->get_index_suffix().empty();
961
72.8k
        bool index_file_exists = false;
962
72.8k
        Status probe_status;
963
72.8k
        if (need_binding_diagnostic) {
964
1.32k
            probe_status = _index_file_reader->init(config::inverted_index_read_buffer_size,
965
1.32k
                                                    &read_options.io_ctx);
966
1.32k
            if (probe_status.ok()) {
967
1.32k
                probe_status = _index_file_reader->index_file_exist(index_meta, &index_file_exists);
968
1.32k
            }
969
1.32k
            const auto diagnostic = fmt::format(
970
1.32k
                    "[VariantSearchBinding] phase=index_file_probe tablet_id={} rowset_id={} "
971
1.32k
                    "segment_id={} column={} logical_path={} index_id={} suffix={} exists={} "
972
1.32k
                    "status={}",
973
1.32k
                    read_options.tablet_id, _rowset_id.to_string(), _segment_id,
974
1.32k
                    tablet_column.name(),
975
1.32k
                    tablet_column.has_path_info() ? tablet_column.path_info_ptr()->get_path()
976
1.32k
                                                  : tablet_column.name(),
977
1.32k
                    index_meta->index_id(), index_meta->get_index_suffix(), index_file_exists,
978
1.32k
                    probe_status.ok() ? "OK" : probe_status.to_string());
979
1.32k
            VLOG_DEBUG << diagnostic;
980
1.32k
            if (read_options.stats != nullptr) {
981
1.32k
                read_options.stats->inverted_index_stats.add_binding_diagnostic(diagnostic);
982
1.32k
            }
983
1.32k
        }
984
72.8k
        Status iter_status = reader->new_index_iterator(_index_file_reader, index_meta, rowset_id,
985
72.8k
                                                        _segment_id, _num_rows, iter);
986
72.8k
        if (!iter_status.ok()) {
987
0
            if (need_binding_diagnostic) {
988
0
                const auto diagnostic = fmt::format(
989
0
                        "[VariantSearchBinding] phase=index_iterator_create result=reject "
990
0
                        "tablet_id={} rowset_id={} segment_id={} column={} logical_path={} "
991
0
                        "index_id={} suffix={} reason={}",
992
0
                        read_options.tablet_id, _rowset_id.to_string(), _segment_id,
993
0
                        tablet_column.name(),
994
0
                        tablet_column.has_path_info() ? tablet_column.path_info_ptr()->get_path()
995
0
                                                      : tablet_column.name(),
996
0
                        index_meta->index_id(), index_meta->get_index_suffix(),
997
0
                        iter_status.to_string());
998
0
                VLOG_DEBUG << diagnostic;
999
0
                if (read_options.stats != nullptr) {
1000
0
                    read_options.stats->inverted_index_stats.add_binding_diagnostic(diagnostic);
1001
0
                }
1002
0
            }
1003
0
            return iter_status;
1004
0
        }
1005
72.8k
        return Status::OK();
1006
72.8k
    }
1007
18.4E
    return Status::OK();
1008
72.5k
}
1009
1010
Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
1011
                               bool with_seq_col, bool with_rowid, RowLocation* row_location,
1012
3.85M
                               OlapReaderStatistics* stats, std::string* encoded_seq_value) {
1013
3.85M
    RETURN_IF_ERROR(load_pk_index_and_bf(stats));
1014
3.85M
    bool has_seq_col = latest_schema->has_sequence_col();
1015
3.85M
    bool has_rowid = !latest_schema->cluster_key_uids().empty();
1016
3.85M
    size_t seq_col_length = 0;
1017
3.85M
    if (has_seq_col) {
1018
25.2k
        seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
1019
25.2k
    }
1020
3.85M
    size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;
1021
1022
3.85M
    Slice key_without_seq =
1023
3.85M
            Slice(key.get_data(), key.get_size() - (with_seq_col ? seq_col_length : 0) -
1024
3.85M
                                          (with_rowid ? rowid_length : 0));
1025
1026
3.85M
    DCHECK(_pk_index_reader != nullptr);
1027
3.85M
    if (!_pk_index_reader->check_present(key_without_seq)) {
1028
188k
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
1029
188k
    }
1030
3.67M
    bool exact_match = false;
1031
3.67M
    std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
1032
3.67M
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats));
1033
3.67M
    auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match);
1034
3.67M
    if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
1035
0
        return st;
1036
0
    }
1037
3.68M
    if (st.is<ErrorCode::ENTRY_NOT_FOUND>() || (!has_seq_col && !has_rowid && !exact_match)) {
1038
104
        return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
1039
104
    }
1040
3.67M
    row_location->row_id = cast_set<uint32_t>(index_iterator->get_current_ordinal());
1041
3.67M
    row_location->segment_id = _segment_id;
1042
3.67M
    row_location->rowset_id = _rowset_id;
1043
1044
3.67M
    size_t num_to_read = 1;
1045
3.67M
    auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0);
1046
3.67M
    auto index_column = index_type->create_column();
1047
3.67M
    size_t num_read = num_to_read;
1048
3.67M
    RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
1049
3.67M
    DCHECK(num_to_read == num_read);
1050
1051
3.67M
    Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
1052
1053
    // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column
1054
    // for a merge-on-write table which doesn't have sequence column, so `has_seq_col ==  true` doesn't mean
1055
    // data in segment has sequence column value
1056
3.67M
    bool segment_has_seq_col = _tablet_schema->has_sequence_col();
1057
3.67M
    Slice sought_key_without_seq = Slice(
1058
3.67M
            sought_key.get_data(),
1059
3.67M
            sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length);
1060
1061
3.67M
    if (has_seq_col) {
1062
        // compare key
1063
25.2k
        if (key_without_seq.compare(sought_key_without_seq) != 0) {
1064
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
1065
0
        }
1066
1067
25.2k
        if (with_seq_col && segment_has_seq_col) {
1068
            // compare sequence id
1069
23.3k
            Slice sequence_id =
1070
23.3k
                    Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1);
1071
23.3k
            Slice previous_sequence_id =
1072
23.3k
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size() + 1,
1073
23.3k
                          seq_col_length - 1);
1074
23.3k
            if (sequence_id.compare(previous_sequence_id) < 0) {
1075
338
                return Status::Error<ErrorCode::KEY_ALREADY_EXISTS>(
1076
338
                        "key with higher sequence id exists");
1077
338
            }
1078
23.3k
        }
1079
3.64M
    } else if (has_rowid) {
1080
59.0k
        Slice sought_key_without_rowid =
1081
59.0k
                Slice(sought_key.get_data(), sought_key.get_size() - rowid_length);
1082
        // compare key
1083
59.0k
        if (key_without_seq.compare(sought_key_without_rowid) != 0) {
1084
0
            return Status::Error<ErrorCode::KEY_NOT_FOUND, false>("");
1085
0
        }
1086
59.0k
    }
1087
    // found the key, use rowid in pk index if necessary.
1088
3.67M
    if (has_rowid) {
1089
71.0k
        Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() +
1090
71.0k
                                          (segment_has_seq_col ? seq_col_length : 0) + 1,
1091
71.0k
                                  rowid_length - 1);
1092
71.0k
        const auto* rowid_coder = get_key_coder(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT);
1093
71.0k
        RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length,
1094
71.0k
                                                      (uint8_t*)&row_location->row_id));
1095
71.0k
    }
1096
1097
3.67M
    if (encoded_seq_value) {
1098
78
        if (!segment_has_seq_col) {
1099
0
            *encoded_seq_value = std::string {};
1100
78
        } else {
1101
            // include marker
1102
78
            *encoded_seq_value =
1103
78
                    Slice(sought_key.get_data() + sought_key_without_seq.get_size(), seq_col_length)
1104
78
                            .to_string();
1105
78
        }
1106
78
    }
1107
3.67M
    return Status::OK();
1108
3.67M
}
1109
1110
0
Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
1111
0
    OlapReaderStatistics* null_stat = nullptr;
1112
0
    RETURN_IF_ERROR(load_pk_index_and_bf(null_stat));
1113
0
    std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
1114
0
    RETURN_IF_ERROR(_pk_index_reader->new_iterator(&iter, null_stat));
1115
1116
0
    auto index_type = DataTypeFactory::instance().create_data_type(_pk_index_reader->type(), 1, 0);
1117
0
    auto index_column = index_type->create_column();
1118
0
    RETURN_IF_ERROR(iter->seek_to_ordinal(row_id));
1119
0
    size_t num_read = 1;
1120
0
    RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
1121
0
    CHECK(num_read == 1);
1122
    // trim row id
1123
0
    if (_tablet_schema->cluster_key_uids().empty()) {
1124
0
        *key = index_column->get_data_at(0).to_string();
1125
0
    } else {
1126
0
        Slice sought_key =
1127
0
                Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
1128
0
        Slice sought_key_without_rowid =
1129
0
                Slice(sought_key.get_data(),
1130
0
                      sought_key.get_size() - PrimaryKeyIndexReader::ROW_ID_LENGTH);
1131
0
        *key = sought_key_without_rowid.to_string();
1132
0
    }
1133
0
    return Status::OK();
1134
0
}
1135
1136
Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
1137
                                       const std::vector<uint32_t>& row_ids,
1138
                                       MutableColumnPtr& result,
1139
                                       StorageReadOptions& storage_read_options,
1140
13.0k
                                       std::unique_ptr<ColumnIterator>& iterator_hint) {
1141
13.0k
    if (row_ids.empty()) {
1142
0
        return Status::OK();
1143
0
    }
1144
13.0k
    DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end()));
1145
13.0k
    DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) == row_ids.end());
1146
    // ColumnIterator::seek_and_read expects monotonically increasing row_ids without
1147
    // duplicates for correct ordinal scanning. Enforce this contract at the entry point.
1148
13.0k
    segment_v2::ColumnIteratorOptions opt {
1149
13.0k
            .use_page_cache = !config::disable_storage_page_cache,
1150
13.0k
            .file_reader = file_reader().get(),
1151
13.0k
            .stats = storage_read_options.stats,
1152
13.0k
            .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
1153
13.0k
                                     .file_cache_stats =
1154
13.0k
                                             &storage_read_options.stats->file_cache_stats},
1155
13.0k
    };
1156
1157
13.0k
    if (!slot->column_paths().empty()) {
1158
        // here need create column readers to make sure column reader is created before seek_and_read_by_rowid
1159
        // if segment cache miss, column reader will be created to make sure the variant column result not coredump
1160
254
        RETURN_IF_ERROR(_create_column_meta_once(storage_read_options.stats));
1161
1162
254
        const auto& dt_variant =
1163
254
                assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()));
1164
254
        TabletColumn column = TabletColumn::create_materialized_variant_column(
1165
254
                schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
1166
254
                slot->col_unique_id(), dt_variant.variant_max_subcolumns_count(),
1167
254
                dt_variant.enable_doc_mode());
1168
254
        auto storage_type = get_data_type_of(column, storage_read_options);
1169
254
        MutableColumnPtr file_storage_column = storage_type->create_column();
1170
254
        DCHECK(storage_type != nullptr);
1171
1172
254
        if (iterator_hint == nullptr) {
1173
254
            RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_options));
1174
254
            RETURN_IF_ERROR(iterator_hint->init(opt));
1175
254
        }
1176
254
        RETURN_IF_ERROR(
1177
254
                iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), file_storage_column));
1178
254
        ColumnPtr source_ptr;
1179
        // storage may have different type with schema, so we need to cast the column
1180
254
        RETURN_IF_ERROR(variant_util::cast_column(
1181
254
                ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type, column.name()),
1182
254
                slot->type(), &source_ptr));
1183
254
        RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, row_ids.size()));
1184
12.7k
    } else {
1185
12.7k
        int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
1186
12.7k
                                                 : schema.field_index(slot->col_name());
1187
12.7k
        if (index < 0) {
1188
0
            std::stringstream ss;
1189
0
            ss << "field name is invalid. field=" << slot->col_name()
1190
0
               << ", field_name_to_index=" << schema.get_all_field_names();
1191
0
            return Status::InternalError(ss.str());
1192
0
        }
1193
12.7k
        if (iterator_hint == nullptr) {
1194
12.7k
            RETURN_IF_ERROR(new_column_iterator(schema.column(index), &iterator_hint,
1195
12.7k
                                                &storage_read_options));
1196
12.7k
            RETURN_IF_ERROR(iterator_hint->init(opt));
1197
12.7k
        }
1198
12.7k
        RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), result));
1199
12.7k
    }
1200
13.0k
    return Status::OK();
1201
13.0k
}
1202
1203
Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
1204
11.6M
                                    OlapReaderStatistics* stats) {
1205
11.6M
    std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
1206
11.6M
    if (footer_pb_shared != nullptr) {
1207
10.7M
        footer_pb = footer_pb_shared;
1208
10.7M
        return Status::OK();
1209
10.7M
    }
1210
1211
18.4E
    VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is missing, try to load it",
1212
18.4E
                              _file_reader->path().native(), _file_reader->size(),
1213
18.4E
                              _file_reader->size() - 12);
1214
1215
860k
    StoragePageCache* segment_footer_cache = ExecEnv::GetInstance()->get_storage_page_cache();
1216
860k
    DCHECK(segment_footer_cache != nullptr);
1217
1218
860k
    auto cache_key = get_segment_footer_cache_key();
1219
1220
860k
    PageCacheHandle cache_handle;
1221
1222
    // Put segment footer into index page cache.
1223
    // Rationale:
1224
    // - Footer is metadata (small, parsed with indexes), not data page payload.
1225
    // - Using PageTypePB::INDEX_PAGE keeps it under the same eviction policy/shards
1226
    //   as other index/metadata pages and avoids competing with DATA_PAGE budget.
1227
860k
    if (!segment_footer_cache->lookup(cache_key, &cache_handle,
1228
860k
                                      segment_v2::PageTypePB::INDEX_PAGE)) {
1229
95.0k
        RETURN_IF_ERROR(_parse_footer(footer_pb_shared, stats));
1230
95.0k
        segment_footer_cache->insert(cache_key, footer_pb_shared, footer_pb_shared->ByteSizeLong(),
1231
95.0k
                                     &cache_handle, segment_v2::PageTypePB::INDEX_PAGE);
1232
765k
    } else {
1233
18.4E
        VLOG_DEBUG << fmt::format("Segment footer of {}:{}:{} is found in cache",
1234
18.4E
                                  _file_reader->path().native(), _file_reader->size(),
1235
18.4E
                                  _file_reader->size() - 12);
1236
765k
    }
1237
860k
    footer_pb_shared = cache_handle.get<std::shared_ptr<SegmentFooterPB>>();
1238
860k
    _footer_pb = footer_pb_shared;
1239
860k
    footer_pb = footer_pb_shared;
1240
860k
    return Status::OK();
1241
860k
}
1242
1243
894k
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key() const {
1244
894k
    DCHECK(_file_reader != nullptr);
1245
    // The footer is always at the end of the segment file.
1246
    // The size of footer is 12.
1247
    // So we use the size of file minus 12 as the cache key, which is unique for each segment file.
1248
894k
    return get_segment_footer_cache_key(_file_reader);
1249
894k
}
1250
1251
StoragePageCache::CacheKey Segment::get_segment_footer_cache_key(
1252
893k
        const io::FileReaderSPtr& file_reader) {
1253
893k
    return {file_reader->path().native(), file_reader->size(),
1254
893k
            static_cast<int64_t>(file_reader->size() - 12)};
1255
893k
}
1256
1257
} // namespace doris::segment_v2