Coverage Report

Created: 2026-07-02 13:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table/iceberg_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/table/iceberg_reader.h"
19
20
#include <algorithm>
21
#include <cstring>
22
#include <memory>
23
#include <sstream>
24
#include <utility>
25
26
#include "common/cast_set.h"
27
#include "common/consts.h"
28
#include "core/assert_cast.h"
29
#include "core/block/block.h"
30
#include "core/column/column_const.h"
31
#include "core/column/column_nullable.h"
32
#include "core/column/column_string.h"
33
#include "core/column/column_struct.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type_number.h"
36
#include "core/data_type/define_primitive_type.h"
37
#include "core/field.h"
38
#include "exprs/vslot_ref.h"
39
#include "format/table/deletion_vector_reader.h"
40
#include "format_v2/expr/cast.h"
41
#include "format_v2/expr/equality_delete_predicate.h"
42
#include "format_v2/parquet/parquet_reader.h"
43
#include "format_v2/parquet/reader/column_reader.h"
44
#include "format_v2/table_reader.h"
45
#include "io/file_factory.h"
46
47
namespace doris::format::iceberg {
48
49
static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
50
static constexpr int32_t ROW_LINEAGE_ROW_ID_FIELD_ID = 2147483540;
51
52
template <typename T>
53
0
static std::string join_values_for_debug(const std::vector<T>& values) {
54
0
    std::ostringstream out;
55
0
    out << "[";
56
0
    for (size_t idx = 0; idx < values.size(); ++idx) {
57
0
        if (idx > 0) {
58
0
            out << ", ";
59
0
        }
60
0
        out << values[idx];
61
0
    }
62
0
    out << "]";
63
0
    return out.str();
64
0
}
65
66
1
static bool is_projected_row_lineage_row_id(const format::ColumnDefinition& column) {
67
    // Iceberg row lineage columns can be bound by field id when a mapper has already been built,
68
    // but customize_file_scan_request() is also exercised directly by scan-request tests before the
69
    // mapper exists. In that path, inspect the projected table schema so row-position dependencies
70
    // are still added for `_row_id`.
71
1
    return column.name == ROW_LINEAGE_ROW_ID ||
72
1
           (column.has_identifier_field_id() &&
73
0
            column.get_identifier_field_id() == ROW_LINEAGE_ROW_ID_FIELD_ID);
74
1
}
75
76
15
static bool is_projected_iceberg_rowid(const format::ColumnDefinition& column) {
77
15
    return column.name == BeConsts::ICEBERG_ROWID_COL;
78
15
}
79
80
0
static std::string iceberg_delete_file_debug_string(const TIcebergDeleteFileDesc& delete_file) {
81
0
    std::ostringstream out;
82
0
    out << "TIcebergDeleteFileDesc{path=" << (delete_file.__isset.path ? delete_file.path : "null")
83
0
        << ", content=" << (delete_file.__isset.content ? delete_file.content : -1)
84
0
        << ", file_format="
85
0
        << (delete_file.__isset.file_format ? static_cast<int>(delete_file.file_format) : -1)
86
0
        << ", position_lower_bound="
87
0
        << (delete_file.__isset.position_lower_bound ? delete_file.position_lower_bound : -1)
88
0
        << ", position_upper_bound="
89
0
        << (delete_file.__isset.position_upper_bound ? delete_file.position_upper_bound : -1)
90
0
        << ", field_ids="
91
0
        << (delete_file.__isset.field_ids ? join_values_for_debug(delete_file.field_ids) : "[]")
92
0
        << ", content_offset="
93
0
        << (delete_file.__isset.content_offset ? delete_file.content_offset : -1)
94
0
        << ", content_size_in_bytes="
95
0
        << (delete_file.__isset.content_size_in_bytes ? delete_file.content_size_in_bytes : -1)
96
0
        << "}";
97
0
    return out.str();
98
0
}
99
100
static std::string iceberg_delete_files_debug_string(
101
0
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
102
0
    std::ostringstream out;
103
0
    out << "[";
104
0
    for (size_t idx = 0; idx < delete_files.size(); ++idx) {
105
0
        if (idx > 0) {
106
0
            out << ", ";
107
0
        }
108
0
        out << iceberg_delete_file_debug_string(delete_files[idx]);
109
0
    }
110
0
    out << "]";
111
0
    return out.str();
112
0
}
113
114
0
static std::string iceberg_params_debug_string(const std::optional<TIcebergFileDesc>& params) {
115
0
    if (!params.has_value()) {
116
0
        return "null";
117
0
    }
118
0
    const auto& iceberg_params = *params;
119
0
    std::ostringstream out;
120
0
    out << "TIcebergFileDesc{format_version="
121
0
        << (iceberg_params.__isset.format_version ? iceberg_params.format_version : -1)
122
0
        << ", content=" << (iceberg_params.__isset.content ? iceberg_params.content : -1)
123
0
        << ", original_file_path="
124
0
        << (iceberg_params.__isset.original_file_path ? iceberg_params.original_file_path : "null")
125
0
        << ", row_count=" << (iceberg_params.__isset.row_count ? iceberg_params.row_count : -1)
126
0
        << ", partition_spec_id="
127
0
        << (iceberg_params.__isset.partition_spec_id ? iceberg_params.partition_spec_id : 0)
128
0
        << ", has_partition_data_json=" << iceberg_params.__isset.partition_data_json
129
0
        << ", first_row_id="
130
0
        << (iceberg_params.__isset.first_row_id ? iceberg_params.first_row_id : -1)
131
0
        << ", last_updated_sequence_number="
132
0
        << (iceberg_params.__isset.last_updated_sequence_number
133
0
                    ? iceberg_params.last_updated_sequence_number
134
0
                    : -1)
135
0
        << ", delete_file_count="
136
0
        << (iceberg_params.__isset.delete_files ? iceberg_params.delete_files.size() : 0)
137
0
        << ", delete_files="
138
0
        << (iceberg_params.__isset.delete_files
139
0
                    ? iceberg_delete_files_debug_string(iceberg_params.delete_files)
140
0
                    : "[]")
141
0
        << ", has_serialized_split=" << iceberg_params.__isset.serialized_split << "}";
142
0
    return out.str();
143
0
}
144
145
IcebergTableReader::PositionDeleteRowsCollector::PositionDeleteRowsCollector(
146
        std::string data_file_path, format::DeleteRows* rows)
147
6
        : _data_file_path(std::move(data_file_path)), _rows(rows) {}
148
149
Status IcebergTableReader::PositionDeleteRowsCollector::collect(const Block& block,
150
12
                                                                size_t read_rows) {
151
12
    if (read_rows == 0) {
152
6
        return Status::OK();
153
6
    }
154
6
    const auto& file_path_column = assert_cast<const ColumnString&>(
155
6
            *remove_nullable((block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column)));
156
6
    const auto& pos_column = assert_cast<const ColumnInt64&>(
157
6
            *remove_nullable(block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION).column));
158
15
    for (size_t row = 0; row < read_rows; ++row) {
159
9
        const auto file_path = file_path_column.get_data_at(row).to_string();
160
9
        if (file_path == _data_file_path) {
161
8
            _rows->push_back(pos_column.get_element(row));
162
8
        }
163
9
    }
164
6
    return Status::OK();
165
12
}
166
167
21
Status IcebergTableReader::prepare_split(const format::SplitReadOptions& options) {
168
21
    _row_lineage_columns = {};
169
21
    _iceberg_params.reset();
170
21
    _delete_predicates_initialized = false;
171
21
    _position_delete_rows_storage.clear();
172
21
    _equality_delete_filters.clear();
173
21
    if (options.current_range.__isset.table_format_params &&
174
21
        options.current_range.table_format_params.__isset.iceberg_params) {
175
19
        const auto& iceberg_params = options.current_range.table_format_params.iceberg_params;
176
19
        _iceberg_params = iceberg_params;
177
19
        if (iceberg_params.__isset.first_row_id) {
178
8
            _row_lineage_columns.first_row_id = iceberg_params.first_row_id;
179
8
        }
180
19
        if (iceberg_params.__isset.last_updated_sequence_number) {
181
6
            _row_lineage_columns.last_updated_sequence_number =
182
6
                    iceberg_params.last_updated_sequence_number;
183
6
        }
184
19
    }
185
21
    RETURN_IF_ERROR(TableReader::prepare_split(options));
186
21
    if (_is_table_level_count_active()) {
187
1
        return Status::OK();
188
1
    }
189
20
    RETURN_IF_ERROR(_init_delete_predicates(options.current_range.table_format_params));
190
20
    return Status::OK();
191
20
}
192
193
0
std::string IcebergTableReader::debug_string() const {
194
0
    size_t position_delete_file_count = 0;
195
0
    size_t equality_delete_file_count = 0;
196
0
    size_t deletion_vector_file_count = 0;
197
0
    if (_iceberg_params.has_value() && _iceberg_params->__isset.delete_files) {
198
0
        for (const auto& delete_file : _iceberg_params->delete_files) {
199
0
            if (!delete_file.__isset.content) {
200
0
                continue;
201
0
            }
202
0
            if (delete_file.content == POSITION_DELETE) {
203
0
                ++position_delete_file_count;
204
0
            } else if (delete_file.content == EQUALITY_DELETE) {
205
0
                ++equality_delete_file_count;
206
0
            } else if (delete_file.content == DELETION_VECTOR) {
207
0
                ++deletion_vector_file_count;
208
0
            }
209
0
        }
210
0
    }
211
212
0
    std::ostringstream equality_filters;
213
0
    equality_filters << "[";
214
0
    for (size_t idx = 0; idx < _equality_delete_filters.size(); ++idx) {
215
0
        if (idx > 0) {
216
0
            equality_filters << ", ";
217
0
        }
218
0
        const auto& filter = _equality_delete_filters[idx];
219
0
        equality_filters << "EqualityDeleteFilter{field_ids="
220
0
                         << join_values_for_debug(filter.field_ids) << ", key_types=[";
221
0
        for (size_t type_idx = 0; type_idx < filter.key_types.size(); ++type_idx) {
222
0
            if (type_idx > 0) {
223
0
                equality_filters << ", ";
224
0
            }
225
0
            equality_filters << (filter.key_types[type_idx] == nullptr
226
0
                                         ? "null"
227
0
                                         : filter.key_types[type_idx]->get_name());
228
0
        }
229
0
        equality_filters << "], delete_block_rows=" << filter.delete_block.rows()
230
0
                         << ", delete_block_columns=" << filter.delete_block.columns() << "}";
231
0
    }
232
0
    equality_filters << "]";
233
234
0
    std::ostringstream out;
235
0
    out << "IcebergTableReader{base=" << format::TableReader::debug_string()
236
0
        << ", iceberg_params=" << iceberg_params_debug_string(_iceberg_params)
237
0
        << ", row_lineage_first_row_id=" << _row_lineage_columns.first_row_id
238
0
        << ", row_lineage_last_updated_sequence_number="
239
0
        << _row_lineage_columns.last_updated_sequence_number
240
0
        << ", need_row_lineage_row_id=" << _need_row_lineage_row_id()
241
0
        << ", need_iceberg_rowid=" << _need_iceberg_rowid()
242
0
        << ", row_position_block_position=" << _row_position_block_position
243
0
        << ", delete_predicates_initialized=" << _delete_predicates_initialized
244
0
        << ", position_delete_file_count=" << position_delete_file_count
245
0
        << ", equality_delete_file_count=" << equality_delete_file_count
246
0
        << ", deletion_vector_file_count=" << deletion_vector_file_count
247
0
        << ", position_delete_rows_storage_count=" << _position_delete_rows_storage.size()
248
0
        << ", equality_delete_filter_count=" << _equality_delete_filters.size()
249
0
        << ", equality_delete_filters=" << equality_filters.str() << "}";
250
0
    return out.str();
251
0
}
252
253
19
Status IcebergTableReader::materialize_virtual_columns(Block* table_block) {
254
56
    for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size();
255
37
         ++column_idx) {
256
37
        const auto& mapping = _data_reader.column_mapper->mappings()[column_idx];
257
37
        switch (mapping.virtual_column_type) {
258
9
        case format::TableVirtualColumnType::ROW_ID:
259
9
            RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block, column_idx));
260
9
            break;
261
9
        case format::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
262
8
            RETURN_IF_ERROR(
263
8
                    _materialize_row_lineage_last_updated_sequence_number(table_block, column_idx));
264
8
            break;
265
8
        case format::TableVirtualColumnType::ICEBERG_ROWID:
266
1
            RETURN_IF_ERROR(_materialize_iceberg_rowid(table_block, column_idx));
267
1
            break;
268
19
        case format::TableVirtualColumnType::INVALID:
269
19
            break;
270
37
        }
271
37
    }
272
19
    return Status::OK();
273
19
}
274
275
20
Status IcebergTableReader::customize_file_scan_request(format::FileScanRequest* file_request) {
276
20
    RETURN_IF_ERROR(TableReader::customize_file_scan_request(file_request));
277
20
    if ((_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) ||
278
20
        _need_iceberg_rowid()) {
279
9
        RETURN_IF_ERROR(_append_row_position_output_column(file_request));
280
9
    }
281
20
    RETURN_IF_ERROR(_append_equality_delete_predicates(file_request));
282
20
    return Status::OK();
283
20
}
284
285
19
bool IcebergTableReader::_supports_aggregate_pushdown(TPushAggOp::type agg_type) const {
286
19
    if (!TableReader::_supports_aggregate_pushdown(agg_type)) {
287
18
        return false;
288
18
    }
289
1
    return _equality_delete_filters.empty();
290
19
}
291
292
Status IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
293
                                                       DeleteFileDesc* desc,
294
22
                                                       bool* has_delete_file) {
295
22
    DORIS_CHECK(desc != nullptr);
296
22
    DORIS_CHECK(has_delete_file != nullptr);
297
22
    *has_delete_file = false;
298
22
    if (!t_desc.__isset.iceberg_params) {
299
2
        return Status::OK();
300
2
    }
301
20
    const auto& iceberg_params = t_desc.iceberg_params;
302
20
    if (!iceberg_params.__isset.format_version ||
303
20
        iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION ||
304
20
        !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) {
305
8
        return Status::OK();
306
8
    }
307
308
12
    const TIcebergDeleteFileDesc* deletion_vector = nullptr;
309
14
    for (const auto& delete_file : iceberg_params.delete_files) {
310
14
        if (!delete_file.__isset.content || delete_file.content != DELETION_VECTOR) {
311
8
            continue;
312
8
        }
313
6
        if (deletion_vector != nullptr) {
314
1
            return Status::DataQualityError("This iceberg data file has multiple DVs.");
315
1
        }
316
5
        deletion_vector = &delete_file;
317
5
    }
318
11
    if (deletion_vector == nullptr) {
319
7
        return Status::OK();
320
7
    }
321
4
    if (!deletion_vector->__isset.content_offset ||
322
4
        !deletion_vector->__isset.content_size_in_bytes) {
323
0
        return Status::InternalError("Deletion vector is missing content offset or length");
324
0
    }
325
326
4
    desc->key = _iceberg_delete_vector_cache_key(*deletion_vector);
327
4
    desc->path = deletion_vector->path;
328
4
    desc->start_offset = deletion_vector->content_offset;
329
4
    desc->size = deletion_vector->content_size_in_bytes;
330
4
    desc->file_size = -1;
331
4
    desc->format = DeleteFileDesc::Format::ICEBERG;
332
4
    *has_delete_file = true;
333
4
    return Status::OK();
334
4
}
335
336
20
Status IcebergTableReader::_init_delete_predicates(const TTableFormatFileDesc& t_desc) {
337
20
    if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) {
338
2
        _delete_predicates_initialized = true;
339
2
        return Status::OK();
340
2
    }
341
18
    const auto& iceberg_params = t_desc.iceberg_params;
342
18
    if (!iceberg_params.__isset.format_version ||
343
18
        iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION ||
344
18
        !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) {
345
8
        _delete_predicates_initialized = true;
346
8
        return Status::OK();
347
8
    }
348
349
10
    std::vector<TIcebergDeleteFileDesc> position_delete_files;
350
10
    std::vector<TIcebergDeleteFileDesc> equality_delete_files;
351
11
    for (const auto& delete_file : iceberg_params.delete_files) {
352
11
        if (!delete_file.__isset.content) {
353
0
            continue;
354
0
        }
355
11
        if (delete_file.content == POSITION_DELETE) {
356
6
            position_delete_files.push_back(delete_file);
357
6
        } else if (delete_file.content == EQUALITY_DELETE) {
358
2
            equality_delete_files.push_back(delete_file);
359
2
        }
360
11
    }
361
    // `_delete_rows != nullptr` means DeleteVector is parsed
362
10
    if (_delete_rows != nullptr) {
363
3
        _position_delete_rows_storage = *_delete_rows;
364
3
        _delete_rows = &_position_delete_rows_storage;
365
3
    }
366
    // Combine position delete rows from both deletion vector and position delete files, and
367
    // initialize equality delete predicates. Position delete files contain row positions of
368
    // deleted rows, which can be directly added to `_delete_rows`. Equality delete files contain
369
    // values of deleted rows, which require reading the files and building predicates for later
370
    // filtering.
371
10
    if (!position_delete_files.empty()) {
372
6
        RETURN_IF_ERROR(_init_position_delete_rows(position_delete_files));
373
6
    }
374
10
    if (!equality_delete_files.empty()) {
375
2
        RETURN_IF_ERROR(_init_equality_delete_predicates(equality_delete_files));
376
2
    }
377
378
10
    _delete_predicates_initialized = true;
379
10
    return Status::OK();
380
10
}
381
382
std::string IcebergTableReader::_iceberg_delete_vector_cache_key(
383
4
        const TIcebergDeleteFileDesc& delete_file) {
384
4
    const std::string key_prefix = "iceberg_dv:";
385
4
    std::string key;
386
4
    key.resize(key_prefix.size() + delete_file.path.size() + sizeof(delete_file.content_offset) +
387
4
               sizeof(delete_file.content_size_in_bytes));
388
4
    char* data = key.data();
389
4
    memcpy(data, key_prefix.data(), key_prefix.size());
390
4
    data += key_prefix.size();
391
4
    memcpy(data, delete_file.path.data(), delete_file.path.size());
392
4
    data += delete_file.path.size();
393
4
    memcpy(data, &delete_file.content_offset, sizeof(delete_file.content_offset));
394
4
    data += sizeof(delete_file.content_offset);
395
4
    memcpy(data, &delete_file.content_size_in_bytes, sizeof(delete_file.content_size_in_bytes));
396
4
    return key;
397
4
}
398
399
std::shared_ptr<io::FileSystemProperties> IcebergTableReader::_delete_file_system_properties(
400
8
        const TFileScanRangeParams& scan_params) {
401
8
    auto system_properties = std::make_shared<io::FileSystemProperties>();
402
8
    system_properties->system_type =
403
8
            scan_params.__isset.file_type ? scan_params.file_type : TFileType::FILE_LOCAL;
404
8
    system_properties->properties = scan_params.properties;
405
8
    system_properties->hdfs_params = scan_params.hdfs_params;
406
8
    if (scan_params.__isset.broker_addresses) {
407
0
        system_properties->broker_addresses.assign(scan_params.broker_addresses.begin(),
408
0
                                                   scan_params.broker_addresses.end());
409
0
    }
410
8
    return system_properties;
411
8
}
412
413
std::unique_ptr<io::FileDescription> IcebergTableReader::_delete_file_description(
414
8
        const TFileRangeDesc& range) {
415
8
    auto file_description = std::make_unique<io::FileDescription>();
416
8
    file_description->path = range.path;
417
8
    file_description->file_size = range.__isset.file_size ? range.file_size : -1;
418
8
    file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0;
419
8
    file_description->range_size = range.__isset.size ? range.size : -1;
420
8
    if (range.__isset.fs_name) {
421
0
        file_description->fs_name = range.fs_name;
422
0
    }
423
8
    return file_description;
424
8
}
425
426
7
std::string IcebergTableReader::_data_file_path() const {
427
7
    if (_iceberg_params.has_value() && _iceberg_params->__isset.original_file_path) {
428
6
        return _iceberg_params->original_file_path;
429
6
    }
430
1
    DORIS_CHECK(_current_task != nullptr);
431
1
    DORIS_CHECK(_current_task->data_file != nullptr);
432
1
    return _current_task->data_file->path;
433
7
}
434
435
9
Status IcebergTableReader::_append_row_position_output_column(format::FileScanRequest* request) {
436
9
    const auto row_position_column_id = format::LocalColumnId(format::ROW_POSITION_COLUMN_ID);
437
9
    _append_file_scan_column(request, row_position_column_id, &request->non_predicate_columns);
438
9
    _row_position_block_position = request->local_positions.at(row_position_column_id).value();
439
9
    return Status::OK();
440
9
}
441
442
20
Status IcebergTableReader::_append_equality_delete_predicates(format::FileScanRequest* request) {
443
20
    DORIS_CHECK(request != nullptr);
444
20
    for (const auto& filter : _equality_delete_filters) {
445
2
        auto delete_predicate =
446
2
                std::make_shared<EqualityDeletePredicate>(filter.delete_block, filter.field_ids);
447
2
        DCHECK_EQ(filter.field_ids.size(), filter.key_types.size());
448
4
        for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) {
449
2
            const int field_id = filter.field_ids[idx];
450
2
            auto field_it = std::ranges::find_if(
451
2
                    _data_reader.file_schema, [field_id](const format::ColumnDefinition& field) {
452
2
                        return field.has_identifier_field_id() &&
453
2
                               field.get_identifier_field_id() == field_id;
454
2
                    });
455
2
            if (field_it == _data_reader.file_schema.end()) {
456
0
                return Status::InternalError(
457
0
                        "Can not find equality delete column field id {} in data file schema",
458
0
                        field_id);
459
0
            }
460
2
            const auto field_column_id = format::LocalColumnId(field_it->file_local_id());
461
2
            _append_file_scan_column(request, field_column_id, &request->predicate_columns);
462
2
            const auto block_position = request->local_positions.at(field_column_id).value();
463
2
            auto slot = VSlotRef::create_shared(cast_set<int>(block_position),
464
2
                                                cast_set<int>(block_position), -1, field_it->type,
465
2
                                                field_it->name);
466
2
            if (field_it->type->equals(*filter.key_types[idx])) {
467
1
                delete_predicate->add_child(std::move(slot));
468
1
            } else {
469
1
                auto cast_expr = Cast::create_shared(filter.key_types[idx]);
470
1
                cast_expr->add_child(std::move(slot));
471
1
                delete_predicate->add_child(std::move(cast_expr));
472
1
            }
473
2
        }
474
2
        request->delete_conjuncts.push_back(
475
2
                VExprContext::create_shared(std::move(delete_predicate)));
476
2
    }
477
20
    return Status::OK();
478
20
}
479
480
Status IcebergTableReader::_read_parquet_position_delete_file(
481
        const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params,
482
6
        IcebergDeleteFileIOContext* delete_io_ctx, PositionDeleteRowsCollector* collector) {
483
6
    if (!delete_file.__isset.file_format) {
484
0
        return Status::InternalError("Iceberg position delete file is missing file format");
485
0
    }
486
6
    if (delete_file.file_format == TFileFormatType::FORMAT_ORC) {
487
0
        return Status::NotSupported("Iceberg ORC position delete file is not supported");
488
0
    }
489
6
    if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) {
490
0
        return Status::NotSupported("Unsupported Iceberg delete file format {}",
491
0
                                    delete_file.file_format);
492
0
    }
493
494
6
    auto delete_range = build_iceberg_delete_file_range(delete_file.path);
495
6
    if (_current_task != nullptr && _current_task->data_file != nullptr &&
496
6
        !_current_task->data_file->fs_name.empty()) {
497
0
        delete_range.__set_fs_name(_current_task->data_file->fs_name);
498
0
    }
499
6
    auto system_properties = _delete_file_system_properties(scan_params);
500
6
    auto file_description = _delete_file_description(delete_range);
501
6
    std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx, [](io::IOContext*) {});
502
6
    format::parquet::ParquetReader reader(system_properties, file_description, io_ctx,
503
6
                                          _scanner_profile);
504
6
    RETURN_IF_ERROR(reader.init(_runtime_state));
505
506
6
    std::vector<format::ColumnDefinition> schema;
507
6
    RETURN_IF_ERROR(reader.get_schema(&schema));
508
6
    format::ColumnDefinition* file_path_field = nullptr;
509
6
    format::ColumnDefinition* pos_field = nullptr;
510
12
    for (auto& field : schema) {
511
12
        if (field.name == ICEBERG_FILE_PATH) {
512
6
            file_path_field = &field;
513
6
        } else if (field.name == ICEBERG_ROW_POS) {
514
6
            pos_field = &field;
515
6
        }
516
12
    }
517
6
    if (file_path_field == nullptr || pos_field == nullptr) {
518
0
        return Status::InternalError("Position delete parquet file is missing required columns");
519
0
    }
520
521
6
    auto request = std::make_shared<format::FileScanRequest>();
522
6
    request->non_predicate_columns = {
523
6
            format::LocalColumnIndex::top_level(
524
6
                    format::LocalColumnId(file_path_field->file_local_id())),
525
6
            format::LocalColumnIndex::top_level(format::LocalColumnId(pos_field->file_local_id()))};
526
6
    request->local_positions = {
527
6
            {format::LocalColumnId(file_path_field->file_local_id()),
528
6
             format::LocalIndex(ICEBERG_FILE_PATH_BLOCK_POSITION)},
529
6
            {format::LocalColumnId(pos_field->file_local_id()),
530
6
             format::LocalIndex(ICEBERG_ROW_POS_BLOCK_POSITION)},
531
6
    };
532
6
    RETURN_IF_ERROR(reader.open(request));
533
534
6
    bool eof = false;
535
6
    auto build_position_delete_block = [](const format::ColumnDefinition& file_path_field,
536
12
                                          const format::ColumnDefinition& pos_field) -> Block {
537
12
        Block block;
538
12
        block.insert(
539
12
                {file_path_field.type->create_column(), file_path_field.type, ICEBERG_FILE_PATH});
540
12
        block.insert({pos_field.type->create_column(), pos_field.type, ICEBERG_ROW_POS});
541
12
        return block;
542
12
    };
543
18
    while (!eof) {
544
12
        Block block = build_position_delete_block(*file_path_field, *pos_field);
545
12
        size_t read_rows = 0;
546
12
        RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
547
12
        RETURN_IF_ERROR(collector->collect(block, read_rows));
548
12
    }
549
6
    return reader.close();
550
6
}
551
552
Status IcebergTableReader::_init_position_delete_rows(
553
6
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
554
6
    TFileScanRangeParams delete_scan_params =
555
6
            _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
556
6
    format::DeleteRows position_delete_rows;
557
6
    IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
558
6
    PositionDeleteRowsCollector collector(_data_file_path(), &position_delete_rows);
559
6
    for (const auto& delete_file : delete_files) {
560
6
        RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, delete_scan_params,
561
6
                                                           &delete_io_ctx, &collector));
562
6
    }
563
6
    if (position_delete_rows.empty()) {
564
0
        return Status::OK();
565
0
    }
566
    // Position delete files and deletion vectors both become row-position deletes for the
567
    // common TableReader DeletePredicate path. Keep the merged rows in a member vector because
568
    // DeletePredicate stores a reference to the vector used by _delete_rows.
569
6
    _position_delete_rows_storage.insert(_position_delete_rows_storage.end(),
570
6
                                         position_delete_rows.begin(), position_delete_rows.end());
571
6
    std::sort(_position_delete_rows_storage.begin(), _position_delete_rows_storage.end());
572
6
    _position_delete_rows_storage.erase(
573
6
            std::unique(_position_delete_rows_storage.begin(), _position_delete_rows_storage.end()),
574
6
            _position_delete_rows_storage.end());
575
6
    _delete_rows = &_position_delete_rows_storage;
576
6
    return Status::OK();
577
6
}
578
579
Status IcebergTableReader::_init_equality_delete_predicates(
580
2
        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
581
2
    TFileScanRangeParams delete_scan_params =
582
2
            _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
583
2
    IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
584
2
    for (const auto& delete_file : delete_files) {
585
2
        RETURN_IF_ERROR(_read_parquet_equality_delete_file(delete_file, delete_scan_params,
586
2
                                                           &delete_io_ctx));
587
2
    }
588
2
    return Status::OK();
589
2
}
590
591
Status IcebergTableReader::_read_parquet_equality_delete_file(
592
        const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params,
593
2
        IcebergDeleteFileIOContext* delete_io_ctx) {
594
2
    if (!delete_file.__isset.file_format) {
595
0
        return Status::InternalError("Iceberg equality delete file is missing file format");
596
0
    }
597
2
    if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) {
598
0
        return Status::NotSupported("Unsupported Iceberg equality delete file format {}",
599
0
                                    delete_file.file_format);
600
0
    }
601
2
    if (!delete_file.__isset.field_ids || delete_file.field_ids.empty()) {
602
0
        return Status::InternalError("Iceberg equality delete file is missing field ids");
603
0
    }
604
605
2
    auto delete_range = build_iceberg_delete_file_range(delete_file.path);
606
2
    if (_current_task != nullptr && _current_task->data_file != nullptr &&
607
2
        !_current_task->data_file->fs_name.empty()) {
608
0
        delete_range.__set_fs_name(_current_task->data_file->fs_name);
609
0
    }
610
2
    auto system_properties = _delete_file_system_properties(scan_params);
611
2
    auto file_description = _delete_file_description(delete_range);
612
2
    std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx, [](io::IOContext*) {});
613
2
    format::parquet::ParquetReader reader(system_properties, file_description, io_ctx,
614
2
                                          _scanner_profile);
615
2
    RETURN_IF_ERROR(reader.init(_runtime_state));
616
617
2
    std::vector<format::ColumnDefinition> schema;
618
2
    RETURN_IF_ERROR(reader.get_schema(&schema));
619
2
    std::vector<format::ColumnDefinition> delete_fields;
620
2
    std::vector<int> delete_field_ids;
621
2
    std::vector<DataTypePtr> delete_key_types;
622
2
    for (const auto field_id : delete_file.field_ids) {
623
2
        auto field_it = std::find_if(schema.begin(), schema.end(),
624
2
                                     [field_id](const format::ColumnDefinition& field) {
625
2
                                         return field.has_identifier_field_id() &&
626
2
                                                field_id == field.get_identifier_field_id();
627
2
                                     });
628
2
        if (field_it == schema.end()) {
629
0
            return Status::InternalError("Can not find field id {} in equality delete file {}",
630
0
                                         field_id, delete_file.path);
631
0
        }
632
2
        if (!field_it->children.empty()) {
633
0
            return Status::NotSupported(
634
0
                    "Iceberg equality delete does not support complex column {}", field_it->name);
635
0
        }
636
2
        delete_fields.push_back(*field_it);
637
2
        delete_field_ids.push_back(field_id);
638
2
        delete_key_types.push_back(field_it->type);
639
2
    }
640
641
2
    auto request = std::make_shared<format::FileScanRequest>();
642
4
    for (size_t idx = 0; idx < delete_fields.size(); ++idx) {
643
2
        const auto local_column_id = format::LocalColumnId(delete_fields[idx].file_local_id());
644
2
        request->non_predicate_columns.push_back(
645
2
                format::LocalColumnIndex::top_level(local_column_id));
646
2
        request->local_positions.emplace(local_column_id, format::LocalIndex(idx));
647
2
    }
648
2
    RETURN_IF_ERROR(reader.open(request));
649
650
2
    auto build_equality_delete_block =
651
6
            [](const std::vector<format::ColumnDefinition> fields) -> Block {
652
6
        Block block;
653
6
        for (const auto& field : fields) {
654
6
            block.insert({field.type->create_column(), field.type, field.name});
655
6
        }
656
6
        return block;
657
6
    };
658
2
    Block delete_block = build_equality_delete_block(delete_fields);
659
2
    MutableBlock mutable_delete_block(std::move(delete_block));
660
2
    bool eof = false;
661
6
    while (!eof) {
662
4
        Block block = build_equality_delete_block(delete_fields);
663
4
        size_t read_rows = 0;
664
4
        RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
665
4
        if (read_rows > 0) {
666
2
            RETURN_IF_ERROR(mutable_delete_block.merge(block));
667
2
        }
668
4
    }
669
2
    RETURN_IF_ERROR(reader.close());
670
2
    delete_block = mutable_delete_block.to_block();
671
2
    _equality_delete_filters.push_back(
672
2
            EqualityDeleteFilter {.field_ids = std::move(delete_field_ids),
673
2
                                  .key_types = std::move(delete_key_types),
674
2
                                  .delete_block = std::move(delete_block)});
675
2
    return Status::OK();
676
2
}
677
678
9
Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, size_t column_idx) {
679
9
    if (_row_lineage_columns.first_row_id < 0) {
680
2
        return Status::OK();
681
2
    }
682
7
    DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns());
683
7
    const auto& row_position_column = assert_cast<const ColumnInt64&>(
684
7
            *_data_reader.block_template.get_by_position(_row_position_block_position).column);
685
7
    DORIS_CHECK(row_position_column.size() == table_block->rows());
686
7
    auto column = IColumn::mutate(
687
7
            table_block->get_by_position(column_idx).column->convert_to_full_column_if_const());
688
7
    auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
689
7
    auto& null_map = nullable_column->get_null_map_data();
690
7
    auto& data = assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
691
7
    DORIS_CHECK(null_map.size() == row_position_column.size());
692
7
    DORIS_CHECK(data.size() == row_position_column.size());
693
23
    for (size_t row = 0; row < row_position_column.size(); ++row) {
694
16
        if (null_map[row]) {
695
10
            null_map[row] = 0;
696
10
            data[row] = _row_lineage_columns.first_row_id + row_position_column.get_element(row);
697
10
        }
698
16
    }
699
7
    table_block->replace_by_position(column_idx, std::move(column));
700
7
    return Status::OK();
701
9
}
702
703
1
Status IcebergTableReader::_materialize_iceberg_rowid(Block* table_block, size_t column_idx) {
704
1
    DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns());
705
1
    const auto& row_position_column = assert_cast<const ColumnInt64&>(
706
1
            *_data_reader.block_template.get_by_position(_row_position_block_position).column);
707
1
    DORIS_CHECK(row_position_column.size() == table_block->rows());
708
709
1
    const auto& type = table_block->get_by_position(column_idx).type;
710
1
    auto column = type->create_column();
711
1
    auto* nullable_column = check_and_get_column<ColumnNullable>(column.get());
712
1
    auto* struct_column = nullable_column != nullptr
713
1
                                  ? check_and_get_column<ColumnStruct>(
714
1
                                            nullable_column->get_nested_column_ptr().get())
715
1
                                  : check_and_get_column<ColumnStruct>(column.get());
716
1
    DORIS_CHECK(struct_column != nullptr);
717
1
    DORIS_CHECK(struct_column->tuple_size() >= 4);
718
719
1
    const auto rows = row_position_column.size();
720
1
    const auto file_path = _data_file_path();
721
1
    const int32_t partition_spec_id =
722
1
            _iceberg_params.has_value() && _iceberg_params->__isset.partition_spec_id
723
1
                    ? _iceberg_params->partition_spec_id
724
1
                    : 0;
725
1
    const std::string partition_data_json =
726
1
            _iceberg_params.has_value() && _iceberg_params->__isset.partition_data_json
727
1
                    ? _iceberg_params->partition_data_json
728
1
                    : "";
729
730
1
    auto& file_path_column = struct_column->get_column(0);
731
1
    auto& row_pos_column = struct_column->get_column(1);
732
1
    auto& spec_id_column = struct_column->get_column(2);
733
1
    auto& partition_data_column = struct_column->get_column(3);
734
1
    file_path_column.reserve(rows);
735
1
    row_pos_column.reserve(rows);
736
1
    spec_id_column.reserve(rows);
737
1
    partition_data_column.reserve(rows);
738
3
    for (size_t row = 0; row < rows; ++row) {
739
2
        file_path_column.insert_data(file_path.data(), file_path.size());
740
2
        const int64_t row_pos = row_position_column.get_element(row);
741
2
        row_pos_column.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos));
742
2
        spec_id_column.insert_data(reinterpret_cast<const char*>(&partition_spec_id),
743
2
                                   sizeof(partition_spec_id));
744
2
        partition_data_column.insert_data(partition_data_json.data(), partition_data_json.size());
745
2
    }
746
1
    if (nullable_column != nullptr) {
747
1
        nullable_column->get_null_map_data().resize_fill(rows, 0);
748
1
    }
749
1
    table_block->replace_by_position(column_idx, std::move(column));
750
1
    return Status::OK();
751
1
}
752
753
Status IcebergTableReader::_materialize_row_lineage_last_updated_sequence_number(
754
8
        Block* table_block, size_t column_idx) {
755
8
    if (_row_lineage_columns.last_updated_sequence_number < 0) {
756
2
        return Status::OK();
757
2
    }
758
6
    auto column = IColumn::mutate(
759
6
            table_block->get_by_position(column_idx).column->convert_to_full_column_if_const());
760
6
    auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
761
6
    auto& null_map = nullable_column->get_null_map_data();
762
6
    auto& data = assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
763
6
    DORIS_CHECK(null_map.size() == table_block->rows());
764
6
    DORIS_CHECK(data.size() == table_block->rows());
765
20
    for (size_t row = 0; row < table_block->rows(); ++row) {
766
14
        if (null_map[row]) {
767
8
            null_map[row] = 0;
768
8
            data[row] = _row_lineage_columns.last_updated_sequence_number;
769
8
        }
770
14
    }
771
6
    table_block->replace_by_position(column_idx, std::move(column));
772
6
    return Status::OK();
773
8
}
774
775
8
bool IcebergTableReader::_need_row_lineage_row_id() const {
776
8
    if (_data_reader.column_mapper != nullptr) {
777
7
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
778
7
            if (mapping.virtual_column_type == format::TableVirtualColumnType::ROW_ID) {
779
7
                return true;
780
7
            }
781
7
        }
782
7
    }
783
1
    return std::ranges::any_of(_projected_columns, is_projected_row_lineage_row_id);
784
8
}
785
786
12
bool IcebergTableReader::_need_iceberg_rowid() const {
787
12
    if (_data_reader.column_mapper != nullptr) {
788
16
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
789
16
            if (mapping.virtual_column_type == format::TableVirtualColumnType::ICEBERG_ROWID) {
790
1
                return true;
791
1
            }
792
16
        }
793
12
    }
794
11
    return std::ranges::any_of(_projected_columns, is_projected_iceberg_rowid);
795
12
}
796
797
} // namespace doris::format::iceberg