Coverage Report

Created: 2026-03-16 12:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <memory>
20
#include <string>
21
22
#include "common/status.h"
23
24
namespace doris {
25
#include "common/compile_check_begin.h"
26
class RuntimeProfile;
27
class RuntimeState;
28
29
class Block;
30
class SpillDataDir;
31
class SpillFileWriter;
32
class SpillFileReader;
33
using SpillFileWriterSPtr = std::shared_ptr<SpillFileWriter>;
34
using SpillFileReaderSPtr = std::shared_ptr<SpillFileReader>;
35
36
/// SpillFile represents a logical spill file that may consist of multiple
37
/// physical "part" files on disk. Parts are managed automatically by
38
/// SpillFileWriter when a part exceeds the configured size threshold.
39
///
40
/// On-disk layout:
41
///   spill_dir/         (created lazily by SpillFileWriter on first write)
42
///   +-- 0              (part 0)
43
///   +-- 1              (part 1)
44
///   +-- ...
45
///
46
/// Writing workflow:
47
///   SpillFileWriterSPtr writer;
48
///   RETURN_IF_ERROR(spill_file->create_writer(state, profile, writer));
49
///   RETURN_IF_ERROR(writer->write_block(state, block)); // auto-rotates parts
50
///   RETURN_IF_ERROR(writer->close());                   // finalizes all parts
51
///
52
/// Reading workflow:
53
///   auto reader = spill_file->create_reader(state, profile);
54
///   RETURN_IF_ERROR(reader->open());
55
///   while (!eos) { RETURN_IF_ERROR(reader->read(&block, &eos)); }
56
class SpillFile : public std::enable_shared_from_this<SpillFile> {
57
public:
58
    // to avoid too many small file writes
59
    static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 512 * 1024;
60
    static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024;
61
62
    /// @param data_dir       The spill storage directory (disk) selected by SpillFileManager.
63
    /// @param relative_path  Relative path under the spill root, formatted by the operator.
64
    ///                       e.g. "query_id/sort-node_id-task_id-unique_id"
65
    SpillFile(SpillDataDir* data_dir, std::string relative_path);
66
67
    SpillFile() = delete;
68
    SpillFile(const SpillFile&) = delete;
69
    SpillFile& operator=(const SpillFile&) = delete;
70
71
    ~SpillFile();
72
73
    void gc();
74
75
    /// Returns true after the writer has been closed (all data flushed).
76
29
    bool ready_for_reading() const { return _ready_for_reading; }
77
78
    /// Create a SpillFileWriter that automatically manages multi-part rotation.
79
    /// Only one writer should exist per SpillFile at a time.
80
    /// Part size threshold is read from config::spill_file_part_size_bytes.
81
    Status create_writer(RuntimeState* state, RuntimeProfile* profile, SpillFileWriterSPtr& writer);
82
83
    /// Create a SpillFileReader that reads sequentially across all parts.
84
    /// The caller should call reader->open() before reading.
85
    SpillFileReaderSPtr create_reader(RuntimeState* state, RuntimeProfile* profile) const;
86
87
private:
88
    friend class SpillFileWriter;
89
    friend class SpillFileManager;
90
91
    /// Called by SpillFileWriter::close() to mark writing as complete.
92
    void finish_writing();
93
94
    /// Called by SpillFileWriter to incrementally track bytes written to disk.
95
    /// This ensures SpillFile always knows the correct _total_written_bytes for
96
    /// gc() accounting, even if the writer's close() is never properly called.
97
    void update_written_bytes(int64_t delta_bytes);
98
99
    /// Called by SpillFileWriter when a part file is completed.
100
    void increment_part_count();
101
102
    SpillDataDir* _data_dir = nullptr;
103
    // Absolute path: data_dir->get_spill_data_path() + "/" + relative_path
104
    std::string _spill_dir;
105
    int64_t _total_written_bytes = 0;
106
    size_t _part_count = 0;
107
    bool _ready_for_reading = false;
108
    // Pointer to the currently-active writer. Mutable to allow checks from const
109
    // methods like create_reader(). Only one writer may be active at a time.
110
    mutable SpillFileWriter* _active_writer = nullptr;
111
};
112
using SpillFileSPtr = std::shared_ptr<SpillFile>;
113
} // namespace doris
114
#include "common/compile_check_end.h"