Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_reader.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
#include "common/cast_set.h"
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "exec/spill/spill_stream_manager.h"
28
#include "io/file_factory.h"
29
#include "io/fs/file_reader.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "util/slice.h"
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
namespace io {
36
class FileSystem;
37
} // namespace io
38
39
23.2k
Status SpillReader::open() {
40
23.2k
    if (file_reader_) {
41
22.6k
        return Status::OK();
42
22.6k
    }
43
44
627
    SCOPED_TIMER(_read_file_timer);
45
46
627
    COUNTER_UPDATE(_read_file_count, 1);
47
48
627
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_, &file_reader_));
49
50
627
    size_t file_size = file_reader_->size();
51
627
    DCHECK(file_size >= 16); // max_sub_block_size, block count
52
53
627
    Slice result((char*)&block_count_, sizeof(size_t));
54
55
627
    size_t total_read_bytes = 0;
56
    // read block count
57
627
    size_t bytes_read = 0;
58
627
    RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read));
59
627
    DCHECK(bytes_read == 8); // max_sub_block_size, block count
60
627
    total_read_bytes += bytes_read;
61
627
    if (_resource_ctx) {
62
627
        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read);
63
627
    }
64
65
    // read max sub block size
66
627
    bytes_read = 0;
67
627
    result.data = (char*)&max_sub_block_size_;
68
627
    RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read));
69
627
    DCHECK(bytes_read == 8); // max_sub_block_size, block count
70
627
    total_read_bytes += bytes_read;
71
627
    if (_resource_ctx) {
72
627
        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read);
73
627
    }
74
75
627
    size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_);
76
627
    read_buff_.reserve(buff_size);
77
78
    // read block start offsets
79
627
    size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t);
80
627
    result.data = read_buff_.data();
81
627
    result.size = block_count_ * sizeof(size_t);
82
83
627
    RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
84
627
    DCHECK(bytes_read == block_count_ * sizeof(size_t));
85
627
    total_read_bytes += bytes_read;
86
627
    COUNTER_UPDATE(_read_file_size, total_read_bytes);
87
627
    ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes);
88
627
    if (_resource_ctx) {
89
627
        _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
90
627
    }
91
92
627
    block_start_offsets_.resize(block_count_ + 1);
93
23.3k
    for (size_t i = 0; i < block_count_; ++i) {
94
22.7k
        block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t));
95
22.7k
    }
96
627
    block_start_offsets_[block_count_] = file_size - (block_count_ + 2) * sizeof(size_t);
97
98
627
    return Status::OK();
99
627
}
100
101
0
void SpillReader::seek(size_t block_index) {
102
0
    DCHECK_LT(block_index, block_count_);
103
0
    read_block_index_ = block_index;
104
0
}
105
106
23.3k
Status SpillReader::read(Block* block, bool* eos) {
107
23.3k
    DCHECK(file_reader_);
108
23.3k
    block->clear_column_data();
109
110
23.3k
    if (read_block_index_ >= block_count_) {
111
627
        *eos = true;
112
627
        return Status::OK();
113
627
    }
114
115
22.7k
    size_t bytes_to_read =
116
22.7k
            block_start_offsets_[read_block_index_ + 1] - block_start_offsets_[read_block_index_];
117
118
22.7k
    if (bytes_to_read == 0) {
119
0
        ++read_block_index_;
120
0
        return Status::OK();
121
0
    }
122
123
22.7k
    Slice result(read_buff_.data(), bytes_to_read);
124
22.7k
    size_t bytes_read = 0;
125
22.7k
    {
126
22.7k
        SCOPED_TIMER(_read_file_timer);
127
22.7k
        RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], result,
128
22.7k
                                              &bytes_read));
129
22.7k
    }
130
22.7k
    DCHECK(bytes_read == bytes_to_read);
131
132
22.7k
    if (bytes_read > 0) {
133
22.7k
        COUNTER_UPDATE(_read_file_size, bytes_read);
134
22.7k
        ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read);
135
22.7k
        if (_resource_ctx) {
136
22.7k
            _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
137
22.7k
        }
138
22.7k
        COUNTER_UPDATE(_read_block_count, 1);
139
22.7k
        {
140
22.7k
            SCOPED_TIMER(_deserialize_timer);
141
22.7k
            if (!pb_block_.ParseFromArray(result.data, cast_set<int>(result.size))) {
142
0
                return Status::InternalError("Failed to read spilled block");
143
0
            }
144
22.7k
            size_t uncompressed_size = 0;
145
22.7k
            int64_t uncompressed_time = 0;
146
22.7k
            RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size, &uncompressed_time));
147
22.7k
        }
148
22.7k
        COUNTER_UPDATE(_read_block_data_size, block->bytes());
149
22.7k
        COUNTER_UPDATE(_read_rows_count, block->rows());
150
22.7k
    } else {
151
0
        block->clear_column_data();
152
0
    }
153
154
22.7k
    ++read_block_index_;
155
156
22.7k
    return Status::OK();
157
22.7k
}
158
159
720
Status SpillReader::close() {
160
720
    if (!file_reader_) {
161
93
        return Status::OK();
162
93
    }
163
627
    (void)file_reader_->close();
164
627
    file_reader_.reset();
165
627
    return Status::OK();
166
720
}
167
168
} // namespace doris