Coverage Report

Created: 2026-03-16 05:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/connector/jni_connector.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/connector/jni_connector.h"
19
20
#include <glog/logging.h>
21
22
#include <sstream>
23
#include <variant>
24
25
#include "core/block/block.h"
26
#include "core/column/column_array.h"
27
#include "core/column/column_map.h"
28
#include "core/column/column_nullable.h"
29
#include "core/column/column_string.h"
30
#include "core/column/column_struct.h"
31
#include "core/column/column_varbinary.h"
32
#include "core/data_type/data_type_array.h"
33
#include "core/data_type/data_type_map.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_struct.h"
36
#include "core/data_type/data_type_varbinary.h"
37
#include "core/data_type/define_primitive_type.h"
38
#include "core/data_type/primitive_type.h"
39
#include "core/types.h"
40
#include "core/value/decimalv2_value.h"
41
#include "jni.h"
42
#include "runtime/runtime_state.h"
43
#include "util/jni-util.h"
44
45
namespace doris {
46
#include "common/compile_check_begin.h"
47
class RuntimeProfile;
48
} // namespace doris
49
50
namespace doris {
51
52
#define FOR_FIXED_LENGTH_TYPES(M)                                  \
53
0
    M(PrimitiveType::TYPE_TINYINT, ColumnInt8, Int8)               \
54
0
    M(PrimitiveType::TYPE_BOOLEAN, ColumnUInt8, UInt8)             \
55
0
    M(PrimitiveType::TYPE_SMALLINT, ColumnInt16, Int16)            \
56
0
    M(PrimitiveType::TYPE_INT, ColumnInt32, Int32)                 \
57
0
    M(PrimitiveType::TYPE_BIGINT, ColumnInt64, Int64)              \
58
0
    M(PrimitiveType::TYPE_LARGEINT, ColumnInt128, Int128)          \
59
0
    M(PrimitiveType::TYPE_FLOAT, ColumnFloat32, Float32)           \
60
0
    M(PrimitiveType::TYPE_DOUBLE, ColumnFloat64, Float64)          \
61
0
    M(PrimitiveType::TYPE_DECIMALV2, ColumnDecimal128V2, Int128)   \
62
0
    M(PrimitiveType::TYPE_DECIMAL128I, ColumnDecimal128V3, Int128) \
63
0
    M(PrimitiveType::TYPE_DECIMAL32, ColumnDecimal32, Int32)       \
64
0
    M(PrimitiveType::TYPE_DECIMAL64, ColumnDecimal64, Int64)       \
65
0
    M(PrimitiveType::TYPE_DATE, ColumnDate, Int64)                 \
66
0
    M(PrimitiveType::TYPE_DATEV2, ColumnDateV2, UInt32)            \
67
0
    M(PrimitiveType::TYPE_DATETIME, ColumnDateTime, Int64)         \
68
0
    M(PrimitiveType::TYPE_DATETIMEV2, ColumnDateTimeV2, UInt64)    \
69
0
    M(PrimitiveType::TYPE_TIMESTAMPTZ, ColumnTimeStampTz, UInt64)  \
70
0
    M(PrimitiveType::TYPE_IPV4, ColumnIPv4, IPv4)                  \
71
0
    M(PrimitiveType::TYPE_IPV6, ColumnIPv6, IPv6)
72
73
0
Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
74
0
    _state = state;
75
0
    _profile = profile;
76
0
    ADD_TIMER(_profile, _connector_name.c_str());
77
0
    _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str());
78
0
    _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str());
79
0
    _java_append_data_time =
80
0
            ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", _connector_name.c_str());
81
0
    _java_create_vector_table_time =
82
0
            ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", _connector_name.c_str());
83
0
    _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str());
84
0
    _max_time_split_weight_counter = _profile->add_conditition_counter(
85
0
            "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; },
86
0
            _connector_name.c_str());
87
0
    _java_scan_watcher = 0;
88
    // cannot put the env into fields, because frames in an env object is limited
89
    // to avoid limited frames in a thread, we should get local env in a method instead of in whole object.
90
0
    JNIEnv* env = nullptr;
91
0
    int batch_size = 0;
92
0
    if (!_is_table_schema) {
93
0
        batch_size = _state->batch_size();
94
0
    }
95
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
96
0
    SCOPED_RAW_TIMER(&_jni_scanner_open_watcher);
97
0
    _scanner_params.emplace("time_zone", _state->timezone());
98
0
    RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
99
    // Call org.apache.doris.common.jni.JniScanner#open
100
0
    RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_open).call());
101
102
0
    RETURN_ERROR_IF_EXC(env);
103
0
    _scanner_opened = true;
104
0
    return Status::OK();
105
0
}
106
107
0
Status JniConnector::init() {
108
0
    return Status::OK();
109
0
}
110
111
0
Status JniConnector::get_next_block(Block* block, size_t* read_rows, bool* eof) {
112
    // Call org.apache.doris.common.jni.JniScanner#getNextBatchMeta
113
    // return the address of meta information
114
0
    JNIEnv* env = nullptr;
115
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
116
0
    long meta_address = 0;
117
0
    {
118
0
        SCOPED_RAW_TIMER(&_java_scan_watcher);
119
0
        RETURN_IF_ERROR(_jni_scanner_obj.call_long_method(env, _jni_scanner_get_next_batch)
120
0
                                .call(&meta_address));
121
0
    }
122
0
    if (meta_address == 0) {
123
        // Address == 0 when there's no data in scanner
124
0
        *read_rows = 0;
125
0
        *eof = true;
126
0
        return Status::OK();
127
0
    }
128
0
    _set_meta(meta_address);
129
0
    long num_rows = _table_meta.next_meta_as_long();
130
0
    if (num_rows == 0) {
131
0
        *read_rows = 0;
132
0
        *eof = true;
133
0
        return Status::OK();
134
0
    }
135
0
    RETURN_IF_ERROR(_fill_block(block, num_rows));
136
0
    *read_rows = num_rows;
137
0
    *eof = false;
138
0
    RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call());
139
0
    _has_read += num_rows;
140
0
    return Status::OK();
141
0
}
142
143
0
Status JniConnector::get_table_schema(std::string& table_schema_str) {
144
0
    JNIEnv* env = nullptr;
145
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
146
147
0
    Jni::LocalString jstr;
148
0
    RETURN_IF_ERROR(
149
0
            _jni_scanner_obj.call_object_method(env, _jni_scanner_get_table_schema).call(&jstr));
150
0
    Jni::LocalStringBufferGuard cstr;
151
0
    RETURN_IF_ERROR(jstr.get_string_chars(env, &cstr));
152
0
    table_schema_str = std::string {cstr.get()}; // copy to std::string
153
0
    return Status::OK();
154
0
}
155
156
0
Status JniConnector::get_statistics(JNIEnv* env, std::map<std::string, std::string>* result) {
157
0
    result->clear();
158
0
    Jni::LocalObject metrics;
159
0
    RETURN_IF_ERROR(
160
0
            _jni_scanner_obj.call_object_method(env, _jni_scanner_get_statistics).call(&metrics));
161
162
0
    RETURN_IF_ERROR(Jni::Util::convert_to_cpp_map(env, metrics, result));
163
0
    return Status::OK();
164
0
}
165
166
0
Status JniConnector::close() {
167
0
    if (!_closed) {
168
0
        JNIEnv* env = nullptr;
169
0
        RETURN_IF_ERROR(Jni::Env::Get(&env));
170
0
        if (_scanner_opened) {
171
0
            COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher);
172
0
            COUNTER_UPDATE(_fill_block_time, _fill_block_watcher);
173
174
0
            RETURN_ERROR_IF_EXC(env);
175
0
            jlong _append = 0;
176
0
            RETURN_IF_ERROR(
177
0
                    _jni_scanner_obj.call_long_method(env, _jni_scanner_get_append_data_time)
178
0
                            .call(&_append));
179
180
0
            COUNTER_UPDATE(_java_append_data_time, _append);
181
182
0
            jlong _create = 0;
183
0
            RETURN_IF_ERROR(
184
0
                    _jni_scanner_obj
185
0
                            .call_long_method(env, _jni_scanner_get_create_vector_table_time)
186
0
                            .call(&_create));
187
188
0
            COUNTER_UPDATE(_java_create_vector_table_time, _create);
189
190
0
            COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - _create);
191
192
0
            _max_time_split_weight_counter->conditional_update(
193
0
                    _jni_scanner_open_watcher + _fill_block_watcher + _java_scan_watcher,
194
0
                    _self_split_weight);
195
196
            // _fill_block may be failed and returned, we should release table in close.
197
            // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
198
0
            RETURN_IF_ERROR(
199
0
                    _jni_scanner_obj.call_void_method(env, _jni_scanner_release_table).call());
200
0
            RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_close).call());
201
0
        }
202
0
    }
203
0
    return Status::OK();
204
0
}
205
206
0
Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {
207
0
    RETURN_IF_ERROR(
208
0
            Jni::Util::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls));
209
210
0
    Jni::MethodId scanner_constructor;
211
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "<init>", "(ILjava/util/Map;)V",
212
0
                                                &scanner_constructor));
213
214
    // prepare constructor parameters
215
0
    Jni::LocalObject hashmap_object;
216
0
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _scanner_params, &hashmap_object));
217
0
    RETURN_IF_ERROR(_jni_scanner_cls.new_object(env, scanner_constructor)
218
0
                            .with_arg(batch_size)
219
0
                            .with_arg(hashmap_object)
220
0
                            .call(&_jni_scanner_obj));
221
222
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "open", "()V", &_jni_scanner_open));
223
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getNextBatchMeta", "()J",
224
0
                                                &_jni_scanner_get_next_batch));
225
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getAppendDataTime", "()J",
226
0
                                                &_jni_scanner_get_append_data_time));
227
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getCreateVectorTableTime", "()J",
228
0
                                                &_jni_scanner_get_create_vector_table_time));
229
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getTableSchema", "()Ljava/lang/String;",
230
0
                                                &_jni_scanner_get_table_schema));
231
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "close", "()V", &_jni_scanner_close));
232
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "releaseColumn", "(I)V",
233
0
                                                &_jni_scanner_release_column));
234
0
    RETURN_IF_ERROR(
235
0
            _jni_scanner_cls.get_method(env, "releaseTable", "()V", &_jni_scanner_release_table));
236
0
    RETURN_IF_ERROR(_jni_scanner_cls.get_method(env, "getStatistics", "()Ljava/util/Map;",
237
0
                                                &_jni_scanner_get_statistics));
238
0
    return Status::OK();
239
0
}
240
241
0
Status JniConnector::fill_block(Block* block, const ColumnNumbers& arguments, long table_address) {
242
0
    if (table_address == 0) {
243
0
        return Status::InternalError("table_address is 0");
244
0
    }
245
0
    TableMetaAddress table_meta(table_address);
246
0
    long num_rows = table_meta.next_meta_as_long();
247
0
    for (size_t i : arguments) {
248
0
        if (block->get_by_position(i).column.get() == nullptr) {
249
0
            auto return_type = block->get_data_type(i);
250
0
            bool result_nullable = return_type->is_nullable();
251
0
            ColumnUInt8::MutablePtr null_col = nullptr;
252
0
            if (result_nullable) {
253
0
                return_type = remove_nullable(return_type);
254
0
                null_col = ColumnUInt8::create();
255
0
            }
256
0
            auto res_col = return_type->create_column();
257
0
            if (result_nullable) {
258
0
                block->replace_by_position(
259
0
                        i, ColumnNullable::create(std::move(res_col), std::move(null_col)));
260
0
            } else {
261
0
                block->replace_by_position(i, std::move(res_col));
262
0
            }
263
0
        } else if (is_column_const(*(block->get_by_position(i).column))) {
264
0
            auto doris_column = block->get_by_position(i).column->convert_to_full_column_if_const();
265
0
            bool is_nullable = block->get_by_position(i).type->is_nullable();
266
0
            block->replace_by_position(i, is_nullable ? make_nullable(doris_column) : doris_column);
267
0
        }
268
0
        auto& column_with_type_and_name = block->get_by_position(i);
269
0
        auto& column_ptr = column_with_type_and_name.column;
270
0
        auto& column_type = column_with_type_and_name.type;
271
0
        RETURN_IF_ERROR(_fill_column(table_meta, column_ptr, column_type, num_rows));
272
0
    }
273
0
    return Status::OK();
274
0
}
275
276
0
Status JniConnector::_fill_block(Block* block, size_t num_rows) {
277
0
    SCOPED_RAW_TIMER(&_fill_block_watcher);
278
0
    JNIEnv* env = nullptr;
279
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
280
0
    for (int i = 0; i < _column_names.size(); ++i) {
281
0
        auto& column_with_type_and_name =
282
0
                block->get_by_position(_col_name_to_block_idx->at(_column_names[i]));
283
0
        auto& column_ptr = column_with_type_and_name.column;
284
0
        auto& column_type = column_with_type_and_name.type;
285
0
        RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, num_rows));
286
        // Column is not released when _fill_column failed. It will be released when releasing table.
287
0
        RETURN_IF_ERROR(_jni_scanner_obj.call_void_method(env, _jni_scanner_release_column)
288
0
                                .with_arg(i)
289
0
                                .call());
290
0
        RETURN_ERROR_IF_EXC(env);
291
0
    }
292
0
    return Status::OK();
293
0
}
294
295
Status JniConnector::_fill_column(TableMetaAddress& address, ColumnPtr& doris_column,
296
0
                                  const DataTypePtr& data_type, size_t num_rows) {
297
0
    auto logical_type = data_type->get_primitive_type();
298
0
    void* null_map_ptr = address.next_meta_as_ptr();
299
0
    if (null_map_ptr == nullptr) {
300
        // org.apache.doris.common.jni.vec.ColumnType.Type#UNSUPPORTED will set column address as 0
301
0
        return Status::InternalError("Unsupported type {} in java side", data_type->get_name());
302
0
    }
303
0
    MutableColumnPtr data_column;
304
0
    if (doris_column->is_nullable()) {
305
0
        auto* nullable_column =
306
0
                reinterpret_cast<ColumnNullable*>(doris_column->assume_mutable().get());
307
0
        data_column = nullable_column->get_nested_column_ptr();
308
0
        NullMap& null_map = nullable_column->get_null_map_data();
309
0
        size_t origin_size = null_map.size();
310
0
        null_map.resize(origin_size + num_rows);
311
0
        memcpy(null_map.data() + origin_size, static_cast<bool*>(null_map_ptr), num_rows);
312
0
    } else {
313
0
        data_column = doris_column->assume_mutable();
314
0
    }
315
    // Date and DateTime are deprecated and not supported.
316
0
    switch (logical_type) {
317
        //FIXME: in Doris we check data then insert. jdbc external table may have some data invalid for doris.
318
        // should add check otherwise it may break some of our assumption now.
319
0
#define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE)              \
320
0
    case TYPE_INDEX:                                             \
321
0
        return _fill_fixed_length_column<COLUMN_TYPE, CPP_TYPE>( \
322
0
                data_column, reinterpret_cast<CPP_TYPE*>(address.next_meta_as_ptr()), num_rows);
323
0
        FOR_FIXED_LENGTH_TYPES(DISPATCH)
324
0
#undef DISPATCH
325
0
    case PrimitiveType::TYPE_STRING:
326
0
        [[fallthrough]];
327
0
    case PrimitiveType::TYPE_CHAR:
328
0
        [[fallthrough]];
329
0
    case PrimitiveType::TYPE_VARCHAR:
330
0
        return _fill_string_column(address, data_column, num_rows);
331
0
    case PrimitiveType::TYPE_ARRAY:
332
0
        return _fill_array_column(address, data_column, data_type, num_rows);
333
0
    case PrimitiveType::TYPE_MAP:
334
0
        return _fill_map_column(address, data_column, data_type, num_rows);
335
0
    case PrimitiveType::TYPE_STRUCT:
336
0
        return _fill_struct_column(address, data_column, data_type, num_rows);
337
0
    case PrimitiveType::TYPE_VARBINARY:
338
0
        return _fill_varbinary_column(address, data_column, num_rows);
339
0
    default:
340
0
        return Status::InvalidArgument("Unsupported type {} in jni scanner", data_type->get_name());
341
0
    }
342
0
    return Status::OK();
343
0
}
344
345
Status JniConnector::_fill_varbinary_column(TableMetaAddress& address,
346
0
                                            MutableColumnPtr& doris_column, size_t num_rows) {
347
0
    auto* meta_base = reinterpret_cast<char*>(address.next_meta_as_ptr());
348
0
    auto& varbinary_col = assert_cast<ColumnVarbinary&>(*doris_column);
349
    // Java side writes per-row metadata as 16 bytes: [len: long][addr: long]
350
0
    for (size_t i = 0; i < num_rows; ++i) {
351
        // Read length (first 8 bytes)
352
0
        int64_t len = 0;
353
0
        memcpy(&len, meta_base + 16 * i, sizeof(len));
354
0
        if (len <= 0) {
355
0
            varbinary_col.insert_default();
356
0
        } else {
357
            // Read address (next 8 bytes)
358
0
            uint64_t addr_u = 0;
359
0
            memcpy(&addr_u, meta_base + 16 * i + 8, sizeof(addr_u));
360
0
            const char* src = reinterpret_cast<const char*>(addr_u);
361
0
            varbinary_col.insert_data(src, static_cast<size_t>(len));
362
0
        }
363
0
    }
364
0
    return Status::OK();
365
0
}
366
367
Status JniConnector::_fill_string_column(TableMetaAddress& address, MutableColumnPtr& doris_column,
368
0
                                         size_t num_rows) {
369
0
    auto& string_col = static_cast<ColumnString&>(*doris_column);
370
0
    ColumnString::Chars& string_chars = string_col.get_chars();
371
0
    ColumnString::Offsets& string_offsets = string_col.get_offsets();
372
0
    int* offsets = reinterpret_cast<int*>(address.next_meta_as_ptr());
373
0
    char* chars = reinterpret_cast<char*>(address.next_meta_as_ptr());
374
375
    // This judgment is necessary, otherwise the following statement `offsets[num_rows - 1]` out of bounds
376
    // What's more, This judgment must be placed after `address.next_meta_as_ptr()`
377
    // because `address.next_meta_as_ptr` will make `address._meta_index` plus 1
378
0
    if (num_rows == 0) {
379
0
        return Status::OK();
380
0
    }
381
382
0
    size_t origin_chars_size = string_chars.size();
383
0
    string_chars.resize(origin_chars_size + offsets[num_rows - 1]);
384
0
    memcpy(string_chars.data() + origin_chars_size, chars, offsets[num_rows - 1]);
385
386
0
    size_t origin_offsets_size = string_offsets.size();
387
0
    size_t start_offset = string_offsets[origin_offsets_size - 1];
388
0
    string_offsets.resize(origin_offsets_size + num_rows);
389
0
    for (size_t i = 0; i < num_rows; ++i) {
390
0
        string_offsets[origin_offsets_size + i] =
391
0
                static_cast<unsigned int>(offsets[i] + start_offset);
392
0
    }
393
0
    return Status::OK();
394
0
}
395
396
Status JniConnector::_fill_array_column(TableMetaAddress& address, MutableColumnPtr& doris_column,
397
0
                                        const DataTypePtr& data_type, size_t num_rows) {
398
0
    ColumnPtr& element_column = static_cast<ColumnArray&>(*doris_column).get_data_ptr();
399
0
    const DataTypePtr& element_type =
400
0
            (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
401
0
                    ->get_nested_type();
402
0
    ColumnArray::Offsets64& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets();
403
404
0
    int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr());
405
0
    size_t origin_size = offsets_data.size();
406
0
    offsets_data.resize(origin_size + num_rows);
407
0
    size_t start_offset = offsets_data[origin_size - 1];
408
0
    for (size_t i = 0; i < num_rows; ++i) {
409
0
        offsets_data[origin_size + i] = offsets[i] + start_offset;
410
0
    }
411
412
    // offsets[num_rows - 1] == offsets_data[origin_size + num_rows - 1] - start_offset
413
    // but num_row equals 0 when there are all empty arrays
414
0
    return _fill_column(address, element_column, element_type,
415
0
                        offsets_data[origin_size + num_rows - 1] - start_offset);
416
0
}
417
418
Status JniConnector::_fill_map_column(TableMetaAddress& address, MutableColumnPtr& doris_column,
419
0
                                      const DataTypePtr& data_type, size_t num_rows) {
420
0
    auto& map = static_cast<ColumnMap&>(*doris_column);
421
0
    const DataTypePtr& key_type =
422
0
            reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type();
423
0
    const DataTypePtr& value_type =
424
0
            reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
425
0
                    ->get_value_type();
426
0
    ColumnPtr& key_column = map.get_keys_ptr();
427
0
    ColumnPtr& value_column = map.get_values_ptr();
428
0
    ColumnArray::Offsets64& map_offsets = map.get_offsets();
429
430
0
    int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr());
431
0
    size_t origin_size = map_offsets.size();
432
0
    map_offsets.resize(origin_size + num_rows);
433
0
    size_t start_offset = map_offsets[origin_size - 1];
434
0
    for (size_t i = 0; i < num_rows; ++i) {
435
0
        map_offsets[origin_size + i] = offsets[i] + start_offset;
436
0
    }
437
438
0
    RETURN_IF_ERROR(_fill_column(address, key_column, key_type,
439
0
                                 map_offsets[origin_size + num_rows - 1] - start_offset));
440
0
    RETURN_IF_ERROR(_fill_column(address, value_column, value_type,
441
0
                                 map_offsets[origin_size + num_rows - 1] - start_offset));
442
0
    return Status::OK();
443
0
}
444
445
Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColumnPtr& doris_column,
446
0
                                         const DataTypePtr& data_type, size_t num_rows) {
447
0
    auto& doris_struct = static_cast<ColumnStruct&>(*doris_column);
448
0
    const DataTypeStruct* doris_struct_type =
449
0
            reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
450
0
    for (int i = 0; i < doris_struct.tuple_size(); ++i) {
451
0
        ColumnPtr& struct_field = doris_struct.get_column_ptr(i);
452
0
        const DataTypePtr& field_type = doris_struct_type->get_element(i);
453
0
        RETURN_IF_ERROR(_fill_column(address, struct_field, field_type, num_rows));
454
0
    }
455
0
    return Status::OK();
456
0
}
457
458
0
std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
459
0
    DataTypePtr type = remove_nullable(data_type);
460
0
    std::ostringstream buffer;
461
0
    switch (type->get_primitive_type()) {
462
0
    case TYPE_BOOLEAN:
463
0
        return "boolean";
464
0
    case TYPE_TINYINT:
465
0
        return "tinyint";
466
0
    case TYPE_SMALLINT:
467
0
        return "smallint";
468
0
    case TYPE_INT:
469
0
        return "int";
470
0
    case TYPE_BIGINT:
471
0
        return "bigint";
472
0
    case TYPE_LARGEINT:
473
0
        return "largeint";
474
0
    case TYPE_FLOAT:
475
0
        return "float";
476
0
    case TYPE_DOUBLE:
477
0
        return "double";
478
0
    case TYPE_IPV4:
479
0
        return "ipv4";
480
0
    case TYPE_IPV6:
481
0
        return "ipv6";
482
0
    case TYPE_VARCHAR:
483
0
        [[fallthrough]];
484
0
    case TYPE_CHAR:
485
0
        [[fallthrough]];
486
0
    case TYPE_STRING:
487
0
        return "string";
488
0
    case TYPE_DATE:
489
0
        return "datev1";
490
0
    case TYPE_DATEV2:
491
0
        return "datev2";
492
0
    case TYPE_DATETIME:
493
0
        return "datetimev1";
494
0
    case TYPE_DATETIMEV2:
495
0
        [[fallthrough]];
496
0
    case TYPE_TIMEV2: {
497
0
        buffer << "datetimev2(" << type->get_scale() << ")";
498
0
        return buffer.str();
499
0
    }
500
0
    case TYPE_TIMESTAMPTZ: {
501
0
        buffer << "timestamptz(" << type->get_scale() << ")";
502
0
        return buffer.str();
503
0
    }
504
0
    case TYPE_BINARY:
505
0
        return "binary";
506
0
    case TYPE_DECIMALV2: {
507
0
        buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")";
508
0
        return buffer.str();
509
0
    }
510
0
    case TYPE_DECIMAL32: {
511
0
        buffer << "decimal32(" << type->get_precision() << "," << type->get_scale() << ")";
512
0
        return buffer.str();
513
0
    }
514
0
    case TYPE_DECIMAL64: {
515
0
        buffer << "decimal64(" << type->get_precision() << "," << type->get_scale() << ")";
516
0
        return buffer.str();
517
0
    }
518
0
    case TYPE_DECIMAL128I: {
519
0
        buffer << "decimal128(" << type->get_precision() << "," << type->get_scale() << ")";
520
0
        return buffer.str();
521
0
    }
522
0
    case TYPE_STRUCT: {
523
0
        const DataTypeStruct* struct_type = reinterpret_cast<const DataTypeStruct*>(type.get());
524
0
        buffer << "struct<";
525
0
        for (int i = 0; i < struct_type->get_elements().size(); ++i) {
526
0
            if (i != 0) {
527
0
                buffer << ",";
528
0
            }
529
0
            buffer << struct_type->get_element_names()[i] << ":"
530
0
                   << get_jni_type(struct_type->get_element(i));
531
0
        }
532
0
        buffer << ">";
533
0
        return buffer.str();
534
0
    }
535
0
    case TYPE_ARRAY: {
536
0
        const DataTypeArray* array_type = reinterpret_cast<const DataTypeArray*>(type.get());
537
0
        buffer << "array<" << get_jni_type(array_type->get_nested_type()) << ">";
538
0
        return buffer.str();
539
0
    }
540
0
    case TYPE_MAP: {
541
0
        const DataTypeMap* map_type = reinterpret_cast<const DataTypeMap*>(type.get());
542
0
        buffer << "map<" << get_jni_type(map_type->get_key_type()) << ","
543
0
               << get_jni_type(map_type->get_value_type()) << ">";
544
0
        return buffer.str();
545
0
    }
546
0
    case TYPE_VARBINARY:
547
0
        return "varbinary";
548
0
    default:
549
0
        return "unsupported";
550
0
    }
551
0
}
552
553
0
std::string JniConnector::get_jni_type_with_different_string(const DataTypePtr& data_type) {
554
0
    DataTypePtr type = remove_nullable(data_type);
555
0
    std::ostringstream buffer;
556
0
    switch (data_type->get_primitive_type()) {
557
0
    case TYPE_BOOLEAN:
558
0
        return "boolean";
559
0
    case TYPE_TINYINT:
560
0
        return "tinyint";
561
0
    case TYPE_SMALLINT:
562
0
        return "smallint";
563
0
    case TYPE_INT:
564
0
        return "int";
565
0
    case TYPE_BIGINT:
566
0
        return "bigint";
567
0
    case TYPE_LARGEINT:
568
0
        return "largeint";
569
0
    case TYPE_FLOAT:
570
0
        return "float";
571
0
    case TYPE_DOUBLE:
572
0
        return "double";
573
0
    case TYPE_IPV4:
574
0
        return "ipv4";
575
0
    case TYPE_IPV6:
576
0
        return "ipv6";
577
0
    case TYPE_VARCHAR: {
578
0
        buffer << "varchar("
579
0
               << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len()
580
0
               << ")";
581
0
        return buffer.str();
582
0
    }
583
0
    case TYPE_DATE:
584
0
        return "datev1";
585
0
    case TYPE_DATEV2:
586
0
        return "datev2";
587
0
    case TYPE_DATETIME:
588
0
        return "datetimev1";
589
0
    case TYPE_DATETIMEV2:
590
0
        [[fallthrough]];
591
0
    case TYPE_TIMEV2: {
592
0
        buffer << "datetimev2(" << data_type->get_scale() << ")";
593
0
        return buffer.str();
594
0
    }
595
0
    case TYPE_TIMESTAMPTZ: {
596
0
        buffer << "timestamptz(" << data_type->get_scale() << ")";
597
0
        return buffer.str();
598
0
    }
599
0
    case TYPE_BINARY:
600
0
        return "binary";
601
0
    case TYPE_CHAR: {
602
0
        buffer << "char("
603
0
               << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len()
604
0
               << ")";
605
0
        return buffer.str();
606
0
    }
607
0
    case TYPE_STRING:
608
0
        return "string";
609
0
    case TYPE_VARBINARY:
610
0
        buffer << "varbinary("
611
0
               << assert_cast<const DataTypeVarbinary*>(remove_nullable(data_type).get())->len()
612
0
               << ")";
613
0
        return buffer.str();
614
0
    case TYPE_DECIMALV2: {
615
0
        buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")";
616
0
        return buffer.str();
617
0
    }
618
0
    case TYPE_DECIMAL32: {
619
0
        buffer << "decimal32(" << data_type->get_precision() << "," << data_type->get_scale()
620
0
               << ")";
621
0
        return buffer.str();
622
0
    }
623
0
    case TYPE_DECIMAL64: {
624
0
        buffer << "decimal64(" << data_type->get_precision() << "," << data_type->get_scale()
625
0
               << ")";
626
0
        return buffer.str();
627
0
    }
628
0
    case TYPE_DECIMAL128I: {
629
0
        buffer << "decimal128(" << data_type->get_precision() << "," << data_type->get_scale()
630
0
               << ")";
631
0
        return buffer.str();
632
0
    }
633
0
    case TYPE_STRUCT: {
634
0
        const auto* type_struct =
635
0
                assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
636
0
        buffer << "struct<";
637
0
        for (int i = 0; i < type_struct->get_elements().size(); ++i) {
638
0
            if (i != 0) {
639
0
                buffer << ",";
640
0
            }
641
0
            buffer << type_struct->get_element_name(i) << ":"
642
0
                   << get_jni_type_with_different_string(type_struct->get_element(i));
643
0
        }
644
0
        buffer << ">";
645
0
        return buffer.str();
646
0
    }
647
0
    case TYPE_ARRAY: {
648
0
        const auto* type_arr = assert_cast<const DataTypeArray*>(remove_nullable(data_type).get());
649
0
        buffer << "array<" << get_jni_type_with_different_string(type_arr->get_nested_type())
650
0
               << ">";
651
0
        return buffer.str();
652
0
    }
653
0
    case TYPE_MAP: {
654
0
        const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(data_type).get());
655
0
        buffer << "map<" << get_jni_type_with_different_string(type_map->get_key_type()) << ","
656
0
               << get_jni_type_with_different_string(type_map->get_value_type()) << ">";
657
0
        return buffer.str();
658
0
    }
659
0
    default:
660
0
        return "unsupported";
661
0
    }
662
0
}
663
664
Status JniConnector::_fill_column_meta(const ColumnPtr& doris_column, const DataTypePtr& data_type,
665
0
                                       std::vector<long>& meta_data) {
666
0
    auto logical_type = data_type->get_primitive_type();
667
0
    const IColumn* column = nullptr;
668
    // insert const flag
669
0
    if (is_column_const(*doris_column)) {
670
0
        meta_data.emplace_back((long)1);
671
0
        const auto& const_column = assert_cast<const ColumnConst&>(*doris_column);
672
0
        column = &(const_column.get_data_column());
673
0
    } else {
674
0
        meta_data.emplace_back((long)0);
675
0
        column = &(*doris_column);
676
0
    }
677
678
    // insert null map address
679
0
    const IColumn* data_column = nullptr;
680
0
    if (column->is_nullable()) {
681
0
        const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
682
0
        data_column = &(nullable_column.get_nested_column());
683
0
        const auto& null_map = nullable_column.get_null_map_data();
684
0
        meta_data.emplace_back((long)null_map.data());
685
0
    } else {
686
0
        meta_data.emplace_back(0);
687
0
        data_column = column;
688
0
    }
689
0
    switch (logical_type) {
690
0
#define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE)                                          \
691
0
    case TYPE_INDEX: {                                                                       \
692
0
        meta_data.emplace_back(_get_fixed_length_column_address<COLUMN_TYPE>(*data_column)); \
693
0
        break;                                                                               \
694
0
    }
695
0
        FOR_FIXED_LENGTH_TYPES(DISPATCH)
696
0
#undef DISPATCH
697
0
    case PrimitiveType::TYPE_STRING:
698
0
        [[fallthrough]];
699
0
    case PrimitiveType::TYPE_CHAR:
700
0
        [[fallthrough]];
701
0
    case PrimitiveType::TYPE_VARCHAR: {
702
0
        const auto& string_column = assert_cast<const ColumnString&>(*data_column);
703
        // inert offsets
704
0
        meta_data.emplace_back((long)string_column.get_offsets().data());
705
0
        meta_data.emplace_back((long)string_column.get_chars().data());
706
0
        break;
707
0
    }
708
0
    case PrimitiveType::TYPE_ARRAY: {
709
0
        const auto& element_column = assert_cast<const ColumnArray&>(*data_column).get_data_ptr();
710
0
        meta_data.emplace_back(
711
0
                (long)assert_cast<const ColumnArray&>(*data_column).get_offsets().data());
712
0
        const auto& element_type = assert_cast<const DataTypePtr&>(
713
0
                (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
714
0
                        ->get_nested_type());
715
0
        RETURN_IF_ERROR(_fill_column_meta(element_column, element_type, meta_data));
716
0
        break;
717
0
    }
718
0
    case PrimitiveType::TYPE_STRUCT: {
719
0
        const auto& doris_struct = assert_cast<const ColumnStruct&>(*data_column);
720
0
        const auto* doris_struct_type =
721
0
                assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
722
0
        for (int i = 0; i < doris_struct.tuple_size(); ++i) {
723
0
            const auto& struct_field = doris_struct.get_column_ptr(i);
724
0
            const auto& field_type =
725
0
                    assert_cast<const DataTypePtr&>(doris_struct_type->get_element(i));
726
0
            RETURN_IF_ERROR(_fill_column_meta(struct_field, field_type, meta_data));
727
0
        }
728
0
        break;
729
0
    }
730
0
    case PrimitiveType::TYPE_MAP: {
731
0
        const auto& map = assert_cast<const ColumnMap&>(*data_column);
732
0
        const auto& key_type = assert_cast<const DataTypePtr&>(
733
0
                assert_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type());
734
0
        const auto& value_type = assert_cast<const DataTypePtr&>(
735
0
                assert_cast<const DataTypeMap*>(remove_nullable(data_type).get())
736
0
                        ->get_value_type());
737
0
        const auto& key_column = map.get_keys_ptr();
738
0
        const auto& value_column = map.get_values_ptr();
739
0
        meta_data.emplace_back((long)map.get_offsets().data());
740
0
        RETURN_IF_ERROR(_fill_column_meta(key_column, key_type, meta_data));
741
0
        RETURN_IF_ERROR(_fill_column_meta(value_column, value_type, meta_data));
742
0
        break;
743
0
    }
744
0
    case PrimitiveType::TYPE_VARBINARY: {
745
0
        const auto& varbinary_col = assert_cast<const ColumnVarbinary&>(*data_column);
746
0
        meta_data.emplace_back(
747
0
                (long)assert_cast<const ColumnVarbinary&>(varbinary_col).get_data().data());
748
0
        break;
749
0
    }
750
0
    default:
751
0
        return Status::InternalError("Unsupported type: {}", data_type->get_name());
752
0
    }
753
0
    return Status::OK();
754
0
}
755
756
0
Status JniConnector::to_java_table(Block* block, std::unique_ptr<long[]>& meta) {
757
0
    ColumnNumbers arguments;
758
0
    for (size_t i = 0; i < block->columns(); ++i) {
759
0
        arguments.emplace_back(i);
760
0
    }
761
0
    return to_java_table(block, block->rows(), arguments, meta);
762
0
}
763
764
Status JniConnector::to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments,
765
0
                                   std::unique_ptr<long[]>& meta) {
766
0
    std::vector<long> meta_data;
767
    // insert number of rows
768
0
    meta_data.emplace_back(num_rows);
769
0
    for (size_t i : arguments) {
770
0
        auto& column_with_type_and_name = block->get_by_position(i);
771
0
        RETURN_IF_ERROR(_fill_column_meta(column_with_type_and_name.column,
772
0
                                          column_with_type_and_name.type, meta_data));
773
0
    }
774
775
0
    meta.reset(new long[meta_data.size()]);
776
0
    memcpy(meta.get(), &meta_data[0], meta_data.size() * 8);
777
0
    return Status::OK();
778
0
}
779
780
std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block,
781
                                                                     const ColumnNumbers& arguments,
782
0
                                                                     bool ignore_column_name) {
783
    // prepare table schema
784
0
    std::ostringstream required_fields;
785
0
    std::ostringstream columns_types;
786
0
    for (int i = 0; i < arguments.size(); ++i) {
787
        // column name maybe empty or has special characters
788
        // std::string field = block->get_by_position(i).name;
789
0
        std::string type = JniConnector::get_jni_type(block->get_by_position(arguments[i]).type);
790
0
        if (i == 0) {
791
0
            if (ignore_column_name) {
792
0
                required_fields << "_col_" << arguments[i];
793
0
            } else {
794
0
                required_fields << block->get_by_position(arguments[i]).name;
795
0
            }
796
0
            columns_types << type;
797
0
        } else {
798
0
            if (ignore_column_name) {
799
0
                required_fields << ","
800
0
                                << "_col_" << arguments[i];
801
0
            } else {
802
0
                required_fields << "," << block->get_by_position(arguments[i]).name;
803
0
            }
804
0
            columns_types << "#" << type;
805
0
        }
806
0
    }
807
0
    return std::make_pair(required_fields.str(), columns_types.str());
808
0
}
809
810
0
std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block) {
811
0
    ColumnNumbers arguments;
812
0
    for (size_t i = 0; i < block->columns(); ++i) {
813
0
        arguments.emplace_back(i);
814
0
    }
815
0
    return parse_table_schema(block, arguments, true);
816
0
}
817
818
0
void JniConnector::_collect_profile_before_close() {
819
0
    if (_scanner_opened && _profile != nullptr) {
820
0
        JNIEnv* env = nullptr;
821
0
        Status st = Jni::Env::Get(&env);
822
0
        if (!st) {
823
0
            LOG(WARNING) << "failed to get jni env when collect profile: " << st;
824
0
            return;
825
0
        }
826
        // update scanner metrics
827
0
        std::map<std::string, std::string> statistics_result;
828
0
        st = get_statistics(env, &statistics_result);
829
0
        if (!st) {
830
0
            LOG(WARNING) << "failed to get_statistics when collect profile: " << st;
831
0
            return;
832
0
        }
833
834
0
        for (const auto& metric : statistics_result) {
835
0
            std::vector<std::string> type_and_name = split(metric.first, ":");
836
0
            if (type_and_name.size() != 2) {
837
0
                LOG(WARNING) << "Name of JNI Scanner metric should be pattern like "
838
0
                             << "'metricType:metricName'";
839
0
                continue;
840
0
            }
841
0
            long metric_value = std::stol(metric.second);
842
0
            RuntimeProfile::Counter* scanner_counter;
843
0
            if (type_and_name[0] == "timer") {
844
0
                scanner_counter =
845
0
                        ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str());
846
0
            } else if (type_and_name[0] == "counter") {
847
0
                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT,
848
0
                                                    _connector_name.c_str());
849
0
            } else if (type_and_name[0] == "bytes") {
850
0
                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES,
851
0
                                                    _connector_name.c_str());
852
0
            } else {
853
0
                LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes";
854
0
                continue;
855
0
            }
856
0
            COUNTER_UPDATE(scanner_counter, metric_value);
857
0
        }
858
0
    }
859
0
}
860
#include "common/compile_check_end.h"
861
} // namespace doris