Coverage Report

Created: 2026-03-27 18:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_block_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <arrow/type.h>
21
#include <cctz/time_zone.h>
22
#include <gen_cpp/PaloInternalService_types.h>
23
#include <gen_cpp/Types_types.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <cstdint>
28
#include <deque>
29
#include <list>
30
#include <memory>
31
#include <mutex>
32
#include <unordered_map>
33
34
#include "common/status.h"
35
#include "runtime/runtime_state.h"
36
37
namespace google::protobuf {
38
class Closure;
39
} // namespace google::protobuf
40
41
namespace brpc {
42
class Controller;
43
}
44
45
namespace doris {
46
47
class Dependency;
48
49
class GetArrowResultBatchCtx;
50
class Block;
51
52
class PFetchDataResult;
53
54
class ResultBlockBufferBase {
55
public:
56
13
    ResultBlockBufferBase() = default;
57
13
    virtual ~ResultBlockBufferBase() = default;
58
59
    // Close one fragment instance's contribution to this buffer.  When the last
60
    // registered instance calls close(), |is_fully_closed| is set to true,
61
    // indicating that no more producers will write to this buffer and callers may
62
    // safely schedule deferred cleanup.  The buffer is keyed in ResultBufferMgr
63
    // under buffer_id(); use that id (not the per-instance fragment_instance_id)
64
    // when scheduling cancel_at_time() for the deferred cleanup.
65
    virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
66
                         bool& is_fully_closed) = 0;
67
    virtual void cancel(const Status& reason) = 0;
68
69
    // The id under which this buffer was registered in ResultBufferMgr.
70
    // In parallel result-sink mode this equals query_id; in non-parallel mode
71
    // it equals fragment_instance_id.
72
    [[nodiscard]] virtual const TUniqueId& buffer_id() const = 0;
73
74
    [[nodiscard]] virtual std::shared_ptr<MemTrackerLimiter> mem_tracker() = 0;
75
    virtual void set_dependency(const TUniqueId& id,
76
                                std::shared_ptr<Dependency> result_sink_dependency) = 0;
77
};
78
79
// This is used to serialize a result block by normal queries / arrow flight queries / point queries.
80
template <typename ResultCtxType>
81
class ResultBlockBuffer : public ResultBlockBufferBase {
82
public:
83
    using InBlockType = typename ResultCtxType::ResultType;
84
    ResultBlockBuffer(TUniqueId id, RuntimeState* state, int buffer_size);
85
13
    ~ResultBlockBuffer() override = default;
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEED2Ev
Line
Count
Source
85
8
    ~ResultBlockBuffer() override = default;
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEED2Ev
Line
Count
Source
85
5
    ~ResultBlockBuffer() override = default;
86
87
    Status add_batch(RuntimeState* state, std::shared_ptr<InBlockType>& result);
88
    Status get_batch(std::shared_ptr<ResultCtxType> ctx);
89
    Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
90
                 bool& is_fully_closed) override;
91
    void cancel(const Status& reason) override;
92
93
0
    [[nodiscard]] const TUniqueId& buffer_id() const override { return _fragment_id; }
Unexecuted instantiation: _ZNK5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9buffer_idEv
Unexecuted instantiation: _ZNK5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9buffer_idEv
94
0
    [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() override { return _mem_tracker; }
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE11mem_trackerEv
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE11mem_trackerEv
95
    void set_dependency(const TUniqueId& id,
96
                        std::shared_ptr<Dependency> result_sink_dependency) override;
97
98
protected:
99
    friend class GetArrowResultBatchCtx;
100
    ResultBlockBuffer(RuntimeState* state)
101
1
            : ResultBlockBuffer<ResultCtxType>(TUniqueId(), state, 0) {}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2EPNS_12RuntimeStateE
Line
Count
Source
101
1
            : ResultBlockBuffer<ResultCtxType>(TUniqueId(), state, 0) {}
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2EPNS_12RuntimeStateE
102
    void _update_dependency();
103
104
    using ResultQueue = std::list<std::shared_ptr<InBlockType>>;
105
106
    // result's query id
107
    TUniqueId _fragment_id;
108
    bool _is_close;
109
    Status _status;
110
    // Producer. blocking queue for result batch waiting to sent to FE by _waiting_rpc.
111
    ResultQueue _result_batch_queue;
112
    // protects all subsequent data in this block
113
    std::mutex _lock;
114
115
    // The last batch size in bytes.
116
    // Determine whether to merge multiple batches based on the size of each batch to avoid getting an excessively large batch after merging.
117
    size_t _last_batch_bytes = 0;
118
119
    // get arrow flight result is a sync method, need wait for data ready and return result.
120
    // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
121
    std::condition_variable _arrow_data_arrival;
122
    // Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue filled, the rpc could be sent.
123
    std::deque<std::shared_ptr<ResultCtxType>> _waiting_rpc;
124
125
    std::atomic<int64_t> _returned_rows = 0;
126
    // instance id to dependency
127
    std::unordered_map<TUniqueId, std::shared_ptr<Dependency>> _result_sink_dependencies;
128
    std::unordered_map<TUniqueId, size_t> _instance_rows;
129
    std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
130
    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
131
    int _packet_num = 0;
132
    const int _batch_size;
133
    const std::string _timezone;
134
    const int _be_exec_version;
135
    const segment_v2::CompressionTypePB _fragment_transmission_compression_type;
136
    const int _buffer_limit;
137
};
138
139
} // namespace doris