Coverage Report

Created: 2026-03-19 16:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file.h"
19
20
#include <glog/logging.h>
21
22
#include <filesystem>
23
#include <memory>
24
#include <utility>
25
26
#include "exec/spill/spill_file_manager.h"
27
#include "exec/spill/spill_file_reader.h"
28
#include "exec/spill/spill_file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "runtime/exec_env.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/runtime_state.h"
34
#include "util/debug_points.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
SpillFile::SpillFile(SpillDataDir* data_dir, std::string relative_path)
39
323
        : _data_dir(data_dir),
40
323
          _spill_dir(data_dir->get_spill_data_path() + "/" + std::move(relative_path)) {}
41
42
323
SpillFile::~SpillFile() {
43
323
    gc();
44
323
}
45
46
435
void SpillFile::gc() {
47
435
    bool exists = false;
48
435
    auto status = io::global_local_filesystem()->exists(_spill_dir, &exists);
49
435
    if (status.ok() && exists) {
50
        // Delete spill directory directly instead of moving it to a GC directory.
51
        // This simplifies cleanup and avoids retaining spill data under a GC path.
52
249
        status = io::global_local_filesystem()->delete_directory(_spill_dir);
53
249
        DBUG_EXECUTE_IF("fault_inject::spill_file::gc", {
54
249
            status = Status::Error<INTERNAL_ERROR>("fault_inject spill_file gc failed");
55
249
        });
56
249
        if (!status.ok()) {
57
0
            LOG_EVERY_T(WARNING, 1) << fmt::format("failed to delete spill data, dir {}, error: {}",
58
0
                                                   _spill_dir, status.to_string());
59
0
        }
60
249
    }
61
    // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be
62
    // clean up as a last resort
63
435
    _data_dir->update_spill_data_usage(-_total_written_bytes);
64
435
    _total_written_bytes = 0;
65
435
}
66
67
Status SpillFile::create_writer(RuntimeState* state, RuntimeProfile* profile,
68
295
                                SpillFileWriterSPtr& writer) {
69
295
    writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, profile, _data_dir,
70
295
                                               _spill_dir);
71
    // _active_writer is set in SpillFileWriter constructor via the shared_ptr
72
295
    return Status::OK();
73
295
}
74
75
185
SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* state, RuntimeProfile* profile) const {
76
    // It's a programming error to create a reader while a writer is still active.
77
185
    DCHECK(_active_writer == nullptr) << "create_reader() called while writer still active";
78
185
    return std::make_shared<SpillFileReader>(state, profile, _spill_dir, _part_count);
79
185
}
80
81
279
void SpillFile::finish_writing() {
82
279
    _ready_for_reading = true;
83
    // writer finished; clear active writer pointer
84
279
    _active_writer = nullptr;
85
279
}
86
87
686
void SpillFile::update_written_bytes(int64_t delta_bytes) {
88
686
    _total_written_bytes += delta_bytes;
89
686
}
90
91
244
void SpillFile::increment_part_count() {
92
244
    ++_part_count;
93
244
}
94
95
} // namespace doris