be/src/format/jni/jni_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <map> |
22 | | #include <memory> |
23 | | #include <string> |
24 | | #include <unordered_map> |
25 | | #include <unordered_set> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "format/generic_reader.h" |
30 | | #include "format/jni/jni_data_bridge.h" |
31 | | #include "runtime/runtime_profile.h" |
32 | | #include "util/jni-util.h" |
33 | | #include "util/profile_collector.h" |
34 | | #include "util/string_util.h" |
35 | | |
36 | | namespace doris { |
37 | | #include "common/compile_check_begin.h" |
38 | | class RuntimeProfile; |
39 | | class RuntimeState; |
40 | | class SlotDescriptor; |
41 | | class Block; |
42 | | } // namespace doris |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | /** |
47 | | * JniReader is the base class for all JNI-based readers. It directly manages |
48 | | * the JNI lifecycle (open/read/close) for Java scanners that extend |
49 | | * org.apache.doris.common.jni.JniScanner. |
50 | | * |
51 | | * Subclasses only need to: |
52 | | * 1. Build scanner_params/column_names in their constructor |
53 | | * 2. Pass them to JniReader's constructor |
54 | | * 3. Call open() in their init_reader() |
55 | | * |
56 | | * This class replaces the old JniConnector intermediary. |
57 | | */ |
58 | | class JniReader : public GenericReader { |
59 | | public: |
60 | | /** |
61 | | * Constructor for scan mode. |
62 | | * @param file_slot_descs Slot descriptors for the output columns |
63 | | * @param state Runtime state |
64 | | * @param profile Runtime profile for metrics |
65 | | * @param connector_class Java scanner class path (e.g. "org/apache/doris/paimon/PaimonJniScanner") |
66 | | * @param scanner_params Configuration map passed to Java scanner constructor |
67 | | * @param column_names Fields to read (also the required_fields in scanner_params) |
68 | | * @param self_split_weight Weight for this split (for profile conditition counter) |
69 | | */ |
70 | | JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, |
71 | | RuntimeProfile* profile, std::string connector_class, |
72 | | std::map<std::string, std::string> scanner_params, |
73 | | std::vector<std::string> column_names, int64_t self_split_weight = -1); |
74 | | |
75 | | /** |
76 | | * Constructor for table-schema-only mode (no data reading). |
77 | | * @param connector_class Java scanner class path |
78 | | * @param scanner_params Configuration map passed to Java scanner constructor |
79 | | */ |
80 | | JniReader(std::string connector_class, std::map<std::string, std::string> scanner_params); |
81 | | |
82 | 0 | ~JniReader() override = default; |
83 | | |
84 | | /** |
85 | | * Open the java scanner: set up profile counters, create Java object, |
86 | | * get method IDs, and call JniScanner#open. |
87 | | */ |
88 | | Status open(RuntimeState* state, RuntimeProfile* profile); |
89 | | |
90 | | Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
91 | 0 | std::unordered_set<std::string>* missing_cols) override { |
92 | 0 | for (const auto& desc : _file_slot_descs) { |
93 | 0 | name_to_type->emplace(desc->col_name(), desc->type()); |
94 | 0 | } |
95 | 0 | return Status::OK(); |
96 | 0 | } |
97 | | |
98 | | /** |
99 | | * Read next batch from Java scanner and fill the block. |
100 | | */ |
101 | | virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
102 | | |
103 | | /** |
104 | | * Get table schema from Java scanner (used by Avro schema discovery). |
105 | | */ |
106 | | Status get_table_schema(std::string& table_schema_str); |
107 | | |
108 | | /** |
109 | | * Close the scanner and release JNI resources. |
110 | | */ |
111 | | Status close() override; |
112 | | |
113 | | /** |
114 | | * Set column name to block index map from FileScanner to avoid repeated map creation. |
115 | | */ |
116 | | void set_col_name_to_block_idx( |
117 | 0 | const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) { |
118 | 0 | _col_name_to_block_idx = col_name_to_block_idx; |
119 | 0 | } |
120 | | |
121 | | protected: |
122 | | void _collect_profile_before_close() override; |
123 | | |
124 | | /** |
125 | | * Update scanner params and column names after construction. |
126 | | * Used by Avro which builds params in init_reader/init_schema_reader |
127 | | * rather than in the constructor. |
128 | | */ |
129 | | void _update_scanner_params(std::map<std::string, std::string> params, |
130 | 0 | std::vector<std::string> column_names) { |
131 | 0 | _scanner_params = std::move(params); |
132 | 0 | _column_names = std::move(column_names); |
133 | 0 | } |
134 | | |
135 | | const std::vector<SlotDescriptor*>& _file_slot_descs; |
136 | | RuntimeState* _state = nullptr; |
137 | | RuntimeProfile* _profile = nullptr; |
138 | | |
139 | | private: |
140 | | static const std::vector<SlotDescriptor*> _s_empty_slot_descs; |
141 | | |
142 | | Status _init_jni_scanner(JNIEnv* env, int batch_size); |
143 | | Status _fill_block(Block* block, size_t num_rows); |
144 | | Status _get_statistics(JNIEnv* env, std::map<std::string, std::string>* result); |
145 | | |
146 | | std::string _connector_name; |
147 | | std::string _connector_class; |
148 | | std::map<std::string, std::string> _scanner_params; |
149 | | std::vector<std::string> _column_names; |
150 | | int32_t _self_split_weight = -1; |
151 | | bool _is_table_schema = false; |
152 | | |
153 | | RuntimeProfile::Counter* _open_scanner_time = nullptr; |
154 | | RuntimeProfile::Counter* _java_scan_time = nullptr; |
155 | | RuntimeProfile::Counter* _java_append_data_time = nullptr; |
156 | | RuntimeProfile::Counter* _java_create_vector_table_time = nullptr; |
157 | | RuntimeProfile::Counter* _fill_block_time = nullptr; |
158 | | RuntimeProfile::ConditionCounter* _max_time_split_weight_counter = nullptr; |
159 | | |
160 | | int64_t _jni_scanner_open_watcher = 0; |
161 | | int64_t _java_scan_watcher = 0; |
162 | | int64_t _fill_block_watcher = 0; |
163 | | |
164 | | size_t _has_read = 0; |
165 | | |
166 | | bool _closed = false; |
167 | | bool _scanner_opened = false; |
168 | | |
169 | | Jni::GlobalClass _jni_scanner_cls; |
170 | | Jni::GlobalObject _jni_scanner_obj; |
171 | | Jni::MethodId _jni_scanner_open; |
172 | | Jni::MethodId _jni_scanner_get_append_data_time; |
173 | | Jni::MethodId _jni_scanner_get_create_vector_table_time; |
174 | | Jni::MethodId _jni_scanner_get_next_batch; |
175 | | Jni::MethodId _jni_scanner_get_table_schema; |
176 | | Jni::MethodId _jni_scanner_close; |
177 | | Jni::MethodId _jni_scanner_release_column; |
178 | | Jni::MethodId _jni_scanner_release_table; |
179 | | Jni::MethodId _jni_scanner_get_statistics; |
180 | | |
181 | | JniDataBridge::TableMetaAddress _table_meta; |
182 | | |
183 | | // Column name to block index map, passed from FileScanner to avoid repeated map creation |
184 | | const std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
185 | | |
186 | 0 | void _set_meta(long meta_addr) { _table_meta.set_meta(meta_addr); } |
187 | | }; |
188 | | |
189 | | /** |
190 | | * The demo usage of JniReader, showing how to read data from java scanner. |
191 | | * The java side is also a mock reader that provide values for each type. |
192 | | * This class will only be retained during the functional testing phase to verify that |
193 | | * the communication and data exchange with the jvm are correct. |
194 | | */ |
195 | | class MockJniReader : public JniReader { |
196 | | public: |
197 | | MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, |
198 | | RuntimeProfile* profile); |
199 | | |
200 | 0 | ~MockJniReader() override = default; |
201 | | |
202 | | Status init_reader(); |
203 | | }; |
204 | | |
205 | | #include "common/compile_check_end.h" |
206 | | } // namespace doris |