Coverage Report

Created: 2026-04-08 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/paimon_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/paimon_reader.h"
19
20
#include <vector>
21
22
#include "common/status.h"
23
#include "format/table/deletion_vector_reader.h"
24
#include "runtime/runtime_state.h"
25
26
namespace doris {
27
#include "common/compile_check_begin.h"
28
29
// ============================================================================
30
// PaimonOrcReader
31
// ============================================================================
32
2.07k
void PaimonOrcReader::_init_paimon_profile() {
33
2.07k
    static const char* paimon_profile = "PaimonProfile";
34
2.07k
    ADD_TIMER(get_profile(), paimon_profile);
35
2.07k
    _paimon_profile.num_delete_rows =
36
2.07k
            ADD_CHILD_COUNTER(get_profile(), "NumDeleteRows", TUnit::UNIT, paimon_profile);
37
2.07k
    _paimon_profile.delete_files_read_time =
38
2.07k
            ADD_CHILD_TIMER(get_profile(), "DeleteFileReadTime", paimon_profile);
39
2.07k
    _paimon_profile.parse_deletion_vector_time =
40
2.07k
            ADD_CHILD_TIMER(get_profile(), "ParseDeletionVectorTime", paimon_profile);
41
2.07k
}
42
43
2.08k
Status PaimonOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
44
2.08k
    _column_descs = ctx->column_descs;
45
2.08k
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
46
2.08k
    const orc::Type* orc_type_ptr = nullptr;
47
2.08k
    RETURN_IF_ERROR(get_file_type(&orc_type_ptr));
48
49
2.08k
    RETURN_IF_ERROR(gen_table_info_node_by_field_id(
50
2.08k
            get_scan_params(), get_scan_range().table_format_params.paimon_params.schema_id,
51
2.08k
            get_tuple_descriptor(), orc_type_ptr));
52
2.08k
    ctx->table_info_node = table_info_node_ptr;
53
54
12.4k
    for (const auto& desc : *ctx->column_descs) {
55
12.4k
        if (desc.category == ColumnCategory::REGULAR ||
56
12.4k
            desc.category == ColumnCategory::GENERATED) {
57
12.4k
            ctx->column_names.push_back(desc.name);
58
12.4k
        }
59
12.4k
    }
60
2.08k
    return Status::OK();
61
2.08k
}
62
63
2.08k
Status PaimonOrcReader::on_after_init_reader(ReaderInitContext* /*ctx*/) {
64
2.08k
    return _init_deletion_vector();
65
2.08k
}
66
67
2.08k
Status PaimonOrcReader::_init_deletion_vector() {
68
2.08k
    const auto& table_desc = get_scan_range().table_format_params.paimon_params;
69
2.08k
    if (!table_desc.__isset.deletion_file) {
70
2.06k
        return Status::OK();
71
2.06k
    }
72
73
    // Cannot do count push down if there are delete files
74
20
    if (!get_scan_range().table_format_params.paimon_params.__isset.row_count) {
75
20
        set_push_down_agg_type(TPushAggOp::NONE);
76
20
        lock_push_down_agg_type();
77
20
    }
78
20
    const auto& deletion_file = table_desc.deletion_file;
79
80
20
    Status create_status = Status::OK();
81
82
20
    std::string key;
83
20
    key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
84
20
    memcpy(key.data(), deletion_file.path.data(), deletion_file.path.size());
85
20
    memcpy(key.data() + deletion_file.path.size(), &deletion_file.offset,
86
20
           sizeof(deletion_file.offset));
87
88
20
    SCOPED_TIMER(_paimon_profile.delete_files_read_time);
89
20
    using DeleteRows = std::vector<int64_t>;
90
20
    _delete_rows = _kv_cache->get<DeleteRows>(key, [&]() -> DeleteRows* {
91
20
        auto* delete_rows = new DeleteRows;
92
93
20
        TFileRangeDesc delete_range;
94
20
        delete_range.__set_fs_name(get_scan_range().fs_name);
95
20
        delete_range.path = deletion_file.path;
96
20
        delete_range.start_offset = deletion_file.offset;
97
20
        delete_range.size = deletion_file.length + 4;
98
20
        delete_range.file_size = -1;
99
100
20
        DeletionVectorReader dv_reader(get_state(), get_profile(), get_scan_params(), delete_range,
101
20
                                       get_io_ctx());
102
20
        create_status = dv_reader.open();
103
20
        if (!create_status.ok()) [[unlikely]] {
104
0
            return nullptr;
105
0
        }
106
107
20
        size_t bytes_read = deletion_file.length + 4;
108
20
        std::vector<char> buffer(bytes_read);
109
20
        create_status = dv_reader.read_at(deletion_file.offset, {buffer.data(), bytes_read});
110
20
        if (!create_status.ok()) [[unlikely]] {
111
0
            return nullptr;
112
0
        }
113
114
20
        const char* buf = buffer.data();
115
20
        uint32_t actual_length;
116
20
        std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
117
20
        std::reverse(reinterpret_cast<char*>(&actual_length),
118
20
                     reinterpret_cast<char*>(&actual_length) + 4);
119
20
        buf += 4;
120
20
        if (actual_length != bytes_read - 4) [[unlikely]] {
121
0
            create_status = Status::RuntimeError(
122
0
                    "DeletionVector deserialize error: length not match, "
123
0
                    "actual length: {}, expect length: {}",
124
0
                    actual_length, bytes_read - 4);
125
0
            return nullptr;
126
0
        }
127
20
        uint32_t magic_number;
128
20
        std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
129
20
        std::reverse(reinterpret_cast<char*>(&magic_number),
130
20
                     reinterpret_cast<char*>(&magic_number) + 4);
131
20
        buf += 4;
132
20
        const static uint32_t MAGIC_NUMBER = 1581511376;
133
20
        if (magic_number != MAGIC_NUMBER) [[unlikely]] {
134
0
            create_status = Status::RuntimeError(
135
0
                    "DeletionVector deserialize error: invalid magic number {}", magic_number);
136
0
            return nullptr;
137
0
        }
138
139
20
        roaring::Roaring roaring_bitmap;
140
20
        SCOPED_TIMER(_paimon_profile.parse_deletion_vector_time);
141
20
        try {
142
20
            roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4);
143
20
        } catch (const std::runtime_error& e) {
144
0
            create_status = Status::RuntimeError(
145
0
                    "DeletionVector deserialize error: failed to deserialize roaring bitmap, {}",
146
0
                    e.what());
147
0
            return nullptr;
148
0
        }
149
20
        delete_rows->reserve(roaring_bitmap.cardinality());
150
40
        for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); it++) {
151
20
            delete_rows->push_back(*it);
152
20
        }
153
20
        COUNTER_UPDATE(_paimon_profile.num_delete_rows, delete_rows->size());
154
20
        return delete_rows;
155
20
    });
156
20
    RETURN_IF_ERROR(create_status);
157
20
    if (!_delete_rows->empty()) [[likely]] {
158
20
        set_position_delete_rowids(_delete_rows);
159
20
    }
160
20
    return Status::OK();
161
20
}
162
163
// ============================================================================
164
// PaimonParquetReader
165
// ============================================================================
166
2.68k
void PaimonParquetReader::_init_paimon_profile() {
167
2.68k
    static const char* paimon_profile = "PaimonProfile";
168
2.68k
    ADD_TIMER(get_profile(), paimon_profile);
169
2.68k
    _paimon_profile.num_delete_rows =
170
2.68k
            ADD_CHILD_COUNTER(get_profile(), "NumDeleteRows", TUnit::UNIT, paimon_profile);
171
2.68k
    _paimon_profile.delete_files_read_time =
172
2.68k
            ADD_CHILD_TIMER(get_profile(), "DeleteFileReadTime", paimon_profile);
173
2.68k
    _paimon_profile.parse_deletion_vector_time =
174
2.68k
            ADD_CHILD_TIMER(get_profile(), "ParseDeletionVectorTime", paimon_profile);
175
2.68k
}
176
177
2.66k
Status PaimonParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
178
2.66k
    _column_descs = ctx->column_descs;
179
2.66k
    _fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
180
2.66k
    const FieldDescriptor* field_desc = nullptr;
181
2.66k
    RETURN_IF_ERROR(get_file_metadata_schema(&field_desc));
182
2.66k
    DCHECK(field_desc != nullptr);
183
184
2.66k
    RETURN_IF_ERROR(gen_table_info_node_by_field_id(
185
2.66k
            get_scan_params(), get_scan_range().table_format_params.paimon_params.schema_id,
186
2.66k
            get_tuple_descriptor(), *field_desc));
187
2.66k
    ctx->table_info_node = table_info_node_ptr;
188
189
7.49k
    for (const auto& desc : *ctx->column_descs) {
190
7.49k
        if (desc.category == ColumnCategory::REGULAR ||
191
7.50k
            desc.category == ColumnCategory::GENERATED) {
192
7.50k
            ctx->column_names.push_back(desc.name);
193
7.50k
        }
194
7.49k
    }
195
2.66k
    return Status::OK();
196
2.66k
}
197
198
2.67k
Status PaimonParquetReader::on_after_init_reader(ReaderInitContext* /*ctx*/) {
199
2.67k
    return _init_deletion_vector();
200
2.67k
}
201
202
2.66k
Status PaimonParquetReader::_init_deletion_vector() {
203
2.66k
    const auto& table_desc = get_scan_range().table_format_params.paimon_params;
204
2.66k
    if (!table_desc.__isset.deletion_file) {
205
2.63k
        return Status::OK();
206
2.63k
    }
207
208
32
    if (!get_scan_range().table_format_params.paimon_params.__isset.row_count) {
209
28
        set_push_down_agg_type(TPushAggOp::NONE);
210
28
        lock_push_down_agg_type();
211
28
    }
212
32
    const auto& deletion_file = table_desc.deletion_file;
213
214
32
    Status create_status = Status::OK();
215
216
32
    std::string key;
217
32
    key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
218
32
    memcpy(key.data(), deletion_file.path.data(), deletion_file.path.size());
219
32
    memcpy(key.data() + deletion_file.path.size(), &deletion_file.offset,
220
32
           sizeof(deletion_file.offset));
221
222
32
    SCOPED_TIMER(_paimon_profile.delete_files_read_time);
223
32
    using DeleteRows = std::vector<int64_t>;
224
32
    _delete_rows = _kv_cache->get<DeleteRows>(key, [&]() -> DeleteRows* {
225
28
        auto* delete_rows = new DeleteRows;
226
227
28
        TFileRangeDesc delete_range;
228
28
        delete_range.__set_fs_name(get_scan_range().fs_name);
229
28
        delete_range.path = deletion_file.path;
230
28
        delete_range.start_offset = deletion_file.offset;
231
28
        delete_range.size = deletion_file.length + 4;
232
28
        delete_range.file_size = -1;
233
234
28
        DeletionVectorReader dv_reader(get_state(), get_profile(), get_scan_params(), delete_range,
235
28
                                       get_io_ctx());
236
28
        create_status = dv_reader.open();
237
28
        if (!create_status.ok()) [[unlikely]] {
238
0
            return nullptr;
239
0
        }
240
241
28
        size_t bytes_read = deletion_file.length + 4;
242
28
        std::vector<char> buffer(bytes_read);
243
28
        create_status = dv_reader.read_at(deletion_file.offset, {buffer.data(), bytes_read});
244
28
        if (!create_status.ok()) [[unlikely]] {
245
0
            return nullptr;
246
0
        }
247
248
28
        const char* buf = buffer.data();
249
28
        uint32_t actual_length;
250
28
        std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
251
28
        std::reverse(reinterpret_cast<char*>(&actual_length),
252
28
                     reinterpret_cast<char*>(&actual_length) + 4);
253
28
        buf += 4;
254
28
        if (actual_length != bytes_read - 4) [[unlikely]] {
255
0
            create_status = Status::RuntimeError(
256
0
                    "DeletionVector deserialize error: length not match, "
257
0
                    "actual length: {}, expect length: {}",
258
0
                    actual_length, bytes_read - 4);
259
0
            return nullptr;
260
0
        }
261
28
        uint32_t magic_number;
262
28
        std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
263
28
        std::reverse(reinterpret_cast<char*>(&magic_number),
264
28
                     reinterpret_cast<char*>(&magic_number) + 4);
265
28
        buf += 4;
266
28
        const static uint32_t MAGIC_NUMBER = 1581511376;
267
28
        if (magic_number != MAGIC_NUMBER) [[unlikely]] {
268
0
            create_status = Status::RuntimeError(
269
0
                    "DeletionVector deserialize error: invalid magic number {}", magic_number);
270
0
            return nullptr;
271
0
        }
272
273
28
        roaring::Roaring roaring_bitmap;
274
28
        SCOPED_TIMER(_paimon_profile.parse_deletion_vector_time);
275
28
        try {
276
28
            roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4);
277
28
        } catch (const std::runtime_error& e) {
278
0
            create_status = Status::RuntimeError(
279
0
                    "DeletionVector deserialize error: failed to deserialize roaring bitmap, {}",
280
0
                    e.what());
281
0
            return nullptr;
282
0
        }
283
28
        delete_rows->reserve(roaring_bitmap.cardinality());
284
56
        for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); it++) {
285
28
            delete_rows->push_back(*it);
286
28
        }
287
28
        COUNTER_UPDATE(_paimon_profile.num_delete_rows, delete_rows->size());
288
28
        return delete_rows;
289
28
    });
290
32
    RETURN_IF_ERROR(create_status);
291
32
    if (!_delete_rows->empty()) [[likely]] {
292
28
        ParquetReader::set_delete_rows(_delete_rows);
293
28
    }
294
32
    return Status::OK();
295
32
}
296
297
#include "common/compile_check_end.h"
298
} // namespace doris