be/src/format/jni/jni_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "jni_reader.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <map> |
23 | | #include <ostream> |
24 | | #include <sstream> |
25 | | #include <tuple> |
26 | | #include <unordered_map> |
27 | | #include <utility> |
28 | | |
29 | | #include "core/block/block.h" |
30 | | #include "core/types.h" |
31 | | #include "format/jni/jni_data_bridge.h" |
32 | | #include "format/table/partition_column_filler.h" |
33 | | #include "runtime/descriptors.h" |
34 | | #include "runtime/runtime_state.h" |
35 | | #include "util/jni-util.h" |
36 | | |
37 | | namespace doris { |
38 | | class RuntimeProfile; |
39 | | class RuntimeState; |
40 | | |
41 | | class Block; |
42 | | } // namespace doris |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | const std::vector<SlotDescriptor*> JniReader::_s_empty_slot_descs; |
47 | | |
48 | | // ========================================================================= |
49 | | // JniReader constructors |
50 | | // ========================================================================= |
51 | | |
52 | | JniReader::JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, |
53 | | RuntimeProfile* profile, std::string connector_class, |
54 | | std::map<std::string, std::string> scanner_params, |
55 | | std::vector<std::string> column_names, int64_t self_split_weight) |
56 | 0 | : _file_slot_descs(file_slot_descs), |
57 | 0 | _state(state), |
58 | 0 | _profile(profile), |
59 | 0 | _connector_class(std::move(connector_class)), |
60 | 0 | _scanner_params(std::move(scanner_params)), |
61 | 0 | _column_names(std::move(column_names)), |
62 | 0 | _self_split_weight(static_cast<int32_t>(self_split_weight)) { |
63 | 0 | _connector_name = split(_connector_class, "/").back(); |
64 | 0 | } |
65 | | |
66 | | JniReader::JniReader(std::string connector_class, std::map<std::string, std::string> scanner_params) |
67 | 0 | : _file_slot_descs(_s_empty_slot_descs), |
68 | 0 | _connector_class(std::move(connector_class)), |
69 | 0 | _scanner_params(std::move(scanner_params)) { |
70 | 0 | _is_table_schema = true; |
71 | 0 | _connector_name = split(_connector_class, "/").back(); |
72 | 0 | } |
73 | | |
74 | 0 | Status JniReader::on_before_init_reader(ReaderInitContext* ctx) { |
75 | 0 | _column_descs = ctx->column_descs; |
76 | 0 | if (_col_name_to_block_idx == nullptr) { |
77 | 0 | _col_name_to_block_idx = ctx->col_name_to_block_idx; |
78 | 0 | } |
79 | 0 | _partition_values.clear(); |
80 | 0 | _partition_value_is_null.clear(); |
81 | 0 | if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr || |
82 | 0 | !ctx->range->__isset.columns_from_path_keys) { |
83 | 0 | return Status::OK(); |
84 | 0 | } |
85 | | |
86 | 0 | DORIS_CHECK(ctx->range->__isset.columns_from_path); |
87 | 0 | DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size()); |
88 | 0 | const bool has_null_flags = ctx->range->__isset.columns_from_path_is_null; |
89 | 0 | if (has_null_flags) { |
90 | 0 | DORIS_CHECK(ctx->range->columns_from_path_is_null.size() == |
91 | 0 | ctx->range->columns_from_path_keys.size()); |
92 | 0 | } |
93 | |
|
94 | 0 | std::unordered_map<std::string, const SlotDescriptor*> name_to_slot; |
95 | 0 | for (auto* slot : ctx->tuple_descriptor->slots()) { |
96 | 0 | name_to_slot.emplace(slot->col_name(), slot); |
97 | 0 | } |
98 | 0 | for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) { |
99 | 0 | const auto& key = ctx->range->columns_from_path_keys[i]; |
100 | 0 | auto slot_it = name_to_slot.find(key); |
101 | 0 | if (slot_it == name_to_slot.end()) { |
102 | 0 | continue; |
103 | 0 | } |
104 | 0 | _partition_values.emplace( |
105 | 0 | key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second)); |
106 | 0 | _partition_value_is_null.emplace( |
107 | 0 | key, has_null_flags ? ctx->range->columns_from_path_is_null[i] : false); |
108 | 0 | } |
109 | 0 | return Status::OK(); |
110 | 0 | } |
111 | | |
112 | 0 | Status JniReader::on_after_read_block(Block* block, size_t* read_rows) { |
113 | 0 | if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 || |
114 | 0 | _push_down_agg_type == TPushAggOp::type::COUNT) { |
115 | 0 | return Status::OK(); |
116 | 0 | } |
117 | 0 | return _fill_partition_columns(block, *read_rows); |
118 | 0 | } |
119 | | |
120 | | // ========================================================================= |
121 | | // JniReader::open (merged from JniConnector::open) |
122 | | // ========================================================================= |
123 | | |
124 | 0 | Status JniReader::open(RuntimeState* state, RuntimeProfile* profile) { |
125 | 0 | _state = state; |
126 | 0 | _profile = profile; |
127 | 0 | if (_profile) { |
128 | 0 | ADD_TIMER(_profile, _connector_name.c_str()); |
129 | 0 | _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str()); |
130 | 0 | _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str()); |
131 | 0 | _java_append_data_time = |
132 | 0 | ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", _connector_name.c_str()); |
133 | 0 | _java_create_vector_table_time = |
134 | 0 | ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", _connector_name.c_str()); |
135 | 0 | _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str()); |
136 | 0 | _max_time_split_weight_counter = _profile->add_conditition_counter( |
137 | 0 | "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; }, |
138 | 0 | _connector_name.c_str()); |
139 | 0 | } |
140 | 0 | _java_scan_watcher = 0; |
141 | |
|
142 | 0 | JNIEnv* env = nullptr; |
143 | 0 | int batch_size = 0; |
144 | 0 | if (!_is_table_schema && _state) { |
145 | 0 | batch_size = _state->batch_size(); |
146 | 0 | } |
147 | 0 | _batch_size = batch_size; |
148 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
149 | 0 | SCOPED_RAW_TIMER(&_jni_scanner_open_watcher); |
150 | 0 | if (_state) { |
151 | 0 | _scanner_params.emplace("time_zone", _state->timezone()); |
152 | 0 | } |
153 | 0 | RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); |
154 | | // Call org.apache.doris.common.jni.JniScanner#open |
155 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_open).call()); |
156 | | |
157 | 0 | RETURN_ERROR_IF_EXC(env); |
158 | 0 | _scanner_opened = true; |
159 | 0 | return Status::OK(); |
160 | 0 | } |
161 | | |
162 | | // ========================================================================= |
163 | | // JniReader::_do_get_next_block (merged from JniConnector::get_next_block) |
164 | | // ========================================================================= |
165 | | |
166 | 0 | Status JniReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) { |
167 | 0 | JNIEnv* env = nullptr; |
168 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
169 | 0 | long meta_address = 0; |
170 | 0 | { |
171 | 0 | SCOPED_RAW_TIMER(&_java_scan_watcher); |
172 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_long_method(env, _jni_scanner_get_next_batch) |
173 | 0 | .call(&meta_address)); |
174 | 0 | } |
175 | 0 | if (meta_address == 0) { |
176 | 0 | *read_rows = 0; |
177 | 0 | *eof = true; |
178 | 0 | return Status::OK(); |
179 | 0 | } |
180 | 0 | _set_meta(meta_address); |
181 | 0 | long num_rows = _table_meta.next_meta_as_long(); |
182 | 0 | if (num_rows == 0) { |
183 | 0 | *read_rows = 0; |
184 | 0 | *eof = true; |
185 | 0 | return Status::OK(); |
186 | 0 | } |
187 | 0 | RETURN_IF_ERROR(_fill_block(block, num_rows)); |
188 | 0 | *read_rows = num_rows; |
189 | 0 | *eof = false; |
190 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call()); |
191 | 0 | _has_read += num_rows; |
192 | 0 | return Status::OK(); |
193 | 0 | } |
194 | | |
195 | | // ========================================================================= |
196 | | // JniReader::get_table_schema (merged from JniConnector::get_table_schema) |
197 | | // ========================================================================= |
198 | | |
199 | 0 | Status JniReader::get_table_schema(std::string& table_schema_str) { |
200 | 0 | JNIEnv* env = nullptr; |
201 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
202 | | |
203 | 0 | Jni::LocalString jstr; |
204 | 0 | RETURN_IF_ERROR( |
205 | 0 | _jni_scanner_obj.call_object_method(env, _jni_scanner_get_table_schema).call(&jstr)); |
206 | 0 | Jni::LocalStringBufferGuard cstr; |
207 | 0 | RETURN_IF_ERROR(jstr.get_string_chars(env, &cstr)); |
208 | 0 | table_schema_str = std::string {cstr.get()}; |
209 | 0 | return Status::OK(); |
210 | 0 | } |
211 | | |
212 | | // ========================================================================= |
213 | | // JniReader::close (merged from JniConnector::close) |
214 | | // ========================================================================= |
215 | | |
216 | 0 | Status JniReader::close() { |
217 | 0 | if (!_closed) { |
218 | 0 | _closed = true; |
219 | 0 | JNIEnv* env = nullptr; |
220 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
221 | 0 | if (_scanner_opened) { |
222 | 0 | if (_profile) { |
223 | 0 | COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher); |
224 | 0 | COUNTER_UPDATE(_fill_block_time, _fill_block_watcher); |
225 | 0 | } |
226 | |
|
227 | 0 | RETURN_ERROR_IF_EXC(env); |
228 | 0 | jlong _append = 0; |
229 | 0 | RETURN_IF_ERROR( |
230 | 0 | _jni_scanner_obj.call_long_method(env, _jni_scanner_get_append_data_time) |
231 | 0 | .call(&_append)); |
232 | | |
233 | 0 | if (_profile) { |
234 | 0 | COUNTER_UPDATE(_java_append_data_time, _append); |
235 | 0 | } |
236 | |
|
237 | 0 | jlong _create = 0; |
238 | 0 | RETURN_IF_ERROR( |
239 | 0 | _jni_scanner_obj |
240 | 0 | .call_long_method(env, _jni_scanner_get_create_vector_table_time) |
241 | 0 | .call(&_create)); |
242 | | |
243 | 0 | if (_profile) { |
244 | 0 | COUNTER_UPDATE(_java_create_vector_table_time, _create); |
245 | 0 | COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - _create); |
246 | 0 | _max_time_split_weight_counter->conditional_update( |
247 | 0 | _jni_scanner_open_watcher + _fill_block_watcher + _java_scan_watcher, |
248 | 0 | _self_split_weight); |
249 | 0 | } |
250 | | |
251 | | // _fill_block may be failed and returned, we should release table in close. |
252 | | // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent |
253 | 0 | RETURN_IF_ERROR( |
254 | 0 | _jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call()); |
255 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_close).call()); |
256 | 0 | } |
257 | 0 | } |
258 | 0 | return Status::OK(); |
259 | 0 | } |
260 | | |
261 | | // ========================================================================= |
262 | | // JniReader::set_batch_size |
263 | | // ========================================================================= |
264 | | |
265 | 0 | void JniReader::set_batch_size(size_t batch_size) { |
266 | 0 | DCHECK_GT(batch_size, 0); |
267 | 0 | if (_batch_size == batch_size) { |
268 | 0 | return; |
269 | 0 | } |
270 | 0 | _batch_size = batch_size; |
271 | 0 | if (_scanner_opened) { |
272 | 0 | JNIEnv* env = nullptr; |
273 | 0 | Status st = Jni::Env::Get(&env); |
274 | 0 | if (!st) { |
275 | 0 | LOG(WARNING) << "failed to get jni env when set_batch_size: " << st; |
276 | 0 | return; |
277 | 0 | } |
278 | 0 | st = _jni_scanner_obj.call_void_method(env, _jni_scanner_set_batch_size) |
279 | 0 | .with_arg(static_cast<int>(_batch_size)) |
280 | 0 | .call(); |
281 | 0 | if (!st) { |
282 | 0 | LOG(WARNING) << "failed to call setBatchSize: " << st; |
283 | 0 | } |
284 | 0 | } |
285 | 0 | } |
286 | | |
287 | | // ========================================================================= |
288 | | // JniReader::_init_jni_scanner (merged from JniConnector::_init_jni_scanner) |
289 | | // ========================================================================= |
290 | | |
291 | 0 | Status JniReader::_init_jni_scanner(JNIEnv* env, int batch_size) { |
292 | 0 | RETURN_IF_ERROR( |
293 | 0 | Jni::Util::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls)); |
294 | | |
295 | 0 | Jni::MethodId scanner_constructor; |
296 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", |
297 | 0 | &scanner_constructor)); |
298 | | |
299 | | // prepare constructor parameters |
300 | 0 | Jni::LocalObject hashmap_object; |
301 | 0 | RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _scanner_params, &hashmap_object)); |
302 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.new_object(env, scanner_constructor) |
303 | 0 | .with_arg(batch_size) |
304 | 0 | .with_arg(hashmap_object) |
305 | 0 | .call(&_jni_scanner_obj)); |
306 | | |
307 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "open", "()V", &_jni_scanner_open)); |
308 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getNextBatchMeta", "()J", |
309 | 0 | &_jni_scanner_get_next_batch)); |
310 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getAppendDataTime", "()J", |
311 | 0 | &_jni_scanner_get_append_data_time)); |
312 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getCreateVectorTableTime", "()J", |
313 | 0 | &_jni_scanner_get_create_vector_table_time)); |
314 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getTableSchema", "()Ljava/lang/String;", |
315 | 0 | &_jni_scanner_get_table_schema)); |
316 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "close", "()V", &_jni_scanner_close)); |
317 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "releaseColumn", "(I)V", |
318 | 0 | &_jni_scanner_release_column)); |
319 | 0 | RETURN_IF_ERROR( |
320 | 0 | _jni_scanner_cls.get_method(env, "releaseTable", "()V", &_jni_scanner_release_table)); |
321 | 0 | RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", |
322 | 0 | &_jni_scanner_get_statistics)); |
323 | 0 | RETURN_IF_ERROR( |
324 | 0 | _jni_scanner_cls.get_method(env, "setBatchSize", "(I)V", &_jni_scanner_set_batch_size)); |
325 | 0 | return Status::OK(); |
326 | 0 | } |
327 | | |
328 | | // ========================================================================= |
329 | | // JniReader::_fill_block (merged from JniConnector::_fill_block) |
330 | | // ========================================================================= |
331 | | |
332 | 0 | Status JniReader::_fill_block(Block* block, size_t num_rows) { |
333 | 0 | SCOPED_RAW_TIMER(&_fill_block_watcher); |
334 | 0 | JNIEnv* env = nullptr; |
335 | 0 | RETURN_IF_ERROR(Jni::Env::Get(&env)); |
336 | | // Fallback: if _col_name_to_block_idx was not set by the caller (e.g. JdbcScanner), |
337 | | // build the name-to-position map from the block itself. |
338 | 0 | std::unordered_map<std::string, uint32_t> local_name_to_idx; |
339 | 0 | const std::unordered_map<std::string, uint32_t>* col_map = _col_name_to_block_idx; |
340 | 0 | if (col_map == nullptr) { |
341 | 0 | local_name_to_idx = block->get_name_to_pos_map(); |
342 | 0 | col_map = &local_name_to_idx; |
343 | 0 | } |
344 | 0 | for (int i = 0; i < _column_names.size(); ++i) { |
345 | 0 | auto& column_with_type_and_name = block->get_by_position(col_map->at(_column_names[i])); |
346 | 0 | auto& column_ptr = column_with_type_and_name.column; |
347 | 0 | auto& column_type = column_with_type_and_name.type; |
348 | 0 | RETURN_IF_ERROR(JniDataBridge::fill_column(_table_meta, column_ptr, column_type, num_rows)); |
349 | | // Column is not released when fill_column failed. It will be released when releasing table. |
350 | 0 | RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_column) |
351 | 0 | .with_arg(i) |
352 | 0 | .call()); |
353 | 0 | RETURN_ERROR_IF_EXC(env); |
354 | 0 | } |
355 | 0 | return Status::OK(); |
356 | 0 | } |
357 | | |
358 | 0 | Status JniReader::_fill_partition_columns(Block* block, size_t num_rows) { |
359 | 0 | std::unordered_map<std::string, uint32_t> local_name_to_idx; |
360 | 0 | const std::unordered_map<std::string, uint32_t>* col_map = _col_name_to_block_idx; |
361 | 0 | if (col_map == nullptr) { |
362 | 0 | local_name_to_idx = block->get_name_to_pos_map(); |
363 | 0 | col_map = &local_name_to_idx; |
364 | 0 | } |
365 | |
|
366 | 0 | for (const auto& desc : *_column_descs) { |
367 | 0 | if (desc.category != ColumnCategory::PARTITION_KEY) { |
368 | 0 | continue; |
369 | 0 | } |
370 | 0 | auto value_it = _partition_values.find(desc.name); |
371 | 0 | if (value_it == _partition_values.end()) { |
372 | 0 | continue; |
373 | 0 | } |
374 | 0 | auto col_it = col_map->find(desc.name); |
375 | 0 | if (col_it == col_map->end()) { |
376 | 0 | return Status::InternalError("Missing partition column {} in block {}", desc.name, |
377 | 0 | block->dump_structure()); |
378 | 0 | } |
379 | | |
380 | 0 | auto& column_with_type_and_name = block->get_by_position(col_it->second); |
381 | 0 | auto mutable_column = std::move(*column_with_type_and_name.column).mutate(); |
382 | 0 | const auto& [value, slot_desc] = value_it->second; |
383 | 0 | auto null_it = _partition_value_is_null.find(desc.name); |
384 | 0 | DORIS_CHECK(null_it != _partition_value_is_null.end()); |
385 | 0 | RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value, |
386 | 0 | num_rows, null_it->second)); |
387 | 0 | column_with_type_and_name.column = std::move(mutable_column); |
388 | 0 | } |
389 | 0 | return Status::OK(); |
390 | 0 | } |
391 | | |
392 | | // ========================================================================= |
393 | | // JniReader::_get_statistics (merged from JniConnector::get_statistics) |
394 | | // ========================================================================= |
395 | | |
396 | 0 | Status JniReader::_get_statistics(JNIEnv* env, std::map<std::string, std::string>* result) { |
397 | 0 | result->clear(); |
398 | 0 | Jni::LocalObject metrics; |
399 | 0 | RETURN_IF_ERROR( |
400 | 0 | _jni_scanner_obj.call_object_method(env, _jni_scanner_get_statistics).call(&metrics)); |
401 | | |
402 | 0 | RETURN_IF_ERROR(Jni::Util::convert_to_cpp_map(env, metrics, result)); |
403 | 0 | return Status::OK(); |
404 | 0 | } |
405 | | |
406 | | // ========================================================================= |
407 | | // JniReader::_collect_profile_before_close |
408 | | // (merged from JniConnector::_collect_profile_before_close) |
409 | | // ========================================================================= |
410 | | |
411 | 0 | void JniReader::_collect_profile_before_close() { |
412 | 0 | if (_scanner_opened && _profile != nullptr) { |
413 | 0 | JNIEnv* env = nullptr; |
414 | 0 | Status st = Jni::Env::Get(&env); |
415 | 0 | if (!st) { |
416 | 0 | LOG(WARNING) << "failed to get jni env when collect profile: " << st; |
417 | 0 | return; |
418 | 0 | } |
419 | | // update scanner metrics |
420 | 0 | std::map<std::string, std::string> statistics_result; |
421 | 0 | st = _get_statistics(env, &statistics_result); |
422 | 0 | if (!st) { |
423 | 0 | LOG(WARNING) << "failed to get_statistics when collect profile: " << st; |
424 | 0 | return; |
425 | 0 | } |
426 | | |
427 | 0 | for (const auto& metric : statistics_result) { |
428 | 0 | std::vector<std::string> type_and_name = split(metric.first, ":"); |
429 | 0 | if (type_and_name.size() != 2) { |
430 | 0 | LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " |
431 | 0 | << "'metricType:metricName'"; |
432 | 0 | continue; |
433 | 0 | } |
434 | 0 | long metric_value = std::stol(metric.second); |
435 | 0 | RuntimeProfile::Counter* scanner_counter; |
436 | 0 | if (type_and_name[0] == "timer") { |
437 | 0 | scanner_counter = |
438 | 0 | ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); |
439 | 0 | } else if (type_and_name[0] == "counter") { |
440 | 0 | scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, |
441 | 0 | _connector_name.c_str()); |
442 | 0 | } else if (type_and_name[0] == "bytes") { |
443 | 0 | scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, |
444 | 0 | _connector_name.c_str()); |
445 | 0 | } else { |
446 | 0 | LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; |
447 | 0 | continue; |
448 | 0 | } |
449 | 0 | COUNTER_UPDATE(scanner_counter, metric_value); |
450 | 0 | } |
451 | 0 | } |
452 | 0 | } |
453 | | |
454 | | // ========================================================================= |
455 | | // MockJniReader |
456 | | // ========================================================================= |
457 | | |
458 | | MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, |
459 | | RuntimeState* state, RuntimeProfile* profile) |
460 | 0 | : JniReader( |
461 | 0 | file_slot_descs, state, profile, "org/apache/doris/common/jni/MockJniScanner", |
462 | 0 | [&]() { |
463 | 0 | std::ostringstream required_fields; |
464 | 0 | std::ostringstream columns_types; |
465 | 0 | int index = 0; |
466 | 0 | for (const auto& desc : file_slot_descs) { |
467 | 0 | std::string field = desc->col_name(); |
468 | 0 | std::string type = |
469 | 0 | JniDataBridge::get_jni_type_with_different_string(desc->type()); |
470 | 0 | if (index == 0) { |
471 | 0 | required_fields << field; |
472 | 0 | columns_types << type; |
473 | 0 | } else { |
474 | 0 | required_fields << "," << field; |
475 | 0 | columns_types << "#" << type; |
476 | 0 | } |
477 | 0 | index++; |
478 | 0 | } |
479 | 0 | return std::map<String, String> {{"mock_rows", "10240"}, |
480 | 0 | {"required_fields", required_fields.str()}, |
481 | 0 | {"columns_types", columns_types.str()}}; |
482 | 0 | }(), |
483 | 0 | [&]() { |
484 | 0 | std::vector<std::string> names; |
485 | 0 | for (const auto& desc : file_slot_descs) { |
486 | 0 | names.emplace_back(desc->col_name()); |
487 | 0 | } |
488 | 0 | return names; |
489 | 0 | }()) {} |
490 | | |
491 | 0 | Status MockJniReader::init_reader() { |
492 | 0 | return open(_state, _profile); |
493 | 0 | } |
494 | | |
495 | | } // namespace doris |