Coverage Report

Created: 2026-04-16 21:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/transformer/writer_assigner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <cstdint>
23
#include <vector>
24
25
#include "exec/connector/skewed_partition_rebalancer.h"
26
27
namespace doris {
28
29
class WriterAssigner {
30
public:
31
1
    virtual ~WriterAssigner() = default;
32
33
    virtual void assign(const std::vector<uint32_t>& partition_ids,
34
                        const std::vector<uint8_t>* mask, size_t rows, size_t block_bytes,
35
                        std::vector<uint32_t>& writer_ids) = 0;
36
};
37
38
class IdentityWriterAssigner final : public WriterAssigner {
39
public:
40
    void assign(const std::vector<uint32_t>& partition_ids, const std::vector<uint8_t>* mask,
41
0
                size_t rows, size_t /*block_bytes*/, std::vector<uint32_t>& writer_ids) override {
42
0
        if (rows == 0) {
43
0
            return;
44
0
        }
45
0
        if (writer_ids.size() != rows && &writer_ids != &partition_ids) {
46
0
            writer_ids.resize(rows);
47
0
        }
48
0
        if (mask == nullptr) {
49
0
            for (size_t i = 0; i < rows; ++i) {
50
0
                writer_ids[i] = partition_ids[i];
51
0
            }
52
0
            return;
53
0
        }
54
0
        for (size_t i = 0; i < rows; ++i) {
55
0
            if ((*mask)[i] == 0) {
56
0
                continue;
57
0
            }
58
0
            writer_ids[i] = partition_ids[i];
59
0
        }
60
0
    }
61
};
62
63
class SkewedWriterAssigner final : public WriterAssigner {
64
public:
65
    SkewedWriterAssigner(int partition_count, int task_count, int task_bucket_count,
66
                         long min_partition_data_processed_rebalance_threshold,
67
                         long min_data_processed_rebalance_threshold)
68
1
            : _rebalancer(partition_count, task_count, task_bucket_count,
69
1
                          min_partition_data_processed_rebalance_threshold,
70
1
                          min_data_processed_rebalance_threshold),
71
1
              _partition_row_counts(partition_count, 0),
72
1
              _partition_writer_ids(partition_count, -1),
73
1
              _partition_writer_indexes(partition_count, 0) {}
74
75
    void assign(const std::vector<uint32_t>& partition_ids, const std::vector<uint8_t>* mask,
76
1
                size_t rows, size_t block_bytes, std::vector<uint32_t>& writer_ids) override {
77
1
        if (rows == 0 || _partition_row_counts.empty()) {
78
0
            return;
79
0
        }
80
1
        if (writer_ids.size() != rows && &writer_ids != &partition_ids) {
81
0
            writer_ids.resize(rows);
82
0
        }
83
84
1
        std::fill(_partition_row_counts.begin(), _partition_row_counts.end(), 0);
85
1
        std::fill(_partition_writer_ids.begin(), _partition_writer_ids.end(), -1);
86
1
        _rebalancer.rebalance();
87
88
1
        const size_t partition_count = _partition_row_counts.size();
89
6
        for (size_t i = 0; i < rows; ++i) {
90
5
            if (mask != nullptr && (*mask)[i] == 0) {
91
2
                continue;
92
2
            }
93
3
            const uint32_t partition_id = partition_ids[i];
94
3
            if (partition_id >= partition_count) {
95
0
                continue;
96
0
            }
97
3
            _partition_row_counts[partition_id] += 1;
98
3
            int writer_id = _partition_writer_ids[partition_id];
99
3
            if (writer_id == -1) {
100
2
                writer_id = _get_next_writer_id(partition_id);
101
2
                _partition_writer_ids[partition_id] = writer_id;
102
2
            }
103
3
            writer_ids[i] = static_cast<uint32_t>(writer_id);
104
3
        }
105
106
513
        for (size_t i = 0; i < partition_count; ++i) {
107
512
            if (_partition_row_counts[i] > 0) {
108
2
                _rebalancer.add_partition_row_count(static_cast<int>(i), _partition_row_counts[i]);
109
2
            }
110
512
        }
111
1
        _rebalancer.add_data_processed(static_cast<long>(block_bytes));
112
1
    }
113
114
private:
115
2
    int _get_next_writer_id(uint32_t partition_id) {
116
2
        return _rebalancer.get_task_id(partition_id, _partition_writer_indexes[partition_id]++);
117
2
    }
118
119
    SkewedPartitionRebalancer _rebalancer;
120
    std::vector<int> _partition_row_counts;
121
    std::vector<int> _partition_writer_ids;
122
    std::vector<int> _partition_writer_indexes;
123
};
124
125
} // namespace doris