Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/transactional_hive_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/transactional_hive_reader.h"
19
20
#include <re2/re2.h>
21
22
#include "core/data_type/data_type_factory.hpp"
23
#include "format/orc/vorc_reader.h"
24
#include "format/table/table_format_reader.h"
25
#include "format/table/transactional_hive_common.h"
26
27
namespace doris {
28
#include "common/compile_check_begin.h"
29
30
namespace io {
31
struct IOContext;
32
} // namespace io
33
class VExprContext;
34
} // namespace doris
35
36
namespace doris {
37
38
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
39
                                                 RuntimeProfile* profile, RuntimeState* state,
40
                                                 const TFileScanRangeParams& params,
41
                                                 const TFileRangeDesc& range, io::IOContext* io_ctx,
42
                                                 FileMetaCache* meta_cache)
43
0
        : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx,
44
0
                            meta_cache) {
45
0
    static const char* transactional_hive_profile = "TransactionalHiveProfile";
46
0
    ADD_TIMER(_profile, transactional_hive_profile);
47
0
    _transactional_orc_profile.num_delete_files =
48
0
            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, transactional_hive_profile);
49
0
    _transactional_orc_profile.num_delete_rows =
50
0
            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, transactional_hive_profile);
51
0
    _transactional_orc_profile.delete_files_read_time =
52
0
            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", transactional_hive_profile);
53
0
}
54
55
Status TransactionalHiveReader::init_reader(
56
        const std::vector<std::string>& column_names,
57
        std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
58
        const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
59
        const RowDescriptor* row_descriptor,
60
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
61
0
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
62
0
    _col_name_to_block_idx = col_name_to_block_idx;
63
0
    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
64
0
    _col_names.insert(_col_names.end(), column_names.begin(), column_names.end());
65
0
    _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
66
0
                      TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
67
68
    // https://issues.apache.org/jira/browse/HIVE-15190
69
0
    const orc::Type* orc_type_ptr = nullptr;
70
0
    RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
71
0
    const auto& orc_type = *orc_type_ptr;
72
73
0
    for (auto idx = 0; idx < TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.size(); idx++) {
74
0
        table_info_node_ptr->add_children(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE[idx],
75
0
                                          TransactionalHive::READ_ROW_COLUMN_NAMES[idx],
76
0
                                          std::make_shared<ScalarNode>());
77
0
    }
78
79
0
    auto row_orc_type = orc_type.getSubtype(TransactionalHive::ROW_OFFSET);
80
    // struct<operation:int,originalTransaction:bigint,bucket:int,rowId:bigint,currentTransaction:bigint,row:struct<id:int,name:string>>
81
0
    std::vector<std::string> row_names;
82
0
    std::map<std::string, uint64_t> row_names_map;
83
0
    for (uint64_t idx = 0; idx < row_orc_type->getSubtypeCount(); idx++) {
84
0
        const auto& file_column_name = row_orc_type->getFieldName(idx);
85
0
        row_names.emplace_back(file_column_name);
86
0
        row_names_map.emplace(file_column_name, idx);
87
0
    }
88
89
    // use name for match.
90
0
    for (const auto& slot : tuple_descriptor->slots()) {
91
0
        const auto& slot_name = slot->col_name();
92
93
0
        if (std::count(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
94
0
                       TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(), slot_name) > 0) {
95
0
            return Status::InternalError("xxxx");
96
0
        }
97
98
0
        if (row_names_map.contains(slot_name)) {
99
0
            std::shared_ptr<Node> child_node = nullptr;
100
0
            RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(
101
0
                    slot->type(), row_orc_type->getSubtype(row_names_map[slot_name]), child_node));
102
0
            auto file_column_name = fmt::format(
103
0
                    "{}.{}", TransactionalHive::ACID_COLUMN_NAMES[TransactionalHive::ROW_OFFSET],
104
0
                    slot_name);
105
0
            table_info_node_ptr->add_children(slot_name, file_column_name, child_node);
106
107
0
        } else {
108
0
            table_info_node_ptr->add_not_exist_children(slot_name);
109
0
        }
110
0
    }
111
112
0
    Status status = orc_reader->init_reader(
113
0
            &_col_names, col_name_to_block_idx, conjuncts, true, tuple_descriptor, row_descriptor,
114
0
            not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr);
115
0
    return status;
116
0
}
117
118
0
Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
119
0
    for (const auto& i : TransactionalHive::READ_PARAMS) {
120
0
        DataTypePtr data_type = get_data_type_with_default_argument(
121
0
                DataTypeFactory::instance().create_data_type(i.type, false));
122
0
        MutableColumnPtr data_column = data_type->create_column();
123
0
        (*_col_name_to_block_idx)[i.column_lower_case] = static_cast<uint32_t>(block->columns());
124
0
        block->insert(
125
0
                ColumnWithTypeAndName(std::move(data_column), data_type, i.column_lower_case));
126
0
    }
127
0
    auto res = _file_format_reader->get_next_block(block, read_rows, eof);
128
0
    Block::erase_useless_column(block, block->columns() - TransactionalHive::READ_PARAMS.size());
129
0
    for (const auto& i : TransactionalHive::READ_PARAMS) {
130
0
        _col_name_to_block_idx->erase(i.column_lower_case);
131
0
    }
132
0
    return res;
133
0
}
134
135
0
Status TransactionalHiveReader::init_row_filters() {
136
0
    std::string data_file_path = _range.path;
137
    // the path in _range is remove the namenode prefix,
138
    // and the file_path in delete file is full path, so we should add it back.
139
0
    if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
140
0
        std::string fs_name = _params.hdfs_params.fs_name;
141
0
        if (!starts_with(data_file_path, fs_name)) {
142
0
            data_file_path = fs_name + data_file_path;
143
0
        }
144
0
    }
145
146
0
    auto* orc_reader = (OrcReader*)(_file_format_reader.get());
147
0
    std::vector<std::string> delete_file_col_names;
148
0
    int64_t num_delete_rows = 0;
149
0
    int64_t num_delete_files = 0;
150
0
    std::filesystem::path file_path(data_file_path);
151
152
    //See https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
153
    // bucket_xxx_attemptId => bucket_xxx
154
    // bucket_xxx           => bucket_xxx
155
0
    auto remove_bucket_attemptId = [](const std::string& str) {
156
0
        re2::RE2 pattern("^bucket_\\d+_\\d+$");
157
158
0
        if (re2::RE2::FullMatch(str, pattern)) {
159
0
            size_t pos = str.rfind('_');
160
0
            if (pos != std::string::npos) {
161
0
                return str.substr(0, pos);
162
0
            }
163
0
        }
164
0
        return str;
165
0
    };
166
167
0
    SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
168
0
    for (const auto& delete_delta :
169
0
         _range.table_format_params.transactional_hive_params.delete_deltas) {
170
0
        const std::string file_name = file_path.filename().string();
171
172
        //need opt.
173
0
        std::vector<std::string> delete_delta_file_names;
174
0
        for (const auto& x : delete_delta.file_names) {
175
0
            delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
176
0
        }
177
0
        auto iter = std::find(delete_delta_file_names.begin(), delete_delta_file_names.end(),
178
0
                              remove_bucket_attemptId(file_name));
179
0
        if (iter == delete_delta_file_names.end()) {
180
0
            continue;
181
0
        }
182
0
        auto delete_file =
183
0
                fmt::format("{}/{}", delete_delta.directory_location,
184
0
                            delete_delta.file_names[iter - delete_delta_file_names.begin()]);
185
186
0
        TFileRangeDesc delete_range;
187
        // must use __set() method to make sure __isset is true
188
0
        delete_range.__set_fs_name(_range.fs_name);
189
0
        delete_range.path = delete_file;
190
0
        delete_range.start_offset = 0;
191
0
        delete_range.size = -1;
192
0
        delete_range.file_size = -1;
193
194
0
        OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE,
195
0
                                _state->timezone(), _io_ctx, _meta_cache, false);
196
197
0
        auto acid_info_node = std::make_shared<StructNode>();
198
0
        for (auto idx = 0; idx < TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE.size();
199
0
             idx++) {
200
0
            auto const& table_column_name =
201
0
                    TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE[idx];
202
0
            auto const& file_column_name = TransactionalHive::DELETE_ROW_COLUMN_NAMES[idx];
203
0
            acid_info_node->add_children(table_column_name, file_column_name,
204
0
                                         std::make_shared<ScalarNode>());
205
0
        }
206
207
0
        RETURN_IF_ERROR(delete_reader.init_reader(
208
0
                &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE,
209
0
                const_cast<std::unordered_map<std::string, uint32_t>*>(
210
0
                        &TransactionalHive::DELETE_COL_NAME_TO_BLOCK_IDX),
211
0
                {}, false, nullptr, nullptr, nullptr, nullptr, acid_info_node));
212
213
0
        std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
214
0
                partition_columns;
215
0
        std::unordered_map<std::string, VExprContextSPtr> missing_columns;
216
0
        RETURN_IF_ERROR(delete_reader.set_fill_columns(partition_columns, missing_columns));
217
218
0
        bool eof = false;
219
0
        while (!eof) {
220
0
            Block block;
221
0
            for (const auto& i : TransactionalHive::DELETE_ROW_PARAMS) {
222
0
                DataTypePtr data_type = DataTypeFactory::instance().create_data_type(i.type, false);
223
0
                MutableColumnPtr data_column = data_type->create_column();
224
0
                block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
225
0
                                                   i.column_lower_case));
226
0
            }
227
0
            eof = false;
228
0
            size_t read_rows = 0;
229
0
            RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
230
0
            if (read_rows > 0) {
231
0
                static int ORIGINAL_TRANSACTION_INDEX = 0;
232
0
                static int BUCKET_ID_INDEX = 1;
233
0
                static int ROW_ID_INDEX = 2;
234
0
                const auto& original_transaction_column = assert_cast<const ColumnInt64&>(
235
0
                        *block.get_by_position(ORIGINAL_TRANSACTION_INDEX).column);
236
0
                const auto& bucket_id_column = assert_cast<const ColumnInt32&>(
237
0
                        *block.get_by_position(BUCKET_ID_INDEX).column);
238
0
                const auto& row_id_column = assert_cast<const ColumnInt64&>(
239
0
                        *block.get_by_position(ROW_ID_INDEX).column);
240
241
0
                DCHECK_EQ(original_transaction_column.size(), read_rows);
242
0
                DCHECK_EQ(bucket_id_column.size(), read_rows);
243
0
                DCHECK_EQ(row_id_column.size(), read_rows);
244
245
0
                for (int i = 0; i < read_rows; ++i) {
246
0
                    Int64 original_transaction = original_transaction_column.get_int(i);
247
0
                    Int64 bucket_id = bucket_id_column.get_int(i);
248
0
                    Int64 row_id = row_id_column.get_int(i);
249
0
                    AcidRowID delete_row_id = {original_transaction, bucket_id, row_id};
250
0
                    _delete_rows.insert(delete_row_id);
251
0
                    ++num_delete_rows;
252
0
                }
253
0
            }
254
0
        }
255
0
        ++num_delete_files;
256
0
    }
257
0
    if (num_delete_rows > 0) {
258
0
        orc_reader->set_push_down_agg_type(TPushAggOp::NONE);
259
0
        orc_reader->set_delete_rows(&_delete_rows);
260
0
        COUNTER_UPDATE(_transactional_orc_profile.num_delete_files, num_delete_files);
261
0
        COUNTER_UPDATE(_transactional_orc_profile.num_delete_rows, num_delete_rows);
262
0
    }
263
0
    return Status::OK();
264
0
}
265
#include "common/compile_check_end.h"
266
} // namespace doris