Coverage Report

Created: 2025-04-16 14:29

/root/doris/be/src/exec/data_sink.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/data-sink.cc
19
// and modified by Doris
20
21
#include "exec/data_sink.h"
22
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <glog/logging.h>
26
27
#include <map>
28
#include <memory>
29
#include <ostream>
30
#include <string>
31
32
#include "common/config.h"
33
#include "runtime/query_context.h"
34
#include "runtime/query_statistics.h"
35
#include "vec/sink/async_writer_sink.h"
36
#include "vec/sink/group_commit_block_sink.h"
37
#include "vec/sink/multi_cast_data_stream_sink.h"
38
#include "vec/sink/vdata_stream_sender.h"
39
#include "vec/sink/vhive_table_sink.h"
40
#include "vec/sink/viceberg_table_sink.h"
41
#include "vec/sink/vmemory_scratch_sink.h"
42
#include "vec/sink/volap_table_sink.h"
43
#include "vec/sink/volap_table_sink_v2.h"
44
#include "vec/sink/vresult_file_sink.h"
45
#include "vec/sink/vresult_sink.h"
46
47
namespace doris {
48
class DescriptorTbl;
49
class TExpr;
50
51
5
DataSink::DataSink(const RowDescriptor& desc) : _row_desc(desc) {
52
5
    _query_statistics = std::make_shared<QueryStatistics>();
53
5
}
54
55
0
std::shared_ptr<QueryStatistics> DataSink::get_query_statistics_ptr() {
56
0
    return _query_statistics;
57
0
}
58
59
Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
60
                                  const std::vector<TExpr>& output_exprs,
61
                                  const TPlanFragmentExecParams& params,
62
                                  const RowDescriptor& row_desc, RuntimeState* state,
63
0
                                  std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) {
64
0
    switch (thrift_sink.type) {
65
0
    case TDataSinkType::DATA_STREAM_SINK: {
66
0
        if (!thrift_sink.__isset.stream_sink) {
67
0
            return Status::InternalError("Missing data stream sink.");
68
0
        }
69
        // TODO: figure out good buffer size based on size of output row
70
0
        sink->reset(new vectorized::VDataStreamSender(state, pool, params.sender_id, row_desc,
71
0
                                                      thrift_sink.stream_sink,
72
0
                                                      params.destinations));
73
        // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
74
0
        break;
75
0
    }
76
0
    case TDataSinkType::RESULT_SINK: {
77
0
        if (!thrift_sink.__isset.result_sink) {
78
0
            return Status::InternalError("Missing data buffer sink.");
79
0
        }
80
81
0
        int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
82
0
        if (!thrift_sink.result_sink.__isset.type ||
83
0
            thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
84
0
            result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
85
0
        }
86
87
        // TODO: figure out good buffer size based on size of output row
88
0
        sink->reset(new doris::vectorized::VResultSink(
89
0
                row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows));
90
0
        break;
91
0
    }
92
0
    case TDataSinkType::RESULT_FILE_SINK: {
93
0
        if (!thrift_sink.__isset.result_file_sink) {
94
0
            return Status::InternalError("Missing result file sink.");
95
0
        }
96
97
        // TODO: figure out good buffer size based on size of output row
98
        // Result file sink is not the top sink
99
0
        if (params.__isset.destinations && params.destinations.size() > 0) {
100
0
            sink->reset(new doris::vectorized::VResultFileSink(
101
0
                    state, pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
102
0
                    params.destinations, output_exprs, desc_tbl));
103
0
        } else {
104
0
            sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs));
105
0
        }
106
0
        break;
107
0
    }
108
0
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
109
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
110
0
            return Status::InternalError("Missing data buffer sink.");
111
0
        }
112
113
0
        sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
114
0
        break;
115
0
    }
116
0
    case TDataSinkType::MYSQL_TABLE_SINK: {
117
0
#ifdef DORIS_WITH_MYSQL
118
0
        if (!thrift_sink.__isset.mysql_table_sink) {
119
0
            return Status::InternalError("Missing data buffer sink.");
120
0
        }
121
0
        vectorized::VMysqlTableSink* vmysql_tbl_sink =
122
0
                new vectorized::VMysqlTableSink(row_desc, output_exprs);
123
0
        sink->reset(vmysql_tbl_sink);
124
0
        break;
125
#else
126
        return Status::InternalError(
127
                "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
128
#endif
129
0
    }
130
0
    case TDataSinkType::ODBC_TABLE_SINK: {
131
0
        if (!thrift_sink.__isset.odbc_table_sink) {
132
0
            return Status::InternalError("Missing data odbc sink.");
133
0
        }
134
0
        sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
135
0
        break;
136
0
    }
137
138
0
    case TDataSinkType::JDBC_TABLE_SINK: {
139
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
140
0
            return Status::InternalError("Missing data jdbc sink.");
141
0
        }
142
0
        if (config::enable_java_support) {
143
0
            sink->reset(new vectorized::VJdbcTableSink(row_desc, output_exprs));
144
0
        } else {
145
0
            return Status::InternalError(
146
0
                    "Jdbc table sink is not enabled, you can change be config "
147
0
                    "enable_java_support to true and restart be.");
148
0
        }
149
0
        break;
150
0
    }
151
152
0
    case TDataSinkType::EXPORT_SINK: {
153
0
        RETURN_ERROR_IF_NON_VEC;
154
0
        break;
155
0
    }
156
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
157
0
    case TDataSinkType::OLAP_TABLE_SINK: {
158
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
159
0
        if (state->query_options().enable_memtable_on_sink_node &&
160
0
            !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
161
0
            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs));
162
0
        } else {
163
0
            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs));
164
0
        }
165
0
        break;
166
0
    }
167
0
    case TDataSinkType::HIVE_TABLE_SINK: {
168
0
        if (!thrift_sink.__isset.hive_table_sink) {
169
0
            return Status::InternalError("Missing hive table sink.");
170
0
        }
171
0
        sink->reset(new vectorized::VHiveTableSink(pool, row_desc, output_exprs));
172
0
        break;
173
0
    }
174
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
175
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
176
0
            return Status::InternalError("Missing iceberg table sink.");
177
0
        }
178
0
        sink->reset(new vectorized::VIcebergTableSink(pool, row_desc, output_exprs));
179
0
        break;
180
0
    }
181
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
182
0
        Status status = Status::OK();
183
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
184
0
#ifndef NDEBUG
185
0
        DCHECK(state->get_query_ctx() != nullptr);
186
0
        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
187
0
#endif
188
0
        sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
189
0
        RETURN_IF_ERROR(status);
190
0
        break;
191
0
    }
192
0
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
193
0
        return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine");
194
0
    }
195
196
0
    default: {
197
0
        std::stringstream error_msg;
198
0
        std::map<int, const char*>::const_iterator i =
199
0
                _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
200
0
        const char* str = "Unknown data sink type ";
201
202
0
        if (i != _TDataSinkType_VALUES_TO_NAMES.end()) {
203
0
            str = i->second;
204
0
        }
205
206
0
        error_msg << str << " not implemented.";
207
0
        return Status::InternalError(error_msg.str());
208
0
    }
209
0
    }
210
211
0
    if (*sink != nullptr) {
212
0
        RETURN_IF_ERROR((*sink)->init(thrift_sink));
213
0
        if (state->get_query_ctx()) {
214
0
            state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
215
0
        }
216
0
    }
217
218
0
    return Status::OK();
219
0
}
220
221
Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
222
                                  const std::vector<TExpr>& output_exprs,
223
                                  const TPipelineFragmentParams& params,
224
                                  const size_t& local_param_idx, const RowDescriptor& row_desc,
225
                                  RuntimeState* state, std::unique_ptr<DataSink>* sink,
226
0
                                  DescriptorTbl& desc_tbl) {
227
0
    const auto& local_params = params.local_params[local_param_idx];
228
0
    switch (thrift_sink.type) {
229
0
    case TDataSinkType::DATA_STREAM_SINK: {
230
0
        if (!thrift_sink.__isset.stream_sink) {
231
0
            return Status::InternalError("Missing data stream sink.");
232
0
        }
233
        // TODO: figure out good buffer size based on size of output row
234
0
        *sink = std::make_unique<vectorized::VDataStreamSender>(state, pool, local_params.sender_id,
235
0
                                                                row_desc, thrift_sink.stream_sink,
236
0
                                                                params.destinations);
237
        // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
238
0
        break;
239
0
    }
240
0
    case TDataSinkType::RESULT_SINK: {
241
0
        if (!thrift_sink.__isset.result_sink) {
242
0
            return Status::InternalError("Missing data buffer sink.");
243
0
        }
244
245
0
        int result_sink_buffer_size_rows = vectorized::RESULT_SINK_BUFFER_SIZE;
246
0
        if (!thrift_sink.result_sink.__isset.type ||
247
0
            thrift_sink.result_sink.type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
248
0
            result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
249
0
        }
250
251
        // TODO: figure out good buffer size based on size of output row
252
0
        sink->reset(new doris::vectorized::VResultSink(
253
0
                row_desc, output_exprs, thrift_sink.result_sink, result_sink_buffer_size_rows));
254
0
        break;
255
0
    }
256
0
    case TDataSinkType::RESULT_FILE_SINK: {
257
0
        if (!thrift_sink.__isset.result_file_sink) {
258
0
            return Status::InternalError("Missing result file sink.");
259
0
        }
260
261
        // TODO: figure out good buffer size based on size of output row
262
        // Result file sink is not the top sink
263
0
        if (params.__isset.destinations && params.destinations.size() > 0) {
264
0
            sink->reset(new doris::vectorized::VResultFileSink(
265
0
                    state, pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
266
0
                    params.destinations, output_exprs, desc_tbl));
267
0
        } else {
268
0
            sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs));
269
0
        }
270
0
        break;
271
0
    }
272
0
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
273
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
274
0
            return Status::InternalError("Missing data buffer sink.");
275
0
        }
276
277
0
        sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
278
0
        break;
279
0
    }
280
0
    case TDataSinkType::MYSQL_TABLE_SINK: {
281
0
#ifdef DORIS_WITH_MYSQL
282
0
        if (!thrift_sink.__isset.mysql_table_sink) {
283
0
            return Status::InternalError("Missing data buffer sink.");
284
0
        }
285
0
        vectorized::VMysqlTableSink* vmysql_tbl_sink =
286
0
                new vectorized::VMysqlTableSink(row_desc, output_exprs);
287
0
        sink->reset(vmysql_tbl_sink);
288
0
        break;
289
#else
290
        return Status::InternalError(
291
                "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
292
#endif
293
0
    }
294
0
    case TDataSinkType::ODBC_TABLE_SINK: {
295
0
        if (!thrift_sink.__isset.odbc_table_sink) {
296
0
            return Status::InternalError("Missing data odbc sink.");
297
0
        }
298
0
        sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs));
299
0
        break;
300
0
    }
301
302
0
    case TDataSinkType::JDBC_TABLE_SINK: {
303
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
304
0
            return Status::InternalError("Missing data jdbc sink.");
305
0
        }
306
0
        if (config::enable_java_support) {
307
0
            sink->reset(new vectorized::VJdbcTableSink(row_desc, output_exprs));
308
0
        } else {
309
0
            return Status::InternalError(
310
0
                    "Jdbc table sink is not enabled, you can change be config "
311
0
                    "enable_java_support to true and restart be.");
312
0
        }
313
0
        break;
314
0
    }
315
316
0
    case TDataSinkType::EXPORT_SINK: {
317
0
        RETURN_ERROR_IF_NON_VEC;
318
0
        break;
319
0
    }
320
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
321
0
    case TDataSinkType::OLAP_TABLE_SINK: {
322
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
323
0
        if (state->query_options().enable_memtable_on_sink_node &&
324
0
            !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
325
0
            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs));
326
0
        } else {
327
0
            sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs));
328
0
        }
329
0
        break;
330
0
    }
331
0
    case TDataSinkType::HIVE_TABLE_SINK: {
332
0
        if (!thrift_sink.__isset.hive_table_sink) {
333
0
            return Status::InternalError("Missing hive table sink.");
334
0
        }
335
0
        sink->reset(new vectorized::VHiveTableSink(pool, row_desc, output_exprs));
336
0
        break;
337
0
    }
338
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
339
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
340
0
            return Status::InternalError("Missing iceberg table sink.");
341
0
        }
342
0
        sink->reset(new vectorized::VIcebergTableSink(pool, row_desc, output_exprs));
343
0
        break;
344
0
    }
345
0
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
346
0
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
347
0
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
348
0
        auto multi_cast_data_streamer = std::make_shared<pipeline::MultiCastDataStreamer>(
349
0
                row_desc, pool, thrift_sink.multi_cast_stream_sink.sinks.size());
350
0
        sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer));
351
0
        break;
352
0
    }
353
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
354
0
        Status status = Status::OK();
355
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
356
0
#ifndef NDEBUG
357
0
        DCHECK(state->get_query_ctx() != nullptr);
358
0
        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
359
0
#endif
360
0
        sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
361
0
        RETURN_IF_ERROR(status);
362
0
        break;
363
0
    }
364
365
0
    default: {
366
0
        std::stringstream error_msg;
367
0
        std::map<int, const char*>::const_iterator i =
368
0
                _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
369
0
        const char* str = "Unknown data sink type ";
370
371
0
        if (i != _TDataSinkType_VALUES_TO_NAMES.end()) {
372
0
            str = i->second;
373
0
        }
374
375
0
        error_msg << str << " not implemented.";
376
0
        return Status::InternalError(error_msg.str());
377
0
    }
378
0
    }
379
380
0
    if (*sink != nullptr) {
381
0
        RETURN_IF_ERROR((*sink)->init(thrift_sink));
382
0
        RETURN_IF_ERROR((*sink)->prepare(state));
383
0
        if (state->get_query_ctx()) {
384
0
            state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
385
0
        }
386
0
    }
387
388
0
    return Status::OK();
389
0
}
390
391
5
Status DataSink::init(const TDataSink& thrift_sink) {
392
5
    return Status::OK();
393
5
}
394
395
5
Status DataSink::prepare(RuntimeState* state) {
396
5
    return Status::OK();
397
5
}
398
} // namespace doris