Coverage Report

Created: 2026-05-09 01:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/jni/jni_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "jni_reader.h"
19
20
#include <glog/logging.h>
21
22
#include <map>
23
#include <ostream>
24
#include <sstream>
25
26
#include "core/block/block.h"
27
#include "core/types.h"
28
#include "format/jni/jni_data_bridge.h"
29
#include "runtime/descriptors.h"
30
#include "runtime/runtime_state.h"
31
#include "util/jni-util.h"
32
33
namespace doris {
34
class RuntimeProfile;
35
class RuntimeState;
36
37
class Block;
38
} // namespace doris
39
40
namespace doris {
41
42
const std::vector<SlotDescriptor*> JniReader::_s_empty_slot_descs;
43
44
// =========================================================================
45
// JniReader constructors
46
// =========================================================================
47
48
JniReader::JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
49
                     RuntimeProfile* profile, std::string connector_class,
50
                     std::map<std::string, std::string> scanner_params,
51
                     std::vector<std::string> column_names, int64_t self_split_weight)
52
5.18k
        : _file_slot_descs(file_slot_descs),
53
5.18k
          _state(state),
54
5.18k
          _profile(profile),
55
5.18k
          _connector_class(std::move(connector_class)),
56
5.18k
          _scanner_params(std::move(scanner_params)),
57
5.18k
          _column_names(std::move(column_names)),
58
5.18k
          _self_split_weight(static_cast<int32_t>(self_split_weight)) {
59
5.18k
    _connector_name = split(_connector_class, "/").back();
60
5.18k
}
61
62
JniReader::JniReader(std::string connector_class, std::map<std::string, std::string> scanner_params)
63
312
        : _file_slot_descs(_s_empty_slot_descs),
64
312
          _connector_class(std::move(connector_class)),
65
312
          _scanner_params(std::move(scanner_params)) {
66
312
    _is_table_schema = true;
67
312
    _connector_name = split(_connector_class, "/").back();
68
312
}
69
70
// =========================================================================
71
// JniReader::open  (merged from JniConnector::open)
72
// =========================================================================
73
74
5.49k
Status JniReader::open(RuntimeState* state, RuntimeProfile* profile) {
75
5.49k
    _state = state;
76
5.49k
    _profile = profile;
77
5.49k
    if (_profile) {
78
5.18k
        ADD_TIMER(_profile, _connector_name.c_str());
79
5.18k
        _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str());
80
5.18k
        _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str());
81
5.18k
        _java_append_data_time =
82
5.18k
                ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", _connector_name.c_str());
83
5.18k
        _java_create_vector_table_time =
84
5.18k
                ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", _connector_name.c_str());
85
5.18k
        _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str());
86
5.18k
        _max_time_split_weight_counter = _profile->add_conditition_counter(
87
5.18k
                "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; },
88
5.18k
                _connector_name.c_str());
89
5.18k
    }
90
5.49k
    _java_scan_watcher = 0;
91
92
5.49k
    JNIEnv* env = nullptr;
93
5.49k
    int batch_size = 0;
94
5.49k
    if (!_is_table_schema && _state) {
95
5.18k
        batch_size = _state->batch_size();
96
5.18k
    }
97
5.49k
    _batch_size = batch_size;
98
5.49k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
99
5.49k
    SCOPED_RAW_TIMER(&_jni_scanner_open_watcher);
100
5.49k
    if (_state) {
101
5.18k
        _scanner_params.emplace("time_zone", _state->timezone());
102
5.18k
    }
103
5.49k
    RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
104
    // Call org.apache.doris.common.jni.JniScanner#open
105
5.49k
    RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_open).call());
106
107
5.49k
    RETURN_ERROR_IF_EXC(env);
108
5.49k
    _scanner_opened = true;
109
5.49k
    return Status::OK();
110
5.49k
}
111
112
// =========================================================================
113
// JniReader::_do_get_next_block  (merged from JniConnector::get_next_block)
114
// =========================================================================
115
116
11.4k
Status JniReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
117
11.4k
    JNIEnv* env = nullptr;
118
11.4k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
119
11.4k
    long meta_address = 0;
120
11.4k
    {
121
11.4k
        SCOPED_RAW_TIMER(&_java_scan_watcher);
122
11.4k
        RETURN_IF_ERROR(_jni_scanner_obj.call_long_method(env, _jni_scanner_get_next_batch)
123
11.4k
                                .call(&meta_address));
124
11.4k
    }
125
11.4k
    if (meta_address == 0) {
126
5.10k
        *read_rows = 0;
127
5.10k
        *eof = true;
128
5.10k
        return Status::OK();
129
5.10k
    }
130
6.30k
    _set_meta(meta_address);
131
6.30k
    long num_rows = _table_meta.next_meta_as_long();
132
6.30k
    if (num_rows == 0) {
133
0
        *read_rows = 0;
134
0
        *eof = true;
135
0
        return Status::OK();
136
0
    }
137
6.30k
    RETURN_IF_ERROR(_fill_block(block, num_rows));
138
6.30k
    *read_rows = num_rows;
139
6.30k
    *eof = false;
140
6.30k
    RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call());
141
6.30k
    _has_read += num_rows;
142
6.30k
    return Status::OK();
143
6.30k
}
144
145
// =========================================================================
146
// JniReader::get_table_schema  (merged from JniConnector::get_table_schema)
147
// =========================================================================
148
149
0
Status JniReader::get_table_schema(std::string& table_schema_str) {
150
0
    JNIEnv* env = nullptr;
151
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
152
153
0
    Jni::LocalString jstr;
154
0
    RETURN_IF_ERROR(
155
0
            _jni_scanner_obj.call_object_method(env, _jni_scanner_get_table_schema).call(&jstr));
156
0
    Jni::LocalStringBufferGuard cstr;
157
0
    RETURN_IF_ERROR(jstr.get_string_chars(env, &cstr));
158
0
    table_schema_str = std::string {cstr.get()};
159
0
    return Status::OK();
160
0
}
161
162
// =========================================================================
163
// JniReader::close  (merged from JniConnector::close)
164
// =========================================================================
165
166
5.49k
Status JniReader::close() {
167
5.49k
    if (!_closed) {
168
5.49k
        _closed = true;
169
5.49k
        JNIEnv* env = nullptr;
170
5.49k
        RETURN_IF_ERROR(Jni::Env::Get(&env));
171
5.49k
        if (_scanner_opened) {
172
5.49k
            if (_profile) {
173
5.18k
                COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher);
174
5.18k
                COUNTER_UPDATE(_fill_block_time, _fill_block_watcher);
175
5.18k
            }
176
177
5.49k
            RETURN_ERROR_IF_EXC(env);
178
5.49k
            jlong _append = 0;
179
5.49k
            RETURN_IF_ERROR(
180
5.49k
                    _jni_scanner_obj.call_long_method(env, _jni_scanner_get_append_data_time)
181
5.49k
                            .call(&_append));
182
183
5.49k
            if (_profile) {
184
5.18k
                COUNTER_UPDATE(_java_append_data_time, _append);
185
5.18k
            }
186
187
5.49k
            jlong _create = 0;
188
5.49k
            RETURN_IF_ERROR(
189
5.49k
                    _jni_scanner_obj
190
5.49k
                            .call_long_method(env, _jni_scanner_get_create_vector_table_time)
191
5.49k
                            .call(&_create));
192
193
5.49k
            if (_profile) {
194
5.18k
                COUNTER_UPDATE(_java_create_vector_table_time, _create);
195
5.18k
                COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - _create);
196
5.18k
                _max_time_split_weight_counter->conditional_update(
197
5.18k
                        _jni_scanner_open_watcher + _fill_block_watcher + _java_scan_watcher,
198
5.18k
                        _self_split_weight);
199
5.18k
            }
200
201
            // _fill_block may be failed and returned, we should release table in close.
202
            // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
203
5.49k
            RETURN_IF_ERROR(
204
5.49k
                    _jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call());
205
5.49k
            RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_close).call());
206
5.49k
        }
207
5.49k
    }
208
5.49k
    return Status::OK();
209
5.49k
}
210
211
// =========================================================================
212
// JniReader::set_batch_size
213
// =========================================================================
214
215
10.4k
void JniReader::set_batch_size(size_t batch_size) {
216
10.4k
    DCHECK_GT(batch_size, 0);
217
10.4k
    if (_batch_size == batch_size) {
218
1.38k
        return;
219
1.38k
    }
220
9.09k
    _batch_size = batch_size;
221
9.09k
    if (_scanner_opened) {
222
9.09k
        JNIEnv* env = nullptr;
223
9.09k
        Status st = Jni::Env::Get(&env);
224
9.09k
        if (!st) {
225
0
            LOG(WARNING) << "failed to get jni env when set_batch_size: " << st;
226
0
            return;
227
0
        }
228
9.09k
        st = _jni_scanner_obj.call_void_method(env, _jni_scanner_set_batch_size)
229
9.09k
                     .with_arg(static_cast<int>(_batch_size))
230
9.09k
                     .call();
231
9.09k
        if (!st) {
232
0
            LOG(WARNING) << "failed to call setBatchSize: " << st;
233
0
        }
234
9.09k
    }
235
9.09k
}
236
237
// =========================================================================
238
// JniReader::_init_jni_scanner  (merged from JniConnector::_init_jni_scanner)
239
// =========================================================================
240
241
5.49k
Status JniReader::_init_jni_scanner(JNIEnv* env, int batch_size) {
242
5.49k
    RETURN_IF_ERROR(
243
5.49k
            Jni::Util::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls));
244
245
5.49k
    Jni::MethodId scanner_constructor;
246
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "<init>", "(ILjava/util/Map;)V",
247
5.49k
                                                &scanner_constructor));
248
249
    // prepare constructor parameters
250
5.49k
    Jni::LocalObject hashmap_object;
251
5.49k
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _scanner_params, &hashmap_object));
252
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.new_object(env, scanner_constructor)
253
5.49k
                            .with_arg(batch_size)
254
5.49k
                            .with_arg(hashmap_object)
255
5.49k
                            .call(&_jni_scanner_obj));
256
257
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "open", "()V", &_jni_scanner_open));
258
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getNextBatchMeta", "()J",
259
5.49k
                                                &_jni_scanner_get_next_batch));
260
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getAppendDataTime", "()J",
261
5.49k
                                                &_jni_scanner_get_append_data_time));
262
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getCreateVectorTableTime", "()J",
263
5.49k
                                                &_jni_scanner_get_create_vector_table_time));
264
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getTableSchema", "()Ljava/lang/String;",
265
5.49k
                                                &_jni_scanner_get_table_schema));
266
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "close", "()V", &_jni_scanner_close));
267
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "releaseColumn", "(I)V",
268
5.49k
                                                &_jni_scanner_release_column));
269
5.49k
    RETURN_IF_ERROR(
270
5.49k
            _jni_scanner_cls.get_method(env, "releaseTable", "()V", &_jni_scanner_release_table));
271
5.49k
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getStatistics", "()Ljava/util/Map;",
272
5.49k
                                                &_jni_scanner_get_statistics));
273
5.49k
    RETURN_IF_ERROR(
274
5.49k
            _jni_scanner_cls.get_method(env, "setBatchSize", "(I)V", &_jni_scanner_set_batch_size));
275
5.49k
    return Status::OK();
276
5.49k
}
277
278
// =========================================================================
279
// JniReader::_fill_block  (merged from JniConnector::_fill_block)
280
// =========================================================================
281
282
6.30k
Status JniReader::_fill_block(Block* block, size_t num_rows) {
283
6.30k
    SCOPED_RAW_TIMER(&_fill_block_watcher);
284
6.30k
    JNIEnv* env = nullptr;
285
6.30k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
286
    // Fallback: if _col_name_to_block_idx was not set by the caller (e.g. JdbcScanner),
287
    // build the name-to-position map from the block itself.
288
6.30k
    std::unordered_map<std::string, uint32_t> local_name_to_idx;
289
6.30k
    const std::unordered_map<std::string, uint32_t>* col_map = _col_name_to_block_idx;
290
6.30k
    if (col_map == nullptr) {
291
0
        local_name_to_idx = block->get_name_to_pos_map();
292
0
        col_map = &local_name_to_idx;
293
0
    }
294
36.8k
    for (int i = 0; i < _column_names.size(); ++i) {
295
30.5k
        auto& column_with_type_and_name = block->get_by_position(col_map->at(_column_names[i]));
296
30.5k
        auto& column_ptr = column_with_type_and_name.column;
297
30.5k
        auto& column_type = column_with_type_and_name.type;
298
30.5k
        RETURN_IF_ERROR(JniDataBridge::fill_column(_table_meta, column_ptr, column_type, num_rows));
299
        // Column is not released when fill_column failed. It will be released when releasing table.
300
30.5k
        RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_column)
301
30.5k
                                .with_arg(i)
302
30.5k
                                .call());
303
30.5k
        RETURN_ERROR_IF_EXC(env);
304
30.5k
    }
305
6.30k
    return Status::OK();
306
6.30k
}
307
308
// =========================================================================
309
// JniReader::_get_statistics  (merged from JniConnector::get_statistics)
310
// =========================================================================
311
312
5.18k
Status JniReader::_get_statistics(JNIEnv* env, std::map<std::string, std::string>* result) {
313
5.18k
    result->clear();
314
5.18k
    Jni::LocalObject metrics;
315
5.18k
    RETURN_IF_ERROR(
316
5.18k
            _jni_scanner_obj.call_object_method(env, _jni_scanner_get_statistics).call(&metrics));
317
318
5.18k
    RETURN_IF_ERROR(Jni::Util::convert_to_cpp_map(env, metrics, result));
319
5.18k
    return Status::OK();
320
5.18k
}
321
322
// =========================================================================
323
// JniReader::_collect_profile_before_close
324
// (merged from JniConnector::_collect_profile_before_close)
325
// =========================================================================
326
327
5.18k
void JniReader::_collect_profile_before_close() {
328
5.18k
    if (_scanner_opened && _profile != nullptr) {
329
5.18k
        JNIEnv* env = nullptr;
330
5.18k
        Status st = Jni::Env::Get(&env);
331
5.18k
        if (!st) {
332
0
            LOG(WARNING) << "failed to get jni env when collect profile: " << st;
333
0
            return;
334
0
        }
335
        // update scanner metrics
336
5.18k
        std::map<std::string, std::string> statistics_result;
337
5.18k
        st = _get_statistics(env, &statistics_result);
338
5.18k
        if (!st) {
339
0
            LOG(WARNING) << "failed to get_statistics when collect profile: " << st;
340
0
            return;
341
0
        }
342
343
7.02k
        for (const auto& metric : statistics_result) {
344
7.02k
            std::vector<std::string> type_and_name = split(metric.first, ":");
345
7.02k
            if (type_and_name.size() != 2) {
346
0
                LOG(WARNING) << "Name of JNI Scanner metric should be pattern like "
347
0
                             << "'metricType:metricName'";
348
0
                continue;
349
0
            }
350
7.02k
            long metric_value = std::stol(metric.second);
351
7.02k
            RuntimeProfile::Counter* scanner_counter;
352
7.02k
            if (type_and_name[0] == "timer") {
353
5.78k
                scanner_counter =
354
5.78k
                        ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str());
355
5.78k
            } else if (type_and_name[0] == "counter") {
356
1.24k
                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT,
357
1.24k
                                                    _connector_name.c_str());
358
1.24k
            } else if (type_and_name[0] == "bytes") {
359
0
                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES,
360
0
                                                    _connector_name.c_str());
361
0
            } else {
362
0
                LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes";
363
0
                continue;
364
0
            }
365
7.02k
            COUNTER_UPDATE(scanner_counter, metric_value);
366
7.02k
        }
367
5.18k
    }
368
5.18k
}
369
370
// =========================================================================
371
// MockJniReader
372
// =========================================================================
373
374
MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
375
                             RuntimeState* state, RuntimeProfile* profile)
376
0
        : JniReader(
377
0
                  file_slot_descs, state, profile, "org/apache/doris/common/jni/MockJniScanner",
378
0
                  [&]() {
379
0
                      std::ostringstream required_fields;
380
0
                      std::ostringstream columns_types;
381
0
                      int index = 0;
382
0
                      for (const auto& desc : file_slot_descs) {
383
0
                          std::string field = desc->col_name();
384
0
                          std::string type =
385
0
                                  JniDataBridge::get_jni_type_with_different_string(desc->type());
386
0
                          if (index == 0) {
387
0
                              required_fields << field;
388
0
                              columns_types << type;
389
0
                          } else {
390
0
                              required_fields << "," << field;
391
0
                              columns_types << "#" << type;
392
0
                          }
393
0
                          index++;
394
0
                      }
395
0
                      return std::map<String, String> {{"mock_rows", "10240"},
396
0
                                                       {"required_fields", required_fields.str()},
397
0
                                                       {"columns_types", columns_types.str()}};
398
0
                  }(),
399
0
                  [&]() {
400
0
                      std::vector<std::string> names;
401
0
                      for (const auto& desc : file_slot_descs) {
402
0
                          names.emplace_back(desc->col_name());
403
0
                      }
404
0
                      return names;
405
0
                  }()) {}
406
407
0
Status MockJniReader::init_reader() {
408
0
    return open(_state, _profile);
409
0
}
410
411
} // namespace doris