Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/DataSinks_types.h> |
21 | | |
22 | | #include <memory> |
23 | | #include <string> |
24 | | #include <unordered_map> |
25 | | #include <vector> |
26 | | |
27 | | #include "common/status.h" |
28 | | #include "core/block/block.h" |
29 | | #include "exec/sink/writer/async_result_writer.h" |
30 | | #include "exprs/vexpr_fwd.h" |
31 | | #include "runtime/runtime_profile.h" |
32 | | |
33 | | #ifdef WITH_PAIMON_CPP |
34 | | namespace paimon { |
35 | | class FileStoreWrite; |
36 | | class MemoryPool; |
37 | | } // namespace paimon |
38 | | #endif |
39 | | |
40 | | namespace doris { |
41 | | |
42 | | class ObjectPool; |
43 | | class RuntimeProfile; |
44 | | class RuntimeState; |
45 | | |
46 | | namespace vectorized { |
47 | | |
48 | | class VPaimonPartitionWriter; |
49 | | |
50 | | // Table-level writer responsible for routing rows to partition writers |
51 | | // according to TPaimonWriteShuffleMode and FE-provided bucket metadata. |
52 | | class VPaimonTableWriter : public AsyncResultWriter { |
53 | | public: |
54 | | VPaimonTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); |
55 | | VPaimonTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
56 | | std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep); |
57 | | |
58 | | ~VPaimonTableWriter() override; |
59 | | |
60 | | Status init_properties(ObjectPool* pool); |
61 | | |
62 | | Status open(RuntimeState* state, RuntimeProfile* profile) override; |
63 | | |
64 | | Status write(RuntimeState* state, ::doris::Block& block) override; |
65 | | |
66 | | Status close(Status status) override; |
67 | | |
68 | | protected: |
69 | | const TDataSink _t_sink; |
70 | | |
71 | | private: |
72 | | struct WriteKey { |
73 | | std::vector<std::string> partition_values; |
74 | | int32_t bucket_id = -1; |
75 | | |
76 | 0 | bool operator==(const WriteKey& rhs) const { |
77 | 0 | return bucket_id == rhs.bucket_id && partition_values == rhs.partition_values; |
78 | 0 | } |
79 | | }; |
80 | | |
81 | | struct WriteKeyHash { |
82 | 0 | size_t operator()(const WriteKey& key) const { |
83 | 0 | std::hash<int32_t> int_hasher; |
84 | 0 | std::hash<std::string> string_hasher; |
85 | 0 | size_t hash = int_hasher(key.bucket_id); |
86 | 0 | for (const auto& v : key.partition_values) { |
87 | 0 | hash = hash * 31 + string_hasher(v); |
88 | 0 | } |
89 | 0 | return hash; |
90 | 0 | } |
91 | | }; |
92 | | |
93 | | Status _extract_write_key(const ::doris::Block& block, int row, WriteKey* key) const; |
94 | | |
95 | | Status _get_or_create_writer(const WriteKey& key, |
96 | | std::shared_ptr<VPaimonPartitionWriter>* writer); |
97 | | |
98 | | Status _filter_block(::doris::Block& block, const IColumn::Filter* filter, |
99 | | ::doris::Block* output_block); |
100 | | |
101 | | RuntimeState* _state = nullptr; |
102 | | RuntimeProfile* _profile = nullptr; |
103 | | |
104 | | RuntimeProfile::Counter* _written_rows_counter = nullptr; |
105 | | RuntimeProfile::Counter* _written_bytes_counter = nullptr; |
106 | | RuntimeProfile::Counter* _send_data_timer = nullptr; |
107 | | RuntimeProfile::Counter* _project_timer = nullptr; |
108 | | RuntimeProfile::Counter* _bucket_calc_timer = nullptr; |
109 | | RuntimeProfile::Counter* _partition_writers_dispatch_timer = nullptr; |
110 | | RuntimeProfile::Counter* _partition_writers_write_timer = nullptr; |
111 | | RuntimeProfile::Counter* _partition_writers_count = nullptr; |
112 | | RuntimeProfile::Counter* _partition_writer_created = nullptr; |
113 | | RuntimeProfile::Counter* _open_timer = nullptr; |
114 | | RuntimeProfile::Counter* _close_timer = nullptr; |
115 | | RuntimeProfile::Counter* _prepare_commit_timer = nullptr; |
116 | | RuntimeProfile::Counter* _serialize_commit_messages_timer = nullptr; |
117 | | RuntimeProfile::Counter* _commit_payload_bytes_counter = nullptr; |
118 | | |
119 | | std::unordered_map<WriteKey, std::shared_ptr<VPaimonPartitionWriter>, WriteKeyHash> _writers; |
120 | | |
121 | | mutable bool _partition_indices_inited = false; |
122 | | mutable std::vector<int> _partition_column_indices; |
123 | | |
124 | | #ifdef WITH_PAIMON_CPP |
125 | | std::shared_ptr<::paimon::MemoryPool> _pool; |
126 | | std::unique_ptr<::paimon::FileStoreWrite> _file_store_write; |
127 | | size_t _row_count = 0; |
128 | | #endif |
129 | | }; |
130 | | |
131 | | } // namespace vectorized |
132 | | } // namespace doris |