Coverage Report

Created: 2024-11-21 20:39

/root/doris/be/src/util/thrift_util.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/thrift_util.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <thrift/TOutput.h>
22
#include <thrift/protocol/TBinaryProtocol.h>
23
#include <thrift/transport/TSocket.h>
24
#include <thrift/transport/TTransportException.h>
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <chrono> // IWYU pragma: keep
27
#include <string>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/logging.h"
31
#include "exec/tablet_info.h"
32
#include "olap/tablet_schema.h"
33
#include "util/thrift_server.h"
34
35
namespace apache::thrift::protocol {
36
class TProtocol;
37
} // namespace apache::thrift::protocol
38
39
// TCompactProtocol requires some #defines to work right.  They also define UNLIKELY
40
// so we need to undef this.
41
// TODO: is there a better include to use?
42
#ifdef UNLIKELY
43
#undef UNLIKELY
44
#endif
45
#ifndef SIGNED_RIGHT_SHIFT_IS
46
#define SIGNED_RIGHT_SHIFT_IS 1
47
#endif
48
49
#ifndef ARITHMETIC_RIGHT_SHIFT
50
#define ARITHMETIC_RIGHT_SHIFT 1
51
#endif
52
53
#include <thrift/protocol/TCompactProtocol.h>
54
55
#include <sstream>
56
#include <thread>
57
58
namespace doris {
59
60
ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size)
61
0
        : _mem_buffer(new apache::thrift::transport::TMemoryBuffer(initial_buffer_size)) {
62
0
    if (compact) {
63
0
        apache::thrift::protocol::TCompactProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
64
0
                factory;
65
0
        _protocol = factory.getProtocol(_mem_buffer);
66
0
    } else {
67
0
        apache::thrift::protocol::TBinaryProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
68
0
                factory;
69
0
        _protocol = factory.getProtocol(_mem_buffer);
70
0
    }
71
0
}
72
73
std::shared_ptr<apache::thrift::protocol::TProtocol> create_deserialize_protocol(
74
59
        std::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem, bool compact) {
75
59
    if (compact) {
76
59
        apache::thrift::protocol::TCompactProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
77
59
                tproto_factory;
78
59
        return tproto_factory.getProtocol(mem);
79
59
    } else {
80
0
        apache::thrift::protocol::TBinaryProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
81
0
                tproto_factory;
82
0
        return tproto_factory.getProtocol(mem);
83
0
    }
84
59
}
85
86
// Comparator for THostPorts. Thrift declares this (in gen-cpp/Types_types.h) but
87
// never defines it.
88
0
bool TNetworkAddress::operator<(const TNetworkAddress& that) const {
89
0
    if (this->hostname < that.hostname) {
90
0
        return true;
91
0
    } else if ((this->hostname == that.hostname) && (this->port < that.port)) {
92
0
        return true;
93
0
    }
94
95
0
    return false;
96
0
};
97
98
0
static void thrift_output_function(const char* output) {
99
0
    VLOG_QUERY << output;
100
0
}
101
102
0
void init_thrift_logging() {
103
0
    apache::thrift::GlobalOutput.setOutputFunction(thrift_output_function);
104
0
}
105
106
0
Status wait_for_local_server(const ThriftServer& server, int num_retries, int retry_interval_ms) {
107
0
    return wait_for_server("localhost", server.port(), num_retries, retry_interval_ms);
108
0
}
109
110
0
Status wait_for_server(const std::string& host, int port, int num_retries, int retry_interval_ms) {
111
0
    int retry_count = 0;
112
113
0
    while (retry_count < num_retries) {
114
0
        try {
115
0
            apache::thrift::transport::TSocket socket(host, port);
116
            // Timeout is in ms
117
0
            socket.setConnTimeout(500);
118
0
            socket.open();
119
0
            socket.close();
120
0
            return Status::OK();
121
0
        } catch (apache::thrift::transport::TTransportException& e) {
122
0
            VLOG_QUERY << "Connection failed: " << e.what();
123
0
        }
124
125
0
        ++retry_count;
126
0
        VLOG_QUERY << "Waiting " << retry_interval_ms << "ms for Thrift server at " << host << ":"
127
0
                   << port << " to come up, failed attempt " << retry_count << " of "
128
0
                   << num_retries;
129
0
        std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
130
0
    }
131
132
0
    return Status::InternalError("Server did not come up");
133
0
}
134
135
0
void t_network_address_to_string(const TNetworkAddress& address, std::string* out) {
136
0
    std::stringstream ss;
137
0
    ss << address;
138
0
    *out = ss.str();
139
0
}
140
141
0
bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddress& b) {
142
0
    int cmp = a.hostname.compare(b.hostname);
143
144
0
    if (cmp < 0) {
145
0
        return true;
146
0
    }
147
148
0
    if (cmp == 0) {
149
0
        return a.port < b.port;
150
0
    }
151
152
0
    return false;
153
0
}
154
155
0
std::string to_string(const TUniqueId& id) {
156
0
    return std::to_string(id.hi).append(std::to_string(id.lo));
157
0
}
158
159
0
bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink) {
160
0
    OlapTableSchemaParam schema;
161
0
    if (!schema.init(sink.schema).ok()) {
162
0
        return false;
163
0
    }
164
0
    if (schema.is_partial_update()) {
165
0
        return true;
166
0
    }
167
0
    for (const auto& index_schema : schema.indexes()) {
168
0
        for (const auto& index : index_schema->indexes) {
169
0
            if (index->index_type() == INVERTED) {
170
0
                if (sink.schema.inverted_index_file_storage_format ==
171
0
                    TInvertedIndexFileStorageFormat::V1) {
172
0
                    return true;
173
0
                } else {
174
0
                    return false;
175
0
                }
176
0
            }
177
0
        }
178
0
    }
179
0
    return false;
180
0
}
181
182
} // namespace doris