be/src/format_v2/table/paimon_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/table/paimon_reader.h" |
19 | | |
20 | | #include <cstring> |
21 | | #include <string> |
22 | | #include <utility> |
23 | | |
24 | | #include "exprs/vexpr_context.h" |
25 | | #include "format/table/deletion_vector_reader.h" |
26 | | #include "format_v2/column_mapper.h" |
27 | | #include "format_v2/jni/paimon_jni_reader.h" |
28 | | #include "format_v2/table/schema_history_util.h" |
29 | | #include "gen_cpp/PlanNodes_types.h" |
30 | | |
31 | | namespace doris::format::paimon { |
32 | | |
33 | 6 | Status PaimonReader::prepare_split(const format::SplitReadOptions& options) { |
34 | 6 | _split_schema_id = -1; |
35 | 6 | const auto& paimon_params = options.current_range.table_format_params.paimon_params; |
36 | 6 | if (paimon_params.__isset.schema_id) { |
37 | 3 | _split_schema_id = paimon_params.schema_id; |
38 | 3 | } |
39 | 6 | return format::TableReader::prepare_split(options); |
40 | 6 | } |
41 | | |
42 | 8 | format::TableColumnMappingMode PaimonReader::mapping_mode() const { |
43 | 8 | return format::can_map_by_history_schema(_scan_params, _split_schema_id) |
44 | 8 | ? format::TableColumnMappingMode::BY_FIELD_ID |
45 | 8 | : format::TableColumnMappingMode::BY_NAME; |
46 | 8 | } |
47 | | |
48 | 3 | Status PaimonReader::annotate_file_schema(std::vector<format::ColumnDefinition>* file_schema) { |
49 | 3 | DORIS_CHECK(file_schema != nullptr); |
50 | 3 | if (mapping_mode() != format::TableColumnMappingMode::BY_FIELD_ID) { |
51 | 2 | return Status::OK(); |
52 | 2 | } |
53 | 1 | return format::annotate_file_schema_from_history(_scan_params, _split_schema_id, file_schema); |
54 | 3 | } |
55 | | |
56 | | Status PaimonReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, |
57 | 6 | DeleteFileDesc* desc, bool* has_delete_file) { |
58 | 6 | DORIS_CHECK(desc != nullptr); |
59 | 6 | DORIS_CHECK(has_delete_file != nullptr); |
60 | 6 | *has_delete_file = false; |
61 | 6 | const auto& table_desc = t_desc.paimon_params; |
62 | 6 | if (!table_desc.__isset.deletion_file) { |
63 | 5 | return Status::OK(); |
64 | 5 | } |
65 | 1 | const auto& deletion_file = table_desc.deletion_file; |
66 | | |
67 | 1 | const std::string key_prefix = "paimon_dv:"; |
68 | 1 | desc->key.resize(key_prefix.size() + deletion_file.path.size() + sizeof(deletion_file.offset)); |
69 | 1 | char* key_data = desc->key.data(); |
70 | 1 | memcpy(key_data, key_prefix.data(), key_prefix.size()); |
71 | 1 | key_data += key_prefix.size(); |
72 | 1 | memcpy(key_data, deletion_file.path.data(), deletion_file.path.size()); |
73 | 1 | key_data += deletion_file.path.size(); |
74 | 1 | memcpy(key_data, &deletion_file.offset, sizeof(deletion_file.offset)); |
75 | 1 | desc->path = deletion_file.path; |
76 | 1 | desc->start_offset = deletion_file.offset; |
77 | 1 | desc->size = deletion_file.length + 4; |
78 | 1 | desc->file_size = -1; |
79 | 1 | desc->format = DeleteFileDesc::Format::PAIMON; |
80 | 1 | *has_delete_file = true; |
81 | 1 | return Status::OK(); |
82 | 6 | } |
83 | | |
84 | 1 | Status PaimonHybridReader::init(format::TableReadOptions&& options) { |
85 | 1 | return format::TableReader::init(std::move(options)); |
86 | 1 | } |
87 | | |
88 | 2 | Status PaimonHybridReader::prepare_split(const format::SplitReadOptions& options) { |
89 | 2 | RETURN_IF_ERROR(_ensure_current_split_reader(options)); |
90 | 2 | DORIS_CHECK(_current_split_reader != nullptr); |
91 | 2 | return _current_split_reader->prepare_split(options); |
92 | 2 | } |
93 | | |
94 | 0 | Status PaimonHybridReader::get_block(Block* block, bool* eos) { |
95 | 0 | DORIS_CHECK(_current_split_reader != nullptr); |
96 | 0 | return _current_split_reader->get_block(block, eos); |
97 | 0 | } |
98 | | |
99 | 1 | Status PaimonHybridReader::close() { |
100 | 1 | Status close_status = Status::OK(); |
101 | 1 | if (_native_reader != nullptr) { |
102 | 1 | close_status = _native_reader->close(); |
103 | 1 | } |
104 | 1 | if (_jni_reader != nullptr) { |
105 | 1 | auto status = _jni_reader->close(); |
106 | 1 | if (!status.ok() && close_status.ok()) { |
107 | 0 | close_status = std::move(status); |
108 | 0 | } |
109 | 1 | } |
110 | 1 | _current_split_reader = nullptr; |
111 | 1 | return close_status; |
112 | 1 | } |
113 | | |
114 | 2 | Status PaimonHybridReader::_ensure_current_split_reader(const format::SplitReadOptions& options) { |
115 | 2 | if (_is_jni_split(options.current_range)) { |
116 | 1 | if (_jni_reader == nullptr) { |
117 | 1 | _jni_reader = std::make_unique<format::paimon::PaimonJniReader>(); |
118 | 1 | RETURN_IF_ERROR(_init_child_reader(_jni_reader.get(), format::FileFormat::JNI)); |
119 | 1 | } |
120 | 1 | _current_split_reader = _jni_reader.get(); |
121 | 1 | } else { |
122 | 1 | format::FileFormat file_format; |
123 | 1 | RETURN_IF_ERROR(_to_file_format(options.current_range, &file_format)); |
124 | 1 | if (_native_reader == nullptr) { |
125 | 1 | _native_reader = format::paimon::PaimonReader::create_unique(); |
126 | 1 | RETURN_IF_ERROR(_init_child_reader(_native_reader.get(), file_format)); |
127 | 1 | } |
128 | 1 | _current_split_reader = _native_reader.get(); |
129 | 1 | } |
130 | 2 | return Status::OK(); |
131 | 2 | } |
132 | | |
133 | | Status PaimonHybridReader::_init_child_reader(format::TableReader* reader, |
134 | 2 | format::FileFormat file_format) { |
135 | 2 | DORIS_CHECK(reader != nullptr); |
136 | 2 | VExprContextSPtrs conjuncts; |
137 | 2 | RETURN_IF_ERROR(_clone_conjuncts(&conjuncts)); |
138 | 2 | return reader->init({ |
139 | 2 | .projected_columns = _projected_columns, |
140 | 2 | .column_predicates = _table_column_predicates, |
141 | 2 | .conjuncts = std::move(conjuncts), |
142 | 2 | .format = file_format, |
143 | 2 | .scan_params = _scan_params, |
144 | 2 | .io_ctx = _io_ctx, |
145 | 2 | .runtime_state = _runtime_state, |
146 | 2 | .scanner_profile = _scanner_profile, |
147 | 2 | .push_down_agg_type = _push_down_agg_type, |
148 | 2 | .condition_cache_digest = _condition_cache_digest, |
149 | 2 | }); |
150 | 2 | } |
151 | | |
152 | 2 | Status PaimonHybridReader::_clone_conjuncts(VExprContextSPtrs* conjuncts) const { |
153 | 2 | DORIS_CHECK(conjuncts != nullptr); |
154 | 2 | conjuncts->clear(); |
155 | 2 | conjuncts->reserve(_conjuncts.size()); |
156 | 2 | for (const auto& conjunct : _conjuncts) { |
157 | 0 | VExprSPtr root; |
158 | 0 | RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root)); |
159 | 0 | conjuncts->push_back(VExprContext::create_shared(std::move(root))); |
160 | 0 | } |
161 | 2 | return Status::OK(); |
162 | 2 | } |
163 | | |
164 | 5 | bool PaimonHybridReader::_is_jni_split(const TFileRangeDesc& range) { |
165 | 5 | return range.__isset.table_format_params && range.table_format_params.__isset.paimon_params && |
166 | 5 | range.table_format_params.paimon_params.__isset.reader_type && |
167 | 5 | range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI; |
168 | 5 | } |
169 | | |
170 | | Status PaimonHybridReader::_to_file_format(const TFileRangeDesc& range, |
171 | 4 | format::FileFormat* file_format) { |
172 | 4 | DORIS_CHECK(file_format != nullptr); |
173 | 4 | const auto format_type = |
174 | 4 | range.__isset.format_type ? range.format_type : TFileFormatType::FORMAT_PARQUET; |
175 | 4 | switch (format_type) { |
176 | 2 | case TFileFormatType::FORMAT_PARQUET: |
177 | 2 | *file_format = format::FileFormat::PARQUET; |
178 | 2 | return Status::OK(); |
179 | 1 | case TFileFormatType::FORMAT_ORC: |
180 | 1 | *file_format = format::FileFormat::ORC; |
181 | 1 | return Status::OK(); |
182 | 1 | default: |
183 | 1 | return Status::NotSupported("Unsupported native Paimon file format {}", |
184 | 1 | to_string(format_type)); |
185 | 4 | } |
186 | 4 | } |
187 | | |
188 | | } // namespace doris::format::paimon |