Coverage Report

Created: 2026-05-18 14:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/routine_load/consumer_group_helpers.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <map>
21
#include <string>
22
#include <vector>
23
24
namespace doris {
25
26
// Helper class for partitioning work items (partitions/shards) across consumers
27
template <typename KeyType, typename ValueType>
28
class WorkPartitioner {
29
public:
30
    // Divide work items equally across N consumers using round-robin
31
    static std::vector<std::map<KeyType, ValueType>> partition_round_robin(
32
1
            const std::map<KeyType, ValueType>& work_items, int consumer_count) {
33
1
        std::vector<std::map<KeyType, ValueType>> result(consumer_count);
34
1
        int i = 0;
35
1
        for (const auto& [key, value] : work_items) {
36
1
            int idx = i % consumer_count;
37
1
            result[idx].emplace(key, value);
38
1
            i++;
39
1
        }
40
1
        return result;
41
1
    }
_ZN5doris15WorkPartitionerIilE21partition_round_robinERKSt3mapIilSt4lessIiESaISt4pairIKilEEEi
Line
Count
Source
32
1
            const std::map<KeyType, ValueType>& work_items, int consumer_count) {
33
1
        std::vector<std::map<KeyType, ValueType>> result(consumer_count);
34
1
        int i = 0;
35
1
        for (const auto& [key, value] : work_items) {
36
1
            int idx = i % consumer_count;
37
1
            result[idx].emplace(key, value);
38
1
            i++;
39
1
        }
40
1
        return result;
41
1
    }
Unexecuted instantiation: _ZN5doris15WorkPartitionerINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES6_E21partition_round_robinERKSt3mapIS6_S6_St4lessIS6_ESaISt4pairIKS6_S6_EEEi
42
};
43
44
// Helper class for managing format-based append operations
45
class FormatAppender {
46
public:
47
    // Get the appropriate append function pointer based on format type
48
    template <typename PipeType>
49
    static auto get_append_function(TFileFormatType::type format)
50
0
            -> Status (PipeType::*)(const char*, size_t) {
51
0
        return (format == TFileFormatType::FORMAT_JSON) ? &PipeType::append_json
52
0
                                                        : &PipeType::append_with_line_delimiter;
53
0
    }
54
};
55
56
// Helper class for tracking consumption progress
57
class ConsumptionProgress {
58
public:
59
    ConsumptionProgress(int64_t max_time_ms, int64_t max_rows, int64_t max_bytes)
60
            : _initial_time(max_time_ms),
61
              _initial_rows(max_rows),
62
              _initial_bytes(max_bytes),
63
              _left_time(max_time_ms),
64
              _left_rows(max_rows),
65
0
              _left_bytes(max_bytes) {}
66
67
    // Check if any limit is reached
68
0
    bool is_limit_reached() const { return _left_time <= 0 || _left_rows <= 0 || _left_bytes <= 0; }
69
70
    // Update progress after consuming one item
71
0
    void consume_item(int64_t bytes) {
72
0
        _left_rows--;
73
0
        _left_bytes -= bytes;
74
0
    }
75
76
    // Update time progress
77
0
    void update_time(int64_t elapsed_us) { _left_time = _initial_time - elapsed_us / 1000; }
78
79
    // Getters
80
0
    int64_t left_time() const { return _left_time; }
81
0
    int64_t left_rows() const { return _left_rows; }
82
0
    int64_t left_bytes() const { return _left_bytes; }
83
0
    int64_t consumed_rows() const { return _initial_rows - _left_rows; }
84
0
    int64_t consumed_bytes() const { return _initial_bytes - _left_bytes; }
85
0
    int64_t consumed_time() const { return _initial_time - _left_time; }
86
87
private:
88
    int64_t _initial_time;
89
    int64_t _initial_rows;
90
    int64_t _initial_bytes;
91
    int64_t _left_time;
92
    int64_t _left_rows;
93
    int64_t _left_bytes;
94
};
95
96
} // namespace doris