be/src/format/jni/jni_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <map> |
22 | | #include <memory> |
23 | | #include <string> |
24 | | #include <unordered_map> |
25 | | #include <unordered_set> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "format/generic_reader.h" |
30 | | #include "format/jni/jni_data_bridge.h" |
31 | | #include "runtime/runtime_profile.h" |
32 | | #include "util/jni-util.h" |
33 | | #include "util/profile_collector.h" |
34 | | #include "util/string_util.h" |
35 | | |
36 | | namespace doris { |
37 | | class RuntimeProfile; |
38 | | class RuntimeState; |
39 | | class SlotDescriptor; |
40 | | class Block; |
41 | | } // namespace doris |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | /** |
46 | | * JniReader is the base class for all JNI-based readers. It directly manages |
47 | | * the JNI lifecycle (open/read/close) for Java scanners that extend |
48 | | * org.apache.doris.common.jni.JniScanner. |
49 | | * |
50 | | * Subclasses only need to: |
51 | | * 1. Build scanner_params/column_names in their constructor |
52 | | * 2. Pass them to JniReader's constructor |
53 | | * 3. Call open() in their init_reader() |
54 | | * |
55 | | * This class replaces the old JniConnector intermediary. |
56 | | */ |
57 | | class JniReader : public GenericReader { |
58 | | public: |
59 | | /** |
60 | | * Constructor for scan mode. |
61 | | * @param file_slot_descs Slot descriptors for the output columns |
62 | | * @param state Runtime state |
63 | | * @param profile Runtime profile for metrics |
64 | | * @param connector_class Java scanner class path (e.g. "org/apache/doris/paimon/PaimonJniScanner") |
65 | | * @param scanner_params Configuration map passed to Java scanner constructor |
66 | | * @param column_names Fields to read (also the required_fields in scanner_params) |
67 | | * @param self_split_weight Weight for this split (for profile conditition counter) |
68 | | */ |
69 | | JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, |
70 | | RuntimeProfile* profile, std::string connector_class, |
71 | | std::map<std::string, std::string> scanner_params, |
72 | | std::vector<std::string> column_names, int64_t self_split_weight = -1); |
73 | | |
74 | | /** |
75 | | * Constructor for table-schema-only mode (no data reading). |
76 | | * @param connector_class Java scanner class path |
77 | | * @param scanner_params Configuration map passed to Java scanner constructor |
78 | | */ |
79 | | JniReader(std::string connector_class, std::map<std::string, std::string> scanner_params); |
80 | | |
81 | 5.98k | ~JniReader() override = default; |
82 | | |
83 | | /** |
84 | | * Open the java scanner: set up profile counters, create Java object, |
85 | | * get method IDs, and call JniScanner#open. |
86 | | */ |
87 | | Status open(RuntimeState* state, RuntimeProfile* profile); |
88 | | |
89 | | Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
90 | 5.61k | std::unordered_set<std::string>* missing_cols) override { |
91 | 27.9k | for (const auto& desc : _file_slot_descs) { |
92 | 27.9k | name_to_type->emplace(desc->col_name(), desc->type()); |
93 | 27.9k | } |
94 | 5.61k | return Status::OK(); |
95 | 5.61k | } |
96 | | |
97 | | /** |
98 | | * Read next batch from Java scanner and fill the block. |
99 | | */ |
100 | | virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
101 | | |
102 | | /** |
103 | | * Get table schema from Java scanner (used by Avro schema discovery). |
104 | | */ |
105 | | Status get_table_schema(std::string& table_schema_str); |
106 | | |
107 | | /** |
108 | | * Close the scanner and release JNI resources. |
109 | | */ |
110 | | Status close() override; |
111 | | |
112 | | /** |
113 | | * Set column name to block index map from FileScanner to avoid repeated map creation. |
114 | | */ |
115 | | void set_col_name_to_block_idx( |
116 | 5.66k | const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) { |
117 | 5.66k | _col_name_to_block_idx = col_name_to_block_idx; |
118 | 5.66k | } |
119 | | |
120 | | protected: |
121 | | void _collect_profile_before_close() override; |
122 | | |
123 | | /** |
124 | | * Update scanner params and column names after construction. |
125 | | * Used by Avro which builds params in init_reader/init_schema_reader |
126 | | * rather than in the constructor. |
127 | | */ |
128 | | void _update_scanner_params(std::map<std::string, std::string> params, |
129 | 0 | std::vector<std::string> column_names) { |
130 | 0 | _scanner_params = std::move(params); |
131 | 0 | _column_names = std::move(column_names); |
132 | 0 | } |
133 | | |
134 | | const std::vector<SlotDescriptor*>& _file_slot_descs; |
135 | | RuntimeState* _state = nullptr; |
136 | | RuntimeProfile* _profile = nullptr; |
137 | | |
138 | | private: |
139 | | static const std::vector<SlotDescriptor*> _s_empty_slot_descs; |
140 | | |
141 | | Status _init_jni_scanner(JNIEnv* env, int batch_size); |
142 | | Status _fill_block(Block* block, size_t num_rows); |
143 | | Status _get_statistics(JNIEnv* env, std::map<std::string, std::string>* result); |
144 | | |
145 | | std::string _connector_name; |
146 | | std::string _connector_class; |
147 | | std::map<std::string, std::string> _scanner_params; |
148 | | std::vector<std::string> _column_names; |
149 | | int32_t _self_split_weight = -1; |
150 | | bool _is_table_schema = false; |
151 | | |
152 | | RuntimeProfile::Counter* _open_scanner_time = nullptr; |
153 | | RuntimeProfile::Counter* _java_scan_time = nullptr; |
154 | | RuntimeProfile::Counter* _java_append_data_time = nullptr; |
155 | | RuntimeProfile::Counter* _java_create_vector_table_time = nullptr; |
156 | | RuntimeProfile::Counter* _fill_block_time = nullptr; |
157 | | RuntimeProfile::ConditionCounter* _max_time_split_weight_counter = nullptr; |
158 | | |
159 | | int64_t _jni_scanner_open_watcher = 0; |
160 | | int64_t _java_scan_watcher = 0; |
161 | | int64_t _fill_block_watcher = 0; |
162 | | |
163 | | size_t _has_read = 0; |
164 | | |
165 | | bool _closed = false; |
166 | | bool _scanner_opened = false; |
167 | | |
168 | | Jni::GlobalClass _jni_scanner_cls; |
169 | | Jni::GlobalObject _jni_scanner_obj; |
170 | | Jni::MethodId _jni_scanner_open; |
171 | | Jni::MethodId _jni_scanner_get_append_data_time; |
172 | | Jni::MethodId _jni_scanner_get_create_vector_table_time; |
173 | | Jni::MethodId _jni_scanner_get_next_batch; |
174 | | Jni::MethodId _jni_scanner_get_table_schema; |
175 | | Jni::MethodId _jni_scanner_close; |
176 | | Jni::MethodId _jni_scanner_release_column; |
177 | | Jni::MethodId _jni_scanner_release_table; |
178 | | Jni::MethodId _jni_scanner_get_statistics; |
179 | | |
180 | | JniDataBridge::TableMetaAddress _table_meta; |
181 | | |
182 | | // Column name to block index map, passed from FileScanner to avoid repeated map creation |
183 | | const std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
184 | | |
185 | 6.39k | void _set_meta(long meta_addr) { _table_meta.set_meta(meta_addr); } |
186 | | }; |
187 | | |
188 | | /** |
189 | | * The demo usage of JniReader, showing how to read data from java scanner. |
190 | | * The java side is also a mock reader that provide values for each type. |
191 | | * This class will only be retained during the functional testing phase to verify that |
192 | | * the communication and data exchange with the jvm are correct. |
193 | | */ |
194 | | class MockJniReader : public JniReader { |
195 | | public: |
196 | | MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, |
197 | | RuntimeProfile* profile); |
198 | | |
199 | | ~MockJniReader() override = default; |
200 | | |
201 | | Status init_reader(); |
202 | | }; |
203 | | |
204 | | } // namespace doris |