be/src/exec/spill/spill_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_file_reader.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | #include "common/cast_set.h" |
25 | | #include "common/exception.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exec/spill/spill_file_manager.h" |
28 | | #include "io/file_factory.h" |
29 | | #include "io/fs/file_reader.h" |
30 | | #include "io/fs/local_file_system.h" |
31 | | #include "runtime/exec_env.h" |
32 | | #include "runtime/query_context.h" |
33 | | #include "runtime/runtime_state.h" |
34 | | #include "util/debug_points.h" |
35 | | #include "util/slice.h" |
36 | | namespace doris { |
37 | | #include "common/compile_check_begin.h" |
38 | | namespace io { |
39 | | class FileSystem; |
40 | | } // namespace io |
41 | | |
42 | | SpillFileReader::SpillFileReader(RuntimeState* state, RuntimeProfile* profile, |
43 | | std::string spill_dir, size_t part_count) |
44 | 191 | : _spill_dir(std::move(spill_dir)), |
45 | 191 | _part_count(part_count), |
46 | 191 | _resource_ctx(state->get_query_ctx()->resource_ctx()) { |
47 | | // Internalize counter setup |
48 | 191 | RuntimeProfile* custom_profile = profile->get_child("CustomCounters"); |
49 | 191 | DCHECK(custom_profile != nullptr); |
50 | 191 | _read_file_timer = custom_profile->get_counter("SpillReadFileTime"); |
51 | 191 | _deserialize_timer = custom_profile->get_counter("SpillReadDerializeBlockTime"); |
52 | 191 | _read_block_count = custom_profile->get_counter("SpillReadBlockCount"); |
53 | 191 | _read_block_data_size = custom_profile->get_counter("SpillReadBlockBytes"); |
54 | 191 | _read_file_size = custom_profile->get_counter("SpillReadFileBytes"); |
55 | 191 | _read_rows_count = custom_profile->get_counter("SpillReadRows"); |
56 | 191 | _read_file_count = custom_profile->get_counter("SpillReadFileCount"); |
57 | 191 | } |
58 | | |
59 | 192 | Status SpillFileReader::open() { |
60 | 192 | if (_is_open || _part_count == 0) { |
61 | 23 | return Status::OK(); |
62 | 23 | } |
63 | 169 | RETURN_IF_ERROR(_open_part(0)); |
64 | 164 | _is_open = true; |
65 | 164 | return Status::OK(); |
66 | 169 | } |
67 | | |
68 | 182 | Status SpillFileReader::_open_part(size_t part_index) { |
69 | 182 | _close_current_part(); |
70 | | |
71 | 182 | _current_part_index = part_index; |
72 | 182 | _part_opened = true; |
73 | 182 | std::string part_path = _spill_dir + "/" + std::to_string(part_index); |
74 | | |
75 | 182 | SCOPED_TIMER(_read_file_timer); |
76 | 182 | COUNTER_UPDATE(_read_file_count, 1); |
77 | 182 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(part_path, &_file_reader)); |
78 | | |
79 | 177 | size_t file_size = _file_reader->size(); |
80 | 177 | DCHECK(file_size >= 16); // max_sub_block_size + block count |
81 | | |
82 | 177 | Slice result((char*)&_part_block_count, sizeof(size_t)); |
83 | | |
84 | | // read block count |
85 | 177 | size_t bytes_read = 0; |
86 | 177 | RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t), result, &bytes_read)); |
87 | 177 | DCHECK(bytes_read == 8); |
88 | | |
89 | | // read max sub block size |
90 | 177 | bytes_read = 0; |
91 | 177 | result.data = (char*)&_part_max_sub_block_size; |
92 | 177 | RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); |
93 | 177 | DCHECK(bytes_read == 8); |
94 | | |
95 | | // The buffer is used for two purposes: |
96 | | // 1. Reading the block start offsets array (needs _part_block_count * sizeof(size_t) bytes) |
97 | | // 2. Reading a single block's serialized data (needs up to _part_max_sub_block_size bytes) |
98 | | // We must ensure the buffer is large enough for either case, so take the maximum. |
99 | 177 | size_t buff_size = std::max(_part_block_count * sizeof(size_t), _part_max_sub_block_size); |
100 | 177 | if (buff_size > _read_buff.size()) { |
101 | 177 | _read_buff.reserve(buff_size); |
102 | 177 | } |
103 | | |
104 | | // Read the block start offsets array from the end of the file. |
105 | | // The file layout (from end backwards) is: |
106 | | // [block count (size_t)] |
107 | | // [max sub block size (size_t)] |
108 | | // [block start offsets array (_part_block_count * size_t)] |
109 | | // So the offsets array starts at: |
110 | | // file_size - (_part_block_count + 2) * sizeof(size_t) |
111 | 177 | size_t read_offset = file_size - (_part_block_count + 2) * sizeof(size_t); |
112 | 177 | result.data = _read_buff.data(); |
113 | 177 | result.size = _part_block_count * sizeof(size_t); |
114 | | |
115 | 177 | RETURN_IF_ERROR(_file_reader->read_at(read_offset, result, &bytes_read)); |
116 | 177 | DCHECK(bytes_read == _part_block_count * sizeof(size_t)); |
117 | | |
118 | 177 | _block_start_offsets.resize(_part_block_count + 1); |
119 | 543 | for (size_t i = 0; i < _part_block_count; ++i) { |
120 | 366 | _block_start_offsets[i] = *(size_t*)(result.data + i * sizeof(size_t)); |
121 | 366 | } |
122 | 177 | _block_start_offsets[_part_block_count] = file_size - (_part_block_count + 2) * sizeof(size_t); |
123 | | |
124 | 177 | _part_read_block_index = 0; |
125 | 177 | return Status::OK(); |
126 | 177 | } |
127 | | |
128 | 507 | void SpillFileReader::_close_current_part() { |
129 | 507 | if (_file_reader) { |
130 | 177 | (void)_file_reader->close(); |
131 | 177 | _file_reader.reset(); |
132 | 177 | } |
133 | 507 | _part_block_count = 0; |
134 | 507 | _part_read_block_index = 0; |
135 | 507 | _part_max_sub_block_size = 0; |
136 | 507 | _block_start_offsets.clear(); |
137 | 507 | _part_opened = false; |
138 | 507 | } |
139 | | |
140 | 493 | Status SpillFileReader::read(Block* block, bool* eos) { |
141 | 493 | DBUG_EXECUTE_IF("fault_inject::spill_file::read_next_block", { |
142 | 493 | return Status::InternalError("fault_inject spill_file read_next_block failed"); |
143 | 493 | }); |
144 | 490 | block->clear_column_data(); |
145 | | |
146 | 490 | if (_part_count == 0) { |
147 | 23 | *eos = true; |
148 | 23 | return Status::OK(); |
149 | 23 | } |
150 | | |
151 | | // Advance to next part if current part is exhausted |
152 | 477 | while (_part_read_block_index >= _part_block_count) { |
153 | 138 | size_t next_part = _part_opened ? _current_part_index + 1 : 0; |
154 | 138 | if (next_part >= _part_count) { |
155 | 128 | *eos = true; |
156 | 128 | return Status::OK(); |
157 | 128 | } |
158 | 10 | RETURN_IF_ERROR(_open_part(next_part)); |
159 | 10 | } |
160 | | |
161 | 339 | size_t bytes_to_read = _block_start_offsets[_part_read_block_index + 1] - |
162 | 339 | _block_start_offsets[_part_read_block_index]; |
163 | | |
164 | 339 | if (bytes_to_read == 0) { |
165 | 0 | ++_part_read_block_index; |
166 | 0 | *eos = false; |
167 | 0 | return Status::OK(); |
168 | 0 | } |
169 | | |
170 | 339 | Slice result(_read_buff.data(), bytes_to_read); |
171 | 339 | size_t bytes_read = 0; |
172 | 339 | { |
173 | 339 | SCOPED_TIMER(_read_file_timer); |
174 | 339 | RETURN_IF_ERROR(_file_reader->read_at(_block_start_offsets[_part_read_block_index], result, |
175 | 339 | &bytes_read)); |
176 | 339 | } |
177 | 339 | DCHECK(bytes_read == bytes_to_read); |
178 | | |
179 | 339 | if (bytes_read > 0) { |
180 | 339 | COUNTER_UPDATE(_read_file_size, bytes_read); |
181 | 339 | ExecEnv::GetInstance()->spill_file_mgr()->update_spill_read_bytes(bytes_read); |
182 | 339 | if (_resource_ctx) { |
183 | 339 | _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read); |
184 | 339 | } |
185 | 339 | COUNTER_UPDATE(_read_block_count, 1); |
186 | 339 | { |
187 | 339 | SCOPED_TIMER(_deserialize_timer); |
188 | 339 | if (!_pb_block.ParseFromArray(result.data, cast_set<int>(result.size))) { |
189 | 0 | return Status::InternalError("Failed to read spilled block"); |
190 | 0 | } |
191 | 339 | size_t uncompressed_size = 0; |
192 | 339 | int64_t uncompressed_time = 0; |
193 | 339 | RETURN_IF_ERROR(block->deserialize(_pb_block, &uncompressed_size, &uncompressed_time)); |
194 | 339 | } |
195 | 339 | COUNTER_UPDATE(_read_block_data_size, block->bytes()); |
196 | 339 | COUNTER_UPDATE(_read_rows_count, block->rows()); |
197 | 339 | } else { |
198 | 0 | block->clear_column_data(); |
199 | 0 | } |
200 | | |
201 | 339 | ++_part_read_block_index; |
202 | 339 | *eos = false; |
203 | 339 | return Status::OK(); |
204 | 339 | } |
205 | | |
206 | 2 | Status SpillFileReader::seek(size_t block_index) { |
207 | 2 | return _seek_to_block(block_index); |
208 | 2 | } |
209 | | |
210 | 2 | Status SpillFileReader::_seek_to_block(size_t block_index) { |
211 | 2 | if (_part_count == 0) { |
212 | 0 | return Status::OK(); |
213 | 0 | } |
214 | | |
215 | 2 | size_t remaining = block_index; |
216 | 3 | for (size_t part_index = 0; part_index < _part_count; ++part_index) { |
217 | 2 | RETURN_IF_ERROR(_open_part(part_index)); |
218 | 2 | if (remaining < _part_block_count) { |
219 | 1 | _part_read_block_index = remaining; |
220 | 1 | return Status::OK(); |
221 | 1 | } |
222 | 1 | remaining -= _part_block_count; |
223 | 1 | } |
224 | | |
225 | | // block_index is out of range: position reader at EOS. |
226 | 1 | RETURN_IF_ERROR(_open_part(_part_count - 1)); |
227 | 1 | _part_read_block_index = _part_block_count; |
228 | 1 | return Status::OK(); |
229 | 1 | } |
230 | | |
231 | 325 | Status SpillFileReader::close() { |
232 | 325 | _close_current_part(); |
233 | 325 | _is_open = false; |
234 | 325 | return Status::OK(); |
235 | 325 | } |
236 | | |
237 | | } // namespace doris |