be/src/exec/spill/spill_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_writer.h" |
19 | | |
20 | | #include "agent/be_exec_version_manager.h" |
21 | | #include "common/status.h" |
22 | | #include "exec/spill/spill_stream_manager.h" |
23 | | #include "io/fs/local_file_system.h" |
24 | | #include "io/fs/local_file_writer.h" |
25 | | #include "runtime/exec_env.h" |
26 | | #include "runtime/runtime_state.h" |
27 | | #include "runtime/thread_context.h" |
28 | | |
29 | | namespace doris { |
30 | | #include "common/compile_check_begin.h" |
31 | 2.45k | Status SpillWriter::open() { |
32 | 2.45k | if (file_writer_) { |
33 | 0 | return Status::OK(); |
34 | 0 | } |
35 | 2.45k | return io::global_local_filesystem()->create_file(file_path_, &file_writer_); |
36 | 2.45k | } |
37 | | |
38 | 2.42k | Status SpillWriter::close() { |
39 | 2.42k | if (closed_ || !file_writer_) { |
40 | 0 | return Status::OK(); |
41 | 0 | } |
42 | 2.42k | closed_ = true; |
43 | | |
44 | 2.42k | meta_.append((const char*)&max_sub_block_size_, sizeof(max_sub_block_size_)); |
45 | 2.42k | meta_.append((const char*)&written_blocks_, sizeof(written_blocks_)); |
46 | | |
47 | | // meta: block1 offset, block2 offset, ..., blockn offset, max_sub_block_size, n |
48 | 2.42k | { |
49 | 2.42k | SCOPED_TIMER(_write_file_timer); |
50 | 2.42k | RETURN_IF_ERROR(file_writer_->append(meta_)); |
51 | 2.42k | } |
52 | | |
53 | 2.42k | total_written_bytes_ += meta_.size(); |
54 | 2.42k | COUNTER_UPDATE(_write_file_total_size, meta_.size()); |
55 | 2.42k | if (_resource_ctx) { |
56 | 2.42k | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_.size()); |
57 | 2.42k | } |
58 | 2.42k | if (_write_file_current_size) { |
59 | 34 | COUNTER_UPDATE(_write_file_current_size, meta_.size()); |
60 | 34 | } |
61 | 2.42k | data_dir_->update_spill_data_usage(meta_.size()); |
62 | 2.42k | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size()); |
63 | | |
64 | 2.42k | RETURN_IF_ERROR(file_writer_->close()); |
65 | | |
66 | 2.42k | file_writer_.reset(); |
67 | 2.42k | return Status::OK(); |
68 | 2.42k | } |
69 | | |
70 | 11.9k | Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& written_bytes) { |
71 | 11.9k | written_bytes = 0; |
72 | 11.9k | DCHECK(file_writer_); |
73 | 11.9k | auto rows = block.rows(); |
74 | 11.9k | COUNTER_UPDATE(_write_rows_counter, rows); |
75 | 11.9k | COUNTER_UPDATE(_write_block_bytes_counter, block.bytes()); |
76 | | // file format: block1, block2, ..., blockn, meta |
77 | 11.9k | if (rows <= batch_size_) { |
78 | 11.9k | return _write_internal(block, written_bytes); |
79 | 11.9k | } else { |
80 | 0 | auto tmp_block = block.clone_empty(); |
81 | 0 | const auto& src_data = block.get_columns_with_type_and_name(); |
82 | |
|
83 | 0 | for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) { |
84 | 0 | tmp_block.clear_column_data(); |
85 | |
|
86 | 0 | const auto& dst_data = tmp_block.get_columns_with_type_and_name(); |
87 | |
|
88 | 0 | size_t block_rows = std::min(rows - row_idx, batch_size_); |
89 | 0 | RETURN_IF_CATCH_EXCEPTION({ |
90 | 0 | for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) { |
91 | 0 | dst_data[col_idx].column->assume_mutable()->insert_range_from( |
92 | 0 | *src_data[col_idx].column, row_idx, block_rows); |
93 | 0 | } |
94 | 0 | }); |
95 | | |
96 | 0 | int64_t tmp_blcok_mem = tmp_block.allocated_bytes(); |
97 | 0 | COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem); |
98 | 0 | Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -tmp_blcok_mem); }}; |
99 | 0 | RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes)); |
100 | | |
101 | 0 | row_idx += block_rows; |
102 | 0 | } |
103 | 0 | return Status::OK(); |
104 | 0 | } |
105 | 11.9k | } |
106 | | |
107 | 11.9k | Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { |
108 | 11.9k | size_t uncompressed_bytes = 0, compressed_bytes = 0; |
109 | | |
110 | 11.9k | Status status; |
111 | 11.9k | std::string buff; |
112 | 11.9k | int64_t buff_size {0}; |
113 | | |
114 | 11.9k | if (block.rows() > 0) { |
115 | 11.9k | { |
116 | 11.9k | PBlock pblock; |
117 | 11.9k | SCOPED_TIMER(_serialize_timer); |
118 | 11.9k | int64_t compressed_time = 0; |
119 | 11.9k | status = block.serialize( |
120 | 11.9k | BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, |
121 | 11.9k | &compressed_bytes, &compressed_time, |
122 | 11.9k | segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio |
123 | 11.9k | RETURN_IF_ERROR(status); |
124 | 11.9k | int64_t pblock_mem = pblock.ByteSizeLong(); |
125 | 11.9k | COUNTER_UPDATE(_memory_used_counter, pblock_mem); |
126 | 11.9k | Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -pblock_mem); }}; |
127 | 11.9k | if (!pblock.SerializeToString(&buff)) { |
128 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
129 | 0 | "serialize spill data error. [path={}]", file_path_); |
130 | 0 | } |
131 | 11.9k | buff_size = buff.size(); |
132 | 11.9k | COUNTER_UPDATE(_memory_used_counter, buff_size); |
133 | 11.9k | Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, -buff_size); }}; |
134 | 11.9k | } |
135 | 11.9k | if (data_dir_->reach_capacity_limit(buff_size)) { |
136 | 0 | return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>( |
137 | 0 | "spill data total size exceed limit, path: {}, size limit: {}, spill data " |
138 | 0 | "size: {}", |
139 | 0 | data_dir_->path(), |
140 | 0 | PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()), |
141 | 0 | PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes())); |
142 | 0 | } |
143 | | |
144 | 11.9k | { |
145 | 11.9k | Defer defer {[&]() { |
146 | 11.9k | if (status.ok()) { |
147 | 11.9k | data_dir_->update_spill_data_usage(buff_size); |
148 | 11.9k | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size); |
149 | | |
150 | 11.9k | written_bytes += buff_size; |
151 | 11.9k | max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size); |
152 | | |
153 | 11.9k | meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); |
154 | 11.9k | COUNTER_UPDATE(_write_file_total_size, buff_size); |
155 | 11.9k | if (_resource_ctx) { |
156 | 11.9k | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage( |
157 | 11.9k | buff_size); |
158 | 11.9k | } |
159 | 11.9k | if (_write_file_current_size) { |
160 | 78 | COUNTER_UPDATE(_write_file_current_size, buff_size); |
161 | 78 | } |
162 | 11.9k | COUNTER_UPDATE(_write_block_counter, 1); |
163 | 11.9k | total_written_bytes_ += buff_size; |
164 | 11.9k | ++written_blocks_; |
165 | 11.9k | } |
166 | 11.9k | }}; |
167 | 11.9k | { |
168 | 11.9k | SCOPED_TIMER(_write_file_timer); |
169 | 11.9k | status = file_writer_->append(buff); |
170 | 11.9k | RETURN_IF_ERROR(status); |
171 | 11.9k | } |
172 | 11.9k | } |
173 | 11.9k | } |
174 | | |
175 | 11.9k | return status; |
176 | 11.9k | } |
177 | | |
178 | | } // namespace doris |