Coverage Report

Created: 2026-03-18 09:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/stream_load/stream_load_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/stream_load/stream_load_context.h"
19
20
#include <gen_cpp/BackendService_types.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/prettywriter.h>
24
#include <rapidjson/rapidjson.h>
25
#include <rapidjson/stringbuffer.h>
26
#include <rapidjson/writer.h>
27
28
#include <new>
29
#include <sstream>
30
31
#include "common/logging.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
using namespace ErrorCode;
36
37
0
std::string StreamLoadContext::to_json() const {
38
0
    rapidjson::StringBuffer s;
39
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
40
41
0
    writer.StartObject();
42
    // txn id
43
0
    writer.Key("TxnId");
44
0
    writer.Int64(txn_id);
45
46
    // label
47
0
    writer.Key("Label");
48
0
    writer.String(label.c_str());
49
50
    // comment
51
0
    writer.Key("Comment");
52
0
    writer.String(load_comment.c_str());
53
54
0
    if (!group_commit) {
55
0
        writer.Key("TwoPhaseCommit");
56
0
        std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
57
0
        writer.String(need_two_phase_commit.c_str());
58
0
    } else {
59
0
        writer.Key("GroupCommit");
60
0
        writer.Bool(true);
61
0
        writer.Key("GroupCommitMode");
62
0
        writer.String(group_commit_mode.c_str());
63
0
        writer.Key("LoadId");
64
0
        writer.String(id.to_string().c_str());
65
0
    }
66
67
    // status
68
0
    writer.Key("Status");
69
0
    switch (status.code()) {
70
0
    case OK:
71
0
        writer.String("Success");
72
0
        break;
73
0
    case PUBLISH_TIMEOUT:
74
0
        writer.String("Publish Timeout");
75
0
        break;
76
0
    case LABEL_ALREADY_EXISTS:
77
0
        writer.String("Label Already Exists");
78
0
        writer.Key("ExistingJobStatus");
79
0
        writer.String(existing_job_status.c_str());
80
0
        break;
81
0
    default:
82
0
        writer.String("Fail");
83
0
        break;
84
0
    }
85
    // msg
86
0
    writer.Key("Message");
87
0
    if (status.ok()) {
88
0
        writer.String("OK");
89
0
    } else {
90
0
        writer.String(status.to_string_no_stack().c_str());
91
0
    }
92
    // number_load_rows
93
0
    writer.Key("NumberTotalRows");
94
0
    writer.Int64(number_total_rows);
95
0
    writer.Key("NumberLoadedRows");
96
0
    writer.Int64(number_loaded_rows);
97
0
    writer.Key("NumberFilteredRows");
98
0
    writer.Int64(number_filtered_rows);
99
0
    writer.Key("NumberUnselectedRows");
100
0
    writer.Int64(number_unselected_rows);
101
0
    writer.Key("LoadBytes");
102
0
    writer.Int64(receive_bytes);
103
0
    writer.Key("LoadTimeMs");
104
0
    writer.Int64(load_cost_millis);
105
0
    if (!group_commit) {
106
0
        writer.Key("BeginTxnTimeMs");
107
0
        writer.Int64(begin_txn_cost_nanos / 1000000);
108
0
    }
109
0
    writer.Key("StreamLoadPutTimeMs");
110
0
    writer.Int64(stream_load_put_cost_nanos / 1000000);
111
0
    writer.Key("ReadDataTimeMs");
112
0
    writer.Int64(read_data_cost_nanos / 1000000);
113
0
    writer.Key("WriteDataTimeMs");
114
0
    writer.Int64(write_data_cost_nanos / 1000000);
115
0
    writer.Key("ReceiveDataTimeMs");
116
0
    writer.Int64((receive_and_read_data_cost_nanos - read_data_cost_nanos) / 1000000);
117
0
    if (!group_commit) {
118
0
        writer.Key("CommitAndPublishTimeMs");
119
0
        writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
120
0
    }
121
122
0
    if (!error_url.empty()) {
123
0
        writer.Key("ErrorURL");
124
0
        writer.String(error_url.c_str());
125
0
    }
126
0
    if (!first_error_msg.empty()) {
127
0
        writer.Key("FirstErrorMsg");
128
0
        writer.String(first_error_msg.c_str());
129
0
    }
130
0
    writer.EndObject();
131
0
    return s.GetString();
132
0
}
133
134
0
std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) {
135
0
    rapidjson::Document document;
136
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
137
0
        LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to "
138
0
                        "client. label="
139
0
                     << label;
140
0
        return "";
141
0
    }
142
0
    rapidjson::Document::AllocatorType& allocator = document.GetAllocator();
143
144
0
    rapidjson::Value cluster_value(rapidjson::kStringType);
145
0
    cluster_value.SetString(auth.cluster.c_str(),
146
0
                            static_cast<rapidjson::SizeType>(auth.cluster.size()));
147
0
    if (!cluster_value.IsNull()) {
148
0
        document.AddMember("cluster", cluster_value, allocator);
149
0
    }
150
151
0
    rapidjson::Value db_value(rapidjson::kStringType);
152
0
    db_value.SetString(db.c_str(), static_cast<rapidjson::SizeType>(db.size()));
153
0
    if (!db_value.IsNull()) {
154
0
        document.AddMember("Db", db_value, allocator);
155
0
    }
156
157
0
    rapidjson::Value table_value(rapidjson::kStringType);
158
0
    table_value.SetString(table.c_str(), static_cast<rapidjson::SizeType>(table.size()));
159
0
    if (!table_value.IsNull()) {
160
0
        document.AddMember("Table", table_value, allocator);
161
0
    }
162
163
0
    rapidjson::Value user_value(rapidjson::kStringType);
164
0
    user_value.SetString(auth.user.c_str(), static_cast<rapidjson::SizeType>(auth.user.size()));
165
0
    if (!user_value.IsNull()) {
166
0
        document.AddMember("User", user_value, allocator);
167
0
    }
168
169
0
    rapidjson::Value client_ip_value(rapidjson::kStringType);
170
0
    client_ip_value.SetString(auth.user_ip.c_str(),
171
0
                              static_cast<rapidjson::SizeType>(auth.user_ip.size()));
172
0
    if (!client_ip_value.IsNull()) {
173
0
        document.AddMember("ClientIp", client_ip_value, allocator);
174
0
    }
175
176
0
    rapidjson::Value comment_value(rapidjson::kStringType);
177
0
    comment_value.SetString(load_comment.c_str(),
178
0
                            static_cast<rapidjson::SizeType>(load_comment.size()));
179
0
    if (!comment_value.IsNull()) {
180
0
        document.AddMember("Comment", comment_value, allocator);
181
0
    }
182
183
0
    document.AddMember("StartTime", start_millis, allocator);
184
0
    document.AddMember("FinishTime", start_millis + load_cost_millis, allocator);
185
0
    rapidjson::StringBuffer buffer;
186
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
187
0
    document.Accept(writer);
188
0
    return buffer.GetString();
189
0
}
190
191
void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record,
192
0
                                                 TStreamLoadRecord& stream_load_item) {
193
0
    rapidjson::Document document;
194
0
    std::stringstream ss;
195
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
196
0
        LOG(WARNING) << "failed to parse json from rocksdb.";
197
0
        return;
198
0
    }
199
200
0
    if (document.HasMember("Label")) {
201
0
        const rapidjson::Value& label = document["Label"];
202
0
        stream_load_item.__set_label(label.GetString());
203
0
        ss << "Label: " << label.GetString();
204
0
    }
205
206
0
    if (document.HasMember("Db")) {
207
0
        const rapidjson::Value& db = document["Db"];
208
0
        stream_load_item.__set_db(db.GetString());
209
0
        ss << ", Db: " << db.GetString();
210
0
    }
211
212
0
    if (document.HasMember("Table")) {
213
0
        const rapidjson::Value& table = document["Table"];
214
0
        stream_load_item.__set_tbl(table.GetString());
215
0
        ss << ", Table: " << table.GetString();
216
0
    }
217
218
0
    if (document.HasMember("User")) {
219
0
        const rapidjson::Value& user = document["User"];
220
0
        stream_load_item.__set_user(user.GetString());
221
0
        ss << ", User: " << user.GetString();
222
0
    }
223
224
0
    if (document.HasMember("ClientIp")) {
225
0
        const rapidjson::Value& client_ip = document["ClientIp"];
226
0
        stream_load_item.__set_user_ip(client_ip.GetString());
227
0
        ss << ", ClientIp: " << client_ip.GetString();
228
0
    }
229
230
0
    if (document.HasMember("Status")) {
231
0
        const rapidjson::Value& status = document["Status"];
232
0
        stream_load_item.__set_status(status.GetString());
233
0
        ss << ", Status: " << status.GetString();
234
0
    }
235
236
0
    if (document.HasMember("Message")) {
237
0
        const rapidjson::Value& message = document["Message"];
238
0
        stream_load_item.__set_message(message.GetString());
239
0
        ss << ", Message: " << message.GetString();
240
0
    }
241
242
0
    if (document.HasMember("ErrorURL")) {
243
0
        const rapidjson::Value& error_url = document["ErrorURL"];
244
0
        stream_load_item.__set_url(error_url.GetString());
245
0
        ss << ", ErrorURL: " << error_url.GetString();
246
0
    } else {
247
0
        stream_load_item.__set_url("N/A");
248
0
        ss << ", ErrorURL: N/A";
249
0
    }
250
251
0
    if (document.HasMember("NumberTotalRows")) {
252
0
        const rapidjson::Value& total_rows = document["NumberTotalRows"];
253
0
        stream_load_item.__set_total_rows(total_rows.GetInt64());
254
0
        ss << ", NumberTotalRows: " << total_rows.GetInt64();
255
0
    }
256
257
0
    if (document.HasMember("NumberLoadedRows")) {
258
0
        const rapidjson::Value& loaded_rows = document["NumberLoadedRows"];
259
0
        stream_load_item.__set_loaded_rows(loaded_rows.GetInt64());
260
0
        ss << ", NumberLoadedRows: " << loaded_rows.GetInt64();
261
0
    }
262
263
0
    if (document.HasMember("NumberFilteredRows")) {
264
0
        const rapidjson::Value& filtered_rows = document["NumberFilteredRows"];
265
0
        stream_load_item.__set_filtered_rows(filtered_rows.GetInt64());
266
0
        ss << ", NumberFilteredRows: " << filtered_rows.GetInt64();
267
0
    }
268
269
0
    if (document.HasMember("NumberUnselectedRows")) {
270
0
        const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"];
271
0
        stream_load_item.__set_unselected_rows(unselected_rows.GetInt64());
272
0
        ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64();
273
0
    }
274
275
0
    if (document.HasMember("LoadBytes")) {
276
0
        const rapidjson::Value& load_bytes = document["LoadBytes"];
277
0
        stream_load_item.__set_load_bytes(load_bytes.GetInt64());
278
0
        ss << ", LoadBytes: " << load_bytes.GetInt64();
279
0
    }
280
281
0
    if (document.HasMember("StartTime")) {
282
0
        const rapidjson::Value& start_time = document["StartTime"];
283
0
        stream_load_item.__set_start_time(start_time.GetInt64());
284
0
        ss << ", StartTime: " << start_time.GetInt64();
285
0
    }
286
287
0
    if (document.HasMember("FinishTime")) {
288
0
        const rapidjson::Value& finish_time = document["FinishTime"];
289
0
        stream_load_item.__set_finish_time(finish_time.GetInt64());
290
0
        ss << ", FinishTime: " << finish_time.GetInt64();
291
0
    }
292
293
0
    if (document.HasMember("Comment")) {
294
0
        const rapidjson::Value& comment_value = document["Comment"];
295
0
        stream_load_item.__set_comment(comment_value.GetString());
296
0
        ss << ", Comment: " << comment_value.GetString();
297
0
    }
298
299
0
    if (document.HasMember("FirstErrorMsg")) {
300
0
        const rapidjson::Value& first_error_msg = document["FirstErrorMsg"];
301
0
        stream_load_item.__set_first_error_msg(first_error_msg.GetString());
302
0
        ss << ", FirstErrorMsg: " << first_error_msg.GetString();
303
0
    } else {
304
0
        stream_load_item.__set_first_error_msg("N/A");
305
0
        ss << ", FirstErrorMsg: N/A";
306
0
    }
307
308
0
    VLOG(1) << "parse json from rocksdb. " << ss.str();
309
0
}
310
311
/*
312
 * The old mini load result format is as follows:
313
 * (which defined in src/util/json_util.cpp)
314
 *
315
 * {
316
 *      "status" : "Success"("Fail"),
317
 *      "msg"    : "xxxx"
318
 * }
319
 *
320
 */
321
0
std::string StreamLoadContext::to_json_for_mini_load() const {
322
0
    rapidjson::StringBuffer s;
323
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
324
0
    writer.StartObject();
325
326
    // status
327
0
    bool show_ok = true;
328
0
    writer.Key("status");
329
0
    switch (status.code()) {
330
0
    case OK:
331
0
        writer.String("Success");
332
0
        break;
333
0
    case PUBLISH_TIMEOUT:
334
        // treat PUBLISH_TIMEOUT as OK in mini load
335
0
        writer.String("Success");
336
0
        break;
337
0
    default:
338
0
        writer.String("Fail");
339
0
        show_ok = false;
340
0
        break;
341
0
    }
342
    // msg
343
0
    writer.Key("msg");
344
0
    if (status.ok() || show_ok) {
345
0
        writer.String("OK");
346
0
    } else {
347
0
        writer.String(status.to_string_no_stack().c_str());
348
0
    }
349
0
    writer.EndObject();
350
0
    return s.GetString();
351
0
}
352
353
7
std::string StreamLoadContext::brief(bool detail) const {
354
7
    std::stringstream ss;
355
7
    ss << "id=" << id << ", job_id=" << job_id << ", txn_id=" << txn_id << ", label=" << label
356
7
       << ", elapse(s)=" << (UnixMillis() - start_millis) / 1000;
357
7
    if (detail) {
358
0
        switch (load_src_type) {
359
0
        case TLoadSourceType::KAFKA:
360
0
            if (kafka_info != nullptr) {
361
0
                ss << ", kafka"
362
0
                   << ", brokers: " << kafka_info->brokers << ", topic: " << kafka_info->topic
363
0
                   << ", partition: ";
364
0
                for (auto& entry : kafka_info->begin_offset) {
365
0
                    ss << "[" << entry.first << ": " << entry.second << "]";
366
0
                }
367
0
            }
368
0
            break;
369
0
        default:
370
0
            break;
371
0
        }
372
0
    }
373
7
    return ss.str();
374
7
}
375
376
0
bool StreamLoadContext::is_mow_table() const {
377
0
    return put_result.__isset.pipeline_params && put_result.pipeline_params.__isset.is_mow_table &&
378
0
           put_result.pipeline_params.is_mow_table;
379
0
}
380
381
#include "common/compile_check_end.h"
382
} // namespace doris