be/src/exec/spill/spill_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_writer.h" |
19 | | |
20 | | #include "agent/be_exec_version_manager.h" |
21 | | #include "common/status.h" |
22 | | #include "exec/spill/spill_stream_manager.h" |
23 | | #include "io/fs/local_file_system.h" |
24 | | #include "io/fs/local_file_writer.h" |
25 | | #include "runtime/exec_env.h" |
26 | | #include "runtime/runtime_state.h" |
27 | | #include "runtime/thread_context.h" |
28 | | |
29 | | namespace doris { |
30 | | #include "common/compile_check_begin.h" |
31 | 115 | Status SpillWriter::open() { |
32 | 115 | if (file_writer_) { |
33 | 0 | return Status::OK(); |
34 | 0 | } |
35 | 115 | return io::global_local_filesystem()->create_file(file_path_, &file_writer_); |
36 | 115 | } |
37 | | |
38 | 93 | Status SpillWriter::close() { |
39 | 93 | if (closed_ || !file_writer_) { |
40 | 0 | return Status::OK(); |
41 | 0 | } |
42 | 93 | closed_ = true; |
43 | | |
44 | 93 | meta_.append((const char*)&max_sub_block_size_, sizeof(max_sub_block_size_)); |
45 | 93 | meta_.append((const char*)&written_blocks_, sizeof(written_blocks_)); |
46 | | |
47 | | // meta: block1 offset, block2 offset, ..., blockn offset, max_sub_block_size, n |
48 | 93 | { |
49 | 93 | SCOPED_TIMER(_write_file_timer); |
50 | 93 | RETURN_IF_ERROR(file_writer_->append(meta_)); |
51 | 93 | } |
52 | | |
53 | 93 | total_written_bytes_ += meta_.size(); |
54 | 93 | COUNTER_UPDATE(_write_file_total_size, meta_.size()); |
55 | 93 | if (_resource_ctx) { |
56 | 93 | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_.size()); |
57 | 93 | } |
58 | 93 | if (_write_file_current_size) { |
59 | 34 | COUNTER_UPDATE(_write_file_current_size, meta_.size()); |
60 | 34 | } |
61 | 93 | data_dir_->update_spill_data_usage(meta_.size()); |
62 | 93 | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size()); |
63 | | |
64 | 93 | RETURN_IF_ERROR(file_writer_->close()); |
65 | | |
66 | 93 | file_writer_.reset(); |
67 | 93 | return Status::OK(); |
68 | 93 | } |
69 | | |
70 | 160 | Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& written_bytes) { |
71 | 160 | written_bytes = 0; |
72 | 160 | DCHECK(file_writer_); |
73 | 160 | auto rows = block.rows(); |
74 | 160 | COUNTER_UPDATE(_write_rows_counter, rows); |
75 | 160 | COUNTER_UPDATE(_write_block_bytes_counter, block.bytes()); |
76 | | // file format: block1, block2, ..., blockn, meta |
77 | 160 | if (rows <= batch_size_) { |
78 | 160 | return _write_internal(block, written_bytes); |
79 | 160 | } else { |
80 | 0 | auto tmp_block = block.clone_empty(); |
81 | 0 | const auto& src_data = block.get_columns_with_type_and_name(); |
82 | |
|
83 | 0 | for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) { |
84 | 0 | tmp_block.clear_column_data(); |
85 | |
|
86 | 0 | const auto& dst_data = tmp_block.get_columns_with_type_and_name(); |
87 | |
|
88 | 0 | size_t block_rows = std::min(rows - row_idx, batch_size_); |
89 | 0 | RETURN_IF_CATCH_EXCEPTION({ |
90 | 0 | for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) { |
91 | 0 | dst_data[col_idx].column->assume_mutable()->insert_range_from( |
92 | 0 | *src_data[col_idx].column, row_idx, block_rows); |
93 | 0 | } |
94 | 0 | }); |
95 | | |
96 | 0 | int64_t tmp_blcok_mem = tmp_block.allocated_bytes(); |
97 | 0 | COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem); |
98 | 0 | Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -tmp_blcok_mem); }}; |
99 | 0 | RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes)); |
100 | | |
101 | 0 | row_idx += block_rows; |
102 | 0 | } |
103 | 0 | return Status::OK(); |
104 | 0 | } |
105 | 160 | } |
106 | | |
107 | 160 | Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { |
108 | 160 | size_t uncompressed_bytes = 0, compressed_bytes = 0; |
109 | | |
110 | 160 | Status status; |
111 | 160 | std::string buff; |
112 | 160 | int64_t buff_size {0}; |
113 | | |
114 | 160 | if (block.rows() > 0) { |
115 | 149 | { |
116 | 149 | PBlock pblock; |
117 | 149 | SCOPED_TIMER(_serialize_timer); |
118 | 149 | int64_t compressed_time = 0; |
119 | 149 | status = block.serialize( |
120 | 149 | BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, |
121 | 149 | &compressed_bytes, &compressed_time, |
122 | 149 | segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio |
123 | 149 | RETURN_IF_ERROR(status); |
124 | 149 | int64_t pblock_mem = pblock.ByteSizeLong(); |
125 | 149 | COUNTER_UPDATE(_memory_used_counter, pblock_mem); |
126 | 149 | Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -pblock_mem); }}; |
127 | 149 | if (!pblock.SerializeToString(&buff)) { |
128 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
129 | 0 | "serialize spill data error. [path={}]", file_path_); |
130 | 0 | } |
131 | 149 | buff_size = buff.size(); |
132 | 149 | COUNTER_UPDATE(_memory_used_counter, buff_size); |
133 | 149 | Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, -buff_size); }}; |
134 | 149 | } |
135 | 149 | if (data_dir_->reach_capacity_limit(buff_size)) { |
136 | 0 | return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>( |
137 | 0 | "spill data total size exceed limit, path: {}, size limit: {}, spill data " |
138 | 0 | "size: {}", |
139 | 0 | data_dir_->path(), |
140 | 0 | PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()), |
141 | 0 | PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes())); |
142 | 0 | } |
143 | | |
144 | 149 | { |
145 | 149 | Defer defer {[&]() { |
146 | 149 | if (status.ok()) { |
147 | 149 | data_dir_->update_spill_data_usage(buff_size); |
148 | 149 | ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size); |
149 | | |
150 | 149 | written_bytes += buff_size; |
151 | 149 | max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size); |
152 | | |
153 | 149 | meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); |
154 | 149 | COUNTER_UPDATE(_write_file_total_size, buff_size); |
155 | 149 | if (_resource_ctx) { |
156 | 149 | _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage( |
157 | 149 | buff_size); |
158 | 149 | } |
159 | 149 | if (_write_file_current_size) { |
160 | 79 | COUNTER_UPDATE(_write_file_current_size, buff_size); |
161 | 79 | } |
162 | 149 | COUNTER_UPDATE(_write_block_counter, 1); |
163 | 149 | total_written_bytes_ += buff_size; |
164 | 149 | ++written_blocks_; |
165 | 149 | } |
166 | 149 | }}; |
167 | 149 | { |
168 | 149 | SCOPED_TIMER(_write_file_timer); |
169 | 149 | status = file_writer_->append(buff); |
170 | 149 | RETURN_IF_ERROR(status); |
171 | 149 | } |
172 | 149 | } |
173 | 149 | } |
174 | | |
175 | 160 | return status; |
176 | 160 | } |
177 | | |
178 | | } // namespace doris |