be/src/exec/spill/spill_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_reader.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | #include "common/cast_set.h" |
25 | | #include "common/exception.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exec/spill/spill_stream_manager.h" |
28 | | #include "io/file_factory.h" |
29 | | #include "io/fs/file_reader.h" |
30 | | #include "io/fs/local_file_system.h" |
31 | | #include "runtime/exec_env.h" |
32 | | #include "util/slice.h" |
33 | | namespace doris { |
34 | | #include "common/compile_check_begin.h" |
35 | | namespace io { |
36 | | class FileSystem; |
37 | | } // namespace io |
38 | | |
39 | 23.2k | Status SpillReader::open() { |
40 | 23.2k | if (file_reader_) { |
41 | 22.6k | return Status::OK(); |
42 | 22.6k | } |
43 | | |
44 | 627 | SCOPED_TIMER(_read_file_timer); |
45 | | |
46 | 627 | COUNTER_UPDATE(_read_file_count, 1); |
47 | | |
48 | 627 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_, &file_reader_)); |
49 | | |
50 | 627 | size_t file_size = file_reader_->size(); |
51 | 627 | DCHECK(file_size >= 16); // max_sub_block_size, block count |
52 | | |
53 | 627 | Slice result((char*)&block_count_, sizeof(size_t)); |
54 | | |
55 | 627 | size_t total_read_bytes = 0; |
56 | | // read block count |
57 | 627 | size_t bytes_read = 0; |
58 | 627 | RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read)); |
59 | 627 | DCHECK(bytes_read == 8); // max_sub_block_size, block count |
60 | 627 | total_read_bytes += bytes_read; |
61 | 627 | if (_resource_ctx) { |
62 | 627 | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read); |
63 | 627 | } |
64 | | |
65 | | // read max sub block size |
66 | 627 | bytes_read = 0; |
67 | 627 | result.data = (char*)&max_sub_block_size_; |
68 | 627 | RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); |
69 | 627 | DCHECK(bytes_read == 8); // max_sub_block_size, block count |
70 | 627 | total_read_bytes += bytes_read; |
71 | 627 | if (_resource_ctx) { |
72 | 627 | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(bytes_read); |
73 | 627 | } |
74 | | |
75 | 627 | size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_); |
76 | 627 | read_buff_.reserve(buff_size); |
77 | | |
78 | | // read block start offsets |
79 | 627 | size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t); |
80 | 627 | result.data = read_buff_.data(); |
81 | 627 | result.size = block_count_ * sizeof(size_t); |
82 | | |
83 | 627 | RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read)); |
84 | 627 | DCHECK(bytes_read == block_count_ * sizeof(size_t)); |
85 | 627 | total_read_bytes += bytes_read; |
86 | 627 | COUNTER_UPDATE(_read_file_size, total_read_bytes); |
87 | 627 | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes); |
88 | 627 | if (_resource_ctx) { |
89 | 627 | _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read); |
90 | 627 | } |
91 | | |
92 | 627 | block_start_offsets_.resize(block_count_ + 1); |
93 | 23.3k | for (size_t i = 0; i < block_count_; ++i) { |
94 | 22.7k | block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t)); |
95 | 22.7k | } |
96 | 627 | block_start_offsets_[block_count_] = file_size - (block_count_ + 2) * sizeof(size_t); |
97 | | |
98 | 627 | return Status::OK(); |
99 | 627 | } |
100 | | |
101 | 0 | void SpillReader::seek(size_t block_index) { |
102 | 0 | DCHECK_LT(block_index, block_count_); |
103 | 0 | read_block_index_ = block_index; |
104 | 0 | } |
105 | | |
106 | 23.3k | Status SpillReader::read(Block* block, bool* eos) { |
107 | 23.3k | DCHECK(file_reader_); |
108 | 23.3k | block->clear_column_data(); |
109 | | |
110 | 23.3k | if (read_block_index_ >= block_count_) { |
111 | 627 | *eos = true; |
112 | 627 | return Status::OK(); |
113 | 627 | } |
114 | | |
115 | 22.7k | size_t bytes_to_read = |
116 | 22.7k | block_start_offsets_[read_block_index_ + 1] - block_start_offsets_[read_block_index_]; |
117 | | |
118 | 22.7k | if (bytes_to_read == 0) { |
119 | 0 | ++read_block_index_; |
120 | 0 | return Status::OK(); |
121 | 0 | } |
122 | | |
123 | 22.7k | Slice result(read_buff_.data(), bytes_to_read); |
124 | 22.7k | size_t bytes_read = 0; |
125 | 22.7k | { |
126 | 22.7k | SCOPED_TIMER(_read_file_timer); |
127 | 22.7k | RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], result, |
128 | 22.7k | &bytes_read)); |
129 | 22.7k | } |
130 | 22.7k | DCHECK(bytes_read == bytes_to_read); |
131 | | |
132 | 22.7k | if (bytes_read > 0) { |
133 | 22.7k | COUNTER_UPDATE(_read_file_size, bytes_read); |
134 | 22.7k | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read); |
135 | 22.7k | if (_resource_ctx) { |
136 | 22.7k | _resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read); |
137 | 22.7k | } |
138 | 22.7k | COUNTER_UPDATE(_read_block_count, 1); |
139 | 22.7k | { |
140 | 22.7k | SCOPED_TIMER(_deserialize_timer); |
141 | 22.7k | if (!pb_block_.ParseFromArray(result.data, cast_set<int>(result.size))) { |
142 | 0 | return Status::InternalError("Failed to read spilled block"); |
143 | 0 | } |
144 | 22.7k | size_t uncompressed_size = 0; |
145 | 22.7k | int64_t uncompressed_time = 0; |
146 | 22.7k | RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size, &uncompressed_time)); |
147 | 22.7k | } |
148 | 22.7k | COUNTER_UPDATE(_read_block_data_size, block->bytes()); |
149 | 22.7k | COUNTER_UPDATE(_read_rows_count, block->rows()); |
150 | 22.7k | } else { |
151 | 0 | block->clear_column_data(); |
152 | 0 | } |
153 | | |
154 | 22.7k | ++read_block_index_; |
155 | | |
156 | 22.7k | return Status::OK(); |
157 | 22.7k | } |
158 | | |
159 | 720 | Status SpillReader::close() { |
160 | 720 | if (!file_reader_) { |
161 | 93 | return Status::OK(); |
162 | 93 | } |
163 | 627 | (void)file_reader_->close(); |
164 | 627 | file_reader_.reset(); |
165 | 627 | return Status::OK(); |
166 | 720 | } |
167 | | |
168 | | } // namespace doris |