Coverage Report

Created: 2026-04-16 15:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/transactional_hive_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/transactional_hive_reader.h"
19
20
#include <re2/re2.h>
21
22
#include "core/data_type/data_type_factory.hpp"
23
#include "format/orc/vorc_reader.h"
24
#include "format/table/table_format_reader.h"
25
#include "format/table/transactional_hive_common.h"
26
27
namespace doris {
28
29
namespace io {
30
struct IOContext;
31
} // namespace io
32
class VExprContext;
33
} // namespace doris
34
35
namespace doris {
36
37
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
38
                                                 RuntimeProfile* profile, RuntimeState* state,
39
                                                 const TFileScanRangeParams& params,
40
                                                 const TFileRangeDesc& range, io::IOContext* io_ctx,
41
                                                 FileMetaCache* meta_cache)
42
1
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
43
1
                            meta_cache) {
44
1
    static const char* transactional_hive_profile = "TransactionalHiveProfile";
45
1
    ADD_TIMER(_profile, transactional_hive_profile);
46
1
    _transactional_orc_profile.num_delete_files =
47
1
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, transactional_hive_profile);
48
1
    _transactional_orc_profile.num_delete_rows =
49
1
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, transactional_hive_profile);
50
1
    _transactional_orc_profile.delete_files_read_time =
51
1
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", transactional_hive_profile);
52
1
}
53
54
Status TransactionalHiveReader::init_reader(
55
        const std::vector<std::string>& column_names,
56
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
57
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
58
        const RowDescriptor* row_descriptor,
59
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
60
0
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
61
0
    _col_name_to_block_idx = col_name_to_block_idx;
62
0
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
63
0
    _col_names.insert(_col_names.end(), column_names.begin(), column_names.end());
64
0
    _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
65
0
                      TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
66
67
    // https://issues.apache.org/jira/browse/HIVE-15190
68
0
    const orc::Type* orc_type_ptr = nullptr;
69
0
    RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
70
0
    const auto& orc_type = *orc_type_ptr;
71
72
0
    for (auto idx = 0; idx < TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.size(); idx++) {
73
0
        table_info_node_ptr->add_children(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE[idx],
74
0
                                          TransactionalHive::READ_ROW_COLUMN_NAMES[idx],
75
0
                                          std::make_shared<ScalarNode>());
76
0
    }
77
78
0
    auto row_orc_type = orc_type.getSubtype(TransactionalHive::ROW_OFFSET);
79
    // struct<operation:int,originalTransaction:bigint,bucket:int,rowId:bigint,currentTransaction:bigint,row:struct<id:int,name:string>>
80
0
    std::vector<std::string> row_names;
81
0
    std::map<std::string, uint64_t> row_names_map;
82
0
    for (uint64_t idx = 0; idx < row_orc_type->getSubtypeCount(); idx++) {
83
0
        const auto& file_column_name = row_orc_type->getFieldName(idx);
84
0
        row_names.emplace_back(file_column_name);
85
0
        row_names_map.emplace(file_column_name, idx);
86
0
    }
87
88
    // use name for match.
89
0
    for (const auto& slot : tuple_descriptor->slots()) {
90
0
        const auto& slot_name = slot->col_name();
91
92
0
        if (std::count(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
93
0
                       TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(), slot_name) > 0) {
94
0
            return Status::InternalError("xxxx");
95
0
        }
96
97
0
        if (row_names_map.contains(slot_name)) {
98
0
            std::shared_ptr<Node> child_node = nullptr;
99
0
            RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(
100
0
                    slot->type(), row_orc_type->getSubtype(row_names_map[slot_name]), child_node));
101
0
            auto file_column_name = fmt::format(
102
0
                    "{}.{}", TransactionalHive::ACID_COLUMN_NAMES[TransactionalHive::ROW_OFFSET],
103
0
                    slot_name);
104
0
            table_info_node_ptr->add_children(slot_name, file_column_name, child_node);
105
106
0
        } else {
107
0
            table_info_node_ptr->add_not_exist_children(slot_name);
108
0
        }
109
0
    }
110
111
0
    Status status = orc_reader->init_reader(
112
0
            &_col_names, col_name_to_block_idx, conjuncts, true, tuple_descriptor, row_descriptor,
113
0
            not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr);
114
0
    return status;
115
0
}
116
117
0
Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
118
0
    for (const auto& i : TransactionalHive::READ_PARAMS) {
119
0
        DataTypePtr data_type = get_data_type_with_default_argument(
120
0
                DataTypeFactory::instance().create_data_type(i.type, false));
121
0
        MutableColumnPtr data_column = data_type->create_column();
122
0
        (*_col_name_to_block_idx)[i.column_lower_case] = static_cast<uint32_t>(block->columns());
123
0
        block->insert(
124
0
                ColumnWithTypeAndName(std::move(data_column), data_type, i.column_lower_case));
125
0
    }
126
0
    auto res = _file_format_reader->get_next_block(block, read_rows, eof);
127
0
    Block::erase_useless_column(block, block->columns() - TransactionalHive::READ_PARAMS.size());
128
0
    for (const auto& i : TransactionalHive::READ_PARAMS) {
129
0
        _col_name_to_block_idx->erase(i.column_lower_case);
130
0
    }
131
0
    return res;
132
0
}
133
134
0
Status TransactionalHiveReader::init_row_filters() {
135
0
    std::string data_file_path = _range.path;
136
    // the path in _range is remove the namenode prefix,
137
    // and the file_path in delete file is full path, so we should add it back.
138
0
    if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
139
0
        std::string fs_name = _params.hdfs_params.fs_name;
140
0
        if (!starts_with(data_file_path, fs_name)) {
141
0
            data_file_path = fs_name + data_file_path;
142
0
        }
143
0
    }
144
145
0
    auto* orc_reader = (OrcReader*)(_file_format_reader.get());
146
0
    std::vector<std::string> delete_file_col_names;
147
0
    int64_t num_delete_rows = 0;
148
0
    int64_t num_delete_files = 0;
149
0
    std::filesystem::path file_path(data_file_path);
150
151
    //See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
152
    // bucket_xxx_attemptId => bucket_xxx
153
    // bucket_xxx           => bucket_xxx
154
0
    auto remove_bucket_attemptId = [](const std::string& str) {
155
0
        re2::RE2 pattern("^bucket_\\d+_\\d+$");
156
157
0
        if (re2::RE2::FullMatch(str, pattern)) {
158
0
            size_t pos = str.rfind('_');
159
0
            if (pos != std::string::npos) {
160
0
                return str.substr(0, pos);
161
0
            }
162
0
        }
163
0
        return str;
164
0
    };
165
166
0
    SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
167
0
    for (const auto& delete_delta :
168
0
         _range.table_format_params.transactional_hive_params.delete_deltas) {
169
0
        const std::string file_name = file_path.filename().string();
170
171
        //need opt.
172
0
        std::vector<std::string> delete_delta_file_names;
173
0
        for (const auto& x : delete_delta.file_names) {
174
0
            delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
175
0
        }
176
0
        auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
177
0
                              remove_bucket_attemptId(file_name));
178
0
        if (iter == delete_delta_file_names.end()) {
179
0
            continue;
180
0
        }
181
0
        auto delete_file =
182
0
                fmt::format("{}/{}", delete_delta.directory_location,
183
0
                            delete_delta.file_names[iter - delete_delta_file_names.begin()]);
184
185
0
        TFileRangeDesc delete_range;
186
        // must use __set() method to make sure __isset is true
187
0
        delete_range.__set_fs_name(_range.fs_name);
188
0
        delete_range.path = delete_file;
189
0
        delete_range.start_offset = 0;
190
0
        delete_range.size = -1;
191
0
        delete_range.file_size = -1;
192
193
0
        OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE,
194
0
                                _state->timezone(), _io_ctx, _meta_cache, false);
195
196
0
        auto acid_info_node = std::make_shared<StructNode>();
197
0
        for (auto idx = 0; idx < TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE.size();
198
0
             idx++) {
199
0
            auto const& table_column_name =
200
0
                    TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE[idx];
201
0
            auto const& file_column_name = TransactionalHive::DELETE_ROW_COLUMN_NAMES[idx];
202
0
            acid_info_node->add_children(table_column_name, file_column_name,
203
0
                                         std::make_shared<ScalarNode>());
204
0
        }
205
206
0
        RETURN_IF_ERROR(delete_reader.init_reader(
207
0
                &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE,
208
0
                const_cast<std::unordered_map<std::string, uint32_t>*>(
209
0
                        &TransactionalHive::DELETE_COL_NAME_TO_BLOCK_IDX),
210
0
                {}, false, nullptr, nullptr, nullptr, nullptr, acid_info_node));
211
212
0
        std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
213
0
                partition_columns;
214
0
        std::unordered_map<std::string, VExprContextSPtr> missing_columns;
215
0
        RETURN_IF_ERROR(delete_reader.set_fill_columns(partition_columns, missing_columns));
216
217
0
        bool eof = false;
218
0
        while (!eof) {
219
0
            Block block;
220
0
            for (const auto& i : TransactionalHive::DELETE_ROW_PARAMS) {
221
0
                DataTypePtr data_type = DataTypeFactory::instance().create_data_type(i.type, false);
222
0
                MutableColumnPtr data_column = data_type->create_column();
223
0
                block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
224
0
                                                   i.column_lower_case));
225
0
            }
226
0
            eof = false;
227
0
            size_t read_rows = 0;
228
0
            RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
229
0
            if (read_rows > 0) {
230
0
                static int ORIGINAL_TRANSACTION_INDEX = 0;
231
0
                static int BUCKET_ID_INDEX = 1;
232
0
                static int ROW_ID_INDEX = 2;
233
0
                const auto& original_transaction_column = assert_cast<const ColumnInt64&>(
234
0
                        *block.get_by_position(ORIGINAL_TRANSACTION_INDEX).column);
235
0
                const auto& bucket_id_column = assert_cast<const ColumnInt32&>(
236
0
                        *block.get_by_position(BUCKET_ID_INDEX).column);
237
0
                const auto& row_id_column = assert_cast<const ColumnInt64&>(
238
0
                        *block.get_by_position(ROW_ID_INDEX).column);
239
240
0
                DCHECK_EQ(original_transaction_column.size(), read_rows);
241
0
                DCHECK_EQ(bucket_id_column.size(), read_rows);
242
0
                DCHECK_EQ(row_id_column.size(), read_rows);
243
244
0
                for (int i = 0; i < read_rows; ++i) {
245
0
                    Int64 original_transaction = original_transaction_column.get_int(i);
246
0
                    Int64 bucket_id = bucket_id_column.get_int(i);
247
0
                    Int64 row_id = row_id_column.get_int(i);
248
0
                    AcidRowID delete_row_id = {original_transaction, bucket_id, row_id};
249
0
                    _delete_rows.insert(delete_row_id);
250
0
                    ++num_delete_rows;
251
0
                }
252
0
            }
253
0
        }
254
0
        ++num_delete_files;
255
0
    }
256
0
    if (num_delete_rows > 0) {
257
0
        orc_reader->set_push_down_agg_type(TPushAggOp::NONE);
258
0
        orc_reader->set_delete_rows(&_delete_rows);
259
0
        COUNTER_UPDATE(_transactional_orc_profile.num_delete_files, num_delete_files);
260
0
        COUNTER_UPDATE(_transactional_orc_profile.num_delete_rows, num_delete_rows);
261
0
    }
262
0
    return Status::OK();
263
0
}
264
} // namespace doris