be/src/exec/spill/spill_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/spill/spill_stream.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <memory> |
23 | | #include <mutex> |
24 | | #include <utility> |
25 | | |
26 | | #include "core/block/block.h" |
27 | | #include "exec/spill/spill_reader.h" |
28 | | #include "exec/spill/spill_stream_manager.h" |
29 | | #include "exec/spill/spill_writer.h" |
30 | | #include "io/fs/local_file_system.h" |
31 | | #include "runtime/exec_env.h" |
32 | | #include "runtime/query_context.h" |
33 | | #include "runtime/runtime_profile.h" |
34 | | #include "runtime/runtime_state.h" |
35 | | #include "runtime/thread_context.h" |
36 | | #include "util/debug_points.h" |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir, |
41 | | std::string spill_dir, size_t batch_rows, size_t batch_bytes, |
42 | | RuntimeProfile* operator_profile) |
43 | 2.10k | : state_(state), |
44 | 2.10k | stream_id_(stream_id), |
45 | 2.10k | data_dir_(data_dir), |
46 | 2.10k | spill_dir_(std::move(spill_dir)), |
47 | 2.10k | batch_rows_(batch_rows), |
48 | 2.10k | batch_bytes_(batch_bytes), |
49 | 2.10k | query_id_(state->query_id()), |
50 | 2.10k | profile_(operator_profile) { |
51 | 2.10k | RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters"); |
52 | 2.10k | DCHECK(custom_profile != nullptr); |
53 | 2.10k | _total_file_count = custom_profile->get_counter("SpillWriteFileTotalCount"); |
54 | 2.10k | _current_file_count = custom_profile->get_counter("SpillWriteFileCurrentCount"); |
55 | 2.10k | _current_file_size = custom_profile->get_counter("SpillWriteFileCurrentBytes"); |
56 | 2.10k | } |
57 | | |
58 | 1.98k | void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) { |
59 | 1.98k | _current_file_count = source_op_profile->get_counter("SpillWriteFileCurrentCount"); |
60 | 1.98k | _current_file_size = source_op_profile->get_counter("SpillWriteFileCurrentBytes"); |
61 | 1.98k | } |
62 | | |
63 | 2.08k | SpillStream::~SpillStream() { |
64 | 2.08k | gc(); |
65 | 2.08k | } |
66 | | |
67 | 4.08k | void SpillStream::gc() { |
68 | 4.08k | if (_current_file_size) { |
69 | 4.00k | COUNTER_UPDATE(_current_file_size, -total_written_bytes_); |
70 | 4.00k | } |
71 | 4.08k | bool exists = false; |
72 | 4.08k | auto status = io::global_local_filesystem()->exists(spill_dir_, &exists); |
73 | 4.08k | if (status.ok() && exists) { |
74 | 2.08k | if (_current_file_count) { |
75 | 2.00k | COUNTER_UPDATE(_current_file_count, -1); |
76 | 2.00k | } |
77 | 2.08k | auto query_gc_dir = data_dir_->get_spill_data_gc_path(print_id(query_id_)); |
78 | 2.08k | status = io::global_local_filesystem()->create_directory(query_gc_dir); |
79 | 2.08k | DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", { |
80 | 2.08k | status = Status::Error<INTERNAL_ERROR>("fault_inject spill_stream gc failed"); |
81 | 2.08k | }); |
82 | 2.08k | if (status.ok()) { |
83 | 2.08k | auto gc_dir = fmt::format("{}/{}", query_gc_dir, |
84 | 2.08k | std::filesystem::path(spill_dir_).filename().string()); |
85 | 2.08k | status = io::global_local_filesystem()->rename(spill_dir_, gc_dir); |
86 | 2.08k | } |
87 | 2.08k | if (!status.ok()) { |
88 | 0 | LOG_EVERY_T(WARNING, 1) << fmt::format("failed to gc spill data, dir {}, error: {}", |
89 | 0 | query_gc_dir, status.to_string()); |
90 | 0 | } |
91 | 2.08k | } |
92 | | // If QueryContext is destructed earlier than PipelineFragmentContext, |
93 | | // spill_dir_ will be already moved to spill_gc directory. |
94 | | |
95 | | // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be |
96 | | // clean up as a last resort |
97 | 4.08k | data_dir_->update_spill_data_usage(-total_written_bytes_); |
98 | 4.08k | total_written_bytes_ = 0; |
99 | 4.08k | } |
100 | | |
101 | 2.10k | Status SpillStream::prepare() { |
102 | 2.10k | writer_ = std::make_unique<SpillWriter>(state_->get_query_ctx()->resource_ctx(), profile_, |
103 | 2.10k | stream_id_, batch_rows_, data_dir_, spill_dir_); |
104 | 2.10k | _set_write_counters(profile_); |
105 | | |
106 | 2.10k | reader_ = std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_, |
107 | 2.10k | writer_->get_file_path()); |
108 | | |
109 | 2.10k | DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", { |
110 | 2.10k | return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed"); |
111 | 2.10k | }); |
112 | 2.10k | COUNTER_UPDATE(_total_file_count, 1); |
113 | 2.10k | if (_current_file_count) { |
114 | 38 | COUNTER_UPDATE(_current_file_count, 1); |
115 | 38 | } |
116 | 2.10k | return writer_->open(); |
117 | 2.10k | } |
118 | | |
119 | 20 | SpillReaderUPtr SpillStream::create_separate_reader() const { |
120 | 20 | return std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_, |
121 | 20 | writer_->get_file_path()); |
122 | 20 | } |
123 | | |
124 | 0 | const TUniqueId& SpillStream::query_id() const { |
125 | 0 | return query_id_; |
126 | 0 | } |
127 | | |
128 | 0 | const std::string& SpillStream::get_spill_root_dir() const { |
129 | 0 | return data_dir_->path(); |
130 | 0 | } |
131 | | |
132 | 4.38k | Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) { |
133 | 4.38k | size_t written_bytes = 0; |
134 | 4.38k | DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_block", { |
135 | 4.38k | return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_block failed"); |
136 | 4.38k | }); |
137 | 4.38k | RETURN_IF_ERROR(writer_->write(state, block, written_bytes)); |
138 | 4.38k | if (eof) { |
139 | 18 | RETURN_IF_ERROR(spill_eof()); |
140 | 4.36k | } else { |
141 | 4.36k | total_written_bytes_ = writer_->get_written_bytes(); |
142 | 4.36k | } |
143 | 4.38k | return Status::OK(); |
144 | 4.38k | } |
145 | | |
146 | 2.06k | Status SpillStream::spill_eof() { |
147 | 2.06k | DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_eof", { |
148 | 2.06k | return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_eof failed"); |
149 | 2.06k | }); |
150 | 2.06k | auto status = writer_->close(); |
151 | 2.06k | total_written_bytes_ = writer_->get_written_bytes(); |
152 | 2.06k | writer_.reset(); |
153 | | |
154 | 2.06k | if (status.ok()) { |
155 | 2.06k | _ready_for_reading = true; |
156 | 2.06k | } |
157 | 2.06k | return status; |
158 | 2.06k | } |
159 | | |
160 | 6.19k | Status SpillStream::read_next_block_sync(Block* block, bool* eos) { |
161 | 6.19k | DCHECK(reader_ != nullptr); |
162 | 6.19k | DCHECK(!_is_reading); |
163 | 6.19k | _is_reading = true; |
164 | 6.19k | Defer defer([this] { _is_reading = false; }); |
165 | | |
166 | 6.19k | DBUG_EXECUTE_IF("fault_inject::spill_stream::read_next_block", { |
167 | 6.19k | return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream read_next_block failed"); |
168 | 6.19k | }); |
169 | 6.18k | RETURN_IF_ERROR(reader_->open()); |
170 | 6.18k | return reader_->read(block, eos); |
171 | 6.18k | } |
172 | | |
173 | | } // namespace doris |