be/src/format_v2/table/hudi_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table/hudi_reader.h" |
19 | | |
20 | | #include <utility> |
21 | | |
22 | | #include "exprs/vexpr_context.h" |
23 | | #include "format_v2/column_mapper.h" |
24 | | #include "format_v2/jni/hudi_jni_reader.h" |
25 | | #include "format_v2/table/schema_history_util.h" |
26 | | #include "gen_cpp/PlanNodes_types.h" |
27 | | |
28 | | namespace doris::format::hudi { |
29 | | |
30 | 4 | Status HudiReader::prepare_split(const format::SplitReadOptions& options) { |
31 | 4 | _split_schema_id = -1; |
32 | 4 | if (options.current_range.__isset.table_format_params && |
33 | 4 | options.current_range.table_format_params.__isset.hudi_params && |
34 | 4 | options.current_range.table_format_params.hudi_params.__isset.schema_id) { |
35 | 3 | _split_schema_id = options.current_range.table_format_params.hudi_params.schema_id; |
36 | 3 | } |
37 | 4 | return format::TableReader::prepare_split(options); |
38 | 4 | } |
39 | | |
40 | 6 | format::TableColumnMappingMode HudiReader::mapping_mode() const { |
41 | 6 | return format::can_map_by_history_schema(_scan_params, _split_schema_id) |
42 | 6 | ? format::TableColumnMappingMode::BY_FIELD_ID |
43 | 6 | : format::TableColumnMappingMode::BY_NAME; |
44 | 6 | } |
45 | | |
46 | 2 | Status HudiReader::annotate_file_schema(std::vector<format::ColumnDefinition>* file_schema) { |
47 | 2 | DORIS_CHECK(file_schema != nullptr); |
48 | 2 | if (mapping_mode() != format::TableColumnMappingMode::BY_FIELD_ID) { |
49 | 1 | return Status::OK(); |
50 | 1 | } |
51 | 1 | return format::annotate_file_schema_from_history(_scan_params, _split_schema_id, file_schema); |
52 | 2 | } |
53 | | |
54 | 0 | Status HudiHybridReader::init(format::TableReadOptions&& options) { |
55 | 0 | return format::TableReader::init(std::move(options)); |
56 | 0 | } |
57 | | |
58 | 0 | Status HudiHybridReader::prepare_split(const format::SplitReadOptions& options) { |
59 | 0 | RETURN_IF_ERROR(_ensure_current_split_reader(options)); |
60 | 0 | DORIS_CHECK(_current_split_reader != nullptr); |
61 | 0 | return _current_split_reader->prepare_split(options); |
62 | 0 | } |
63 | | |
64 | 0 | Status HudiHybridReader::get_block(Block* block, bool* eos) { |
65 | 0 | DORIS_CHECK(_current_split_reader != nullptr); |
66 | 0 | return _current_split_reader->get_block(block, eos); |
67 | 0 | } |
68 | | |
69 | 0 | Status HudiHybridReader::close() { |
70 | 0 | Status close_status = Status::OK(); |
71 | 0 | if (_native_reader != nullptr) { |
72 | 0 | close_status = _native_reader->close(); |
73 | 0 | } |
74 | 0 | if (_jni_reader != nullptr) { |
75 | 0 | auto status = _jni_reader->close(); |
76 | 0 | if (!status.ok() && close_status.ok()) { |
77 | 0 | close_status = std::move(status); |
78 | 0 | } |
79 | 0 | } |
80 | 0 | _current_split_reader = nullptr; |
81 | 0 | return close_status; |
82 | 0 | } |
83 | | |
84 | 0 | Status HudiHybridReader::_ensure_current_split_reader(const format::SplitReadOptions& options) { |
85 | 0 | DORIS_CHECK(_scan_params != nullptr); |
86 | 0 | if (_is_jni_split(*_scan_params, options.current_range)) { |
87 | 0 | if (_jni_reader == nullptr) { |
88 | 0 | _jni_reader = std::make_unique<format::hudi::HudiJniReader>(); |
89 | 0 | RETURN_IF_ERROR(_init_child_reader(_jni_reader.get(), format::FileFormat::JNI)); |
90 | 0 | } |
91 | 0 | _current_split_reader = _jni_reader.get(); |
92 | 0 | } else { |
93 | 0 | format::FileFormat file_format; |
94 | 0 | RETURN_IF_ERROR(_to_file_format(*_scan_params, options.current_range, &file_format)); |
95 | 0 | if (_native_reader == nullptr) { |
96 | 0 | _native_reader = format::hudi::HudiReader::create_unique(); |
97 | 0 | RETURN_IF_ERROR(_init_child_reader(_native_reader.get(), file_format)); |
98 | 0 | } |
99 | 0 | _current_split_reader = _native_reader.get(); |
100 | 0 | } |
101 | 0 | return Status::OK(); |
102 | 0 | } |
103 | | |
104 | | Status HudiHybridReader::_init_child_reader(format::TableReader* reader, |
105 | 0 | format::FileFormat file_format) { |
106 | 0 | DORIS_CHECK(reader != nullptr); |
107 | 0 | VExprContextSPtrs conjuncts; |
108 | 0 | RETURN_IF_ERROR(_clone_conjuncts(&conjuncts)); |
109 | 0 | return reader->init({ |
110 | 0 | .projected_columns = _projected_columns, |
111 | 0 | .column_predicates = _table_column_predicates, |
112 | 0 | .conjuncts = std::move(conjuncts), |
113 | 0 | .format = file_format, |
114 | 0 | .scan_params = _scan_params, |
115 | 0 | .io_ctx = _io_ctx, |
116 | 0 | .runtime_state = _runtime_state, |
117 | 0 | .scanner_profile = _scanner_profile, |
118 | 0 | .push_down_agg_type = _push_down_agg_type, |
119 | 0 | .condition_cache_digest = _condition_cache_digest, |
120 | 0 | }); |
121 | 0 | } |
122 | | |
123 | 0 | Status HudiHybridReader::_clone_conjuncts(VExprContextSPtrs* conjuncts) const { |
124 | 0 | DORIS_CHECK(conjuncts != nullptr); |
125 | 0 | conjuncts->clear(); |
126 | 0 | conjuncts->reserve(_conjuncts.size()); |
127 | 0 | for (const auto& conjunct : _conjuncts) { |
128 | 0 | VExprSPtr root; |
129 | 0 | RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root)); |
130 | 0 | conjuncts->push_back(VExprContext::create_shared(std::move(root))); |
131 | 0 | } |
132 | 0 | return Status::OK(); |
133 | 0 | } |
134 | | |
135 | | TFileFormatType::type HudiHybridReader::_range_format_type(const TFileScanRangeParams& params, |
136 | 0 | const TFileRangeDesc& range) { |
137 | 0 | return range.__isset.format_type ? range.format_type : params.format_type; |
138 | 0 | } |
139 | | |
140 | | bool HudiHybridReader::_is_jni_split(const TFileScanRangeParams& params, |
141 | 0 | const TFileRangeDesc& range) { |
142 | 0 | return _range_format_type(params, range) == TFileFormatType::FORMAT_JNI; |
143 | 0 | } |
144 | | |
145 | | Status HudiHybridReader::_to_file_format(const TFileScanRangeParams& params, |
146 | | const TFileRangeDesc& range, |
147 | 0 | format::FileFormat* file_format) { |
148 | 0 | DORIS_CHECK(file_format != nullptr); |
149 | 0 | const auto format_type = _range_format_type(params, range); |
150 | 0 | switch (format_type) { |
151 | 0 | case TFileFormatType::FORMAT_PARQUET: |
152 | 0 | *file_format = format::FileFormat::PARQUET; |
153 | 0 | return Status::OK(); |
154 | 0 | case TFileFormatType::FORMAT_ORC: |
155 | 0 | *file_format = format::FileFormat::ORC; |
156 | 0 | return Status::OK(); |
157 | 0 | default: |
158 | 0 | return Status::NotSupported("Unsupported native Hudi file format {}", |
159 | 0 | to_string(format_type)); |
160 | 0 | } |
161 | 0 | } |
162 | | |
163 | | } // namespace doris::format::hudi |