Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file.h"
19
20
#include <glog/logging.h>
21
22
#include <filesystem>
23
#include <memory>
24
#include <utility>
25
26
#include "exec/spill/spill_file_manager.h"
27
#include "exec/spill/spill_file_reader.h"
28
#include "exec/spill/spill_file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "runtime/exec_env.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/runtime_state.h"
34
#include "util/debug_points.h"
35
36
namespace doris {
37
SpillFile::SpillFile(SpillDataDir* data_dir, std::string relative_path)
38
326
        : _data_dir(data_dir),
39
326
          _spill_dir(data_dir->get_spill_data_path() + "/" + std::move(relative_path)) {}
40
41
326
SpillFile::~SpillFile() {
42
326
    gc();
43
326
}
44
45
438
void SpillFile::gc() {
46
438
    bool exists = false;
47
438
    auto status = io::global_local_filesystem()->exists(_spill_dir, &exists);
48
438
    if (status.ok() && exists) {
49
        // Delete spill directory directly instead of moving it to a GC directory.
50
        // This simplifies cleanup and avoids retaining spill data under a GC path.
51
249
        status = io::global_local_filesystem()->delete_directory(_spill_dir);
52
249
        DBUG_EXECUTE_IF("fault_inject::spill_file::gc", {
53
249
            status = Status::Error<INTERNAL_ERROR>("fault_inject spill_file gc failed");
54
249
        });
55
249
        if (!status.ok()) {
56
0
            LOG_EVERY_T(WARNING, 1) << fmt::format("failed to delete spill data, dir {}, error: {}",
57
0
                                                   _spill_dir, status.to_string());
58
0
        }
59
249
    }
60
    // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be
61
    // clean up as a last resort
62
438
    _data_dir->update_spill_data_usage(-_total_written_bytes);
63
438
    _total_written_bytes = 0;
64
438
}
65
66
Status SpillFile::create_writer(RuntimeState* state, RuntimeProfile* profile,
67
295
                                SpillFileWriterSPtr& writer) {
68
295
    writer = std::make_shared<SpillFileWriter>(shared_from_this(), state, profile, _data_dir,
69
295
                                               _spill_dir);
70
    // _active_writer is set in SpillFileWriter constructor via the shared_ptr
71
295
    return Status::OK();
72
295
}
73
74
185
SpillFileReaderSPtr SpillFile::create_reader(RuntimeState* state, RuntimeProfile* profile) const {
75
    // It's a programming error to create a reader while a writer is still active.
76
185
    DCHECK(_active_writer == nullptr) << "create_reader() called while writer still active";
77
185
    return std::make_shared<SpillFileReader>(state, profile, _spill_dir, _part_count);
78
185
}
79
80
279
void SpillFile::finish_writing() {
81
279
    _ready_for_reading = true;
82
    // writer finished; clear active writer pointer
83
279
    _active_writer = nullptr;
84
279
}
85
86
685
void SpillFile::update_written_bytes(int64_t delta_bytes) {
87
685
    _total_written_bytes += delta_bytes;
88
685
}
89
90
244
void SpillFile::increment_part_count() {
91
244
    ++_part_count;
92
244
}
93
94
} // namespace doris