Coverage Report

Created: 2026-03-14 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_stream.h"
19
20
#include <glog/logging.h>
21
22
#include <memory>
23
#include <mutex>
24
#include <utility>
25
26
#include "core/block/block.h"
27
#include "exec/spill/spill_reader.h"
28
#include "exec/spill/spill_stream_manager.h"
29
#include "exec/spill/spill_writer.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_profile.h"
34
#include "runtime/runtime_state.h"
35
#include "runtime/thread_context.h"
36
#include "util/debug_points.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
41
                         std::string spill_dir, size_t batch_rows, size_t batch_bytes,
42
                         RuntimeProfile* operator_profile)
43
115
        : state_(state),
44
115
          stream_id_(stream_id),
45
115
          data_dir_(data_dir),
46
115
          spill_dir_(std::move(spill_dir)),
47
115
          batch_rows_(batch_rows),
48
115
          batch_bytes_(batch_bytes),
49
115
          query_id_(state->query_id()),
50
115
          profile_(operator_profile) {
51
115
    RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters");
52
115
    DCHECK(custom_profile != nullptr);
53
115
    _total_file_count = custom_profile->get_counter("SpillWriteFileTotalCount");
54
115
    _current_file_count = custom_profile->get_counter("SpillWriteFileCurrentCount");
55
115
    _current_file_size = custom_profile->get_counter("SpillWriteFileCurrentBytes");
56
115
}
57
58
12
void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
59
12
    _current_file_count = source_op_profile->get_counter("SpillWriteFileCurrentCount");
60
12
    _current_file_size = source_op_profile->get_counter("SpillWriteFileCurrentBytes");
61
12
}
62
63
115
SpillStream::~SpillStream() {
64
115
    gc();
65
115
}
66
67
152
void SpillStream::gc() {
68
152
    if (_current_file_size) {
69
65
        COUNTER_UPDATE(_current_file_size, -total_written_bytes_);
70
65
    }
71
152
    bool exists = false;
72
152
    auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
73
152
    if (status.ok() && exists) {
74
115
        if (_current_file_count) {
75
38
            COUNTER_UPDATE(_current_file_count, -1);
76
38
        }
77
115
        auto query_gc_dir = data_dir_->get_spill_data_gc_path(print_id(query_id_));
78
115
        status = io::global_local_filesystem()->create_directory(query_gc_dir);
79
115
        DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", {
80
115
            status = Status::Error<INTERNAL_ERROR>("fault_inject spill_stream gc failed");
81
115
        });
82
115
        if (status.ok()) {
83
115
            auto gc_dir = fmt::format("{}/{}", query_gc_dir,
84
115
                                      std::filesystem::path(spill_dir_).filename().string());
85
115
            status = io::global_local_filesystem()->rename(spill_dir_, gc_dir);
86
115
        }
87
115
        if (!status.ok()) {
88
0
            LOG_EVERY_T(WARNING, 1) << fmt::format("failed to gc spill data, dir {}, error: {}",
89
0
                                                   query_gc_dir, status.to_string());
90
0
        }
91
115
    }
92
    // If QueryContext is destructed earlier than PipelineFragmentContext,
93
    // spill_dir_ will be already moved to spill_gc directory.
94
95
    // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be
96
    // clean up as a last resort
97
152
    data_dir_->update_spill_data_usage(-total_written_bytes_);
98
152
    total_written_bytes_ = 0;
99
152
}
100
101
115
Status SpillStream::prepare() {
102
115
    writer_ = std::make_unique<SpillWriter>(state_->get_query_ctx()->resource_ctx(), profile_,
103
115
                                            stream_id_, batch_rows_, data_dir_, spill_dir_);
104
115
    _set_write_counters(profile_);
105
106
115
    reader_ = std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
107
115
                                            writer_->get_file_path());
108
109
115
    DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
110
115
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed");
111
115
    });
112
115
    COUNTER_UPDATE(_total_file_count, 1);
113
115
    if (_current_file_count) {
114
38
        COUNTER_UPDATE(_current_file_count, 1);
115
38
    }
116
115
    return writer_->open();
117
115
}
118
119
20
SpillReaderUPtr SpillStream::create_separate_reader() const {
120
20
    return std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
121
20
                                         writer_->get_file_path());
122
20
}
123
124
0
const TUniqueId& SpillStream::query_id() const {
125
0
    return query_id_;
126
0
}
127
128
0
const std::string& SpillStream::get_spill_root_dir() const {
129
0
    return data_dir_->path();
130
0
}
131
132
159
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) {
133
159
    size_t written_bytes = 0;
134
159
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_block", {
135
159
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_block failed");
136
159
    });
137
157
    RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
138
157
    if (eof) {
139
18
        RETURN_IF_ERROR(spill_eof());
140
139
    } else {
141
139
        total_written_bytes_ = writer_->get_written_bytes();
142
139
    }
143
157
    return Status::OK();
144
157
}
145
146
93
Status SpillStream::spill_eof() {
147
93
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_eof", {
148
93
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_eof failed");
149
93
    });
150
93
    auto status = writer_->close();
151
93
    total_written_bytes_ = writer_->get_written_bytes();
152
93
    writer_.reset();
153
154
93
    if (status.ok()) {
155
93
        _ready_for_reading = true;
156
93
    }
157
93
    return status;
158
93
}
159
160
53
Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
161
53
    DCHECK(reader_ != nullptr);
162
53
    DCHECK(!_is_reading);
163
53
    _is_reading = true;
164
53
    Defer defer([this] { _is_reading = false; });
165
166
53
    DBUG_EXECUTE_IF("fault_inject::spill_stream::read_next_block", {
167
53
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream read_next_block failed");
168
53
    });
169
50
    RETURN_IF_ERROR(reader_->open());
170
50
    return reader_->read(block, eos);
171
50
}
172
173
} // namespace doris