Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream_mgr.h"
19
20
#include <brpc/stream.h>
21
22
#include "load/channel/load_channel.h"
23
#include "load/channel/load_stream.h"
24
#include "runtime/exec_env.h"
25
#include "storage/rowset/rowset_factory.h"
26
#include "storage/rowset/rowset_meta.h"
27
#include "storage/storage_engine.h"
28
#include "storage/tablet/tablet_manager.h"
29
#include "util/lru_cache.h"
30
#include "util/uid_util.h"
31
32
namespace doris {
33
34
21
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
35
21
    static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
36
21
                              .set_min_threads(segment_file_writer_thread_num)
37
21
                              .set_max_threads(segment_file_writer_thread_num)
38
21
                              .build(&_file_writer_thread_pool));
39
21
}
40
41
17
LoadStreamMgr::~LoadStreamMgr() {
42
17
    _load_streams_map.clear();
43
17
    _file_writer_thread_pool->shutdown();
44
17
}
45
46
Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request,
47
50
                                       LoadStream*& load_stream) {
48
50
    UniqueId load_id(request->load_id());
49
50
50
    {
51
50
        std::lock_guard l(_lock);
52
50
        auto it = _load_streams_map.find(load_id);
53
50
        if (it != _load_streams_map.end()) {
54
19
            load_stream = it->second.get();
55
31
        } else {
56
31
            auto p = std::make_unique<LoadStream>(request->load_id(), this,
57
31
                                                  request->enable_profile());
58
31
            RETURN_IF_ERROR(p->init(request));
59
31
            load_stream = p.get();
60
31
            _load_streams_map[load_id] = std::move(p);
61
31
        }
62
50
        load_stream->add_source(request->src_id());
63
50
    }
64
0
    return Status::OK();
65
50
}
66
67
31
void LoadStreamMgr::clear_load(UniqueId load_id) {
68
31
    std::lock_guard<decltype(_lock)> l(_lock);
69
31
    _load_streams_map.erase(load_id);
70
31
}
71
72
} // namespace doris