Coverage Report

Created: 2026-03-19 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file_writer.h"
19
20
#include "agent/be_exec_version_manager.h"
21
#include "common/config.h"
22
#include "common/status.h"
23
#include "exec/spill/spill_file.h"
24
#include "exec/spill/spill_file_manager.h"
25
#include "io/fs/local_file_system.h"
26
#include "io/fs/local_file_writer.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/query_context.h"
29
#include "runtime/runtime_state.h"
30
#include "runtime/thread_context.h"
31
32
namespace doris {
33
#include "common/compile_check_begin.h"
34
35
SpillFileWriter::SpillFileWriter(const std::shared_ptr<SpillFile>& spill_file, RuntimeState* state,
36
                                 RuntimeProfile* profile, SpillDataDir* data_dir,
37
                                 const std::string& spill_dir)
38
303
        : _spill_file_wptr(spill_file),
39
303
          _data_dir(data_dir),
40
303
          _spill_dir(spill_dir),
41
303
          _max_part_size(config::spill_file_part_size_bytes),
42
303
          _resource_ctx(state->get_query_ctx()->resource_ctx()) {
43
    // Common counters
44
303
    RuntimeProfile* common_profile = profile->get_child("CommonCounters");
45
303
    DCHECK(common_profile != nullptr);
46
303
    _memory_used_counter = common_profile->get_counter("MemoryUsage");
47
48
    // Register this writer as the active writer for the SpillFile.
49
303
    spill_file->_active_writer = this;
50
51
    // Custom (spill-specific) counters
52
303
    RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
53
303
    _write_file_timer = custom_profile->get_counter("SpillWriteFileTime");
54
303
    _serialize_timer = custom_profile->get_counter("SpillWriteSerializeBlockTime");
55
303
    _write_block_counter = custom_profile->get_counter("SpillWriteBlockCount");
56
303
    _write_block_bytes_counter = custom_profile->get_counter("SpillWriteBlockBytes");
57
303
    _write_file_total_size = custom_profile->get_counter("SpillWriteFileBytes");
58
303
    _write_file_current_size = custom_profile->get_counter("SpillWriteFileCurrentBytes");
59
303
    _write_rows_counter = custom_profile->get_counter("SpillWriteRows");
60
303
    _total_file_count = custom_profile->get_counter("SpillWriteFileTotalCount");
61
303
}
62
63
303
SpillFileWriter::~SpillFileWriter() {
64
303
    if (_closed) {
65
276
        return;
66
276
    }
67
27
    Status st = close();
68
27
    if (!st.ok()) {
69
15
        LOG(WARNING) << "SpillFileWriter::~SpillFileWriter() failed: " << st.to_string()
70
15
                     << ", spill_dir=" << _spill_dir;
71
15
    }
72
27
}
73
74
267
Status SpillFileWriter::_open_next_part() {
75
267
    _current_part_path = _spill_dir + "/" + std::to_string(_current_part_index);
76
    // Create the spill directory lazily on first part
77
267
    if (_current_part_index == 0) {
78
257
        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_spill_dir));
79
257
    }
80
267
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_current_part_path, &_file_writer));
81
267
    COUNTER_UPDATE(_total_file_count, 1);
82
267
    return Status::OK();
83
267
}
84
85
312
Status SpillFileWriter::_close_current_part(const std::shared_ptr<SpillFile>& spill_file) {
86
312
    if (!_file_writer) {
87
45
        return Status::OK();
88
45
    }
89
90
    // Write footer: block offsets + max_sub_block_size + block_count
91
267
    _part_meta.append((const char*)&_part_max_sub_block_size, sizeof(_part_max_sub_block_size));
92
267
    _part_meta.append((const char*)&_part_written_blocks, sizeof(_part_written_blocks));
93
94
267
    {
95
267
        SCOPED_TIMER(_write_file_timer);
96
267
        RETURN_IF_ERROR(_file_writer->append(_part_meta));
97
267
    }
98
99
267
    int64_t meta_size = _part_meta.size();
100
267
    _part_written_bytes += meta_size;
101
267
    _total_written_bytes += meta_size;
102
267
    COUNTER_UPDATE(_write_file_total_size, meta_size);
103
267
    if (_resource_ctx) {
104
267
        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size);
105
267
    }
106
267
    if (_write_file_current_size) {
107
179
        COUNTER_UPDATE(_write_file_current_size, meta_size);
108
179
    }
109
267
    _data_dir->update_spill_data_usage(meta_size);
110
267
    ExecEnv::GetInstance()->spill_file_mgr()->update_spill_write_bytes(meta_size);
111
    // Incrementally update SpillFile's accounting so gc() can always
112
    // decrement the correct amount, even if close() is never called.
113
267
    if (spill_file) {
114
252
        spill_file->update_written_bytes(meta_size);
115
252
    }
116
117
267
    RETURN_IF_ERROR(_file_writer->close());
118
252
    _file_writer.reset();
119
120
    // Advance to next part
121
252
    ++_current_part_index;
122
252
    ++_total_parts;
123
252
    if (spill_file) {
124
252
        spill_file->increment_part_count();
125
252
    }
126
252
    _part_written_blocks = 0;
127
252
    _part_written_bytes = 0;
128
252
    _part_max_sub_block_size = 0;
129
252
    _part_meta.clear();
130
131
252
    return Status::OK();
132
267
}
133
134
520
Status SpillFileWriter::_rotate_if_needed(const std::shared_ptr<SpillFile>& spill_file) {
135
520
    if (_file_writer && _part_written_bytes >= _max_part_size) {
136
10
        RETURN_IF_ERROR(_close_current_part(spill_file));
137
10
    }
138
520
    return Status::OK();
139
520
}
140
141
523
Status SpillFileWriter::write_block(RuntimeState* state, const Block& block) {
142
523
    DCHECK(!_closed);
143
144
    // Lock the SpillFile to ensure it is still alive. If it has already been
145
    // destroyed (gc'd), we must not write any more data because the disk
146
    // accounting would be out of sync.
147
523
    auto spill_file = _spill_file_wptr.lock();
148
523
    if (!spill_file) {
149
0
        return Status::Error<INTERNAL_ERROR>(
150
0
                "SpillFile has been destroyed, cannot write more data, spill_dir={}", _spill_dir);
151
0
    }
152
153
    // Lazily open the first part
154
523
    if (!_file_writer) {
155
267
        RETURN_IF_ERROR(_open_next_part());
156
267
    }
157
158
523
    DBUG_EXECUTE_IF("fault_inject::spill_file::spill_block", {
159
523
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_file spill_block failed");
160
523
    });
161
162
520
    auto rows = block.rows();
163
520
    COUNTER_UPDATE(_write_rows_counter, rows);
164
520
    COUNTER_UPDATE(_write_block_bytes_counter, block.bytes());
165
166
520
    RETURN_IF_ERROR(_write_internal(block, spill_file));
167
168
    // Auto-rotate if current part is full
169
520
    return _rotate_if_needed(spill_file);
170
520
}
171
172
329
Status SpillFileWriter::close() {
173
329
    if (_closed) {
174
26
        return Status::OK();
175
26
    }
176
303
    _closed = true;
177
178
303
    DBUG_EXECUTE_IF("fault_inject::spill_file::spill_eof", {
179
303
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_file spill_eof failed");
180
303
    });
181
182
302
    auto spill_file = _spill_file_wptr.lock();
183
302
    RETURN_IF_ERROR(_close_current_part(spill_file));
184
185
287
    if (spill_file) {
186
287
        if (spill_file->_active_writer != this) {
187
0
            return Status::Error<INTERNAL_ERROR>(
188
0
                    "SpillFileWriter close() called but not registered as active writer, possible "
189
0
                    "double close or logic error");
190
0
        }
191
287
        spill_file->finish_writing();
192
287
    }
193
194
287
    return Status::OK();
195
287
}
196
197
Status SpillFileWriter::_write_internal(const Block& block,
198
520
                                        const std::shared_ptr<SpillFile>& spill_file) {
199
520
    size_t uncompressed_bytes = 0, compressed_bytes = 0;
200
201
520
    Status status;
202
520
    std::string buff;
203
520
    int64_t buff_size {0};
204
205
520
    if (block.rows() > 0) {
206
503
        {
207
503
            PBlock pblock;
208
503
            SCOPED_TIMER(_serialize_timer);
209
503
            int64_t compressed_time = 0;
210
503
            status = block.serialize(
211
503
                    BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes,
212
503
                    &compressed_bytes, &compressed_time,
213
503
                    segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio
214
503
            RETURN_IF_ERROR(status);
215
503
            int64_t pblock_mem = pblock.ByteSizeLong();
216
503
            COUNTER_UPDATE(_memory_used_counter, pblock_mem);
217
503
            Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -pblock_mem); }};
218
503
            if (!pblock.SerializeToString(&buff)) {
219
0
                return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
220
0
                        "serialize spill data error. [path={}]", _current_part_path);
221
0
            }
222
503
            buff_size = buff.size();
223
503
            COUNTER_UPDATE(_memory_used_counter, buff_size);
224
503
            Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, -buff_size); }};
225
503
        }
226
503
        if (_data_dir->reach_capacity_limit(buff_size)) {
227
0
            return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
228
0
                    "spill data total size exceed limit, path: {}, size limit: {}, spill data "
229
0
                    "size: {}",
230
0
                    _data_dir->path(),
231
0
                    PrettyPrinter::print_bytes(_data_dir->get_spill_data_limit()),
232
0
                    PrettyPrinter::print_bytes(_data_dir->get_spill_data_bytes()));
233
0
        }
234
235
503
        {
236
503
            Defer defer {[&]() {
237
503
                if (status.ok()) {
238
503
                    _data_dir->update_spill_data_usage(buff_size);
239
503
                    ExecEnv::GetInstance()->spill_file_mgr()->update_spill_write_bytes(buff_size);
240
241
503
                    _part_max_sub_block_size =
242
503
                            std::max(_part_max_sub_block_size, (size_t)buff_size);
243
244
503
                    _part_meta.append((const char*)&_part_written_bytes, sizeof(size_t));
245
503
                    COUNTER_UPDATE(_write_file_total_size, buff_size);
246
503
                    if (_resource_ctx) {
247
503
                        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(
248
503
                                buff_size);
249
503
                    }
250
503
                    if (_write_file_current_size) {
251
311
                        COUNTER_UPDATE(_write_file_current_size, buff_size);
252
311
                    }
253
503
                    COUNTER_UPDATE(_write_block_counter, 1);
254
503
                    _part_written_bytes += buff_size;
255
503
                    _total_written_bytes += buff_size;
256
503
                    ++_part_written_blocks;
257
                    // Incrementally update SpillFile so gc() can always
258
                    // decrement the correct amount from _data_dir.
259
503
                    spill_file->update_written_bytes(buff_size);
260
503
                }
261
503
            }};
262
503
            {
263
503
                SCOPED_TIMER(_write_file_timer);
264
503
                status = _file_writer->append(buff);
265
503
                RETURN_IF_ERROR(status);
266
503
            }
267
503
        }
268
503
    }
269
270
520
    return status;
271
520
}
272
273
} // namespace doris