Coverage Report

Created: 2026-03-26 19:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/transformer/writer_assigner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <cstdint>
23
#include <vector>
24
25
#include "exec/connector/skewed_partition_rebalancer.h"
26
27
namespace doris {
28
#include "common/compile_check_begin.h"
29
30
class WriterAssigner {
31
public:
32
1
    virtual ~WriterAssigner() = default;
33
34
    virtual void assign(const std::vector<uint32_t>& partition_ids,
35
                        const std::vector<uint8_t>* mask, size_t rows, size_t block_bytes,
36
                        std::vector<uint32_t>& writer_ids) = 0;
37
};
38
39
class IdentityWriterAssigner final : public WriterAssigner {
40
public:
41
    void assign(const std::vector<uint32_t>& partition_ids, const std::vector<uint8_t>* mask,
42
0
                size_t rows, size_t /*block_bytes*/, std::vector<uint32_t>& writer_ids) override {
43
0
        if (rows == 0) {
44
0
            return;
45
0
        }
46
0
        if (writer_ids.size() != rows && &writer_ids != &partition_ids) {
47
0
            writer_ids.resize(rows);
48
0
        }
49
0
        if (mask == nullptr) {
50
0
            for (size_t i = 0; i < rows; ++i) {
51
0
                writer_ids[i] = partition_ids[i];
52
0
            }
53
0
            return;
54
0
        }
55
0
        for (size_t i = 0; i < rows; ++i) {
56
0
            if ((*mask)[i] == 0) {
57
0
                continue;
58
0
            }
59
0
            writer_ids[i] = partition_ids[i];
60
0
        }
61
0
    }
62
};
63
64
class SkewedWriterAssigner final : public WriterAssigner {
65
public:
66
    SkewedWriterAssigner(int partition_count, int task_count, int task_bucket_count,
67
                         long min_partition_data_processed_rebalance_threshold,
68
                         long min_data_processed_rebalance_threshold)
69
1
            : _rebalancer(partition_count, task_count, task_bucket_count,
70
1
                          min_partition_data_processed_rebalance_threshold,
71
1
                          min_data_processed_rebalance_threshold),
72
1
              _partition_row_counts(partition_count, 0),
73
1
              _partition_writer_ids(partition_count, -1),
74
1
              _partition_writer_indexes(partition_count, 0) {}
75
76
    void assign(const std::vector<uint32_t>& partition_ids, const std::vector<uint8_t>* mask,
77
1
                size_t rows, size_t block_bytes, std::vector<uint32_t>& writer_ids) override {
78
1
        if (rows == 0 || _partition_row_counts.empty()) {
79
0
            return;
80
0
        }
81
1
        if (writer_ids.size() != rows && &writer_ids != &partition_ids) {
82
0
            writer_ids.resize(rows);
83
0
        }
84
85
1
        std::fill(_partition_row_counts.begin(), _partition_row_counts.end(), 0);
86
1
        std::fill(_partition_writer_ids.begin(), _partition_writer_ids.end(), -1);
87
1
        _rebalancer.rebalance();
88
89
1
        const size_t partition_count = _partition_row_counts.size();
90
6
        for (size_t i = 0; i < rows; ++i) {
91
5
            if (mask != nullptr && (*mask)[i] == 0) {
92
2
                continue;
93
2
            }
94
3
            const uint32_t partition_id = partition_ids[i];
95
3
            if (partition_id >= partition_count) {
96
0
                continue;
97
0
            }
98
3
            _partition_row_counts[partition_id] += 1;
99
3
            int writer_id = _partition_writer_ids[partition_id];
100
3
            if (writer_id == -1) {
101
2
                writer_id = _get_next_writer_id(partition_id);
102
2
                _partition_writer_ids[partition_id] = writer_id;
103
2
            }
104
3
            writer_ids[i] = static_cast<uint32_t>(writer_id);
105
3
        }
106
107
513
        for (size_t i = 0; i < partition_count; ++i) {
108
512
            if (_partition_row_counts[i] > 0) {
109
2
                _rebalancer.add_partition_row_count(static_cast<int>(i), _partition_row_counts[i]);
110
2
            }
111
512
        }
112
1
        _rebalancer.add_data_processed(static_cast<long>(block_bytes));
113
1
    }
114
115
private:
116
2
    int _get_next_writer_id(uint32_t partition_id) {
117
2
        return _rebalancer.get_task_id(partition_id, _partition_writer_indexes[partition_id]++);
118
2
    }
119
120
    SkewedPartitionRebalancer _rebalancer;
121
    std::vector<int> _partition_row_counts;
122
    std::vector<int> _partition_writer_ids;
123
    std::vector<int> _partition_writer_indexes;
124
};
125
126
#include "common/compile_check_end.h"
127
} // namespace doris