Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_block_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <arrow/type.h>
21
#include <cctz/time_zone.h>
22
#include <gen_cpp/PaloInternalService_types.h>
23
#include <gen_cpp/Types_types.h>
24
25
#include <atomic>
26
#include <condition_variable>
27
#include <cstdint>
28
#include <deque>
29
#include <list>
30
#include <memory>
31
#include <mutex>
32
#include <unordered_map>
33
34
#include "common/status.h"
35
#include "runtime/runtime_state.h"
36
37
namespace google::protobuf {
38
class Closure;
39
} // namespace google::protobuf
40
41
namespace brpc {
42
class Controller;
43
}
44
45
namespace doris {
46
47
class Dependency;
48
49
class GetArrowResultBatchCtx;
50
class Block;
51
52
class PFetchDataResult;
53
54
class ResultBlockBufferBase {
55
public:
56
13
    ResultBlockBufferBase() = default;
57
13
    virtual ~ResultBlockBufferBase() = default;
58
59
    virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) = 0;
60
    virtual void cancel(const Status& reason) = 0;
61
62
    [[nodiscard]] virtual std::shared_ptr<MemTrackerLimiter> mem_tracker() = 0;
63
    virtual void set_dependency(const TUniqueId& id,
64
                                std::shared_ptr<Dependency> result_sink_dependency) = 0;
65
};
66
67
// This is used to serialize a result block by normal queries / arrow flight queries / point queries.
68
template <typename ResultCtxType>
69
class ResultBlockBuffer : public ResultBlockBufferBase {
70
public:
71
    using InBlockType = typename ResultCtxType::ResultType;
72
    ResultBlockBuffer(TUniqueId id, RuntimeState* state, int buffer_size);
73
13
    ~ResultBlockBuffer() override = default;
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEED2Ev
Line
Count
Source
73
8
    ~ResultBlockBuffer() override = default;
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEED2Ev
Line
Count
Source
73
5
    ~ResultBlockBuffer() override = default;
74
75
    Status add_batch(RuntimeState* state, std::shared_ptr<InBlockType>& result);
76
    Status get_batch(std::shared_ptr<ResultCtxType> ctx);
77
    Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) override;
78
    void cancel(const Status& reason) override;
79
80
0
    [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() override { return _mem_tracker; }
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE11mem_trackerEv
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE11mem_trackerEv
81
    void set_dependency(const TUniqueId& id,
82
                        std::shared_ptr<Dependency> result_sink_dependency) override;
83
84
protected:
85
    friend class GetArrowResultBatchCtx;
86
    ResultBlockBuffer(RuntimeState* state)
87
1
            : ResultBlockBuffer<ResultCtxType>(TUniqueId(), state, 0) {}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2EPNS_12RuntimeStateE
Line
Count
Source
87
1
            : ResultBlockBuffer<ResultCtxType>(TUniqueId(), state, 0) {}
Unexecuted instantiation: _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2EPNS_12RuntimeStateE
88
    void _update_dependency();
89
90
    using ResultQueue = std::list<std::shared_ptr<InBlockType>>;
91
92
    // result's query id
93
    TUniqueId _fragment_id;
94
    bool _is_close;
95
    Status _status;
96
    // Producer. blocking queue for result batch waiting to sent to FE by _waiting_rpc.
97
    ResultQueue _result_batch_queue;
98
    // protects all subsequent data in this block
99
    std::mutex _lock;
100
101
    // The last batch size in bytes.
102
    // Determine whether to merge multiple batches based on the size of each batch to avoid getting an excessively large batch after merging.
103
    size_t _last_batch_bytes = 0;
104
105
    // get arrow flight result is a sync method, need wait for data ready and return result.
106
    // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
107
    std::condition_variable _arrow_data_arrival;
108
    // Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue filled, the rpc could be sent.
109
    std::deque<std::shared_ptr<ResultCtxType>> _waiting_rpc;
110
111
    std::atomic<int64_t> _returned_rows = 0;
112
    // instance id to dependency
113
    std::unordered_map<TUniqueId, std::shared_ptr<Dependency>> _result_sink_dependencies;
114
    std::unordered_map<TUniqueId, size_t> _instance_rows;
115
    std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
116
    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
117
    int _packet_num = 0;
118
    const int _batch_size;
119
    const std::string _timezone;
120
    const int _be_exec_version;
121
    const segment_v2::CompressionTypePB _fragment_transmission_compression_type;
122
    const int _buffer_limit;
123
};
124
125
} // namespace doris