Coverage Report

Created: 2026-03-24 16:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file_reader.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
24
#include "common/cast_set.h"
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "exec/spill/spill_file_manager.h"
28
#include "io/file_factory.h"
29
#include "io/fs/file_reader.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_state.h"
34
#include "util/debug_points.h"
35
#include "util/slice.h"
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
namespace io {
39
class FileSystem;
40
} // namespace io
41
42
SpillFileReader::SpillFileReader(RuntimeState* state, RuntimeProfile* profile,
43
                                 std::string spill_dir, size_t part_count)
44
199
        : _spill_dir(std::move(spill_dir)),
45
199
          _part_count(part_count),
46
199
          _resource_ctx(state->get_query_ctx()->resource_ctx()) {
47
    // Internalize counter setup
48
199
    RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
49
199
    DCHECK(custom_profile != nullptr);
50
199
    _read_file_timer = custom_profile->get_counter("SpillReadFileTime");
51
199
    _deserialize_timer = custom_profile->get_counter("SpillReadDerializeBlockTime");
52
199
    _read_block_count = custom_profile->get_counter("SpillReadBlockCount");
53
199
    _read_block_data_size = custom_profile->get_counter("SpillReadBlockBytes");
54
199
    _read_file_size = custom_profile->get_counter("SpillReadFileBytes");
55
199
    _read_rows_count = custom_profile->get_counter("SpillReadRows");
56
199
    _read_file_count = custom_profile->get_counter("SpillReadFileCount");
57
199
}
58
59
200
Status SpillFileReader::open() {
60
200
    if (_is_open || _part_count == 0) {
61
23
        return Status::OK();
62
23
    }
63
177
    RETURN_IF_ERROR(_open_part(0));
64
172
    _is_open = true;
65
172
    return Status::OK();
66
177
}
67
68
190
Status SpillFileReader::_open_part(size_t part_index) {
69
190
    _close_current_part();
70
71
190
    _current_part_index = part_index;
72
190
    _part_opened = true;
73
190
    std::string part_path = _spill_dir + "/" + std::to_string(part_index);
74
75
190
    SCOPED_TIMER(_read_file_timer);
76
190
    COUNTER_UPDATE(_read_file_count, 1);
77
190
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(part_path, &_file_reader));
78
79
185
    size_t file_size = _file_reader->size();
80
185
    DCHECK(file_size >= 16); // max_sub_block_size + block count
81
82
185
    Slice result((char*)&_part_block_count, sizeof(size_t));
83
84
    // read block count
85
185
    size_t bytes_read = 0;
86
185
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t), result, &bytes_read));
87
185
    DCHECK(bytes_read == 8);
88
89
    // read max sub block size
90
185
    bytes_read = 0;
91
185
    result.data = (char*)&_part_max_sub_block_size;
92
185
    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read));
93
185
    DCHECK(bytes_read == 8);
94
95
    // The buffer is used for two purposes:
96
    // 1. Reading the block start offsets array (needs _part_block_count * sizeof(size_t) bytes)
97
    // 2. Reading a single block's serialized data (needs up to _part_max_sub_block_size bytes)
98
    // We must ensure the buffer is large enough for either case, so take the maximum.
99
185
    size_t buff_size = std::max(_part_block_count * sizeof(size_t), _part_max_sub_block_size);
100
185
    if (buff_size > _read_buff.size()) {
101
185
        _read_buff.reserve(buff_size);
102
185
    }
103
104
    // Read the block start offsets array from the end of the file.
105
    // The file layout (from end backwards) is:
106
    //   [block count (size_t)]
107
    //   [max sub block size (size_t)]
108
    //   [block start offsets array (_part_block_count * size_t)]
109
    // So the offsets array starts at:
110
    //   file_size - (_part_block_count + 2) * sizeof(size_t)
111
185
    size_t read_offset = file_size - (_part_block_count + 2) * sizeof(size_t);
112
185
    result.data = _read_buff.data();
113
185
    result.size = _part_block_count * sizeof(size_t);
114
115
185
    RETURN_IF_ERROR(_file_reader->read_at(read_offset, result, &bytes_read));
116
185
    DCHECK(bytes_read == _part_block_count * sizeof(size_t));
117
118
185
    _block_start_offsets.resize(_part_block_count + 1);
119
641
    for (size_t i = 0; i < _part_block_count; ++i) {
120
456
        _block_start_offsets[i] = *(size_t*)(result.data + i * sizeof(size_t));
121
456
    }
122
185
    _block_start_offsets[_part_block_count] = file_size - (_part_block_count + 2) * sizeof(size_t);
123
124
185
    _part_read_block_index = 0;
125
185
    return Status::OK();
126
185
}
127
128
523
void SpillFileReader::_close_current_part() {
129
523
    if (_file_reader) {
130
185
        (void)_file_reader->close();
131
185
        _file_reader.reset();
132
185
    }
133
523
    _part_block_count = 0;
134
523
    _part_read_block_index = 0;
135
523
    _part_max_sub_block_size = 0;
136
523
    _block_start_offsets.clear();
137
523
    _part_opened = false;
138
523
}
139
140
591
Status SpillFileReader::read(Block* block, bool* eos) {
141
591
    DBUG_EXECUTE_IF("fault_inject::spill_file::read_next_block", {
142
591
        return Status::InternalError("fault_inject spill_file read_next_block failed");
143
591
    });
144
588
    block->clear_column_data();
145
146
588
    if (_part_count == 0) {
147
23
        *eos = true;
148
23
        return Status::OK();
149
23
    }
150
151
    // Advance to next part if current part is exhausted
152
575
    while (_part_read_block_index >= _part_block_count) {
153
146
        size_t next_part = _part_opened ? _current_part_index + 1 : 0;
154
146
        if (next_part >= _part_count) {
155
136
            *eos = true;
156
136
            return Status::OK();
157
136
        }
158
10
        RETURN_IF_ERROR(_open_part(next_part));
159
10
    }
160
161
429
    size_t bytes_to_read = _block_start_offsets[_part_read_block_index + 1] -
162
429
                           _block_start_offsets[_part_read_block_index];
163
164
429
    if (bytes_to_read == 0) {
165
0
        ++_part_read_block_index;
166
0
        *eos = false;
167
0
        return Status::OK();
168
0
    }
169
170
429
    Slice result(_read_buff.data(), bytes_to_read);
171
429
    size_t bytes_read = 0;
172
429
    {
173
429
        SCOPED_TIMER(_read_file_timer);
174
429
        RETURN_IF_ERROR(_file_reader->read_at(_block_start_offsets[_part_read_block_index], result,
175
429
                                              &bytes_read));
176
429
    }
177
429
    DCHECK(bytes_read == bytes_to_read);
178
179
429
    if (bytes_read > 0) {
180
429
        COUNTER_UPDATE(_read_file_size, bytes_read);
181
429
        ExecEnv::GetInstance()->spill_file_mgr()->update_spill_read_bytes(bytes_read);
182
429
        if (_resource_ctx) {
183
429
            _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
184
429
        }
185
429
        COUNTER_UPDATE(_read_block_count, 1);
186
429
        {
187
429
            SCOPED_TIMER(_deserialize_timer);
188
429
            if (!_pb_block.ParseFromArray(result.data, cast_set<int>(result.size))) {
189
0
                return Status::InternalError("Failed to read spilled block");
190
0
            }
191
429
            size_t uncompressed_size = 0;
192
429
            int64_t uncompressed_time = 0;
193
429
            RETURN_IF_ERROR(block->deserialize(_pb_block, &uncompressed_size, &uncompressed_time));
194
429
        }
195
429
        COUNTER_UPDATE(_read_block_data_size, block->bytes());
196
429
        COUNTER_UPDATE(_read_rows_count, block->rows());
197
429
    } else {
198
0
        block->clear_column_data();
199
0
    }
200
201
429
    ++_part_read_block_index;
202
429
    *eos = false;
203
429
    return Status::OK();
204
429
}
205
206
2
Status SpillFileReader::seek(size_t block_index) {
207
2
    return _seek_to_block(block_index);
208
2
}
209
210
2
Status SpillFileReader::_seek_to_block(size_t block_index) {
211
2
    if (_part_count == 0) {
212
0
        return Status::OK();
213
0
    }
214
215
2
    size_t remaining = block_index;
216
3
    for (size_t part_index = 0; part_index < _part_count; ++part_index) {
217
2
        RETURN_IF_ERROR(_open_part(part_index));
218
2
        if (remaining < _part_block_count) {
219
1
            _part_read_block_index = remaining;
220
1
            return Status::OK();
221
1
        }
222
1
        remaining -= _part_block_count;
223
1
    }
224
225
    // block_index is out of range: position reader at EOS.
226
1
    RETURN_IF_ERROR(_open_part(_part_count - 1));
227
1
    _part_read_block_index = _part_block_count;
228
1
    return Status::OK();
229
1
}
230
231
333
Status SpillFileReader::close() {
232
333
    _close_current_part();
233
333
    _is_open = false;
234
333
    return Status::OK();
235
333
}
236
237
} // namespace doris