be/src/format/table/iceberg_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <string> |
23 | | #include <unordered_map> |
24 | | #include <unordered_set> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "core/column/column_dictionary.h" |
30 | | #include "core/data_type/define_primitive_type.h" |
31 | | #include "core/data_type/primitive_type.h" |
32 | | #include "exprs/vslot_ref.h" |
33 | | #include "format/orc/vorc_reader.h" |
34 | | #include "format/parquet/vparquet_reader.h" |
35 | | #include "format/table/equality_delete.h" |
36 | | #include "format/table/table_format_reader.h" |
37 | | #include "storage/olap_scan_common.h" |
38 | | |
39 | | namespace tparquet { |
40 | | class KeyValue; |
41 | | } // namespace tparquet |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | class RowDescriptor; |
46 | | class RuntimeState; |
47 | | class SlotDescriptor; |
48 | | class TFileRangeDesc; |
49 | | class TFileScanRangeParams; |
50 | | class TIcebergDeleteFileDesc; |
51 | | class TupleDescriptor; |
52 | | |
53 | | namespace io { |
54 | | struct IOContext; |
55 | | } // namespace io |
56 | | template <typename T> |
57 | | class ColumnStr; |
58 | | using ColumnString = ColumnStr<UInt32>; |
59 | | class Block; |
60 | | class GenericReader; |
61 | | class ShardedKVCache; |
62 | | class VExprContext; |
63 | | |
64 | | class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { |
65 | | public: |
66 | | struct PositionDeleteRange { |
67 | | std::vector<std::string> data_file_path; |
68 | | std::vector<std::pair<int, int>> range; |
69 | | }; |
70 | | |
71 | | IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
72 | | RuntimeState* state, const TFileScanRangeParams& params, |
73 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
74 | | FileMetaCache* meta_cache); |
75 | 16 | ~IcebergTableReader() override = default; |
76 | | |
77 | | Status init_row_filters() final; |
78 | | |
79 | | Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; |
80 | | |
81 | | enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
82 | | enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
83 | | |
84 | | virtual void set_delete_rows() = 0; |
85 | | |
86 | 2 | bool has_delete_operations() const override { |
87 | 2 | return _equality_delete_impls.size() > 0 || TableFormatReader::has_delete_operations(); |
88 | 2 | } |
89 | | |
90 | | Status read_deletion_vector(const std::string& data_file_path, |
91 | | const TIcebergDeleteFileDesc& delete_file_desc); |
92 | | |
93 | | protected: |
94 | | struct IcebergProfile { |
95 | | RuntimeProfile::Counter* num_delete_files; |
96 | | RuntimeProfile::Counter* num_delete_rows; |
97 | | RuntimeProfile::Counter* delete_files_read_time; |
98 | | RuntimeProfile::Counter* delete_rows_sort_time; |
99 | | RuntimeProfile::Counter* parse_delete_file_time; |
100 | | }; |
101 | | using DeleteRows = std::vector<int64_t>; |
102 | | using DeleteFile = phmap::parallel_flat_hash_map< |
103 | | std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
104 | | std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
105 | | std::mutex>; |
106 | | /** |
107 | | * https://iceberg.apache.org/spec/#position-delete-files |
108 | | * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. |
109 | | * Sorting by file_path allows filter pushdown by file in columnar storage formats. |
110 | | * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. |
111 | | */ |
112 | | static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
113 | | int64_t num_delete_rows, std::vector<int64_t>& result); |
114 | | |
115 | | PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); |
116 | | |
117 | | PositionDeleteRange _get_range(const ColumnString& file_path_column); |
118 | | |
119 | 0 | static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } |
120 | | |
121 | | Status _position_delete_base(const std::string data_file_path, |
122 | | const std::vector<TIcebergDeleteFileDesc>& delete_files); |
123 | | virtual Status _process_equality_delete( |
124 | | const std::vector<TIcebergDeleteFileDesc>& delete_files) = 0; |
125 | | void _generate_equality_delete_block(Block* block, |
126 | | const std::vector<std::string>& equality_delete_col_names, |
127 | | const std::vector<DataTypePtr>& equality_delete_col_types); |
128 | | // Equality delete should read the primary columns. Add the missing columns |
129 | | Status _expand_block_if_need(Block* block); |
130 | | // Remove the added delete columns |
131 | | Status _shrink_block_if_need(Block* block); |
132 | | |
133 | | // owned by scan node |
134 | | ShardedKVCache* _kv_cache; |
135 | | IcebergProfile _iceberg_profile; |
136 | | // _iceberg_delete_rows from kv_cache |
137 | | const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
138 | | |
139 | | // Pointer to external column name to block index mapping (from FileScanner) |
140 | | // Used to dynamically add expand columns for equality delete |
141 | | std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
142 | | |
143 | | Fileformat _file_format = Fileformat::NONE; |
144 | | |
145 | | const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
146 | | const std::string ICEBERG_FILE_PATH = "file_path"; |
147 | | const std::string ICEBERG_ROW_POS = "pos"; |
148 | | const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; |
149 | | const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = { |
150 | | {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; |
151 | | const int ICEBERG_FILE_PATH_INDEX = 0; |
152 | | const int ICEBERG_FILE_POS_INDEX = 1; |
153 | | const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
154 | | |
155 | | //Read position_delete_file TFileRangeDesc, generate DeleteFile |
156 | | virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0; |
157 | | |
158 | | void _gen_position_delete_file_range(Block& block, DeleteFile* const position_delete, |
159 | | size_t read_rows, bool file_path_column_dictionary_coded); |
160 | | |
161 | | // read table colummn + extra equality delete columns |
162 | | std::vector<std::string> _all_required_col_names; |
163 | | |
164 | | // extra equality delete name and type |
165 | | std::vector<std::string> _expand_col_names; |
166 | | std::vector<ColumnWithTypeAndName> _expand_columns; |
167 | | |
168 | | // all ids that need read for eq delete (from all qe delte file.) |
169 | | std::set<int> _equality_delete_col_ids; |
170 | | // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
171 | | std::map<std::vector<int>, int> _equality_delete_block_map; |
172 | | // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
173 | | // creating entries in _equality_delete_impls. |
174 | | std::vector<Block> _equality_delete_blocks; |
175 | | std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
176 | | |
177 | | // id -> block column name. |
178 | | std::unordered_map<int, std::string> _id_to_block_column_name; |
179 | | }; |
180 | | |
181 | | class IcebergParquetReader final : public IcebergTableReader { |
182 | | public: |
183 | | ENABLE_FACTORY_CREATOR(IcebergParquetReader); |
184 | | |
185 | | IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
186 | | RuntimeState* state, const TFileScanRangeParams& params, |
187 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, |
188 | | io::IOContext* io_ctx, FileMetaCache* meta_cache) |
189 | 7 | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
190 | 7 | kv_cache, io_ctx, meta_cache) {} |
191 | | Status init_reader( |
192 | | const std::vector<std::string>& file_col_names, |
193 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
194 | | const VExprContextSPtrs& conjuncts, |
195 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
196 | | slot_id_to_predicates, |
197 | | const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
198 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
199 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
200 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
201 | | |
202 | 0 | void set_delete_rows() final { |
203 | 0 | auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); |
204 | 0 | parquet_reader->set_delete_rows(_iceberg_delete_rows); |
205 | 0 | } |
206 | | |
207 | | private: |
208 | | static ColumnIdResult _create_column_ids(const FieldDescriptor* field_desc, |
209 | | const TupleDescriptor* tuple_descriptor); |
210 | | |
211 | | Status _read_position_delete_file(const TFileRangeDesc* delete_range, |
212 | | DeleteFile* position_delete) final; |
213 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
214 | | |
215 | | const FieldDescriptor* _data_file_field_desc = nullptr; |
216 | | }; |
217 | | class IcebergOrcReader final : public IcebergTableReader { |
218 | | public: |
219 | | ENABLE_FACTORY_CREATOR(IcebergOrcReader); |
220 | | |
221 | | Status _read_position_delete_file(const TFileRangeDesc* delete_range, |
222 | | DeleteFile* position_delete) final; |
223 | | |
224 | | IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, |
225 | | RuntimeState* state, const TFileScanRangeParams& params, |
226 | | const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, |
227 | | FileMetaCache* meta_cache) |
228 | 7 | : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, |
229 | 7 | kv_cache, io_ctx, meta_cache) {} |
230 | | |
231 | 0 | void set_delete_rows() final { |
232 | 0 | auto* orc_reader = (OrcReader*)_file_format_reader.get(); |
233 | 0 | orc_reader->set_position_delete_rowids(_iceberg_delete_rows); |
234 | 0 | } |
235 | | |
236 | | Status init_reader( |
237 | | const std::vector<std::string>& file_col_names, |
238 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
239 | | const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, |
240 | | const RowDescriptor* row_descriptor, |
241 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
242 | | const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
243 | | const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); |
244 | | |
245 | | private: |
246 | | Status _process_equality_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files) final; |
247 | | |
248 | | static ColumnIdResult _create_column_ids(const orc::Type* orc_type, |
249 | | const TupleDescriptor* tuple_descriptor); |
250 | | |
251 | | private: |
252 | | static const std::string ICEBERG_ORC_ATTRIBUTE; |
253 | | const orc::Type* _data_file_type_desc = nullptr; |
254 | | }; |
255 | | |
256 | | #include "common/compile_check_end.h" |
257 | | } // namespace doris |