be/src/format/table/hudi_jni_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/table/hudi_jni_reader.h" |
19 | | |
20 | | #include <map> |
21 | | |
22 | | #include "core/types.h" |
23 | | #include "runtime/descriptors.h" |
24 | | #include "runtime/runtime_state.h" |
25 | | |
26 | | namespace doris { |
27 | | class RuntimeProfile; |
28 | | class RuntimeState; |
29 | | class Block; |
30 | | } // namespace doris |
31 | | |
32 | | namespace doris { |
33 | | #include "common/compile_check_begin.h" |
34 | | const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie."; |
35 | | const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf."; |
36 | | |
37 | | HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, |
38 | | const THudiFileDesc& hudi_params, |
39 | | const std::vector<SlotDescriptor*>& file_slot_descs, |
40 | | RuntimeState* state, RuntimeProfile* profile) |
41 | 0 | : JniReader(file_slot_descs, state, profile), |
42 | 0 | _scan_params(scan_params), |
43 | 0 | _hudi_params(hudi_params) { |
44 | 0 | std::vector<std::string> required_fields; |
45 | 0 | for (const auto& desc : _file_slot_descs) { |
46 | 0 | required_fields.emplace_back(desc->col_name()); |
47 | 0 | } |
48 | |
|
49 | 0 | std::map<String, String> params = { |
50 | 0 | {"query_id", print_id(_state->query_id())}, |
51 | 0 | {"base_path", _hudi_params.base_path}, |
52 | 0 | {"data_file_path", _hudi_params.data_file_path}, |
53 | 0 | {"data_file_length", std::to_string(_hudi_params.data_file_length)}, |
54 | 0 | {"delta_file_paths", join(_hudi_params.delta_logs, ",")}, |
55 | 0 | {"hudi_column_names", join(_hudi_params.column_names, ",")}, |
56 | 0 | {"hudi_column_types", join(_hudi_params.column_types, "#")}, |
57 | 0 | {"required_fields", join(required_fields, ",")}, |
58 | 0 | {"instant_time", _hudi_params.instant_time}, |
59 | 0 | {"serde", _hudi_params.serde}, |
60 | 0 | {"input_format", _hudi_params.input_format}, |
61 | 0 | {"time_zone", state->timezone_obj().name()}}; |
62 | | |
63 | | // Use compatible hadoop client to read data |
64 | 0 | for (const auto& kv : _scan_params.properties) { |
65 | 0 | if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { |
66 | 0 | params[kv.first] = kv.second; |
67 | 0 | } else { |
68 | 0 | params[HADOOP_CONF_PREFIX + kv.first] = kv.second; |
69 | 0 | } |
70 | 0 | } |
71 | |
|
72 | 0 | _jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HadoopHudiJniScanner", |
73 | 0 | params, required_fields); |
74 | 0 | } |
75 | | |
76 | 0 | Status HudiJniReader::init_reader() { |
77 | 0 | RETURN_IF_ERROR(_jni_connector->init()); |
78 | 0 | return _jni_connector->open(_state, _profile); |
79 | 0 | } |
80 | | #include "common/compile_check_end.h" |
81 | | } // namespace doris |