Coverage Report

Created: 2026-04-22 22:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/wal/wal_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "wal_reader.h"
19
20
#include <absl/strings/str_split.h>
21
22
#include "agent/be_exec_version_manager.h"
23
#include "common/logging.h"
24
#include "core/block/block.h"
25
#include "cpp/sync_point.h"
26
#include "load/group_commit/wal/wal_manager.h"
27
#include "runtime/runtime_state.h"
28
29
namespace doris {
30
0
WalReader::WalReader(RuntimeState* state) : _state(state) {
31
0
    _wal_id = state->wal_id();
32
0
}
33
34
0
Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
35
0
    _tuple_descriptor = tuple_descriptor;
36
0
    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path));
37
0
    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
38
0
    RETURN_IF_ERROR(_wal_reader->init());
39
0
    return Status::OK();
40
0
}
41
42
// ---- Unified init_reader(ReaderInitContext*) overrides ----
43
44
0
Status WalReader::_open_file_reader(ReaderInitContext* ctx) {
45
0
    auto* wal_ctx = checked_context_cast<WalInitContext>(ctx);
46
0
    _tuple_descriptor = wal_ctx->output_tuple_descriptor;
47
0
    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path));
48
0
    _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
49
0
    RETURN_IF_ERROR(_wal_reader->init());
50
0
    return Status::OK();
51
0
}
52
53
0
Status WalReader::_do_init_reader(ReaderInitContext* /*base_ctx*/) {
54
0
    return Status::OK();
55
0
}
56
57
0
Status WalReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
58
    //read src block
59
0
    PBlock pblock;
60
0
    auto st = _wal_reader->read_block(pblock);
61
0
    if (st.is<ErrorCode::END_OF_FILE>()) {
62
0
        LOG(INFO) << "read eof on wal:" << _wal_path;
63
0
        *read_rows = 0;
64
0
        *eof = true;
65
0
        return Status::OK();
66
0
    }
67
0
    if (!st.ok()) {
68
0
        LOG(WARNING) << "Failed to read wal on path = " << _wal_path;
69
0
        return st;
70
0
    }
71
0
    int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0;
72
0
    if (!BeExecVersionManager::check_be_exec_version(be_exec_version)) {
73
0
        return Status::DataQualityError("check be exec version fail when reading wal file {}",
74
0
                                        _wal_path);
75
0
    }
76
0
    Block src_block;
77
0
    size_t uncompressed_size = 0;
78
0
    int64_t uncompressed_time = 0;
79
0
    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, &uncompressed_time));
80
    //convert to dst block
81
0
    Block dst_block;
82
0
    int index = 0;
83
0
    auto output_block_columns = block->get_columns_with_type_and_name();
84
0
    size_t output_block_column_size = output_block_columns.size();
85
0
    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", &_column_id_count);
86
0
    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", &output_block_column_size);
87
0
    if (_column_id_count != src_block.columns() ||
88
0
        output_block_column_size != _tuple_descriptor->slots().size()) {
89
0
        return Status::InternalError(
90
0
                "not equal wal _column_id_count={} vs wal block columns size={}, "
91
0
                "output block columns size={} vs tuple_descriptor size={}",
92
0
                std::to_string(_column_id_count), std::to_string(src_block.columns()),
93
0
                std::to_string(output_block_column_size),
94
0
                std::to_string(_tuple_descriptor->slots().size()));
95
0
    }
96
0
    for (auto* slot_desc : _tuple_descriptor->slots()) {
97
0
        auto pos = _column_pos_map[slot_desc->col_unique_id()];
98
0
        if (pos >= src_block.columns()) {
99
0
            return Status::InternalError("read wal {} fail, pos {}, columns size {}", _wal_path,
100
0
                                         pos, src_block.columns());
101
0
        }
102
0
        ColumnPtr column_ptr = src_block.get_by_position(pos).column;
103
0
        if (!column_ptr && slot_desc->is_nullable()) {
104
0
            column_ptr = make_nullable(column_ptr);
105
0
        }
106
0
        dst_block.insert(index, ColumnWithTypeAndName(std::move(column_ptr),
107
0
                                                      output_block_columns[index].type,
108
0
                                                      output_block_columns[index].name));
109
0
        index++;
110
0
    }
111
0
    block->swap(dst_block);
112
0
    *read_rows = block->rows();
113
0
    VLOG_DEBUG << "read block rows:" << *read_rows;
114
115
0
    return Status::OK();
116
0
}
117
118
0
Status WalReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
119
0
    std::string col_ids;
120
0
    RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
121
0
    std::vector<std::string> column_id_vector =
122
0
            absl::StrSplit(col_ids, ",", absl::SkipWhitespace());
123
0
    _column_id_count = column_id_vector.size();
124
0
    try {
125
0
        int64_t pos = 0;
126
0
        for (auto col_id_str : column_id_vector) {
127
0
            auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10);
128
0
            _column_pos_map.emplace(col_id, pos);
129
0
            pos++;
130
0
        }
131
0
    } catch (const std::invalid_argument& e) {
132
0
        return Status::InvalidArgument("Invalid format, {}", e.what());
133
0
    }
134
    // Report WAL columns so on_before_init_reader does not mark them as missing.
135
0
    if (_tuple_descriptor) {
136
0
        for (auto* slot_desc : _tuple_descriptor->slots()) {
137
0
            if (_column_pos_map.contains(slot_desc->col_unique_id())) {
138
0
                name_to_type->emplace(slot_desc->col_name(), slot_desc->get_data_type_ptr());
139
0
            }
140
0
        }
141
0
    }
142
0
    return Status::OK();
143
0
}
144
145
} // namespace doris