Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/multi_table_pipe.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "io/fs/kafka_consumer_pipe.h"
21
#include "io/fs/multi_table_pipe.h"
22
#include "load/stream_load/stream_load_context.h"
23
24
namespace doris {
25
namespace io {
26
27
class MultiTablePipe;
28
using AppendFunc = Status (KafkaConsumerPipe::*)(const char* data, size_t size);
29
using KafkaConsumerPipePtr = std::shared_ptr<io::KafkaConsumerPipe>;
30
31
class MultiTablePipe : public KafkaConsumerPipe {
32
public:
33
    MultiTablePipe(std::shared_ptr<StreamLoadContext> ctx, size_t max_buffered_bytes = 1024 * 1024,
34
                   size_t min_chunk_size = 64 * 1024)
35
7
            : KafkaConsumerPipe(max_buffered_bytes, min_chunk_size), _ctx(ctx.get()) {}
36
37
7
    ~MultiTablePipe() override = default;
38
39
    Status append_with_line_delimiter(const char* data, size_t size) override;
40
41
    Status append_json(const char* data, size_t size) override;
42
43
    // for pipe consumers, i.e. scanners, to get underlying KafkaConsumerPipes
44
    KafkaConsumerPipePtr get_pipe_by_table(const std::string& table);
45
46
    // request and execute plans for unplanned pipes
47
    Status request_and_exec_plans();
48
49
6
    void handle_consume_finished() {
50
6
        _set_consume_finished();
51
6
        auto inflight_cnt = _inflight_cnt.fetch_sub(1);
52
6
        if (inflight_cnt == 1) {
53
1
            _handle_consumer_finished();
54
1
        }
55
6
    }
56
57
5
    bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); }
58
59
    Status finish() override;
60
61
    void cancel(const std::string& reason) override;
62
63
private:
64
    // parse table name from data
65
    std::string parse_dst_table(const char* data, size_t size);
66
67
    // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
68
    Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb);
69
70
    Status exec_plans(ExecEnv* exec_env, const std::vector<TPipelineFragmentParams>& params);
71
72
6
    void _set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }
73
74
    void _handle_consumer_finished();
75
76
private:
77
    std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _planned_tables;
78
    std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _unplanned_tables;
79
    std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
80
    // inflight count, when it is zero, means consume and all plans is finished
81
    std::atomic<uint64_t> _inflight_cnt {1};
82
    std::atomic<bool> _consume_finished {false};
83
    // note: Use raw pointer here to avoid cycle reference with StreamLoadContext.
84
    // Life cycle of MultiTablePipe is under control of StreamLoadContext, which means StreamLoadContext is created
85
    // before NultiTablePipe and released after it. It is safe to use raw pointer here.
86
    StreamLoadContext* _ctx = nullptr;
87
    Status _status; // save the first error status of all executing plan fragment
88
89
    std::mutex _tablet_commit_infos_lock;
90
    std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment
91
    std::atomic<int64_t> _number_total_rows {0};
92
    std::atomic<int64_t> _number_loaded_rows {0};
93
    std::atomic<int64_t> _number_filtered_rows {0};
94
    std::atomic<int64_t> _number_unselected_rows {0};
95
96
    std::mutex _pipe_map_lock;
97
    std::mutex _callback_lock;
98
    std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
99
100
    uint32_t _row_threshold = config::multi_table_batch_plan_threshold;
101
    uint32_t _wait_tables_threshold = config::multi_table_max_wait_tables;
102
};
103
} // namespace io
104
} // end namespace doris