Coverage Report

Created: 2026-03-12 14:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_stream.h"
19
20
#include <glog/logging.h>
21
22
#include <memory>
23
#include <mutex>
24
#include <utility>
25
26
#include "core/block/block.h"
27
#include "exec/spill/spill_reader.h"
28
#include "exec/spill/spill_stream_manager.h"
29
#include "exec/spill/spill_writer.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_profile.h"
34
#include "runtime/runtime_state.h"
35
#include "runtime/thread_context.h"
36
#include "util/debug_points.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
41
                         std::string spill_dir, size_t batch_rows, size_t batch_bytes,
42
                         RuntimeProfile* operator_profile)
43
2.10k
        : state_(state),
44
2.10k
          stream_id_(stream_id),
45
2.10k
          data_dir_(data_dir),
46
2.10k
          spill_dir_(std::move(spill_dir)),
47
2.10k
          batch_rows_(batch_rows),
48
2.10k
          batch_bytes_(batch_bytes),
49
2.10k
          query_id_(state->query_id()),
50
2.10k
          profile_(operator_profile) {
51
2.10k
    RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters");
52
2.10k
    DCHECK(custom_profile != nullptr);
53
2.10k
    _total_file_count = custom_profile->get_counter("SpillWriteFileTotalCount");
54
2.10k
    _current_file_count = custom_profile->get_counter("SpillWriteFileCurrentCount");
55
2.10k
    _current_file_size = custom_profile->get_counter("SpillWriteFileCurrentBytes");
56
2.10k
}
57
58
1.98k
void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
59
1.98k
    _current_file_count = source_op_profile->get_counter("SpillWriteFileCurrentCount");
60
1.98k
    _current_file_size = source_op_profile->get_counter("SpillWriteFileCurrentBytes");
61
1.98k
}
62
63
2.08k
SpillStream::~SpillStream() {
64
2.08k
    gc();
65
2.08k
}
66
67
4.08k
void SpillStream::gc() {
68
4.08k
    if (_current_file_size) {
69
4.00k
        COUNTER_UPDATE(_current_file_size, -total_written_bytes_);
70
4.00k
    }
71
4.08k
    bool exists = false;
72
4.08k
    auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
73
4.08k
    if (status.ok() && exists) {
74
2.08k
        if (_current_file_count) {
75
2.00k
            COUNTER_UPDATE(_current_file_count, -1);
76
2.00k
        }
77
2.08k
        auto query_gc_dir = data_dir_->get_spill_data_gc_path(print_id(query_id_));
78
2.08k
        status = io::global_local_filesystem()->create_directory(query_gc_dir);
79
2.08k
        DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", {
80
2.08k
            status = Status::Error<INTERNAL_ERROR>("fault_inject spill_stream gc failed");
81
2.08k
        });
82
2.08k
        if (status.ok()) {
83
2.08k
            auto gc_dir = fmt::format("{}/{}", query_gc_dir,
84
2.08k
                                      std::filesystem::path(spill_dir_).filename().string());
85
2.08k
            status = io::global_local_filesystem()->rename(spill_dir_, gc_dir);
86
2.08k
        }
87
2.08k
        if (!status.ok()) {
88
0
            LOG_EVERY_T(WARNING, 1) << fmt::format("failed to gc spill data, dir {}, error: {}",
89
0
                                                   query_gc_dir, status.to_string());
90
0
        }
91
2.08k
    }
92
    // If QueryContext is destructed earlier than PipelineFragmentContext,
93
    // spill_dir_ will be already moved to spill_gc directory.
94
95
    // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be
96
    // clean up as a last resort
97
4.08k
    data_dir_->update_spill_data_usage(-total_written_bytes_);
98
4.08k
    total_written_bytes_ = 0;
99
4.08k
}
100
101
2.10k
Status SpillStream::prepare() {
102
2.10k
    writer_ = std::make_unique<SpillWriter>(state_->get_query_ctx()->resource_ctx(), profile_,
103
2.10k
                                            stream_id_, batch_rows_, data_dir_, spill_dir_);
104
2.10k
    _set_write_counters(profile_);
105
106
2.10k
    reader_ = std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
107
2.10k
                                            writer_->get_file_path());
108
109
2.10k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
110
2.10k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed");
111
2.10k
    });
112
2.10k
    COUNTER_UPDATE(_total_file_count, 1);
113
2.10k
    if (_current_file_count) {
114
38
        COUNTER_UPDATE(_current_file_count, 1);
115
38
    }
116
2.10k
    return writer_->open();
117
2.10k
}
118
119
20
SpillReaderUPtr SpillStream::create_separate_reader() const {
120
20
    return std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
121
20
                                         writer_->get_file_path());
122
20
}
123
124
0
const TUniqueId& SpillStream::query_id() const {
125
0
    return query_id_;
126
0
}
127
128
0
const std::string& SpillStream::get_spill_root_dir() const {
129
0
    return data_dir_->path();
130
0
}
131
132
4.38k
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) {
133
4.38k
    size_t written_bytes = 0;
134
4.38k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_block", {
135
4.38k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_block failed");
136
4.38k
    });
137
4.38k
    RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
138
4.38k
    if (eof) {
139
18
        RETURN_IF_ERROR(spill_eof());
140
4.36k
    } else {
141
4.36k
        total_written_bytes_ = writer_->get_written_bytes();
142
4.36k
    }
143
4.38k
    return Status::OK();
144
4.38k
}
145
146
2.06k
Status SpillStream::spill_eof() {
147
2.06k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_eof", {
148
2.06k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_eof failed");
149
2.06k
    });
150
2.06k
    auto status = writer_->close();
151
2.06k
    total_written_bytes_ = writer_->get_written_bytes();
152
2.06k
    writer_.reset();
153
154
2.06k
    if (status.ok()) {
155
2.06k
        _ready_for_reading = true;
156
2.06k
    }
157
2.06k
    return status;
158
2.06k
}
159
160
6.19k
Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
161
6.19k
    DCHECK(reader_ != nullptr);
162
6.19k
    DCHECK(!_is_reading);
163
6.19k
    _is_reading = true;
164
6.19k
    Defer defer([this] { _is_reading = false; });
165
166
6.19k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::read_next_block", {
167
6.19k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream read_next_block failed");
168
6.19k
    });
169
6.18k
    RETURN_IF_ERROR(reader_->open());
170
6.18k
    return reader_->read(block, eos);
171
6.18k
}
172
173
} // namespace doris