Coverage Report

Created: 2026-03-17 02:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vjdbc_table_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "vjdbc_table_writer.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <stdint.h>
22
23
#include <sstream>
24
25
#include "common/logging.h"
26
#include "core/block/block.h"
27
#include "exprs/vexpr.h"
28
#include "exprs/vexpr_context.h"
29
#include "runtime/runtime_state.h"
30
#include "util/jdbc_utils.h"
31
32
namespace doris {
33
34
0
std::map<std::string, std::string> VJdbcTableWriter::_build_writer_params(const TDataSink& t_sink) {
35
0
    const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
36
0
    std::map<std::string, std::string> params;
37
38
0
    params["jdbc_url"] = t_jdbc_sink.jdbc_table.jdbc_url;
39
0
    params["jdbc_user"] = t_jdbc_sink.jdbc_table.jdbc_user;
40
0
    params["jdbc_password"] = t_jdbc_sink.jdbc_table.jdbc_password;
41
0
    params["jdbc_driver_class"] = t_jdbc_sink.jdbc_table.jdbc_driver_class;
42
    // Resolve jdbc_driver_url to absolute file:// URL
43
0
    std::string driver_url;
44
0
    auto resolve_st =
45
0
            JdbcUtils::resolve_driver_url(t_jdbc_sink.jdbc_table.jdbc_driver_url, &driver_url);
46
0
    if (!resolve_st.ok()) {
47
0
        LOG(WARNING) << "Failed to resolve JDBC driver URL: " << resolve_st.to_string();
48
0
        driver_url = t_jdbc_sink.jdbc_table.jdbc_driver_url;
49
0
    }
50
0
    params["jdbc_driver_url"] = driver_url;
51
52
0
    params["jdbc_driver_checksum"] = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
53
0
    params["insert_sql"] = t_jdbc_sink.insert_sql;
54
0
    params["use_transaction"] = t_jdbc_sink.use_transaction ? "true" : "false";
55
0
    params["catalog_id"] = std::to_string(t_jdbc_sink.jdbc_table.catalog_id);
56
0
    params["connection_pool_min_size"] =
57
0
            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_min_size);
58
0
    params["connection_pool_max_size"] =
59
0
            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_size);
60
0
    params["connection_pool_max_wait_time"] =
61
0
            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_wait_time);
62
0
    params["connection_pool_max_life_time"] =
63
0
            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_life_time);
64
0
    params["connection_pool_keep_alive"] =
65
0
            t_jdbc_sink.jdbc_table.connection_pool_keep_alive ? "true" : "false";
66
67
0
    return params;
68
0
}
69
70
VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
71
                                   const VExprContextSPtrs& output_expr_ctxs,
72
                                   std::shared_ptr<Dependency> dep,
73
                                   std::shared_ptr<Dependency> fin_dep)
74
0
        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
75
0
          _writer_params(_build_writer_params(t_sink)),
76
0
          _use_transaction(t_sink.jdbc_table_sink.use_transaction) {}
77
78
0
Status VJdbcTableWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
79
0
    _writer = std::make_unique<VJniFormatTransformer>(
80
0
            state, _vec_output_expr_ctxs, "org/apache/doris/jdbc/JdbcJniWriter", _writer_params);
81
0
    return _writer->open();
82
0
}
83
84
0
Status VJdbcTableWriter::write(RuntimeState* state, Block& block) {
85
0
    Block output_block;
86
0
    RETURN_IF_ERROR(_projection_block(block, &output_block));
87
88
0
    if (output_block.rows() == 0) {
89
0
        return Status::OK();
90
0
    }
91
92
0
    return _writer->write(output_block);
93
0
}
94
95
0
Status VJdbcTableWriter::finish(RuntimeState* state) {
96
0
    if (!_use_transaction || !_writer) {
97
0
        return Status::OK();
98
0
    }
99
100
    // Transaction commit is handled in JdbcJniWriter.close() on the Java side.
101
    // When useTransaction=true, close() calls conn.commit() before closing the connection.
102
0
    return Status::OK();
103
0
}
104
105
0
Status VJdbcTableWriter::close(Status s) {
106
0
    if (_writer) {
107
0
        return _writer->close();
108
0
    }
109
0
    return Status::OK();
110
0
}
111
112
} // namespace doris