Coverage Report

Created: 2026-04-13 22:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/stream_load/stream_load_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/stream_load/stream_load_context.h"
19
20
#include <gen_cpp/BackendService_types.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/prettywriter.h>
24
#include <rapidjson/rapidjson.h>
25
#include <rapidjson/stringbuffer.h>
26
#include <rapidjson/writer.h>
27
28
#include <new>
29
#include <sstream>
30
31
#include "common/logging.h"
32
33
namespace doris {
34
using namespace ErrorCode;
35
36
0
std::string StreamLoadContext::to_json() const {
37
0
    rapidjson::StringBuffer s;
38
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
39
40
0
    writer.StartObject();
41
    // txn id
42
0
    writer.Key("TxnId");
43
0
    writer.Int64(txn_id);
44
45
    // label
46
0
    writer.Key("Label");
47
0
    writer.String(label.c_str());
48
49
    // comment
50
0
    writer.Key("Comment");
51
0
    writer.String(load_comment.c_str());
52
53
0
    if (!group_commit) {
54
0
        writer.Key("TwoPhaseCommit");
55
0
        std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
56
0
        writer.String(need_two_phase_commit.c_str());
57
0
    } else {
58
0
        writer.Key("GroupCommit");
59
0
        writer.Bool(true);
60
0
        writer.Key("GroupCommitMode");
61
0
        writer.String(group_commit_mode.c_str());
62
0
        writer.Key("LoadId");
63
0
        writer.String(id.to_string().c_str());
64
0
    }
65
66
    // status
67
0
    writer.Key("Status");
68
0
    switch (status.code()) {
69
0
    case OK:
70
0
        writer.String("Success");
71
0
        break;
72
0
    case PUBLISH_TIMEOUT:
73
0
        writer.String("Publish Timeout");
74
0
        break;
75
0
    case LABEL_ALREADY_EXISTS:
76
0
        writer.String("Label Already Exists");
77
0
        writer.Key("ExistingJobStatus");
78
0
        writer.String(existing_job_status.c_str());
79
0
        break;
80
0
    default:
81
0
        writer.String("Fail");
82
0
        break;
83
0
    }
84
    // msg
85
0
    writer.Key("Message");
86
0
    if (status.ok()) {
87
0
        writer.String("OK");
88
0
    } else {
89
0
        writer.String(status.to_string_no_stack().c_str());
90
0
    }
91
    // number_load_rows
92
0
    writer.Key("NumberTotalRows");
93
0
    writer.Int64(number_total_rows);
94
0
    writer.Key("NumberLoadedRows");
95
0
    writer.Int64(number_loaded_rows);
96
0
    writer.Key("NumberFilteredRows");
97
0
    writer.Int64(number_filtered_rows);
98
0
    writer.Key("NumberUnselectedRows");
99
0
    writer.Int64(number_unselected_rows);
100
0
    writer.Key("LoadBytes");
101
0
    writer.Int64(receive_bytes);
102
0
    writer.Key("LoadTimeMs");
103
0
    writer.Int64(load_cost_millis);
104
0
    if (!group_commit) {
105
0
        writer.Key("BeginTxnTimeMs");
106
0
        writer.Int64(begin_txn_cost_nanos / 1000000);
107
0
    }
108
0
    writer.Key("StreamLoadPutTimeMs");
109
0
    writer.Int64(stream_load_put_cost_nanos / 1000000);
110
0
    writer.Key("ReadDataTimeMs");
111
0
    writer.Int64(read_data_cost_nanos / 1000000);
112
0
    writer.Key("WriteDataTimeMs");
113
0
    writer.Int64(write_data_cost_nanos / 1000000);
114
0
    writer.Key("ReceiveDataTimeMs");
115
0
    writer.Int64((receive_and_read_data_cost_nanos - read_data_cost_nanos) / 1000000);
116
0
    if (!group_commit) {
117
0
        writer.Key("CommitAndPublishTimeMs");
118
0
        writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
119
0
    }
120
121
0
    if (!error_url.empty()) {
122
0
        writer.Key("ErrorURL");
123
0
        writer.String(error_url.c_str());
124
0
    }
125
0
    if (!first_error_msg.empty()) {
126
0
        writer.Key("FirstErrorMsg");
127
0
        writer.String(first_error_msg.c_str());
128
0
    }
129
0
    writer.EndObject();
130
0
    return s.GetString();
131
0
}
132
133
0
std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) {
134
0
    rapidjson::Document document;
135
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
136
0
        LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to "
137
0
                        "client. label="
138
0
                     << label;
139
0
        return "";
140
0
    }
141
0
    rapidjson::Document::AllocatorType& allocator = document.GetAllocator();
142
143
0
    rapidjson::Value cluster_value(rapidjson::kStringType);
144
0
    cluster_value.SetString(auth.cluster.c_str(),
145
0
                            static_cast<rapidjson::SizeType>(auth.cluster.size()));
146
0
    if (!cluster_value.IsNull()) {
147
0
        document.AddMember("cluster", cluster_value, allocator);
148
0
    }
149
150
0
    rapidjson::Value db_value(rapidjson::kStringType);
151
0
    db_value.SetString(db.c_str(), static_cast<rapidjson::SizeType>(db.size()));
152
0
    if (!db_value.IsNull()) {
153
0
        document.AddMember("Db", db_value, allocator);
154
0
    }
155
156
0
    rapidjson::Value table_value(rapidjson::kStringType);
157
0
    table_value.SetString(table.c_str(), static_cast<rapidjson::SizeType>(table.size()));
158
0
    if (!table_value.IsNull()) {
159
0
        document.AddMember("Table", table_value, allocator);
160
0
    }
161
162
0
    rapidjson::Value user_value(rapidjson::kStringType);
163
0
    user_value.SetString(auth.user.c_str(), static_cast<rapidjson::SizeType>(auth.user.size()));
164
0
    if (!user_value.IsNull()) {
165
0
        document.AddMember("User", user_value, allocator);
166
0
    }
167
168
0
    rapidjson::Value client_ip_value(rapidjson::kStringType);
169
0
    client_ip_value.SetString(auth.user_ip.c_str(),
170
0
                              static_cast<rapidjson::SizeType>(auth.user_ip.size()));
171
0
    if (!client_ip_value.IsNull()) {
172
0
        document.AddMember("ClientIp", client_ip_value, allocator);
173
0
    }
174
175
0
    rapidjson::Value comment_value(rapidjson::kStringType);
176
0
    comment_value.SetString(load_comment.c_str(),
177
0
                            static_cast<rapidjson::SizeType>(load_comment.size()));
178
0
    if (!comment_value.IsNull()) {
179
0
        document.AddMember("Comment", comment_value, allocator);
180
0
    }
181
182
0
    document.AddMember("StartTime", start_millis, allocator);
183
0
    document.AddMember("FinishTime", start_millis + load_cost_millis, allocator);
184
0
    rapidjson::StringBuffer buffer;
185
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
186
0
    document.Accept(writer);
187
0
    return buffer.GetString();
188
0
}
189
190
void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record,
191
0
                                                 TStreamLoadRecord& stream_load_item) {
192
0
    rapidjson::Document document;
193
0
    std::stringstream ss;
194
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
195
0
        LOG(WARNING) << "failed to parse json from rocksdb.";
196
0
        return;
197
0
    }
198
199
0
    if (document.HasMember("Label")) {
200
0
        const rapidjson::Value& label = document["Label"];
201
0
        stream_load_item.__set_label(label.GetString());
202
0
        ss << "Label: " << label.GetString();
203
0
    }
204
205
0
    if (document.HasMember("Db")) {
206
0
        const rapidjson::Value& db = document["Db"];
207
0
        stream_load_item.__set_db(db.GetString());
208
0
        ss << ", Db: " << db.GetString();
209
0
    }
210
211
0
    if (document.HasMember("Table")) {
212
0
        const rapidjson::Value& table = document["Table"];
213
0
        stream_load_item.__set_tbl(table.GetString());
214
0
        ss << ", Table: " << table.GetString();
215
0
    }
216
217
0
    if (document.HasMember("User")) {
218
0
        const rapidjson::Value& user = document["User"];
219
0
        stream_load_item.__set_user(user.GetString());
220
0
        ss << ", User: " << user.GetString();
221
0
    }
222
223
0
    if (document.HasMember("ClientIp")) {
224
0
        const rapidjson::Value& client_ip = document["ClientIp"];
225
0
        stream_load_item.__set_user_ip(client_ip.GetString());
226
0
        ss << ", ClientIp: " << client_ip.GetString();
227
0
    }
228
229
0
    if (document.HasMember("Status")) {
230
0
        const rapidjson::Value& status = document["Status"];
231
0
        stream_load_item.__set_status(status.GetString());
232
0
        ss << ", Status: " << status.GetString();
233
0
    }
234
235
0
    if (document.HasMember("Message")) {
236
0
        const rapidjson::Value& message = document["Message"];
237
0
        stream_load_item.__set_message(message.GetString());
238
0
        ss << ", Message: " << message.GetString();
239
0
    }
240
241
0
    if (document.HasMember("ErrorURL")) {
242
0
        const rapidjson::Value& error_url = document["ErrorURL"];
243
0
        stream_load_item.__set_url(error_url.GetString());
244
0
        ss << ", ErrorURL: " << error_url.GetString();
245
0
    } else {
246
0
        stream_load_item.__set_url("N/A");
247
0
        ss << ", ErrorURL: N/A";
248
0
    }
249
250
0
    if (document.HasMember("NumberTotalRows")) {
251
0
        const rapidjson::Value& total_rows = document["NumberTotalRows"];
252
0
        stream_load_item.__set_total_rows(total_rows.GetInt64());
253
0
        ss << ", NumberTotalRows: " << total_rows.GetInt64();
254
0
    }
255
256
0
    if (document.HasMember("NumberLoadedRows")) {
257
0
        const rapidjson::Value& loaded_rows = document["NumberLoadedRows"];
258
0
        stream_load_item.__set_loaded_rows(loaded_rows.GetInt64());
259
0
        ss << ", NumberLoadedRows: " << loaded_rows.GetInt64();
260
0
    }
261
262
0
    if (document.HasMember("NumberFilteredRows")) {
263
0
        const rapidjson::Value& filtered_rows = document["NumberFilteredRows"];
264
0
        stream_load_item.__set_filtered_rows(filtered_rows.GetInt64());
265
0
        ss << ", NumberFilteredRows: " << filtered_rows.GetInt64();
266
0
    }
267
268
0
    if (document.HasMember("NumberUnselectedRows")) {
269
0
        const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"];
270
0
        stream_load_item.__set_unselected_rows(unselected_rows.GetInt64());
271
0
        ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64();
272
0
    }
273
274
0
    if (document.HasMember("LoadBytes")) {
275
0
        const rapidjson::Value& load_bytes = document["LoadBytes"];
276
0
        stream_load_item.__set_load_bytes(load_bytes.GetInt64());
277
0
        ss << ", LoadBytes: " << load_bytes.GetInt64();
278
0
    }
279
280
0
    if (document.HasMember("StartTime")) {
281
0
        const rapidjson::Value& start_time = document["StartTime"];
282
0
        stream_load_item.__set_start_time(start_time.GetInt64());
283
0
        ss << ", StartTime: " << start_time.GetInt64();
284
0
    }
285
286
0
    if (document.HasMember("FinishTime")) {
287
0
        const rapidjson::Value& finish_time = document["FinishTime"];
288
0
        stream_load_item.__set_finish_time(finish_time.GetInt64());
289
0
        ss << ", FinishTime: " << finish_time.GetInt64();
290
0
    }
291
292
0
    if (document.HasMember("Comment")) {
293
0
        const rapidjson::Value& comment_value = document["Comment"];
294
0
        stream_load_item.__set_comment(comment_value.GetString());
295
0
        ss << ", Comment: " << comment_value.GetString();
296
0
    }
297
298
0
    if (document.HasMember("FirstErrorMsg")) {
299
0
        const rapidjson::Value& first_error_msg = document["FirstErrorMsg"];
300
0
        stream_load_item.__set_first_error_msg(first_error_msg.GetString());
301
0
        ss << ", FirstErrorMsg: " << first_error_msg.GetString();
302
0
    } else {
303
0
        stream_load_item.__set_first_error_msg("N/A");
304
0
        ss << ", FirstErrorMsg: N/A";
305
0
    }
306
307
0
    VLOG(1) << "parse json from rocksdb. " << ss.str();
308
0
}
309
310
/*
311
 * The old mini load result format is as follows:
312
 * (which defined in src/util/json_util.cpp)
313
 *
314
 * {
315
 *      "status" : "Success"("Fail"),
316
 *      "msg"    : "xxxx"
317
 * }
318
 *
319
 */
320
0
std::string StreamLoadContext::to_json_for_mini_load() const {
321
0
    rapidjson::StringBuffer s;
322
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
323
0
    writer.StartObject();
324
325
    // status
326
0
    bool show_ok = true;
327
0
    writer.Key("status");
328
0
    switch (status.code()) {
329
0
    case OK:
330
0
        writer.String("Success");
331
0
        break;
332
0
    case PUBLISH_TIMEOUT:
333
        // treat PUBLISH_TIMEOUT as OK in mini load
334
0
        writer.String("Success");
335
0
        break;
336
0
    default:
337
0
        writer.String("Fail");
338
0
        show_ok = false;
339
0
        break;
340
0
    }
341
    // msg
342
0
    writer.Key("msg");
343
0
    if (status.ok() || show_ok) {
344
0
        writer.String("OK");
345
0
    } else {
346
0
        writer.String(status.to_string_no_stack().c_str());
347
0
    }
348
0
    writer.EndObject();
349
0
    return s.GetString();
350
0
}
351
352
7
std::string StreamLoadContext::brief(bool detail) const {
353
7
    std::stringstream ss;
354
7
    ss << "id=" << id << ", job_id=" << job_id << ", txn_id=" << txn_id << ", label=" << label
355
7
       << ", elapse(s)=" << (UnixMillis() - start_millis) / 1000;
356
7
    if (detail) {
357
0
        switch (load_src_type) {
358
0
        case TLoadSourceType::KAFKA:
359
0
            if (kafka_info != nullptr) {
360
0
                ss << ", kafka"
361
0
                   << ", brokers: " << kafka_info->brokers << ", topic: " << kafka_info->topic
362
0
                   << ", partition: ";
363
0
                for (auto& entry : kafka_info->begin_offset) {
364
0
                    ss << "[" << entry.first << ": " << entry.second << "]";
365
0
                }
366
0
            }
367
0
            break;
368
0
        default:
369
0
            break;
370
0
        }
371
0
    }
372
7
    return ss.str();
373
7
}
374
375
0
bool StreamLoadContext::is_mow_table() const {
376
0
    return put_result.__isset.pipeline_params && put_result.pipeline_params.__isset.is_mow_table &&
377
0
           put_result.pipeline_params.is_mow_table;
378
0
}
379
380
} // namespace doris