Coverage Report

Created: 2026-03-19 10:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file_reader.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
#include "common/cast_set.h"
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "exec/spill/spill_file_manager.h"
28
#include "io/file_factory.h"
29
#include "io/fs/file_reader.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_state.h"
34
#include "util/debug_points.h"
35
#include "util/slice.h"
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
namespace io {
39
class FileSystem;
40
} // namespace io
41
42
SpillFileReader::SpillFileReader(RuntimeState* state, RuntimeProfile* profile,
43
                                 std::string spill_dir, size_t part_count)
44
185
        : _spill_dir(std::move(spill_dir)),
45
185
          _part_count(part_count),
46
185
          _resource_ctx(state->get_query_ctx()->resource_ctx()) {
47
    // Internalize counter setup
48
185
    RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
49
185
    DCHECK(custom_profile != nullptr);
50
185
    _read_file_timer = custom_profile->get_counter("SpillReadFileTime");
51
185
    _deserialize_timer = custom_profile->get_counter("SpillReadDerializeBlockTime");
52
185
    _read_block_count = custom_profile->get_counter("SpillReadBlockCount");
53
185
    _read_block_data_size = custom_profile->get_counter("SpillReadBlockBytes");
54
185
    _read_file_size = custom_profile->get_counter("SpillReadFileBytes");
55
185
    _read_rows_count = custom_profile->get_counter("SpillReadRows");
56
185
    _read_file_count = custom_profile->get_counter("SpillReadFileCount");
57
185
}
58
59
186
Status SpillFileReader::open() {
60
186
    if (_is_open || _part_count == 0) {
61
23
        return Status::OK();
62
23
    }
63
163
    RETURN_IF_ERROR(_open_part(0));
64
158
    _is_open = true;
65
158
    return Status::OK();
66
163
}
67
68
176
Status SpillFileReader::_open_part(size_t part_index) {
69
176
    _close_current_part();
70
71
176
    _current_part_index = part_index;
72
176
    _part_opened = true;
73
176
    std::string part_path = _spill_dir + "/" + std::to_string(part_index);
74
75
176
    SCOPED_TIMER(_read_file_timer);
76
176
    COUNTER_UPDATE(_read_file_count, 1);
77
176
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(part_path, &_file_reader));
78
79
171
    size_t file_size = _file_reader->size();
80
171
    DCHECK(file_size >= 16); // max_sub_block_size + block count
81
82
171
    Slice result((char*)&_part_block_count, sizeof(size_t));
83
84
    // read block count
85
171
    size_t bytes_read = 0;
86
171
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t), result, &bytes_read));
87
171
    DCHECK(bytes_read == 8);
88
89
    // read max sub block size
90
171
    bytes_read = 0;
91
171
    result.data = (char*)&_part_max_sub_block_size;
92
171
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read));
93
171
    DCHECK(bytes_read == 8);
94
95
    // The buffer is used for two purposes:
96
    // 1. Reading the block start offsets array (needs _part_block_count * sizeof(size_t) bytes)
97
    // 2. Reading a single block's serialized data (needs up to _part_max_sub_block_size bytes)
98
    // We must ensure the buffer is large enough for either case, so take the maximum.
99
171
    size_t buff_size = std::max(_part_block_count * sizeof(size_t), _part_max_sub_block_size);
100
171
    if (buff_size > _read_buff.size()) {
101
171
        _read_buff.reserve(buff_size);
102
171
    }
103
104
    // Read the block start offsets array from the end of the file.
105
    // The file layout (from end backwards) is:
106
    //   [block count (size_t)]
107
    //   [max sub block size (size_t)]
108
    //   [block start offsets array (_part_block_count * size_t)]
109
    // So the offsets array starts at:
110
    //   file_size - (_part_block_count + 2) * sizeof(size_t)
111
171
    size_t read_offset = file_size - (_part_block_count + 2) * sizeof(size_t);
112
171
    result.data = _read_buff.data();
113
171
    result.size = _part_block_count * sizeof(size_t);
114
115
171
    RETURN_IF_ERROR(_file_reader->read_at(read_offset, result, &bytes_read));
116
171
    DCHECK(bytes_read == _part_block_count * sizeof(size_t));
117
118
171
    _block_start_offsets.resize(_part_block_count + 1);
119
527
    for (size_t i = 0; i < _part_block_count; ++i) {
120
356
        _block_start_offsets[i] = *(size_t*)(result.data + i * sizeof(size_t));
121
356
    }
122
171
    _block_start_offsets[_part_block_count] = file_size - (_part_block_count + 2) * sizeof(size_t);
123
124
171
    _part_read_block_index = 0;
125
171
    return Status::OK();
126
171
}
127
128
489
void SpillFileReader::_close_current_part() {
129
489
    if (_file_reader) {
130
171
        (void)_file_reader->close();
131
171
        _file_reader.reset();
132
171
    }
133
489
    _part_block_count = 0;
134
489
    _part_read_block_index = 0;
135
489
    _part_max_sub_block_size = 0;
136
489
    _block_start_offsets.clear();
137
489
    _part_opened = false;
138
489
}
139
140
477
Status SpillFileReader::read(Block* block, bool* eos) {
141
477
    DBUG_EXECUTE_IF("fault_inject::spill_file::read_next_block", {
142
477
        return Status::InternalError("fault_inject spill_file read_next_block failed");
143
477
    });
144
474
    block->clear_column_data();
145
146
474
    if (_part_count == 0) {
147
23
        *eos = true;
148
23
        return Status::OK();
149
23
    }
150
151
    // Advance to next part if current part is exhausted
152
461
    while (_part_read_block_index >= _part_block_count) {
153
132
        size_t next_part = _part_opened ? _current_part_index + 1 : 0;
154
132
        if (next_part >= _part_count) {
155
122
            *eos = true;
156
122
            return Status::OK();
157
122
        }
158
10
        RETURN_IF_ERROR(_open_part(next_part));
159
10
    }
160
161
329
    size_t bytes_to_read = _block_start_offsets[_part_read_block_index + 1] -
162
329
                           _block_start_offsets[_part_read_block_index];
163
164
329
    if (bytes_to_read == 0) {
165
0
        ++_part_read_block_index;
166
0
        *eos = false;
167
0
        return Status::OK();
168
0
    }
169
170
329
    Slice result(_read_buff.data(), bytes_to_read);
171
329
    size_t bytes_read = 0;
172
329
    {
173
329
        SCOPED_TIMER(_read_file_timer);
174
329
        RETURN_IF_ERROR(_file_reader->read_at(_block_start_offsets[_part_read_block_index], result,
175
329
                                              &bytes_read));
176
329
    }
177
329
    DCHECK(bytes_read == bytes_to_read);
178
179
329
    if (bytes_read > 0) {
180
329
        COUNTER_UPDATE(_read_file_size, bytes_read);
181
329
        ExecEnv::GetInstance()->spill_file_mgr()->update_spill_read_bytes(bytes_read);
182
329
        if (_resource_ctx) {
183
329
            _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
184
329
        }
185
329
        COUNTER_UPDATE(_read_block_count, 1);
186
329
        {
187
329
            SCOPED_TIMER(_deserialize_timer);
188
329
            if (!_pb_block.ParseFromArray(result.data, cast_set<int>(result.size))) {
189
0
                return Status::InternalError("Failed to read spilled block");
190
0
            }
191
329
            size_t uncompressed_size = 0;
192
329
            int64_t uncompressed_time = 0;
193
329
            RETURN_IF_ERROR(block->deserialize(_pb_block, &uncompressed_size, &uncompressed_time));
194
329
        }
195
329
        COUNTER_UPDATE(_read_block_data_size, block->bytes());
196
329
        COUNTER_UPDATE(_read_rows_count, block->rows());
197
329
    } else {
198
0
        block->clear_column_data();
199
0
    }
200
201
329
    ++_part_read_block_index;
202
329
    *eos = false;
203
329
    return Status::OK();
204
329
}
205
206
2
Status SpillFileReader::seek(size_t block_index) {
207
2
    return _seek_to_block(block_index);
208
2
}
209
210
2
Status SpillFileReader::_seek_to_block(size_t block_index) {
211
2
    if (_part_count == 0) {
212
0
        return Status::OK();
213
0
    }
214
215
2
    size_t remaining = block_index;
216
3
    for (size_t part_index = 0; part_index < _part_count; ++part_index) {
217
2
        RETURN_IF_ERROR(_open_part(part_index));
218
2
        if (remaining < _part_block_count) {
219
1
            _part_read_block_index = remaining;
220
1
            return Status::OK();
221
1
        }
222
1
        remaining -= _part_block_count;
223
1
    }
224
225
    // block_index is out of range: position reader at EOS.
226
1
    RETURN_IF_ERROR(_open_part(_part_count - 1));
227
1
    _part_read_block_index = _part_block_count;
228
1
    return Status::OK();
229
1
}
230
231
313
Status SpillFileReader::close() {
232
313
    _close_current_part();
233
313
    _is_open = false;
234
313
    return Status::OK();
235
313
}
236
237
} // namespace doris