be/src/format/table/hudi_jni_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "hudi_jni_reader.h" |
19 | | |
20 | | #include <map> |
21 | | |
22 | | #include "core/types.h" |
23 | | #include "runtime/descriptors.h" |
24 | | #include "runtime/runtime_state.h" |
25 | | |
26 | | namespace doris { |
27 | | class RuntimeProfile; |
28 | | class RuntimeState; |
29 | | class Block; |
30 | | } // namespace doris |
31 | | |
32 | | namespace doris { |
33 | | #include "common/compile_check_begin.h" |
34 | | const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie."; |
35 | | const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf."; |
36 | | |
37 | | HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, |
38 | | const THudiFileDesc& hudi_params, |
39 | | const std::vector<SlotDescriptor*>& file_slot_descs, |
40 | | RuntimeState* state, RuntimeProfile* profile) |
41 | 0 | : JniReader( |
42 | 0 | file_slot_descs, state, profile, "org/apache/doris/hudi/HadoopHudiJniScanner", |
43 | 0 | [&]() { |
44 | 0 | std::vector<std::string> required_fields; |
45 | 0 | for (const auto& desc : file_slot_descs) { |
46 | 0 | required_fields.emplace_back(desc->col_name()); |
47 | 0 | } |
48 | 0 | std::map<String, String> params = { |
49 | 0 | {"query_id", print_id(state->query_id())}, |
50 | 0 | {"base_path", hudi_params.base_path}, |
51 | 0 | {"data_file_path", hudi_params.data_file_path}, |
52 | 0 | {"data_file_length", std::to_string(hudi_params.data_file_length)}, |
53 | 0 | {"delta_file_paths", join(hudi_params.delta_logs, ",")}, |
54 | 0 | {"hudi_column_names", join(hudi_params.column_names, ",")}, |
55 | 0 | {"hudi_column_types", join(hudi_params.column_types, "#")}, |
56 | 0 | {"required_fields", join(required_fields, ",")}, |
57 | 0 | {"instant_time", hudi_params.instant_time}, |
58 | 0 | {"serde", hudi_params.serde}, |
59 | 0 | {"input_format", hudi_params.input_format}, |
60 | 0 | {"time_zone", state->timezone_obj().name()}}; |
61 | 0 | for (const auto& kv : scan_params.properties) { |
62 | 0 | if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { |
63 | 0 | params[kv.first] = kv.second; |
64 | 0 | } else { |
65 | 0 | params[HADOOP_CONF_PREFIX + kv.first] = kv.second; |
66 | 0 | } |
67 | 0 | } |
68 | 0 | return params; |
69 | 0 | }(), |
70 | 0 | [&]() { |
71 | 0 | std::vector<std::string> names; |
72 | 0 | for (const auto& desc : file_slot_descs) { |
73 | 0 | names.emplace_back(desc->col_name()); |
74 | 0 | } |
75 | 0 | return names; |
76 | 0 | }()) {} |
77 | | |
78 | 0 | Status HudiJniReader::init_reader() { |
79 | 0 | return open(_state, _profile); |
80 | 0 | } |
81 | | #include "common/compile_check_end.h" |
82 | | } // namespace doris |