Coverage Report

Created: 2026-03-15 20:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/stream_load/stream_load_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/stream_load/stream_load_context.h"
19
20
#include <gen_cpp/BackendService_types.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/prettywriter.h>
24
#include <rapidjson/rapidjson.h>
25
#include <rapidjson/stringbuffer.h>
26
#include <rapidjson/writer.h>
27
28
#include <new>
29
#include <sstream>
30
31
#include "common/logging.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
using namespace ErrorCode;
36
37
0
std::string StreamLoadContext::to_json() const {
38
0
    rapidjson::StringBuffer s;
39
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
40
41
0
    writer.StartObject();
42
    // txn id
43
0
    writer.Key("TxnId");
44
0
    writer.Int64(txn_id);
45
46
    // label
47
0
    writer.Key("Label");
48
0
    writer.String(label.c_str());
49
50
    // comment
51
0
    writer.Key("Comment");
52
0
    writer.String(load_comment.c_str());
53
54
0
    if (!group_commit) {
55
0
        writer.Key("TwoPhaseCommit");
56
0
        std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
57
0
        writer.String(need_two_phase_commit.c_str());
58
0
    } else {
59
0
        writer.Key("GroupCommit");
60
0
        writer.Bool(true);
61
0
    }
62
63
    // status
64
0
    writer.Key("Status");
65
0
    switch (status.code()) {
66
0
    case OK:
67
0
        writer.String("Success");
68
0
        break;
69
0
    case PUBLISH_TIMEOUT:
70
0
        writer.String("Publish Timeout");
71
0
        break;
72
0
    case LABEL_ALREADY_EXISTS:
73
0
        writer.String("Label Already Exists");
74
0
        writer.Key("ExistingJobStatus");
75
0
        writer.String(existing_job_status.c_str());
76
0
        break;
77
0
    default:
78
0
        writer.String("Fail");
79
0
        break;
80
0
    }
81
    // msg
82
0
    writer.Key("Message");
83
0
    if (status.ok()) {
84
0
        writer.String("OK");
85
0
    } else {
86
0
        writer.String(status.to_string_no_stack().c_str());
87
0
    }
88
    // number_load_rows
89
0
    writer.Key("NumberTotalRows");
90
0
    writer.Int64(number_total_rows);
91
0
    writer.Key("NumberLoadedRows");
92
0
    writer.Int64(number_loaded_rows);
93
0
    writer.Key("NumberFilteredRows");
94
0
    writer.Int64(number_filtered_rows);
95
0
    writer.Key("NumberUnselectedRows");
96
0
    writer.Int64(number_unselected_rows);
97
0
    writer.Key("LoadBytes");
98
0
    writer.Int64(receive_bytes);
99
0
    writer.Key("LoadTimeMs");
100
0
    writer.Int64(load_cost_millis);
101
0
    if (!group_commit) {
102
0
        writer.Key("BeginTxnTimeMs");
103
0
        writer.Int64(begin_txn_cost_nanos / 1000000);
104
0
    }
105
0
    writer.Key("StreamLoadPutTimeMs");
106
0
    writer.Int64(stream_load_put_cost_nanos / 1000000);
107
0
    writer.Key("ReadDataTimeMs");
108
0
    writer.Int64(read_data_cost_nanos / 1000000);
109
0
    writer.Key("WriteDataTimeMs");
110
0
    writer.Int64(write_data_cost_nanos / 1000000);
111
0
    writer.Key("ReceiveDataTimeMs");
112
0
    writer.Int64((receive_and_read_data_cost_nanos - read_data_cost_nanos) / 1000000);
113
0
    if (!group_commit) {
114
0
        writer.Key("CommitAndPublishTimeMs");
115
0
        writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
116
0
    }
117
118
0
    if (!error_url.empty()) {
119
0
        writer.Key("ErrorURL");
120
0
        writer.String(error_url.c_str());
121
0
    }
122
0
    if (!first_error_msg.empty()) {
123
0
        writer.Key("FirstErrorMsg");
124
0
        writer.String(first_error_msg.c_str());
125
0
    }
126
0
    writer.EndObject();
127
0
    return s.GetString();
128
0
}
129
130
0
std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) {
131
0
    rapidjson::Document document;
132
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
133
0
        LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to "
134
0
                        "client. label="
135
0
                     << label;
136
0
        return "";
137
0
    }
138
0
    rapidjson::Document::AllocatorType& allocator = document.GetAllocator();
139
140
0
    rapidjson::Value cluster_value(rapidjson::kStringType);
141
0
    cluster_value.SetString(auth.cluster.c_str(),
142
0
                            static_cast<rapidjson::SizeType>(auth.cluster.size()));
143
0
    if (!cluster_value.IsNull()) {
144
0
        document.AddMember("cluster", cluster_value, allocator);
145
0
    }
146
147
0
    rapidjson::Value db_value(rapidjson::kStringType);
148
0
    db_value.SetString(db.c_str(), static_cast<rapidjson::SizeType>(db.size()));
149
0
    if (!db_value.IsNull()) {
150
0
        document.AddMember("Db", db_value, allocator);
151
0
    }
152
153
0
    rapidjson::Value table_value(rapidjson::kStringType);
154
0
    table_value.SetString(table.c_str(), static_cast<rapidjson::SizeType>(table.size()));
155
0
    if (!table_value.IsNull()) {
156
0
        document.AddMember("Table", table_value, allocator);
157
0
    }
158
159
0
    rapidjson::Value user_value(rapidjson::kStringType);
160
0
    user_value.SetString(auth.user.c_str(), static_cast<rapidjson::SizeType>(auth.user.size()));
161
0
    if (!user_value.IsNull()) {
162
0
        document.AddMember("User", user_value, allocator);
163
0
    }
164
165
0
    rapidjson::Value client_ip_value(rapidjson::kStringType);
166
0
    client_ip_value.SetString(auth.user_ip.c_str(),
167
0
                              static_cast<rapidjson::SizeType>(auth.user_ip.size()));
168
0
    if (!client_ip_value.IsNull()) {
169
0
        document.AddMember("ClientIp", client_ip_value, allocator);
170
0
    }
171
172
0
    rapidjson::Value comment_value(rapidjson::kStringType);
173
0
    comment_value.SetString(load_comment.c_str(),
174
0
                            static_cast<rapidjson::SizeType>(load_comment.size()));
175
0
    if (!comment_value.IsNull()) {
176
0
        document.AddMember("Comment", comment_value, allocator);
177
0
    }
178
179
0
    document.AddMember("StartTime", start_millis, allocator);
180
0
    document.AddMember("FinishTime", start_millis + load_cost_millis, allocator);
181
0
    rapidjson::StringBuffer buffer;
182
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
183
0
    document.Accept(writer);
184
0
    return buffer.GetString();
185
0
}
186
187
void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record,
188
0
                                                 TStreamLoadRecord& stream_load_item) {
189
0
    rapidjson::Document document;
190
0
    std::stringstream ss;
191
0
    if (document.Parse(stream_load_record.data(), stream_load_record.length()).HasParseError()) {
192
0
        LOG(WARNING) << "failed to parse json from rocksdb.";
193
0
        return;
194
0
    }
195
196
0
    if (document.HasMember("Label")) {
197
0
        const rapidjson::Value& label = document["Label"];
198
0
        stream_load_item.__set_label(label.GetString());
199
0
        ss << "Label: " << label.GetString();
200
0
    }
201
202
0
    if (document.HasMember("Db")) {
203
0
        const rapidjson::Value& db = document["Db"];
204
0
        stream_load_item.__set_db(db.GetString());
205
0
        ss << ", Db: " << db.GetString();
206
0
    }
207
208
0
    if (document.HasMember("Table")) {
209
0
        const rapidjson::Value& table = document["Table"];
210
0
        stream_load_item.__set_tbl(table.GetString());
211
0
        ss << ", Table: " << table.GetString();
212
0
    }
213
214
0
    if (document.HasMember("User")) {
215
0
        const rapidjson::Value& user = document["User"];
216
0
        stream_load_item.__set_user(user.GetString());
217
0
        ss << ", User: " << user.GetString();
218
0
    }
219
220
0
    if (document.HasMember("ClientIp")) {
221
0
        const rapidjson::Value& client_ip = document["ClientIp"];
222
0
        stream_load_item.__set_user_ip(client_ip.GetString());
223
0
        ss << ", ClientIp: " << client_ip.GetString();
224
0
    }
225
226
0
    if (document.HasMember("Status")) {
227
0
        const rapidjson::Value& status = document["Status"];
228
0
        stream_load_item.__set_status(status.GetString());
229
0
        ss << ", Status: " << status.GetString();
230
0
    }
231
232
0
    if (document.HasMember("Message")) {
233
0
        const rapidjson::Value& message = document["Message"];
234
0
        stream_load_item.__set_message(message.GetString());
235
0
        ss << ", Message: " << message.GetString();
236
0
    }
237
238
0
    if (document.HasMember("ErrorURL")) {
239
0
        const rapidjson::Value& error_url = document["ErrorURL"];
240
0
        stream_load_item.__set_url(error_url.GetString());
241
0
        ss << ", ErrorURL: " << error_url.GetString();
242
0
    } else {
243
0
        stream_load_item.__set_url("N/A");
244
0
        ss << ", ErrorURL: N/A";
245
0
    }
246
247
0
    if (document.HasMember("NumberTotalRows")) {
248
0
        const rapidjson::Value& total_rows = document["NumberTotalRows"];
249
0
        stream_load_item.__set_total_rows(total_rows.GetInt64());
250
0
        ss << ", NumberTotalRows: " << total_rows.GetInt64();
251
0
    }
252
253
0
    if (document.HasMember("NumberLoadedRows")) {
254
0
        const rapidjson::Value& loaded_rows = document["NumberLoadedRows"];
255
0
        stream_load_item.__set_loaded_rows(loaded_rows.GetInt64());
256
0
        ss << ", NumberLoadedRows: " << loaded_rows.GetInt64();
257
0
    }
258
259
0
    if (document.HasMember("NumberFilteredRows")) {
260
0
        const rapidjson::Value& filtered_rows = document["NumberFilteredRows"];
261
0
        stream_load_item.__set_filtered_rows(filtered_rows.GetInt64());
262
0
        ss << ", NumberFilteredRows: " << filtered_rows.GetInt64();
263
0
    }
264
265
0
    if (document.HasMember("NumberUnselectedRows")) {
266
0
        const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"];
267
0
        stream_load_item.__set_unselected_rows(unselected_rows.GetInt64());
268
0
        ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64();
269
0
    }
270
271
0
    if (document.HasMember("LoadBytes")) {
272
0
        const rapidjson::Value& load_bytes = document["LoadBytes"];
273
0
        stream_load_item.__set_load_bytes(load_bytes.GetInt64());
274
0
        ss << ", LoadBytes: " << load_bytes.GetInt64();
275
0
    }
276
277
0
    if (document.HasMember("StartTime")) {
278
0
        const rapidjson::Value& start_time = document["StartTime"];
279
0
        stream_load_item.__set_start_time(start_time.GetInt64());
280
0
        ss << ", StartTime: " << start_time.GetInt64();
281
0
    }
282
283
0
    if (document.HasMember("FinishTime")) {
284
0
        const rapidjson::Value& finish_time = document["FinishTime"];
285
0
        stream_load_item.__set_finish_time(finish_time.GetInt64());
286
0
        ss << ", FinishTime: " << finish_time.GetInt64();
287
0
    }
288
289
0
    if (document.HasMember("Comment")) {
290
0
        const rapidjson::Value& comment_value = document["Comment"];
291
0
        stream_load_item.__set_comment(comment_value.GetString());
292
0
        ss << ", Comment: " << comment_value.GetString();
293
0
    }
294
295
0
    if (document.HasMember("FirstErrorMsg")) {
296
0
        const rapidjson::Value& first_error_msg = document["FirstErrorMsg"];
297
0
        stream_load_item.__set_first_error_msg(first_error_msg.GetString());
298
0
        ss << ", FirstErrorMsg: " << first_error_msg.GetString();
299
0
    } else {
300
0
        stream_load_item.__set_first_error_msg("N/A");
301
0
        ss << ", FirstErrorMsg: N/A";
302
0
    }
303
304
0
    VLOG(1) << "parse json from rocksdb. " << ss.str();
305
0
}
306
307
/*
308
 * The old mini load result format is as follows:
309
 * (which defined in src/util/json_util.cpp)
310
 *
311
 * {
312
 *      "status" : "Success"("Fail"),
313
 *      "msg"    : "xxxx"
314
 * }
315
 *
316
 */
317
0
std::string StreamLoadContext::to_json_for_mini_load() const {
318
0
    rapidjson::StringBuffer s;
319
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
320
0
    writer.StartObject();
321
322
    // status
323
0
    bool show_ok = true;
324
0
    writer.Key("status");
325
0
    switch (status.code()) {
326
0
    case OK:
327
0
        writer.String("Success");
328
0
        break;
329
0
    case PUBLISH_TIMEOUT:
330
        // treat PUBLISH_TIMEOUT as OK in mini load
331
0
        writer.String("Success");
332
0
        break;
333
0
    default:
334
0
        writer.String("Fail");
335
0
        show_ok = false;
336
0
        break;
337
0
    }
338
    // msg
339
0
    writer.Key("msg");
340
0
    if (status.ok() || show_ok) {
341
0
        writer.String("OK");
342
0
    } else {
343
0
        writer.String(status.to_string_no_stack().c_str());
344
0
    }
345
0
    writer.EndObject();
346
0
    return s.GetString();
347
0
}
348
349
7
std::string StreamLoadContext::brief(bool detail) const {
350
7
    std::stringstream ss;
351
7
    ss << "id=" << id << ", job_id=" << job_id << ", txn_id=" << txn_id << ", label=" << label
352
7
       << ", elapse(s)=" << (UnixMillis() - start_millis) / 1000;
353
7
    if (detail) {
354
0
        switch (load_src_type) {
355
0
        case TLoadSourceType::KAFKA:
356
0
            if (kafka_info != nullptr) {
357
0
                ss << ", kafka"
358
0
                   << ", brokers: " << kafka_info->brokers << ", topic: " << kafka_info->topic
359
0
                   << ", partition: ";
360
0
                for (auto& entry : kafka_info->begin_offset) {
361
0
                    ss << "[" << entry.first << ": " << entry.second << "]";
362
0
                }
363
0
            }
364
0
            break;
365
0
        default:
366
0
            break;
367
0
        }
368
0
    }
369
7
    return ss.str();
370
7
}
371
372
0
bool StreamLoadContext::is_mow_table() const {
373
0
    return put_result.__isset.pipeline_params && put_result.pipeline_params.__isset.is_mow_table &&
374
0
           put_result.pipeline_params.is_mow_table;
375
0
}
376
377
#include "common/compile_check_end.h"
378
} // namespace doris