Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_writer.h"
19
20
#include "agent/be_exec_version_manager.h"
21
#include "common/status.h"
22
#include "exec/spill/spill_stream_manager.h"
23
#include "io/fs/local_file_system.h"
24
#include "io/fs/local_file_writer.h"
25
#include "runtime/exec_env.h"
26
#include "runtime/runtime_state.h"
27
#include "runtime/thread_context.h"
28
29
namespace doris {
30
#include "common/compile_check_begin.h"
31
2.45k
Status SpillWriter::open() {
32
2.45k
    if (file_writer_) {
33
0
        return Status::OK();
34
0
    }
35
2.45k
    return io::global_local_filesystem()->create_file(file_path_, &file_writer_);
36
2.45k
}
37
38
2.42k
Status SpillWriter::close() {
39
2.42k
    if (closed_ || !file_writer_) {
40
0
        return Status::OK();
41
0
    }
42
2.42k
    closed_ = true;
43
44
2.42k
    meta_.append((const char*)&max_sub_block_size_, sizeof(max_sub_block_size_));
45
2.42k
    meta_.append((const char*)&written_blocks_, sizeof(written_blocks_));
46
47
    // meta: block1 offset, block2 offset, ..., blockn offset, max_sub_block_size, n
48
2.42k
    {
49
2.42k
        SCOPED_TIMER(_write_file_timer);
50
2.42k
        RETURN_IF_ERROR(file_writer_->append(meta_));
51
2.42k
    }
52
53
2.42k
    total_written_bytes_ += meta_.size();
54
2.42k
    COUNTER_UPDATE(_write_file_total_size, meta_.size());
55
2.42k
    if (_resource_ctx) {
56
2.42k
        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_.size());
57
2.42k
    }
58
2.42k
    if (_write_file_current_size) {
59
34
        COUNTER_UPDATE(_write_file_current_size, meta_.size());
60
34
    }
61
2.42k
    data_dir_->update_spill_data_usage(meta_.size());
62
2.42k
    ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size());
63
64
2.42k
    RETURN_IF_ERROR(file_writer_->close());
65
66
2.42k
    file_writer_.reset();
67
2.42k
    return Status::OK();
68
2.42k
}
69
70
11.9k
Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& written_bytes) {
71
11.9k
    written_bytes = 0;
72
11.9k
    DCHECK(file_writer_);
73
11.9k
    auto rows = block.rows();
74
11.9k
    COUNTER_UPDATE(_write_rows_counter, rows);
75
11.9k
    COUNTER_UPDATE(_write_block_bytes_counter, block.bytes());
76
    // file format: block1, block2, ..., blockn, meta
77
11.9k
    if (rows <= batch_size_) {
78
11.9k
        return _write_internal(block, written_bytes);
79
11.9k
    } else {
80
0
        auto tmp_block = block.clone_empty();
81
0
        const auto& src_data = block.get_columns_with_type_and_name();
82
83
0
        for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) {
84
0
            tmp_block.clear_column_data();
85
86
0
            const auto& dst_data = tmp_block.get_columns_with_type_and_name();
87
88
0
            size_t block_rows = std::min(rows - row_idx, batch_size_);
89
0
            RETURN_IF_CATCH_EXCEPTION({
90
0
                for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) {
91
0
                    dst_data[col_idx].column->assume_mutable()->insert_range_from(
92
0
                            *src_data[col_idx].column, row_idx, block_rows);
93
0
                }
94
0
            });
95
96
0
            int64_t tmp_blcok_mem = tmp_block.allocated_bytes();
97
0
            COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem);
98
0
            Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -tmp_blcok_mem); }};
99
0
            RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes));
100
101
0
            row_idx += block_rows;
102
0
        }
103
0
        return Status::OK();
104
0
    }
105
11.9k
}
106
107
11.9k
Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) {
108
11.9k
    size_t uncompressed_bytes = 0, compressed_bytes = 0;
109
110
11.9k
    Status status;
111
11.9k
    std::string buff;
112
11.9k
    int64_t buff_size {0};
113
114
11.9k
    if (block.rows() > 0) {
115
11.9k
        {
116
11.9k
            PBlock pblock;
117
11.9k
            SCOPED_TIMER(_serialize_timer);
118
11.9k
            int64_t compressed_time = 0;
119
11.9k
            status = block.serialize(
120
11.9k
                    BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes,
121
11.9k
                    &compressed_bytes, &compressed_time,
122
11.9k
                    segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio
123
11.9k
            RETURN_IF_ERROR(status);
124
11.9k
            int64_t pblock_mem = pblock.ByteSizeLong();
125
11.9k
            COUNTER_UPDATE(_memory_used_counter, pblock_mem);
126
11.9k
            Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -pblock_mem); }};
127
11.9k
            if (!pblock.SerializeToString(&buff)) {
128
0
                return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
129
0
                        "serialize spill data error. [path={}]", file_path_);
130
0
            }
131
11.9k
            buff_size = buff.size();
132
11.9k
            COUNTER_UPDATE(_memory_used_counter, buff_size);
133
11.9k
            Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, -buff_size); }};
134
11.9k
        }
135
11.9k
        if (data_dir_->reach_capacity_limit(buff_size)) {
136
0
            return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
137
0
                    "spill data total size exceed limit, path: {}, size limit: {}, spill data "
138
0
                    "size: {}",
139
0
                    data_dir_->path(),
140
0
                    PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()),
141
0
                    PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes()));
142
0
        }
143
144
11.9k
        {
145
11.9k
            Defer defer {[&]() {
146
11.9k
                if (status.ok()) {
147
11.9k
                    data_dir_->update_spill_data_usage(buff_size);
148
11.9k
                    ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size);
149
150
11.9k
                    written_bytes += buff_size;
151
11.9k
                    max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size);
152
153
11.9k
                    meta_.append((const char*)&total_written_bytes_, sizeof(size_t));
154
11.9k
                    COUNTER_UPDATE(_write_file_total_size, buff_size);
155
11.9k
                    if (_resource_ctx) {
156
11.9k
                        _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(
157
11.9k
                                buff_size);
158
11.9k
                    }
159
11.9k
                    if (_write_file_current_size) {
160
78
                        COUNTER_UPDATE(_write_file_current_size, buff_size);
161
78
                    }
162
11.9k
                    COUNTER_UPDATE(_write_block_counter, 1);
163
11.9k
                    total_written_bytes_ += buff_size;
164
11.9k
                    ++written_blocks_;
165
11.9k
                }
166
11.9k
            }};
167
11.9k
            {
168
11.9k
                SCOPED_TIMER(_write_file_timer);
169
11.9k
                status = file_writer_->append(buff);
170
11.9k
                RETURN_IF_ERROR(status);
171
11.9k
            }
172
11.9k
        }
173
11.9k
    }
174
175
11.9k
    return status;
176
11.9k
}
177
178
} // namespace doris