Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file_reader.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
#include "common/cast_set.h"
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "exec/spill/spill_file_manager.h"
28
#include "io/file_factory.h"
29
#include "io/fs/file_reader.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_state.h"
34
#include "util/debug_points.h"
35
#include "util/slice.h"
36
namespace doris {
37
namespace io {
38
class FileSystem;
39
} // namespace io
40
41
SpillFileReader::SpillFileReader(RuntimeState* state, RuntimeProfile* profile,
42
                                 std::string spill_dir, size_t part_count)
43
185
        : _spill_dir(std::move(spill_dir)),
44
185
          _part_count(part_count),
45
185
          _resource_ctx(state->get_query_ctx()->resource_ctx()) {
46
    // Internalize counter setup
47
185
    RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
48
185
    DCHECK(custom_profile != nullptr);
49
185
    _read_file_timer = custom_profile->get_counter("SpillReadFileTime");
50
185
    _deserialize_timer = custom_profile->get_counter("SpillReadDerializeBlockTime");
51
185
    _read_block_count = custom_profile->get_counter("SpillReadBlockCount");
52
185
    _read_block_data_size = custom_profile->get_counter("SpillReadBlockBytes");
53
185
    _read_file_size = custom_profile->get_counter("SpillReadFileBytes");
54
185
    _read_rows_count = custom_profile->get_counter("SpillReadRows");
55
185
    _read_file_count = custom_profile->get_counter("SpillReadFileCount");
56
185
}
57
58
186
Status SpillFileReader::open() {
59
186
    if (_is_open || _part_count == 0) {
60
23
        return Status::OK();
61
23
    }
62
163
    RETURN_IF_ERROR(_open_part(0));
63
158
    _is_open = true;
64
158
    return Status::OK();
65
163
}
66
67
176
Status SpillFileReader::_open_part(size_t part_index) {
68
176
    _close_current_part();
69
70
176
    _current_part_index = part_index;
71
176
    _part_opened = true;
72
176
    std::string part_path = _spill_dir + "/" + std::to_string(part_index);
73
74
176
    SCOPED_TIMER(_read_file_timer);
75
176
    COUNTER_UPDATE(_read_file_count, 1);
76
176
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(part_path, &_file_reader));
77
78
171
    size_t file_size = _file_reader->size();
79
171
    DCHECK(file_size >= 16); // max_sub_block_size + block count
80
81
171
    Slice result((char*)&_part_block_count, sizeof(size_t));
82
83
    // read block count
84
171
    size_t bytes_read = 0;
85
171
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t), result, &bytes_read));
86
171
    DCHECK(bytes_read == 8);
87
88
    // read max sub block size
89
171
    bytes_read = 0;
90
171
    result.data = (char*)&_part_max_sub_block_size;
91
171
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read));
92
171
    DCHECK(bytes_read == 8);
93
94
    // The buffer is used for two purposes:
95
    // 1. Reading the block start offsets array (needs _part_block_count * sizeof(size_t) bytes)
96
    // 2. Reading a single block's serialized data (needs up to _part_max_sub_block_size bytes)
97
    // We must ensure the buffer is large enough for either case, so take the maximum.
98
171
    size_t buff_size = std::max(_part_block_count * sizeof(size_t), _part_max_sub_block_size);
99
171
    if (buff_size > _read_buff.size()) {
100
171
        _read_buff.reserve(buff_size);
101
171
    }
102
103
    // Read the block start offsets array from the end of the file.
104
    // The file layout (from end backwards) is:
105
    //   [block count (size_t)]
106
    //   [max sub block size (size_t)]
107
    //   [block start offsets array (_part_block_count * size_t)]
108
    // So the offsets array starts at:
109
    //   file_size - (_part_block_count + 2) * sizeof(size_t)
110
171
    size_t read_offset = file_size - (_part_block_count + 2) * sizeof(size_t);
111
171
    result.data = _read_buff.data();
112
171
    result.size = _part_block_count * sizeof(size_t);
113
114
171
    RETURN_IF_ERROR(_file_reader->read_at(read_offset, result, &bytes_read));
115
171
    DCHECK(bytes_read == _part_block_count * sizeof(size_t));
116
117
171
    _block_start_offsets.resize(_part_block_count + 1);
118
529
    for (size_t i = 0; i < _part_block_count; ++i) {
119
358
        _block_start_offsets[i] = *(size_t*)(result.data + i * sizeof(size_t));
120
358
    }
121
171
    _block_start_offsets[_part_block_count] = file_size - (_part_block_count + 2) * sizeof(size_t);
122
123
171
    _part_read_block_index = 0;
124
171
    return Status::OK();
125
171
}
126
127
489
void SpillFileReader::_close_current_part() {
128
489
    if (_file_reader) {
129
171
        (void)_file_reader->close();
130
171
        _file_reader.reset();
131
171
    }
132
489
    _part_block_count = 0;
133
489
    _part_read_block_index = 0;
134
489
    _part_max_sub_block_size = 0;
135
489
    _block_start_offsets.clear();
136
489
    _part_opened = false;
137
489
}
138
139
479
Status SpillFileReader::read(Block* block, bool* eos) {
140
479
    DBUG_EXECUTE_IF("fault_inject::spill_file::read_next_block", {
141
479
        return Status::InternalError("fault_inject spill_file read_next_block failed");
142
479
    });
143
476
    block->clear_column_data();
144
145
476
    if (_part_count == 0) {
146
23
        *eos = true;
147
23
        return Status::OK();
148
23
    }
149
150
    // Advance to next part if current part is exhausted
151
463
    while (_part_read_block_index >= _part_block_count) {
152
132
        size_t next_part = _part_opened ? _current_part_index + 1 : 0;
153
132
        if (next_part >= _part_count) {
154
122
            *eos = true;
155
122
            return Status::OK();
156
122
        }
157
10
        RETURN_IF_ERROR(_open_part(next_part));
158
10
    }
159
160
331
    size_t bytes_to_read = _block_start_offsets[_part_read_block_index + 1] -
161
331
                           _block_start_offsets[_part_read_block_index];
162
163
331
    if (bytes_to_read == 0) {
164
0
        ++_part_read_block_index;
165
0
        *eos = false;
166
0
        return Status::OK();
167
0
    }
168
169
331
    Slice result(_read_buff.data(), bytes_to_read);
170
331
    size_t bytes_read = 0;
171
331
    {
172
331
        SCOPED_TIMER(_read_file_timer);
173
331
        RETURN_IF_ERROR(_file_reader->read_at(_block_start_offsets[_part_read_block_index], result,
174
331
                                              &bytes_read));
175
331
    }
176
331
    DCHECK(bytes_read == bytes_to_read);
177
178
331
    if (bytes_read > 0) {
179
331
        COUNTER_UPDATE(_read_file_size, bytes_read);
180
331
        ExecEnv::GetInstance()->spill_file_mgr()->update_spill_read_bytes(bytes_read);
181
331
        if (_resource_ctx) {
182
331
            _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
183
331
        }
184
331
        COUNTER_UPDATE(_read_block_count, 1);
185
331
        {
186
331
            SCOPED_TIMER(_deserialize_timer);
187
331
            if (!_pb_block.ParseFromArray(result.data, cast_set<int>(result.size))) {
188
0
                return Status::InternalError("Failed to read spilled block");
189
0
            }
190
331
            size_t uncompressed_size = 0;
191
331
            int64_t uncompressed_time = 0;
192
331
            RETURN_IF_ERROR(block->deserialize(_pb_block, &uncompressed_size, &uncompressed_time));
193
331
        }
194
331
        COUNTER_UPDATE(_read_block_data_size, block->bytes());
195
331
        COUNTER_UPDATE(_read_rows_count, block->rows());
196
331
    } else {
197
0
        block->clear_column_data();
198
0
    }
199
200
331
    ++_part_read_block_index;
201
331
    *eos = false;
202
331
    return Status::OK();
203
331
}
204
205
2
Status SpillFileReader::seek(size_t block_index) {
206
2
    return _seek_to_block(block_index);
207
2
}
208
209
2
Status SpillFileReader::_seek_to_block(size_t block_index) {
210
2
    if (_part_count == 0) {
211
0
        return Status::OK();
212
0
    }
213
214
2
    size_t remaining = block_index;
215
3
    for (size_t part_index = 0; part_index < _part_count; ++part_index) {
216
2
        RETURN_IF_ERROR(_open_part(part_index));
217
2
        if (remaining < _part_block_count) {
218
1
            _part_read_block_index = remaining;
219
1
            return Status::OK();
220
1
        }
221
1
        remaining -= _part_block_count;
222
1
    }
223
224
    // block_index is out of range: position reader at EOS.
225
1
    RETURN_IF_ERROR(_open_part(_part_count - 1));
226
1
    _part_read_block_index = _part_block_count;
227
1
    return Status::OK();
228
1
}
229
230
313
Status SpillFileReader::close() {
231
313
    _close_current_part();
232
313
    _is_open = false;
233
313
    return Status::OK();
234
313
}
235
236
} // namespace doris