Coverage Report

Created: 2026-04-09 15:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/vec/sink/vpaimon_table_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/DataSinks_types.h>
21
22
#include <memory>
23
#include <string>
24
#include <unordered_map>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "exec/sink/writer/async_result_writer.h"
30
#include "exprs/vexpr_fwd.h"
31
#include "runtime/runtime_profile.h"
32
33
#ifdef WITH_PAIMON_CPP
34
namespace paimon {
35
class FileStoreWrite;
36
class MemoryPool;
37
} // namespace paimon
38
#endif
39
40
namespace doris {
41
42
class ObjectPool;
43
class RuntimeProfile;
44
class RuntimeState;
45
46
namespace vectorized {
47
48
class VPaimonPartitionWriter;
49
50
// Table-level writer responsible for routing rows to partition writers
51
// according to TPaimonWriteShuffleMode and FE-provided bucket metadata.
52
class VPaimonTableWriter : public AsyncResultWriter {
53
public:
54
    VPaimonTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs);
55
    VPaimonTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
56
                       std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
57
58
    ~VPaimonTableWriter() override;
59
60
    Status init_properties(ObjectPool* pool);
61
62
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
63
64
    Status write(RuntimeState* state, ::doris::Block& block) override;
65
66
    Status close(Status status) override;
67
68
protected:
69
    const TDataSink _t_sink;
70
71
private:
72
    struct WriteKey {
73
        std::vector<std::string> partition_values;
74
        int32_t bucket_id = -1;
75
76
0
        bool operator==(const WriteKey& rhs) const {
77
0
            return bucket_id == rhs.bucket_id && partition_values == rhs.partition_values;
78
0
        }
79
    };
80
81
    struct WriteKeyHash {
82
0
        size_t operator()(const WriteKey& key) const {
83
0
            std::hash<int32_t> int_hasher;
84
0
            std::hash<std::string> string_hasher;
85
0
            size_t hash = int_hasher(key.bucket_id);
86
0
            for (const auto& v : key.partition_values) {
87
0
                hash = hash * 31 + string_hasher(v);
88
0
            }
89
0
            return hash;
90
0
        }
91
    };
92
93
    Status _extract_write_key(const ::doris::Block& block, int row, WriteKey* key) const;
94
95
    Status _get_or_create_writer(const WriteKey& key,
96
                                 std::shared_ptr<VPaimonPartitionWriter>* writer);
97
98
    Status _filter_block(::doris::Block& block, const IColumn::Filter* filter,
99
                         ::doris::Block* output_block);
100
101
    RuntimeState* _state = nullptr;
102
    RuntimeProfile* _profile = nullptr;
103
104
    RuntimeProfile::Counter* _written_rows_counter = nullptr;
105
    RuntimeProfile::Counter* _written_bytes_counter = nullptr;
106
    RuntimeProfile::Counter* _send_data_timer = nullptr;
107
    RuntimeProfile::Counter* _project_timer = nullptr;
108
    RuntimeProfile::Counter* _bucket_calc_timer = nullptr;
109
    RuntimeProfile::Counter* _partition_writers_dispatch_timer = nullptr;
110
    RuntimeProfile::Counter* _partition_writers_write_timer = nullptr;
111
    RuntimeProfile::Counter* _partition_writers_count = nullptr;
112
    RuntimeProfile::Counter* _partition_writer_created = nullptr;
113
    RuntimeProfile::Counter* _open_timer = nullptr;
114
    RuntimeProfile::Counter* _close_timer = nullptr;
115
    RuntimeProfile::Counter* _prepare_commit_timer = nullptr;
116
    RuntimeProfile::Counter* _serialize_commit_messages_timer = nullptr;
117
    RuntimeProfile::Counter* _commit_payload_bytes_counter = nullptr;
118
119
    std::unordered_map<WriteKey, std::shared_ptr<VPaimonPartitionWriter>, WriteKeyHash> _writers;
120
121
    mutable bool _partition_indices_inited = false;
122
    mutable std::vector<int> _partition_column_indices;
123
124
#ifdef WITH_PAIMON_CPP
125
    std::shared_ptr<::paimon::MemoryPool> _pool;
126
    std::unique_ptr<::paimon::FileStoreWrite> _file_store_write;
127
    size_t _row_count = 0;
128
#endif
129
};
130
131
} // namespace vectorized
132
} // namespace doris