Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <atomic>
20
#include <future>
21
#include <memory>
22
23
#include "exec/spill/spill_reader.h"
24
#include "exec/spill/spill_writer.h"
25
26
namespace doris {
27
#include "common/compile_check_begin.h"
28
class RuntimeProfile;
29
class ThreadPool;
30
31
class Block;
32
class SpillDataDir;
33
34
class SpillStream {
35
public:
36
    // to avoid too many small file writes
37
    static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
38
    static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024;
39
    SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
40
                std::string spill_dir, size_t batch_rows, size_t batch_bytes,
41
                RuntimeProfile* profile);
42
43
    SpillStream() = delete;
44
45
    ~SpillStream();
46
47
    void gc();
48
49
0
    int64_t id() const { return stream_id_; }
50
51
0
    SpillDataDir* get_data_dir() const { return data_dir_; }
52
    const std::string& get_spill_root_dir() const;
53
54
0
    const std::string& get_spill_dir() const { return spill_dir_; }
55
56
116
    int64_t get_written_bytes() const { return total_written_bytes_; }
57
58
    Status spill_block(RuntimeState* state, const Block& block, bool eof);
59
60
    Status spill_eof();
61
62
    Status read_next_block_sync(Block* block, bool* eos);
63
64
33
    void set_read_counters(RuntimeProfile* operator_profile) {
65
33
        reader_->set_counters(operator_profile);
66
33
    }
67
68
    void update_shared_profiles(RuntimeProfile* source_op_profile);
69
70
    SpillReaderUPtr create_separate_reader() const;
71
72
    const TUniqueId& query_id() const;
73
74
20
    bool ready_for_reading() const { return _ready_for_reading; }
75
76
private:
77
    friend class SpillStreamManager;
78
79
    Status prepare();
80
81
115
    void _set_write_counters(RuntimeProfile* profile) { writer_->set_counters(profile); }
82
83
    RuntimeState* state_ = nullptr;
84
    int64_t stream_id_;
85
    SpillDataDir* data_dir_ = nullptr;
86
    // Directory path format specified in SpillStreamManager::register_spill_stream:
87
    // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
88
    std::string spill_dir_;
89
    size_t batch_rows_;
90
    size_t batch_bytes_;
91
    int64_t total_written_bytes_ = 0;
92
93
    std::atomic_bool _ready_for_reading = false;
94
    std::atomic_bool _is_reading = false;
95
96
    SpillWriterUPtr writer_;
97
    SpillReaderUPtr reader_;
98
99
    TUniqueId query_id_;
100
101
    RuntimeProfile* profile_ = nullptr;
102
    RuntimeProfile::Counter* _current_file_count = nullptr;
103
    RuntimeProfile::Counter* _total_file_count = nullptr;
104
    RuntimeProfile::Counter* _current_file_size = nullptr;
105
};
106
using SpillStreamSPtr = std::shared_ptr<SpillStream>;
107
} // namespace doris
108
#include "common/compile_check_end.h"