Coverage Report

Created: 2024-11-21 13:02

/root/doris/be/src/exec/rowid_fetcher.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/rowid_fetcher.h"
19
20
#include <brpc/callback.h>
21
#include <butil/endpoint.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/data.pb.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <gen_cpp/olap_file.pb.h>
26
#include <gen_cpp/types.pb.h>
27
#include <glog/logging.h>
28
#include <stddef.h>
29
#include <stdint.h>
30
31
#include <algorithm>
32
#include <cstdint>
33
#include <ostream>
34
#include <string>
35
#include <unordered_map>
36
#include <utility>
37
#include <vector>
38
39
#include "bthread/countdown_event.h"
40
#include "common/config.h"
41
#include "common/consts.h"
42
#include "common/exception.h"
43
#include "exec/tablet_info.h" // DorisNodesInfo
44
#include "olap/olap_common.h"
45
#include "olap/tablet_schema.h"
46
#include "olap/utils.h"
47
#include "runtime/descriptors.h"
48
#include "runtime/exec_env.h"      // ExecEnv
49
#include "runtime/runtime_state.h" // RuntimeState
50
#include "runtime/types.h"
51
#include "util/brpc_client_cache.h" // BrpcClientCache
52
#include "util/defer_op.h"
53
#include "vec/columns/column.h"
54
#include "vec/columns/column_nullable.h"
55
#include "vec/columns/column_string.h"
56
#include "vec/common/assert_cast.h"
57
#include "vec/common/string_ref.h"
58
#include "vec/core/block.h" // Block
59
#include "vec/data_types/data_type_factory.hpp"
60
#include "vec/data_types/serde/data_type_serde.h"
61
#include "vec/jsonb/serialize.h"
62
63
namespace doris {
64
65
0
Status RowIDFetcher::init() {
66
0
    DorisNodesInfo nodes_info;
67
0
    nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info);
68
0
    for (auto [node_id, node_info] : nodes_info.nodes_info()) {
69
0
        auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
70
0
                node_info.host, node_info.brpc_port);
71
0
        if (!client) {
72
0
            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
73
0
                         << ", port=" << node_info.brpc_port;
74
0
            return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
75
0
                                         node_info.host, node_info.brpc_port);
76
0
        }
77
0
        _stubs.push_back(client);
78
0
    }
79
0
    return Status::OK();
80
0
}
81
82
0
PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_locs) const {
83
0
    PMultiGetRequest mget_req;
84
0
    _fetch_option.desc->to_protobuf(mget_req.mutable_desc());
85
0
    for (SlotDescriptor* slot : _fetch_option.desc->slots()) {
86
        // ignore rowid
87
0
        if (slot->col_name() == BeConsts::ROWID_COL) {
88
0
            continue;
89
0
        }
90
0
        slot->to_protobuf(mget_req.add_slots());
91
0
    }
92
0
    for (size_t i = 0; i < row_locs.size(); ++i) {
93
0
        PRowLocation row_loc;
94
0
        StringRef row_id_rep = row_locs.get_data_at(i);
95
        // TODO: When transferring data between machines with different byte orders (endianness),
96
        // not performing proper handling may lead to issues in parsing and exchanging the data.
97
0
        auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data);
98
0
        row_loc.set_tablet_id(location->tablet_id);
99
0
        row_loc.set_rowset_id(location->row_location.rowset_id.to_string());
100
0
        row_loc.set_segment_id(location->row_location.segment_id);
101
0
        row_loc.set_ordinal_id(location->row_location.row_id);
102
0
        *mget_req.add_row_locs() = std::move(row_loc);
103
0
    }
104
    // Set column desc
105
0
    for (const TColumn& tcolumn : _fetch_option.t_fetch_opt.column_desc) {
106
0
        TabletColumn column(tcolumn);
107
0
        column.to_schema_pb(mget_req.add_column_desc());
108
0
    }
109
0
    PUniqueId& query_id = *mget_req.mutable_query_id();
110
0
    query_id.set_hi(_fetch_option.runtime_state->query_id().hi);
111
0
    query_id.set_lo(_fetch_option.runtime_state->query_id().lo);
112
0
    mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version());
113
0
    mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store);
114
0
    return mget_req;
115
0
}
116
117
0
static void fetch_callback(bthread::CountdownEvent* counter) {
118
0
    Defer __defer([&] { counter->signal(); });
119
0
}
120
121
Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
122
                                        const std::vector<PMultiGetResponse>& rsps,
123
                                        const std::vector<brpc::Controller>& cntls,
124
                                        vectorized::Block* output_block,
125
0
                                        std::vector<PRowLocation>* rows_id) const {
126
0
    output_block->clear();
127
0
    for (const auto& cntl : cntls) {
128
0
        if (cntl.Failed()) {
129
0
            LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText()
130
0
                         << ", host:" << cntl.remote_side();
131
0
            return Status::InternalError(cntl.ErrorText());
132
0
        }
133
0
    }
134
0
    vectorized::DataTypeSerDeSPtrs serdes;
135
0
    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
136
0
    std::vector<std::string> default_values;
137
0
    default_values.resize(_fetch_option.desc->slots().size());
138
0
    auto merge_function = [&](const PMultiGetResponse& resp) {
139
0
        Status st(Status::create(resp.status()));
140
0
        if (!st.ok()) {
141
0
            LOG(WARNING) << "Failed to fetch " << st.to_string();
142
0
            return st;
143
0
        }
144
0
        for (const PRowLocation& row_id : resp.row_locs()) {
145
0
            rows_id->push_back(row_id);
146
0
        }
147
        // Merge binary rows
148
0
        if (request.fetch_row_store()) {
149
0
            CHECK(resp.row_locs().size() == resp.binary_row_data_size());
150
0
            if (output_block->is_empty_column()) {
151
0
                *output_block = vectorized::Block(_fetch_option.desc->slots(), 1);
152
0
            }
153
0
            if (serdes.empty() && col_uid_to_idx.empty()) {
154
0
                serdes = vectorized::create_data_type_serdes(_fetch_option.desc->slots());
155
0
                for (int i = 0; i < _fetch_option.desc->slots().size(); ++i) {
156
0
                    col_uid_to_idx[_fetch_option.desc->slots()[i]->col_unique_id()] = i;
157
0
                    default_values[i] = _fetch_option.desc->slots()[i]->col_default_value();
158
0
                }
159
0
            }
160
0
            for (int i = 0; i < resp.binary_row_data_size(); ++i) {
161
0
                vectorized::JsonbSerializeUtil::jsonb_to_block(
162
0
                        serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(),
163
0
                        col_uid_to_idx, *output_block, default_values);
164
0
            }
165
0
            return Status::OK();
166
0
        }
167
        // Merge partial blocks
168
0
        vectorized::Block partial_block;
169
0
        RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
170
0
        if (partial_block.is_empty_column()) {
171
0
            return Status::OK();
172
0
        }
173
0
        CHECK(resp.row_locs().size() == partial_block.rows());
174
0
        if (output_block->is_empty_column()) {
175
0
            output_block->swap(partial_block);
176
0
        } else if (partial_block.columns() != output_block->columns()) {
177
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
178
0
                    "Merge block not match, self:[{}], input:[{}], ", output_block->dump_types(),
179
0
                    partial_block.dump_types());
180
0
        } else {
181
0
            for (int i = 0; i < output_block->columns(); ++i) {
182
0
                output_block->get_by_position(i).column->assume_mutable()->insert_range_from(
183
0
                        *partial_block.get_by_position(i)
184
0
                                 .column->convert_to_full_column_if_const()
185
0
                                 .get(),
186
0
                        0, partial_block.rows());
187
0
            }
188
0
        }
189
0
        return Status::OK();
190
0
    };
191
192
0
    for (const auto& resp : rsps) {
193
0
        RETURN_IF_ERROR(merge_function(resp));
194
0
    }
195
0
    return Status::OK();
196
0
}
197
198
0
bool _has_char_type(const TypeDescriptor& desc) {
199
0
    switch (desc.type) {
200
0
    case TYPE_CHAR:
201
0
        return true;
202
0
    case TYPE_ARRAY:
203
0
    case TYPE_MAP:
204
0
    case TYPE_STRUCT:
205
0
        for (int idx = 0; idx < desc.children.size(); ++idx) {
206
0
            if (_has_char_type(desc.children[idx])) {
207
0
                return true;
208
0
            }
209
0
        }
210
0
        return false;
211
0
    default:
212
0
        return false;
213
0
    }
214
0
}
215
216
Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
217
0
                           vectorized::Block* res_block) {
218
0
    CHECK(!_stubs.empty());
219
0
    PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const vectorized::ColumnString&>(
220
0
            *vectorized::remove_nullable(column_row_ids).get()));
221
0
    std::vector<PMultiGetResponse> resps(_stubs.size());
222
0
    std::vector<brpc::Controller> cntls(_stubs.size());
223
0
    bthread::CountdownEvent counter(_stubs.size());
224
0
    for (size_t i = 0; i < _stubs.size(); ++i) {
225
0
        cntls[i].set_timeout_ms(config::fetch_rpc_timeout_seconds * 1000);
226
0
        auto callback = brpc::NewCallback(fetch_callback, &counter);
227
0
        _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback);
228
0
    }
229
0
    counter.wait();
230
231
    // Merge
232
0
    std::vector<PRowLocation> rows_locs;
233
0
    rows_locs.reserve(rows_locs.size());
234
0
    RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_locs));
235
0
    if (rows_locs.size() < column_row_ids->size()) {
236
0
        return Status::InternalError("Miss matched return row loc count {}, expected {}, input {}",
237
0
                                     rows_locs.size(), res_block->rows(), column_row_ids->size());
238
0
    }
239
    // Final sort by row_ids sequence, since row_ids is already sorted if need
240
0
    std::map<GlobalRowLoacation, size_t> positions;
241
0
    for (size_t i = 0; i < rows_locs.size(); ++i) {
242
0
        RowsetId rowset_id;
243
0
        rowset_id.init(rows_locs[i].rowset_id());
244
0
        GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id, rows_locs[i].segment_id(),
245
0
                               rows_locs[i].ordinal_id());
246
0
        positions[grl] = i;
247
0
    };
248
    // TODO remove this warning code
249
0
    if (positions.size() < rows_locs.size()) {
250
0
        LOG(WARNING) << "contains duplicated row entry";
251
0
    }
252
0
    vectorized::IColumn::Permutation permutation;
253
0
    permutation.reserve(column_row_ids->size());
254
0
    for (size_t i = 0; i < column_row_ids->size(); ++i) {
255
0
        auto location =
256
0
                reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
257
0
        permutation.push_back(positions[*location]);
258
0
    }
259
0
    for (size_t i = 0; i < res_block->columns(); ++i) {
260
0
        res_block->get_by_position(i).column =
261
0
                res_block->get_by_position(i).column->permute(permutation, permutation.size());
262
0
    }
263
    // Check row consistency
264
0
    RETURN_IF_CATCH_EXCEPTION(res_block->check_number_of_rows());
265
    // shrink for char type
266
0
    std::vector<size_t> char_type_idx;
267
0
    for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) {
268
0
        const auto& column_desc = _fetch_option.desc->slots()[i];
269
0
        const TypeDescriptor& type_desc = column_desc->type();
270
0
        if (_has_char_type(type_desc)) {
271
0
            char_type_idx.push_back(i);
272
0
        }
273
0
    }
274
0
    res_block->shrink_char_type_column_suffix_zero(char_type_idx);
275
0
    VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10);
276
0
    return Status::OK();
277
0
}
278
279
} // namespace doris