Coverage Report

Created: 2025-09-15 20:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/pipeline/task_scheduler.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <chrono>
22
#include <condition_variable>
23
#include <cstddef>
24
#include <list>
25
#include <memory>
26
#include <mutex>
27
#include <utility>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "pipeline_task.h"
32
#include "runtime/query_context.h"
33
#include "runtime/workload_group/workload_group.h"
34
#include "task_queue.h"
35
#include "util/thread.h"
36
#include "util/uid_util.h"
37
38
namespace doris {
39
class ExecEnv;
40
class ThreadPool;
41
} // namespace doris
42
43
namespace doris::pipeline {
44
45
class HybridTaskScheduler;
46
class TaskScheduler {
47
public:
48
    virtual ~TaskScheduler();
49
50
    virtual Status submit(PipelineTaskSPtr task);
51
52
    virtual Status start();
53
54
    virtual void stop();
55
56
0
    virtual std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() {
57
0
        return {{_name, _fix_thread_pool->debug_info()}};
58
0
    }
59
60
private:
61
    friend class HybridTaskScheduler;
62
63
    TaskScheduler(int core_num, std::string name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
64
0
            : _name(std::move(name)),
65
0
              _task_queue(core_num),
66
0
              _num_threads(core_num),
67
0
              _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
68
20
    TaskScheduler() : _task_queue(0), _num_threads(0) {}
69
    std::string _name;
70
    std::unique_ptr<ThreadPool> _fix_thread_pool;
71
72
    MultiCoreTaskQueue _task_queue;
73
    bool _need_to_stop = false;
74
    bool _shutdown = false;
75
    const int _num_threads;
76
    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
77
78
    void _do_work(int index);
79
};
80
81
class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
82
public:
83
    HybridTaskScheduler(int core_num, std::string name,
84
                        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
85
0
            : _blocking_scheduler(core_num * 2, name + "_blocking_scheduler", cgroup_cpu_ctl),
86
0
              _simple_scheduler(core_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
87
88
    Status submit(PipelineTaskSPtr task) override;
89
90
    Status start() override;
91
92
    void stop() override;
93
94
0
    std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() override {
95
0
        return {_blocking_scheduler.thread_debug_info()[0],
96
0
                _simple_scheduler.thread_debug_info()[0]};
97
0
    }
98
99
private:
100
    TaskScheduler _blocking_scheduler;
101
    TaskScheduler _simple_scheduler;
102
};
103
} // namespace doris::pipeline