Coverage Report

Created: 2026-03-14 18:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_stream.h"
19
20
#include <glog/logging.h>
21
22
#include <memory>
23
#include <mutex>
24
#include <utility>
25
26
#include "core/block/block.h"
27
#include "exec/spill/spill_reader.h"
28
#include "exec/spill/spill_stream_manager.h"
29
#include "exec/spill/spill_writer.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/query_context.h"
33
#include "runtime/runtime_profile.h"
34
#include "runtime/runtime_state.h"
35
#include "runtime/thread_context.h"
36
#include "util/debug_points.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
41
                         std::string spill_dir, size_t batch_rows, size_t batch_bytes,
42
                         RuntimeProfile* operator_profile)
43
1.92k
        : state_(state),
44
1.92k
          stream_id_(stream_id),
45
1.92k
          data_dir_(data_dir),
46
1.92k
          spill_dir_(std::move(spill_dir)),
47
1.92k
          batch_rows_(batch_rows),
48
1.92k
          batch_bytes_(batch_bytes),
49
1.92k
          query_id_(state->query_id()),
50
1.92k
          profile_(operator_profile) {
51
1.92k
    RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters");
52
1.92k
    DCHECK(custom_profile != nullptr);
53
1.92k
    _total_file_count = custom_profile->get_counter("SpillWriteFileTotalCount");
54
1.92k
    _current_file_count = custom_profile->get_counter("SpillWriteFileCurrentCount");
55
1.92k
    _current_file_size = custom_profile->get_counter("SpillWriteFileCurrentBytes");
56
1.92k
}
57
58
1.82k
void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
59
1.82k
    _current_file_count = source_op_profile->get_counter("SpillWriteFileCurrentCount");
60
1.82k
    _current_file_size = source_op_profile->get_counter("SpillWriteFileCurrentBytes");
61
1.82k
}
62
63
1.92k
SpillStream::~SpillStream() {
64
1.92k
    gc();
65
1.92k
}
66
67
3.76k
void SpillStream::gc() {
68
3.76k
    if (_current_file_size) {
69
3.68k
        COUNTER_UPDATE(_current_file_size, -total_written_bytes_);
70
3.68k
    }
71
3.76k
    bool exists = false;
72
3.76k
    auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
73
3.76k
    if (status.ok() && exists) {
74
1.92k
        if (_current_file_count) {
75
1.84k
            COUNTER_UPDATE(_current_file_count, -1);
76
1.84k
        }
77
1.92k
        auto query_gc_dir = data_dir_->get_spill_data_gc_path(print_id(query_id_));
78
1.92k
        status = io::global_local_filesystem()->create_directory(query_gc_dir);
79
1.92k
        DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", {
80
1.92k
            status = Status::Error<INTERNAL_ERROR>("fault_inject spill_stream gc failed");
81
1.92k
        });
82
1.92k
        if (status.ok()) {
83
1.92k
            auto gc_dir = fmt::format("{}/{}", query_gc_dir,
84
1.92k
                                      std::filesystem::path(spill_dir_).filename().string());
85
1.92k
            status = io::global_local_filesystem()->rename(spill_dir_, gc_dir);
86
1.92k
        }
87
1.92k
        if (!status.ok()) {
88
0
            LOG_EVERY_T(WARNING, 1) << fmt::format("failed to gc spill data, dir {}, error: {}",
89
0
                                                   query_gc_dir, status.to_string());
90
0
        }
91
1.92k
    }
92
    // If QueryContext is destructed earlier than PipelineFragmentContext,
93
    // spill_dir_ will be already moved to spill_gc directory.
94
95
    // decrease spill data usage anyway, since in ~QueryContext() spill data of the query will be
96
    // clean up as a last resort
97
3.76k
    data_dir_->update_spill_data_usage(-total_written_bytes_);
98
3.76k
    total_written_bytes_ = 0;
99
3.76k
}
100
101
1.92k
Status SpillStream::prepare() {
102
1.92k
    writer_ = std::make_unique<SpillWriter>(state_->get_query_ctx()->resource_ctx(), profile_,
103
1.92k
                                            stream_id_, batch_rows_, data_dir_, spill_dir_);
104
1.92k
    _set_write_counters(profile_);
105
106
1.92k
    reader_ = std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
107
1.92k
                                            writer_->get_file_path());
108
109
1.92k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
110
1.92k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed");
111
1.92k
    });
112
1.92k
    COUNTER_UPDATE(_total_file_count, 1);
113
1.92k
    if (_current_file_count) {
114
38
        COUNTER_UPDATE(_current_file_count, 1);
115
38
    }
116
1.92k
    return writer_->open();
117
1.92k
}
118
119
20
SpillReaderUPtr SpillStream::create_separate_reader() const {
120
20
    return std::make_unique<SpillReader>(state_->get_query_ctx()->resource_ctx(), stream_id_,
121
20
                                         writer_->get_file_path());
122
20
}
123
124
0
const TUniqueId& SpillStream::query_id() const {
125
0
    return query_id_;
126
0
}
127
128
0
const std::string& SpillStream::get_spill_root_dir() const {
129
0
    return data_dir_->path();
130
0
}
131
132
8.58k
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) {
133
8.58k
    size_t written_bytes = 0;
134
8.58k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_block", {
135
8.58k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_block failed");
136
8.58k
    });
137
8.58k
    RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
138
8.58k
    if (eof) {
139
64
        RETURN_IF_ERROR(spill_eof());
140
8.52k
    } else {
141
8.52k
        total_written_bytes_ = writer_->get_written_bytes();
142
8.52k
    }
143
8.58k
    return Status::OK();
144
8.58k
}
145
146
1.90k
Status SpillStream::spill_eof() {
147
1.90k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::spill_eof", {
148
1.90k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream spill_eof failed");
149
1.90k
    });
150
1.90k
    auto status = writer_->close();
151
1.90k
    total_written_bytes_ = writer_->get_written_bytes();
152
1.90k
    writer_.reset();
153
154
1.90k
    if (status.ok()) {
155
1.90k
        _ready_for_reading = true;
156
1.90k
    }
157
1.90k
    return status;
158
1.90k
}
159
160
10.2k
Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
161
10.2k
    DCHECK(reader_ != nullptr);
162
10.2k
    DCHECK(!_is_reading);
163
10.2k
    _is_reading = true;
164
10.2k
    Defer defer([this] { _is_reading = false; });
165
166
10.2k
    DBUG_EXECUTE_IF("fault_inject::spill_stream::read_next_block", {
167
10.2k
        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream read_next_block failed");
168
10.2k
    });
169
10.2k
    RETURN_IF_ERROR(reader_->open());
170
10.2k
    return reader_->read(block, eos);
171
10.2k
}
172
173
} // namespace doris