be/src/exec/connector/jni_connector.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/connector/jni_connector.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <sstream> |
23 | | #include <variant> |
24 | | |
25 | | #include "core/block/block.h" |
26 | | #include "core/column/column_array.h" |
27 | | #include "core/column/column_map.h" |
28 | | #include "core/column/column_nullable.h" |
29 | | #include "core/column/column_string.h" |
30 | | #include "core/column/column_struct.h" |
31 | | #include "core/column/column_varbinary.h" |
32 | | #include "core/data_type/data_type_array.h" |
33 | | #include "core/data_type/data_type_map.h" |
34 | | #include "core/data_type/data_type_nullable.h" |
35 | | #include "core/data_type/data_type_struct.h" |
36 | | #include "core/data_type/data_type_varbinary.h" |
37 | | #include "core/data_type/define_primitive_type.h" |
38 | | #include "core/data_type/primitive_type.h" |
39 | | #include "core/types.h" |
40 | | #include "core/value/decimalv2_value.h" |
41 | | #include "jni.h" |
42 | | #include "runtime/runtime_state.h" |
43 | | #include "util/jni-util.h" |
44 | | |
45 | | namespace doris { |
46 | | #include "common/compile_check_begin.h" |
47 | | class RuntimeProfile; |
48 | | } // namespace doris |
49 | | |
50 | | namespace doris { |
51 | | |
52 | | #define FOR_FIXED_LENGTH_TYPES(M) \ |
53 | 0 | M(PrimitiveType::TYPE_TINYINT, ColumnInt8, Int8) \ |
54 | 0 | M(PrimitiveType::TYPE_BOOLEAN, ColumnUInt8, UInt8) \ |
55 | 0 | M(PrimitiveType::TYPE_SMALLINT, ColumnInt16, Int16) \ |
56 | 0 | M(PrimitiveType::TYPE_INT, ColumnInt32, Int32) \ |
57 | 0 | M(PrimitiveType::TYPE_BIGINT, ColumnInt64, Int64) \ |
58 | 0 | M(PrimitiveType::TYPE_LARGEINT, ColumnInt128, Int128) \ |
59 | 0 | M(PrimitiveType::TYPE_FLOAT, ColumnFloat32, Float32) \ |
60 | 0 | M(PrimitiveType::TYPE_DOUBLE, ColumnFloat64, Float64) \ |
61 | 0 | M(PrimitiveType::TYPE_DECIMALV2, ColumnDecimal128V2, Int128) \ |
62 | 0 | M(PrimitiveType::TYPE_DECIMAL128I, ColumnDecimal128V3, Int128) \ |
63 | 0 | M(PrimitiveType::TYPE_DECIMAL32, ColumnDecimal32, Int32) \ |
64 | 0 | M(PrimitiveType::TYPE_DECIMAL64, ColumnDecimal64, Int64) \ |
65 | 0 | M(PrimitiveType::TYPE_DATE, ColumnDate, Int64) \ |
66 | 0 | M(PrimitiveType::TYPE_DATEV2, ColumnDateV2, UInt32) \ |
67 | 0 | M(PrimitiveType::TYPE_DATETIME, ColumnDateTime, Int64) \ |
68 | 0 | M(PrimitiveType::TYPE_DATETIMEV2, ColumnDateTimeV2, UInt64) \ |
69 | 0 | M(PrimitiveType::TYPE_TIMESTAMPTZ, ColumnTimeStampTz, UInt64) \ |
70 | 0 | M(PrimitiveType::TYPE_IPV4, ColumnIPv4, IPv4) \ |
71 | 0 | M(PrimitiveType::TYPE_IPV6, ColumnIPv6, IPv6) |
72 | | |
73 | 0 | Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { |
74 | 0 | _state = state; |
75 | 0 | _profile = profile; |
76 | 0 | ADD_TIMER(_profile, _connector_name.c_str()); |
77 | 0 | _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str()); |
78 | 0 | _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str()); |
79 | 0 | _java_append_data_time = |
80 | 0 | ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", _connector_name.c_str()); |
81 | 0 | _java_create_vector_table_time = |
82 | 0 | ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", _connector_name.c_str()); |
83 | 0 | _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str()); |
84 | 0 | _max_time_split_weight_counter = _profile->add_conditition_counter( |
85 | 0 | "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; }, |
86 | 0 | _connector_name.c_str()); |
87 | 0 | _java_scan_watcher = 0; |
88 | | // cannot put the env into fields, because frames in an env object is limited |
89 | | // to avoid limited frames in a thread, we should get local env in a method instead of in whole object. |
90 | 0 | JNIEnv* env = nullptr; |
91 | 0 | int batch_size = 0; |
92 | 0 | if (!_is_table_schema) { |
93 | 0 | batch_size = _state->batch_size(); |
94 | 0 | } |
95 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
96 | 0 | SCOPED_RAW_TIMER(&_jni_scanner_open_watcher); |
97 | 0 | _scanner_params.emplace("time_zone", _state->timezone()); |
98 | 0 | RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); |
99 | | // Call org.apache.doris.common.jni.JniScanner#open |
100 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_open).call()); |
101 | | |
102 | 0 | RETURN_ERROR_IF_EXC(env); |
103 | 0 | _scanner_opened = true; |
104 | 0 | return Status::OK(); |
105 | 0 | } |
106 | | |
107 | 0 | Status JniConnector::init() { |
108 | 0 | return Status::OK(); |
109 | 0 | } |
110 | | |
111 | 0 | Status JniConnector::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
112 | | // Call org.apache.doris.common.jni.JniScanner#getNextBatchMeta |
113 | | // return the address of meta information |
114 | 0 | JNIEnv* env = nullptr; |
115 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
116 | 0 | long meta_address = 0; |
117 | 0 | { |
118 | 0 | SCOPED_RAW_TIMER(&_java_scan_watcher); |
119 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_long_method(env, _jni_scanner_get_next_batch) |
120 | 0 | .call(&meta_address)); |
121 | 0 | } |
122 | 0 | if (meta_address == 0) { |
123 | | // Address == 0 when there's no data in scanner |
124 | 0 | *read_rows = 0; |
125 | 0 | *eof = true; |
126 | 0 | return Status::OK(); |
127 | 0 | } |
128 | 0 | _set_meta(meta_address); |
129 | 0 | long num_rows = _table_meta.next_meta_as_long(); |
130 | 0 | if (num_rows == 0) { |
131 | 0 | *read_rows = 0; |
132 | 0 | *eof = true; |
133 | 0 | return Status::OK(); |
134 | 0 | } |
135 | 0 | RETURN_IF_ERROR(_fill_block(block, num_rows)); |
136 | 0 | *read_rows = num_rows; |
137 | 0 | *eof = false; |
138 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call()); |
139 | 0 | _has_read += num_rows; |
140 | 0 | return Status::OK(); |
141 | 0 | } |
142 | | |
143 | 0 | Status JniConnector::get_table_schema(std::string& table_schema_str) { |
144 | 0 | JNIEnv* env = nullptr; |
145 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
146 | | |
147 | 0 | Jni::LocalString jstr; |
148 | 0 | RETURN_IF_ERROR( |
149 | 0 | _jni_scanner_obj.call_object_method(env, _jni_scanner_get_table_schema).call(&jstr)); |
150 | 0 | Jni::LocalStringBufferGuard cstr; |
151 | 0 | RETURN_IF_ERROR(jstr.get_string_chars(env, &cstr)); |
152 | 0 | table_schema_str = std::string {cstr.get()}; // copy to std::string |
153 | 0 | return Status::OK(); |
154 | 0 | } |
155 | | |
156 | 0 | Status JniConnector::get_statistics(JNIEnv* env, std::map<std::string, std::string>* result) { |
157 | 0 | result->clear(); |
158 | 0 | Jni::LocalObject metrics; |
159 | 0 | RETURN_IF_ERROR( |
160 | 0 | _jni_scanner_obj.call_object_method(env, _jni_scanner_get_statistics).call(&metrics)); |
161 | | |
162 | 0 | RETURN_IF_ERROR(Jni::Util::convert_to_cpp_map(env, metrics, result)); |
163 | 0 | return Status::OK(); |
164 | 0 | } |
165 | | |
166 | 0 | Status JniConnector::close() { |
167 | 0 | if (!_closed) { |
168 | 0 | JNIEnv* env = nullptr; |
169 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
170 | 0 | if (_scanner_opened) { |
171 | 0 | COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher); |
172 | 0 | COUNTER_UPDATE(_fill_block_time, _fill_block_watcher); |
173 | |
|
174 | 0 | RETURN_ERROR_IF_EXC(env); |
175 | 0 | jlong _append = 0; |
176 | 0 | RETURN_IF_ERROR( |
177 | 0 | _jni_scanner_obj.call_long_method(env, _jni_scanner_get_append_data_time) |
178 | 0 | .call(&_append)); |
179 | | |
180 | 0 | COUNTER_UPDATE(_java_append_data_time, _append); |
181 | |
|
182 | 0 | jlong _create = 0; |
183 | 0 | RETURN_IF_ERROR( |
184 | 0 | _jni_scanner_obj |
185 | 0 | .call_long_method(env, _jni_scanner_get_create_vector_table_time) |
186 | 0 | .call(&_create)); |
187 | | |
188 | 0 | COUNTER_UPDATE(_java_create_vector_table_time, _create); |
189 | |
|
190 | 0 | COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - _create); |
191 | |
|
192 | 0 | _max_time_split_weight_counter->conditional_update( |
193 | 0 | _jni_scanner_open_watcher + _fill_block_watcher + _java_scan_watcher, |
194 | 0 | _self_split_weight); |
195 | | |
196 | | // _fill_block may be failed and returned, we should release table in close. |
197 | | // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent |
198 | 0 | RETURN_IF_ERROR( |
199 | 0 | _jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call()); |
200 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_close).call()); |
201 | 0 | } |
202 | 0 | } |
203 | 0 | return Status::OK(); |
204 | 0 | } |
205 | | |
206 | 0 | Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { |
207 | 0 | RETURN_IF_ERROR( |
208 | 0 | Jni::Util::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls)); |
209 | | |
210 | 0 | Jni::MethodId scanner_constructor; |
211 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", |
212 | 0 | &scanner_constructor)); |
213 | | |
214 | | // prepare constructor parameters |
215 | 0 | Jni::LocalObject hashmap_object; |
216 | 0 | RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _scanner_params, &hashmap_object)); |
217 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.new_object(env, scanner_constructor) |
218 | 0 | .with_arg(batch_size) |
219 | 0 | .with_arg(hashmap_object) |
220 | 0 | .call(&_jni_scanner_obj)); |
221 | | |
222 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "open", "()V", &_jni_scanner_open)); |
223 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getNextBatchMeta", "()J", |
224 | 0 | &_jni_scanner_get_next_batch)); |
225 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getAppendDataTime", "()J", |
226 | 0 | &_jni_scanner_get_append_data_time)); |
227 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getCreateVectorTableTime", "()J", |
228 | 0 | &_jni_scanner_get_create_vector_table_time)); |
229 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getTableSchema", "()Ljava/lang/String;", |
230 | 0 | &_jni_scanner_get_table_schema)); |
231 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "close", "()V", &_jni_scanner_close)); |
232 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "releaseColumn", "(I)V", |
233 | 0 | &_jni_scanner_release_column)); |
234 | 0 | RETURN_IF_ERROR( |
235 | 0 | _jni_scanner_cls.get_method(env, "releaseTable", "()V", &_jni_scanner_release_table)); |
236 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", |
237 | 0 | &_jni_scanner_get_statistics)); |
238 | 0 | return Status::OK(); |
239 | 0 | } |
240 | | |
241 | 0 | Status JniConnector::fill_block(Block* block, const ColumnNumbers& arguments, long table_address) { |
242 | 0 | if (table_address == 0) { |
243 | 0 | return Status::InternalError("table_address is 0"); |
244 | 0 | } |
245 | 0 | TableMetaAddress table_meta(table_address); |
246 | 0 | long num_rows = table_meta.next_meta_as_long(); |
247 | 0 | for (size_t i : arguments) { |
248 | 0 | if (block->get_by_position(i).column.get() == nullptr) { |
249 | 0 | auto return_type = block->get_data_type(i); |
250 | 0 | bool result_nullable = return_type->is_nullable(); |
251 | 0 | ColumnUInt8::MutablePtr null_col = nullptr; |
252 | 0 | if (result_nullable) { |
253 | 0 | return_type = remove_nullable(return_type); |
254 | 0 | null_col = ColumnUInt8::create(); |
255 | 0 | } |
256 | 0 | auto res_col = return_type->create_column(); |
257 | 0 | if (result_nullable) { |
258 | 0 | block->replace_by_position( |
259 | 0 | i, ColumnNullable::create(std::move(res_col), std::move(null_col))); |
260 | 0 | } else { |
261 | 0 | block->replace_by_position(i, std::move(res_col)); |
262 | 0 | } |
263 | 0 | } else if (is_column_const(*(block->get_by_position(i).column))) { |
264 | 0 | auto doris_column = block->get_by_position(i).column->convert_to_full_column_if_const(); |
265 | 0 | bool is_nullable = block->get_by_position(i).type->is_nullable(); |
266 | 0 | block->replace_by_position(i, is_nullable ? make_nullable(doris_column) : doris_column); |
267 | 0 | } |
268 | 0 | auto& column_with_type_and_name = block->get_by_position(i); |
269 | 0 | auto& column_ptr = column_with_type_and_name.column; |
270 | 0 | auto& column_type = column_with_type_and_name.type; |
271 | 0 | RETURN_IF_ERROR(_fill_column(table_meta, column_ptr, column_type, num_rows)); |
272 | 0 | } |
273 | 0 | return Status::OK(); |
274 | 0 | } |
275 | | |
276 | 0 | Status JniConnector::_fill_block(Block* block, size_t num_rows) { |
277 | 0 | SCOPED_RAW_TIMER(&_fill_block_watcher); |
278 | 0 | JNIEnv* env = nullptr; |
279 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
280 | 0 | for (int i = 0; i < _column_names.size(); ++i) { |
281 | 0 | auto& column_with_type_and_name = |
282 | 0 | block->get_by_position(_col_name_to_block_idx->at(_column_names[i])); |
283 | 0 | auto& column_ptr = column_with_type_and_name.column; |
284 | 0 | auto& column_type = column_with_type_and_name.type; |
285 | 0 | RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, num_rows)); |
286 | | // Column is not released when _fill_column failed. It will be released when releasing table. |
287 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_column) |
288 | 0 | .with_arg(i) |
289 | 0 | .call()); |
290 | 0 | RETURN_ERROR_IF_EXC(env); |
291 | 0 | } |
292 | 0 | return Status::OK(); |
293 | 0 | } |
294 | | |
295 | | Status JniConnector::_fill_column(TableMetaAddress& address, ColumnPtr& doris_column, |
296 | 0 | const DataTypePtr& data_type, size_t num_rows) { |
297 | 0 | auto logical_type = data_type->get_primitive_type(); |
298 | 0 | void* null_map_ptr = address.next_meta_as_ptr(); |
299 | 0 | if (null_map_ptr == nullptr) { |
300 | | // org.apache.doris.common.jni.vec.ColumnType.Type#UNSUPPORTED will set column address as 0 |
301 | 0 | return Status::InternalError("Unsupported type {} in java side", data_type->get_name()); |
302 | 0 | } |
303 | 0 | MutableColumnPtr data_column; |
304 | 0 | if (doris_column->is_nullable()) { |
305 | 0 | auto* nullable_column = |
306 | 0 | reinterpret_cast<ColumnNullable*>(doris_column->assume_mutable().get()); |
307 | 0 | data_column = nullable_column->get_nested_column_ptr(); |
308 | 0 | NullMap& null_map = nullable_column->get_null_map_data(); |
309 | 0 | size_t origin_size = null_map.size(); |
310 | 0 | null_map.resize(origin_size + num_rows); |
311 | 0 | memcpy(null_map.data() + origin_size, static_cast<bool*>(null_map_ptr), num_rows); |
312 | 0 | } else { |
313 | 0 | data_column = doris_column->assume_mutable(); |
314 | 0 | } |
315 | | // Date and DateTime are deprecated and not supported. |
316 | 0 | switch (logical_type) { |
317 | | //FIXME: in Doris we check data then insert. jdbc external table may have some data invalid for doris. |
318 | | // should add check otherwise it may break some of our assumption now. |
319 | 0 | #define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE) \ |
320 | 0 | case TYPE_INDEX: \ |
321 | 0 | return _fill_fixed_length_column<COLUMN_TYPE, CPP_TYPE>( \ |
322 | 0 | data_column, reinterpret_cast<CPP_TYPE*>(address.next_meta_as_ptr()), num_rows); |
323 | 0 | FOR_FIXED_LENGTH_TYPES(DISPATCH) |
324 | 0 | #undef DISPATCH |
325 | 0 | case PrimitiveType::TYPE_STRING: |
326 | 0 | [[fallthrough]]; |
327 | 0 | case PrimitiveType::TYPE_CHAR: |
328 | 0 | [[fallthrough]]; |
329 | 0 | case PrimitiveType::TYPE_VARCHAR: |
330 | 0 | return _fill_string_column(address, data_column, num_rows); |
331 | 0 | case PrimitiveType::TYPE_ARRAY: |
332 | 0 | return _fill_array_column(address, data_column, data_type, num_rows); |
333 | 0 | case PrimitiveType::TYPE_MAP: |
334 | 0 | return _fill_map_column(address, data_column, data_type, num_rows); |
335 | 0 | case PrimitiveType::TYPE_STRUCT: |
336 | 0 | return _fill_struct_column(address, data_column, data_type, num_rows); |
337 | 0 | case PrimitiveType::TYPE_VARBINARY: |
338 | 0 | return _fill_varbinary_column(address, data_column, num_rows); |
339 | 0 | default: |
340 | 0 | return Status::InvalidArgument("Unsupported type {} in jni scanner", data_type->get_name()); |
341 | 0 | } |
342 | 0 | return Status::OK(); |
343 | 0 | } |
344 | | |
345 | | Status JniConnector::_fill_varbinary_column(TableMetaAddress& address, |
346 | 0 | MutableColumnPtr& doris_column, size_t num_rows) { |
347 | 0 | auto* meta_base = reinterpret_cast<char*>(address.next_meta_as_ptr()); |
348 | 0 | auto& varbinary_col = assert_cast<ColumnVarbinary&>(*doris_column); |
349 | | // Java side writes per-row metadata as 16 bytes: [len: long][addr: long] |
350 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
351 | | // Read length (first 8 bytes) |
352 | 0 | int64_t len = 0; |
353 | 0 | memcpy(&len, meta_base + 16 * i, sizeof(len)); |
354 | 0 | if (len <= 0) { |
355 | 0 | varbinary_col.insert_default(); |
356 | 0 | } else { |
357 | | // Read address (next 8 bytes) |
358 | 0 | uint64_t addr_u = 0; |
359 | 0 | memcpy(&addr_u, meta_base + 16 * i + 8, sizeof(addr_u)); |
360 | 0 | const char* src = reinterpret_cast<const char*>(addr_u); |
361 | 0 | varbinary_col.insert_data(src, static_cast<size_t>(len)); |
362 | 0 | } |
363 | 0 | } |
364 | 0 | return Status::OK(); |
365 | 0 | } |
366 | | |
367 | | Status JniConnector::_fill_string_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
368 | 0 | size_t num_rows) { |
369 | 0 | auto& string_col = static_cast<ColumnString&>(*doris_column); |
370 | 0 | ColumnString::Chars& string_chars = string_col.get_chars(); |
371 | 0 | ColumnString::Offsets& string_offsets = string_col.get_offsets(); |
372 | 0 | int* offsets = reinterpret_cast<int*>(address.next_meta_as_ptr()); |
373 | 0 | char* chars = reinterpret_cast<char*>(address.next_meta_as_ptr()); |
374 | | |
375 | | // This judgment is necessary, otherwise the following statement `offsets[num_rows - 1]` out of bounds |
376 | | // What's more, This judgment must be placed after `address.next_meta_as_ptr()` |
377 | | // because `address.next_meta_as_ptr` will make `address._meta_index` plus 1 |
378 | 0 | if (num_rows == 0) { |
379 | 0 | return Status::OK(); |
380 | 0 | } |
381 | | |
382 | 0 | size_t origin_chars_size = string_chars.size(); |
383 | 0 | string_chars.resize(origin_chars_size + offsets[num_rows - 1]); |
384 | 0 | memcpy(string_chars.data() + origin_chars_size, chars, offsets[num_rows - 1]); |
385 | |
|
386 | 0 | size_t origin_offsets_size = string_offsets.size(); |
387 | 0 | size_t start_offset = string_offsets[origin_offsets_size - 1]; |
388 | 0 | string_offsets.resize(origin_offsets_size + num_rows); |
389 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
390 | 0 | string_offsets[origin_offsets_size + i] = |
391 | 0 | static_cast<unsigned int>(offsets[i] + start_offset); |
392 | 0 | } |
393 | 0 | return Status::OK(); |
394 | 0 | } |
395 | | |
396 | | Status JniConnector::_fill_array_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
397 | 0 | const DataTypePtr& data_type, size_t num_rows) { |
398 | 0 | ColumnPtr& element_column = static_cast<ColumnArray&>(*doris_column).get_data_ptr(); |
399 | 0 | const DataTypePtr& element_type = |
400 | 0 | (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get())) |
401 | 0 | ->get_nested_type(); |
402 | 0 | ColumnArray::Offsets64& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets(); |
403 | |
|
404 | 0 | int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr()); |
405 | 0 | size_t origin_size = offsets_data.size(); |
406 | 0 | offsets_data.resize(origin_size + num_rows); |
407 | 0 | size_t start_offset = offsets_data[origin_size - 1]; |
408 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
409 | 0 | offsets_data[origin_size + i] = offsets[i] + start_offset; |
410 | 0 | } |
411 | | |
412 | | // offsets[num_rows - 1] == offsets_data[origin_size + num_rows - 1] - start_offset |
413 | | // but num_row equals 0 when there are all empty arrays |
414 | 0 | return _fill_column(address, element_column, element_type, |
415 | 0 | offsets_data[origin_size + num_rows - 1] - start_offset); |
416 | 0 | } |
417 | | |
418 | | Status JniConnector::_fill_map_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
419 | 0 | const DataTypePtr& data_type, size_t num_rows) { |
420 | 0 | auto& map = static_cast<ColumnMap&>(*doris_column); |
421 | 0 | const DataTypePtr& key_type = |
422 | 0 | reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type(); |
423 | 0 | const DataTypePtr& value_type = |
424 | 0 | reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get()) |
425 | 0 | ->get_value_type(); |
426 | 0 | ColumnPtr& key_column = map.get_keys_ptr(); |
427 | 0 | ColumnPtr& value_column = map.get_values_ptr(); |
428 | 0 | ColumnArray::Offsets64& map_offsets = map.get_offsets(); |
429 | |
|
430 | 0 | int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr()); |
431 | 0 | size_t origin_size = map_offsets.size(); |
432 | 0 | map_offsets.resize(origin_size + num_rows); |
433 | 0 | size_t start_offset = map_offsets[origin_size - 1]; |
434 | 0 | for (size_t i = 0; i < num_rows; ++i) { |
435 | 0 | map_offsets[origin_size + i] = offsets[i] + start_offset; |
436 | 0 | } |
437 | |
|
438 | 0 | RETURN_IF_ERROR(_fill_column(address, key_column, key_type, |
439 | 0 | map_offsets[origin_size + num_rows - 1] - start_offset)); |
440 | 0 | RETURN_IF_ERROR(_fill_column(address, value_column, value_type, |
441 | 0 | map_offsets[origin_size + num_rows - 1] - start_offset)); |
442 | 0 | return Status::OK(); |
443 | 0 | } |
444 | | |
445 | | Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
446 | 0 | const DataTypePtr& data_type, size_t num_rows) { |
447 | 0 | auto& doris_struct = static_cast<ColumnStruct&>(*doris_column); |
448 | 0 | const DataTypeStruct* doris_struct_type = |
449 | 0 | reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
450 | 0 | for (int i = 0; i < doris_struct.tuple_size(); ++i) { |
451 | 0 | ColumnPtr& struct_field = doris_struct.get_column_ptr(i); |
452 | 0 | const DataTypePtr& field_type = doris_struct_type->get_element(i); |
453 | 0 | RETURN_IF_ERROR(_fill_column(address, struct_field, field_type, num_rows)); |
454 | 0 | } |
455 | 0 | return Status::OK(); |
456 | 0 | } |
457 | | |
458 | 0 | std::string JniConnector::get_jni_type(const DataTypePtr& data_type) { |
459 | 0 | DataTypePtr type = remove_nullable(data_type); |
460 | 0 | std::ostringstream buffer; |
461 | 0 | switch (type->get_primitive_type()) { |
462 | 0 | case TYPE_BOOLEAN: |
463 | 0 | return "boolean"; |
464 | 0 | case TYPE_TINYINT: |
465 | 0 | return "tinyint"; |
466 | 0 | case TYPE_SMALLINT: |
467 | 0 | return "smallint"; |
468 | 0 | case TYPE_INT: |
469 | 0 | return "int"; |
470 | 0 | case TYPE_BIGINT: |
471 | 0 | return "bigint"; |
472 | 0 | case TYPE_LARGEINT: |
473 | 0 | return "largeint"; |
474 | 0 | case TYPE_FLOAT: |
475 | 0 | return "float"; |
476 | 0 | case TYPE_DOUBLE: |
477 | 0 | return "double"; |
478 | 0 | case TYPE_IPV4: |
479 | 0 | return "ipv4"; |
480 | 0 | case TYPE_IPV6: |
481 | 0 | return "ipv6"; |
482 | 0 | case TYPE_VARCHAR: |
483 | 0 | [[fallthrough]]; |
484 | 0 | case TYPE_CHAR: |
485 | 0 | [[fallthrough]]; |
486 | 0 | case TYPE_STRING: |
487 | 0 | return "string"; |
488 | 0 | case TYPE_DATE: |
489 | 0 | return "datev1"; |
490 | 0 | case TYPE_DATEV2: |
491 | 0 | return "datev2"; |
492 | 0 | case TYPE_DATETIME: |
493 | 0 | return "datetimev1"; |
494 | 0 | case TYPE_DATETIMEV2: |
495 | 0 | [[fallthrough]]; |
496 | 0 | case TYPE_TIMEV2: { |
497 | 0 | buffer << "datetimev2(" << type->get_scale() << ")"; |
498 | 0 | return buffer.str(); |
499 | 0 | } |
500 | 0 | case TYPE_TIMESTAMPTZ: { |
501 | 0 | buffer << "timestamptz(" << type->get_scale() << ")"; |
502 | 0 | return buffer.str(); |
503 | 0 | } |
504 | 0 | case TYPE_BINARY: |
505 | 0 | return "binary"; |
506 | 0 | case TYPE_DECIMALV2: { |
507 | 0 | buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")"; |
508 | 0 | return buffer.str(); |
509 | 0 | } |
510 | 0 | case TYPE_DECIMAL32: { |
511 | 0 | buffer << "decimal32(" << type->get_precision() << "," << type->get_scale() << ")"; |
512 | 0 | return buffer.str(); |
513 | 0 | } |
514 | 0 | case TYPE_DECIMAL64: { |
515 | 0 | buffer << "decimal64(" << type->get_precision() << "," << type->get_scale() << ")"; |
516 | 0 | return buffer.str(); |
517 | 0 | } |
518 | 0 | case TYPE_DECIMAL128I: { |
519 | 0 | buffer << "decimal128(" << type->get_precision() << "," << type->get_scale() << ")"; |
520 | 0 | return buffer.str(); |
521 | 0 | } |
522 | 0 | case TYPE_STRUCT: { |
523 | 0 | const DataTypeStruct* struct_type = reinterpret_cast<const DataTypeStruct*>(type.get()); |
524 | 0 | buffer << "struct<"; |
525 | 0 | for (int i = 0; i < struct_type->get_elements().size(); ++i) { |
526 | 0 | if (i != 0) { |
527 | 0 | buffer << ","; |
528 | 0 | } |
529 | 0 | buffer << struct_type->get_element_names()[i] << ":" |
530 | 0 | << get_jni_type(struct_type->get_element(i)); |
531 | 0 | } |
532 | 0 | buffer << ">"; |
533 | 0 | return buffer.str(); |
534 | 0 | } |
535 | 0 | case TYPE_ARRAY: { |
536 | 0 | const DataTypeArray* array_type = reinterpret_cast<const DataTypeArray*>(type.get()); |
537 | 0 | buffer << "array<" << get_jni_type(array_type->get_nested_type()) << ">"; |
538 | 0 | return buffer.str(); |
539 | 0 | } |
540 | 0 | case TYPE_MAP: { |
541 | 0 | const DataTypeMap* map_type = reinterpret_cast<const DataTypeMap*>(type.get()); |
542 | 0 | buffer << "map<" << get_jni_type(map_type->get_key_type()) << "," |
543 | 0 | << get_jni_type(map_type->get_value_type()) << ">"; |
544 | 0 | return buffer.str(); |
545 | 0 | } |
546 | 0 | case TYPE_VARBINARY: |
547 | 0 | return "varbinary"; |
548 | 0 | default: |
549 | 0 | return "unsupported"; |
550 | 0 | } |
551 | 0 | } |
552 | | |
553 | 0 | std::string JniConnector::get_jni_type_with_different_string(const DataTypePtr& data_type) { |
554 | 0 | DataTypePtr type = remove_nullable(data_type); |
555 | 0 | std::ostringstream buffer; |
556 | 0 | switch (data_type->get_primitive_type()) { |
557 | 0 | case TYPE_BOOLEAN: |
558 | 0 | return "boolean"; |
559 | 0 | case TYPE_TINYINT: |
560 | 0 | return "tinyint"; |
561 | 0 | case TYPE_SMALLINT: |
562 | 0 | return "smallint"; |
563 | 0 | case TYPE_INT: |
564 | 0 | return "int"; |
565 | 0 | case TYPE_BIGINT: |
566 | 0 | return "bigint"; |
567 | 0 | case TYPE_LARGEINT: |
568 | 0 | return "largeint"; |
569 | 0 | case TYPE_FLOAT: |
570 | 0 | return "float"; |
571 | 0 | case TYPE_DOUBLE: |
572 | 0 | return "double"; |
573 | 0 | case TYPE_IPV4: |
574 | 0 | return "ipv4"; |
575 | 0 | case TYPE_IPV6: |
576 | 0 | return "ipv6"; |
577 | 0 | case TYPE_VARCHAR: { |
578 | 0 | buffer << "varchar(" |
579 | 0 | << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len() |
580 | 0 | << ")"; |
581 | 0 | return buffer.str(); |
582 | 0 | } |
583 | 0 | case TYPE_DATE: |
584 | 0 | return "datev1"; |
585 | 0 | case TYPE_DATEV2: |
586 | 0 | return "datev2"; |
587 | 0 | case TYPE_DATETIME: |
588 | 0 | return "datetimev1"; |
589 | 0 | case TYPE_DATETIMEV2: |
590 | 0 | [[fallthrough]]; |
591 | 0 | case TYPE_TIMEV2: { |
592 | 0 | buffer << "datetimev2(" << data_type->get_scale() << ")"; |
593 | 0 | return buffer.str(); |
594 | 0 | } |
595 | 0 | case TYPE_TIMESTAMPTZ: { |
596 | 0 | buffer << "timestamptz(" << data_type->get_scale() << ")"; |
597 | 0 | return buffer.str(); |
598 | 0 | } |
599 | 0 | case TYPE_BINARY: |
600 | 0 | return "binary"; |
601 | 0 | case TYPE_CHAR: { |
602 | 0 | buffer << "char(" |
603 | 0 | << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len() |
604 | 0 | << ")"; |
605 | 0 | return buffer.str(); |
606 | 0 | } |
607 | 0 | case TYPE_STRING: |
608 | 0 | return "string"; |
609 | 0 | case TYPE_VARBINARY: |
610 | 0 | buffer << "varbinary(" |
611 | 0 | << assert_cast<const DataTypeVarbinary*>(remove_nullable(data_type).get())->len() |
612 | 0 | << ")"; |
613 | 0 | return buffer.str(); |
614 | 0 | case TYPE_DECIMALV2: { |
615 | 0 | buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")"; |
616 | 0 | return buffer.str(); |
617 | 0 | } |
618 | 0 | case TYPE_DECIMAL32: { |
619 | 0 | buffer << "decimal32(" << data_type->get_precision() << "," << data_type->get_scale() |
620 | 0 | << ")"; |
621 | 0 | return buffer.str(); |
622 | 0 | } |
623 | 0 | case TYPE_DECIMAL64: { |
624 | 0 | buffer << "decimal64(" << data_type->get_precision() << "," << data_type->get_scale() |
625 | 0 | << ")"; |
626 | 0 | return buffer.str(); |
627 | 0 | } |
628 | 0 | case TYPE_DECIMAL128I: { |
629 | 0 | buffer << "decimal128(" << data_type->get_precision() << "," << data_type->get_scale() |
630 | 0 | << ")"; |
631 | 0 | return buffer.str(); |
632 | 0 | } |
633 | 0 | case TYPE_STRUCT: { |
634 | 0 | const auto* type_struct = |
635 | 0 | assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
636 | 0 | buffer << "struct<"; |
637 | 0 | for (int i = 0; i < type_struct->get_elements().size(); ++i) { |
638 | 0 | if (i != 0) { |
639 | 0 | buffer << ","; |
640 | 0 | } |
641 | 0 | buffer << type_struct->get_element_name(i) << ":" |
642 | 0 | << get_jni_type_with_different_string(type_struct->get_element(i)); |
643 | 0 | } |
644 | 0 | buffer << ">"; |
645 | 0 | return buffer.str(); |
646 | 0 | } |
647 | 0 | case TYPE_ARRAY: { |
648 | 0 | const auto* type_arr = assert_cast<const DataTypeArray*>(remove_nullable(data_type).get()); |
649 | 0 | buffer << "array<" << get_jni_type_with_different_string(type_arr->get_nested_type()) |
650 | 0 | << ">"; |
651 | 0 | return buffer.str(); |
652 | 0 | } |
653 | 0 | case TYPE_MAP: { |
654 | 0 | const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(data_type).get()); |
655 | 0 | buffer << "map<" << get_jni_type_with_different_string(type_map->get_key_type()) << "," |
656 | 0 | << get_jni_type_with_different_string(type_map->get_value_type()) << ">"; |
657 | 0 | return buffer.str(); |
658 | 0 | } |
659 | 0 | default: |
660 | 0 | return "unsupported"; |
661 | 0 | } |
662 | 0 | } |
663 | | |
664 | | Status JniConnector::_fill_column_meta(const ColumnPtr& doris_column, const DataTypePtr& data_type, |
665 | 0 | std::vector<long>& meta_data) { |
666 | 0 | auto logical_type = data_type->get_primitive_type(); |
667 | 0 | const IColumn* column = nullptr; |
668 | | // insert const flag |
669 | 0 | if (is_column_const(*doris_column)) { |
670 | 0 | meta_data.emplace_back((long)1); |
671 | 0 | const auto& const_column = assert_cast<const ColumnConst&>(*doris_column); |
672 | 0 | column = &(const_column.get_data_column()); |
673 | 0 | } else { |
674 | 0 | meta_data.emplace_back((long)0); |
675 | 0 | column = &(*doris_column); |
676 | 0 | } |
677 | | |
678 | | // insert null map address |
679 | 0 | const IColumn* data_column = nullptr; |
680 | 0 | if (column->is_nullable()) { |
681 | 0 | const auto& nullable_column = assert_cast<const ColumnNullable&>(*column); |
682 | 0 | data_column = &(nullable_column.get_nested_column()); |
683 | 0 | const auto& null_map = nullable_column.get_null_map_data(); |
684 | 0 | meta_data.emplace_back((long)null_map.data()); |
685 | 0 | } else { |
686 | 0 | meta_data.emplace_back(0); |
687 | 0 | data_column = column; |
688 | 0 | } |
689 | 0 | switch (logical_type) { |
690 | 0 | #define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE) \ |
691 | 0 | case TYPE_INDEX: { \ |
692 | 0 | meta_data.emplace_back(_get_fixed_length_column_address<COLUMN_TYPE>(*data_column)); \ |
693 | 0 | break; \ |
694 | 0 | } |
695 | 0 | FOR_FIXED_LENGTH_TYPES(DISPATCH) |
696 | 0 | #undef DISPATCH |
697 | 0 | case PrimitiveType::TYPE_STRING: |
698 | 0 | [[fallthrough]]; |
699 | 0 | case PrimitiveType::TYPE_CHAR: |
700 | 0 | [[fallthrough]]; |
701 | 0 | case PrimitiveType::TYPE_VARCHAR: { |
702 | 0 | const auto& string_column = assert_cast<const ColumnString&>(*data_column); |
703 | | // inert offsets |
704 | 0 | meta_data.emplace_back((long)string_column.get_offsets().data()); |
705 | 0 | meta_data.emplace_back((long)string_column.get_chars().data()); |
706 | 0 | break; |
707 | 0 | } |
708 | 0 | case PrimitiveType::TYPE_ARRAY: { |
709 | 0 | const auto& element_column = assert_cast<const ColumnArray&>(*data_column).get_data_ptr(); |
710 | 0 | meta_data.emplace_back( |
711 | 0 | (long)assert_cast<const ColumnArray&>(*data_column).get_offsets().data()); |
712 | 0 | const auto& element_type = assert_cast<const DataTypePtr&>( |
713 | 0 | (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get())) |
714 | 0 | ->get_nested_type()); |
715 | 0 | RETURN_IF_ERROR(_fill_column_meta(element_column, element_type, meta_data)); |
716 | 0 | break; |
717 | 0 | } |
718 | 0 | case PrimitiveType::TYPE_STRUCT: { |
719 | 0 | const auto& doris_struct = assert_cast<const ColumnStruct&>(*data_column); |
720 | 0 | const auto* doris_struct_type = |
721 | 0 | assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
722 | 0 | for (int i = 0; i < doris_struct.tuple_size(); ++i) { |
723 | 0 | const auto& struct_field = doris_struct.get_column_ptr(i); |
724 | 0 | const auto& field_type = |
725 | 0 | assert_cast<const DataTypePtr&>(doris_struct_type->get_element(i)); |
726 | 0 | RETURN_IF_ERROR(_fill_column_meta(struct_field, field_type, meta_data)); |
727 | 0 | } |
728 | 0 | break; |
729 | 0 | } |
730 | 0 | case PrimitiveType::TYPE_MAP: { |
731 | 0 | const auto& map = assert_cast<const ColumnMap&>(*data_column); |
732 | 0 | const auto& key_type = assert_cast<const DataTypePtr&>( |
733 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type()); |
734 | 0 | const auto& value_type = assert_cast<const DataTypePtr&>( |
735 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(data_type).get()) |
736 | 0 | ->get_value_type()); |
737 | 0 | const auto& key_column = map.get_keys_ptr(); |
738 | 0 | const auto& value_column = map.get_values_ptr(); |
739 | 0 | meta_data.emplace_back((long)map.get_offsets().data()); |
740 | 0 | RETURN_IF_ERROR(_fill_column_meta(key_column, key_type, meta_data)); |
741 | 0 | RETURN_IF_ERROR(_fill_column_meta(value_column, value_type, meta_data)); |
742 | 0 | break; |
743 | 0 | } |
744 | 0 | case PrimitiveType::TYPE_VARBINARY: { |
745 | 0 | const auto& varbinary_col = assert_cast<const ColumnVarbinary&>(*data_column); |
746 | 0 | meta_data.emplace_back( |
747 | 0 | (long)assert_cast<const ColumnVarbinary&>(varbinary_col).get_data().data()); |
748 | 0 | break; |
749 | 0 | } |
750 | 0 | default: |
751 | 0 | return Status::InternalError("Unsupported type: {}", data_type->get_name()); |
752 | 0 | } |
753 | 0 | return Status::OK(); |
754 | 0 | } |
755 | | |
756 | 0 | Status JniConnector::to_java_table(Block* block, std::unique_ptr<long[]>& meta) { |
757 | 0 | ColumnNumbers arguments; |
758 | 0 | for (size_t i = 0; i < block->columns(); ++i) { |
759 | 0 | arguments.emplace_back(i); |
760 | 0 | } |
761 | 0 | return to_java_table(block, block->rows(), arguments, meta); |
762 | 0 | } |
763 | | |
764 | | Status JniConnector::to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments, |
765 | 0 | std::unique_ptr<long[]>& meta) { |
766 | 0 | std::vector<long> meta_data; |
767 | | // insert number of rows |
768 | 0 | meta_data.emplace_back(num_rows); |
769 | 0 | for (size_t i : arguments) { |
770 | 0 | auto& column_with_type_and_name = block->get_by_position(i); |
771 | 0 | RETURN_IF_ERROR(_fill_column_meta(column_with_type_and_name.column, |
772 | 0 | column_with_type_and_name.type, meta_data)); |
773 | 0 | } |
774 | | |
775 | 0 | meta.reset(new long[meta_data.size()]); |
776 | 0 | memcpy(meta.get(), &meta_data[0], meta_data.size() * 8); |
777 | 0 | return Status::OK(); |
778 | 0 | } |
779 | | |
780 | | std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block, |
781 | | const ColumnNumbers& arguments, |
782 | 0 | bool ignore_column_name) { |
783 | | // prepare table schema |
784 | 0 | std::ostringstream required_fields; |
785 | 0 | std::ostringstream columns_types; |
786 | 0 | for (int i = 0; i < arguments.size(); ++i) { |
787 | | // column name maybe empty or has special characters |
788 | | // std::string field = block->get_by_position(i).name; |
789 | 0 | std::string type = JniConnector::get_jni_type(block->get_by_position(arguments[i]).type); |
790 | 0 | if (i == 0) { |
791 | 0 | if (ignore_column_name) { |
792 | 0 | required_fields << "_col_" << arguments[i]; |
793 | 0 | } else { |
794 | 0 | required_fields << block->get_by_position(arguments[i]).name; |
795 | 0 | } |
796 | 0 | columns_types << type; |
797 | 0 | } else { |
798 | 0 | if (ignore_column_name) { |
799 | 0 | required_fields << "," |
800 | 0 | << "_col_" << arguments[i]; |
801 | 0 | } else { |
802 | 0 | required_fields << "," << block->get_by_position(arguments[i]).name; |
803 | 0 | } |
804 | 0 | columns_types << "#" << type; |
805 | 0 | } |
806 | 0 | } |
807 | 0 | return std::make_pair(required_fields.str(), columns_types.str()); |
808 | 0 | } |
809 | | |
810 | 0 | std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block) { |
811 | 0 | ColumnNumbers arguments; |
812 | 0 | for (size_t i = 0; i < block->columns(); ++i) { |
813 | 0 | arguments.emplace_back(i); |
814 | 0 | } |
815 | 0 | return parse_table_schema(block, arguments, true); |
816 | 0 | } |
817 | | |
818 | 0 | void JniConnector::_collect_profile_before_close() { |
819 | 0 | if (_scanner_opened && _profile != nullptr) { |
820 | 0 | JNIEnv* env = nullptr; |
821 | 0 | Status st = Jni::Env::Get(&env); |
822 | 0 | if (!st) { |
823 | 0 | LOG(WARNING) << "failed to get jni env when collect profile: " << st; |
824 | 0 | return; |
825 | 0 | } |
826 | | // update scanner metrics |
827 | 0 | std::map<std::string, std::string> statistics_result; |
828 | 0 | st = get_statistics(env, &statistics_result); |
829 | 0 | if (!st) { |
830 | 0 | LOG(WARNING) << "failed to get_statistics when collect profile: " << st; |
831 | 0 | return; |
832 | 0 | } |
833 | | |
834 | 0 | for (const auto& metric : statistics_result) { |
835 | 0 | std::vector<std::string> type_and_name = split(metric.first, ":"); |
836 | 0 | if (type_and_name.size() != 2) { |
837 | 0 | LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " |
838 | 0 | << "'metricType:metricName'"; |
839 | 0 | continue; |
840 | 0 | } |
841 | 0 | long metric_value = std::stol(metric.second); |
842 | 0 | RuntimeProfile::Counter* scanner_counter; |
843 | 0 | if (type_and_name[0] == "timer") { |
844 | 0 | scanner_counter = |
845 | 0 | ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); |
846 | 0 | } else if (type_and_name[0] == "counter") { |
847 | 0 | scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, |
848 | 0 | _connector_name.c_str()); |
849 | 0 | } else if (type_and_name[0] == "bytes") { |
850 | 0 | scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, |
851 | 0 | _connector_name.c_str()); |
852 | 0 | } else { |
853 | 0 | LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; |
854 | 0 | continue; |
855 | 0 | } |
856 | 0 | COUNTER_UPDATE(scanner_counter, metric_value); |
857 | 0 | } |
858 | 0 | } |
859 | 0 | } |
860 | | #include "common/compile_check_end.h" |
861 | | } // namespace doris |