Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/es/es_scroll_parser.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/es/es_scroll_parser.h"
19
20
#include <absl/strings/substitute.h>
21
#include <cctz/time_zone.h>
22
#include <glog/logging.h>
23
#include <rapidjson/allocators.h>
24
#include <rapidjson/encodings.h>
25
#include <stdint.h>
26
#include <string.h>
27
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <chrono> // IWYU pragma: keep
30
#include <cstdlib>
31
#include <ostream>
32
#include <string>
33
34
#include "common/status.h"
35
#include "core/binary_cast.hpp"
36
#include "core/column/column.h"
37
#include "core/column/column_nullable.h"
38
#include "core/data_type/data_type_array.h"
39
#include "core/data_type/data_type_nullable.h"
40
#include "core/data_type/define_primitive_type.h"
41
#include "core/data_type/primitive_type.h"
42
#include "core/field.h"
43
#include "core/value/decimalv2_value.h"
44
#include "core/value/jsonb_value.h"
45
#include "core/value/vdatetime_value.h"
46
#include "rapidjson/document.h"
47
#include "rapidjson/rapidjson.h"
48
#include "rapidjson/stringbuffer.h"
49
#include "rapidjson/writer.h"
50
#include "runtime/descriptors.h"
51
#include "util/string_parser.hpp"
52
53
namespace doris {
54
#include "common/compile_check_begin.h"
55
56
static const char* FIELD_SCROLL_ID = "_scroll_id";
57
static const char* FIELD_HITS = "hits";
58
static const char* FIELD_INNER_HITS = "hits";
59
static const char* FIELD_SOURCE = "_source";
60
static const char* FIELD_ID = "_id";
61
62
// get the original json data type
63
0
std::string json_type_to_string(rapidjson::Type type) {
64
0
    switch (type) {
65
0
    case rapidjson::kNumberType:
66
0
        return "Number";
67
0
    case rapidjson::kStringType:
68
0
        return "Varchar/Char";
69
0
    case rapidjson::kArrayType:
70
0
        return "Array";
71
0
    case rapidjson::kObjectType:
72
0
        return "Object";
73
0
    case rapidjson::kNullType:
74
0
        return "Null Type";
75
0
    case rapidjson::kFalseType:
76
0
    case rapidjson::kTrueType:
77
0
        return "True/False";
78
0
    default:
79
0
        return "Unknown Type";
80
0
    }
81
0
}
82
83
// transfer rapidjson::Value to string representation
84
0
std::string json_value_to_string(const rapidjson::Value& value) {
85
0
    rapidjson::StringBuffer scratch_buffer;
86
0
    rapidjson::Writer<rapidjson::StringBuffer> temp_writer(scratch_buffer);
87
0
    value.Accept(temp_writer);
88
0
    return scratch_buffer.GetString();
89
0
}
90
91
static const std::string ERROR_INVALID_COL_DATA =
92
        "Data source returned inconsistent column data. "
93
        "Expected value of type {} based on column metadata. This likely indicates a "
94
        "problem with the data source library.";
95
static const std::string ERROR_MEM_LIMIT_EXCEEDED =
96
        "DataSourceScanNode::$0() failed to allocate "
97
        "$1 bytes for $2.";
98
static const std::string ERROR_COL_DATA_IS_ARRAY =
99
        "Data source returned an array for the type $0"
100
        "based on column metadata.";
101
static const std::string INVALID_NULL_VALUE =
102
        "Invalid null value occurs: Non-null column `$0` contains NULL";
103
104
#define RETURN_ERROR_IF_COL_IS_ARRAY(col, type, is_array)                    \
105
0
    do {                                                                     \
106
0
        if (col.IsArray() == is_array) {                                     \
107
0
            std::stringstream ss;                                            \
108
0
            ss << "Expected value of type: " << type_to_string(type)         \
109
0
               << "; but found type: " << json_type_to_string(col.GetType()) \
110
0
               << "; Document slice is : " << json_value_to_string(col);     \
111
0
            return Status::RuntimeError(ss.str());                           \
112
0
        }                                                                    \
113
0
    } while (false)
114
115
#define RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type)                            \
116
0
    do {                                                                        \
117
0
        if (!col.IsString()) {                                                  \
118
0
            std::stringstream ss;                                               \
119
0
            ss << "Expected value of type: " << type_to_string(type)            \
120
0
               << "; but found type: " << json_type_to_string(col.GetType())    \
121
0
               << "; Document source slice is : " << json_value_to_string(col); \
122
0
            return Status::RuntimeError(ss.str());                              \
123
0
        }                                                                       \
124
0
    } while (false)
125
126
#define RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col, type)                         \
127
0
    do {                                                                     \
128
0
        if (!col.IsNumber()) {                                               \
129
0
            std::stringstream ss;                                            \
130
0
            ss << "Expected value of type: " << type_to_string(type)         \
131
0
               << "; but found type: " << json_type_to_string(col.GetType()) \
132
0
               << "; Document value is: " << json_value_to_string(col);      \
133
0
            return Status::RuntimeError(ss.str());                           \
134
0
        }                                                                    \
135
0
    } while (false)
136
137
#define RETURN_ERROR_IF_PARSING_FAILED(result, col, type)                       \
138
0
    do {                                                                        \
139
0
        if (result != StringParser::PARSE_SUCCESS) {                            \
140
0
            std::stringstream ss;                                               \
141
0
            ss << "Expected value of type: " << type_to_string(type)            \
142
0
               << "; but found type: " << json_type_to_string(col.GetType())    \
143
0
               << "; Document source slice is : " << json_value_to_string(col); \
144
0
            return Status::RuntimeError(ss.str());                              \
145
0
        }                                                                       \
146
0
    } while (false)
147
148
#define RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type)                     \
149
0
    do {                                                                 \
150
0
        std::stringstream ss;                                            \
151
0
        ss << "Expected value of type: " << type_to_string(type)         \
152
0
           << "; but found type: " << json_type_to_string(col.GetType()) \
153
0
           << "; Document slice is : " << json_value_to_string(col);     \
154
0
        return Status::RuntimeError(ss.str());                           \
155
0
    } while (false)
156
157
template <typename T>
158
Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot,
159
0
                     bool pure_doc_value) {
160
0
    if (col.IsNumber()) {
161
0
        *reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64());
162
0
        return Status::OK();
163
0
    }
164
165
0
    if (pure_doc_value && col.IsArray() && !col.Empty()) {
166
0
        RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
167
0
        *reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
168
0
        return Status::OK();
169
0
    }
170
171
0
    RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
172
0
    RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
173
174
0
    StringParser::ParseResult result;
175
0
    const std::string& val = col.GetString();
176
0
    size_t len = col.GetStringLength();
177
0
    T v = StringParser::string_to_int<T>(val.c_str(), len, &result);
178
0
    RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
179
180
0
    if (sizeof(T) < 16) {
181
0
        *reinterpret_cast<T*>(slot) = v;
182
0
    } else {
183
0
        DCHECK(sizeof(T) == 16);
184
0
        memcpy(slot, &v, sizeof(v));
185
0
    }
186
187
0
    return Status::OK();
188
0
}
Unexecuted instantiation: _ZN5doris13get_int_valueIaEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
Unexecuted instantiation: _ZN5doris13get_int_valueIsEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
Unexecuted instantiation: _ZN5doris13get_int_valueIiEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
Unexecuted instantiation: _ZN5doris13get_int_valueIlEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
Unexecuted instantiation: _ZN5doris13get_int_valueInEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
189
190
template <PrimitiveType T>
191
Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool is_date_str,
192
                          typename PrimitiveTypeTraits<T>::CppType* slot,
193
0
                          const cctz::time_zone& time_zone) {
194
0
    constexpr bool is_datetime_v1 = T == TYPE_DATE || T == TYPE_DATETIME;
195
0
    typename PrimitiveTypeTraits<T>::CppType dt_val;
196
0
    if (is_date_str) {
197
0
        const std::string str_date = col.GetString();
198
0
        int str_length = col.GetStringLength();
199
0
        bool success = false;
200
0
        if (str_length > 19) {
201
0
            std::chrono::system_clock::time_point tp;
202
            // time_zone suffix pattern
203
            // Z/+08:00/-04:30
204
0
            RE2 time_zone_pattern(R"([+-]\d{2}:?\d{2}|Z)");
205
0
            bool ok = false;
206
0
            std::string fmt;
207
0
            re2::StringPiece value;
208
0
            if (time_zone_pattern.Match(str_date, 0, str_date.size(), RE2::UNANCHORED, &value, 1)) {
209
                // with time_zone info
210
                // YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DDTHH:MM:SS+08:00
211
                // or 2022-08-08T12:10:10.000Z or YYYY-MM-DDTHH:MM:SS-08:00
212
0
                fmt = "%Y-%m-%dT%H:%M:%E*S%Ez";
213
0
                cctz::time_zone ctz;
214
                // find time_zone by time_zone suffix string
215
0
                TimezoneUtils::find_cctz_time_zone(value.as_string(), ctz);
216
0
                ok = cctz::parse(fmt, str_date, ctz, &tp);
217
0
            } else {
218
                // without time_zone info
219
                // 2022-08-08T12:10:10.000
220
0
                fmt = "%Y-%m-%dT%H:%M:%E*S";
221
                // If the time without time_zone info, ES will assume it is UTC time.
222
                // So we parse it in Doris with UTC time zone.
223
0
                ok = cctz::parse(fmt, str_date, cctz::utc_time_zone(), &tp);
224
0
            }
225
0
            if (ok) {
226
                // The local time zone can change by session variable `time_zone`
227
                // We should use the user specified time zone, not the actual system local time zone.
228
0
                success = true;
229
0
                dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
230
0
            }
231
0
        } else if (str_length == 19) {
232
            // YYYY-MM-DDTHH:MM:SS
233
0
            if (*(str_date.c_str() + 10) == 'T') {
234
0
                std::chrono::system_clock::time_point tp;
235
0
                const bool ok =
236
0
                        cctz::parse("%Y-%m-%dT%H:%M:%S", str_date, cctz::utc_time_zone(), &tp);
237
0
                if (ok) {
238
0
                    success = true;
239
0
                    dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
240
0
                }
241
0
            } else {
242
                // YYYY-MM-DD HH:MM:SS
243
0
                success = dt_val.from_date_str(str_date.c_str(), str_length);
244
0
            }
245
246
0
        } else if (str_length == 13) {
247
            // string long like "1677895728000"
248
0
            int64_t time_long = std::atol(str_date.c_str());
249
0
            if (time_long > 0) {
250
0
                success = true;
251
0
                dt_val.from_unixtime(time_long / 1000, time_zone);
252
0
            }
253
0
        } else {
254
            // YYYY-MM-DD or others
255
0
            success = dt_val.from_date_str(str_date.c_str(), str_length);
256
0
        }
257
258
0
        if (!success) {
259
0
            RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
260
0
        }
261
262
0
    } else {
263
0
        dt_val.from_unixtime(col.GetInt64() / 1000, time_zone);
264
0
    }
265
0
    if constexpr (is_datetime_v1) {
266
0
        if (type == TYPE_DATE) {
267
0
            dt_val.cast_to_date();
268
0
        } else {
269
0
            dt_val.to_datetime();
270
0
        }
271
0
    }
272
273
0
    *slot = *reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(&dt_val);
274
0
    return Status::OK();
275
0
}
Unexecuted instantiation: _ZN5doris18get_date_value_intILNS_13PrimitiveTypeE25EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris18get_date_value_intILNS_13PrimitiveTypeE26EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris18get_date_value_intILNS_13PrimitiveTypeE11EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris18get_date_value_intILNS_13PrimitiveTypeE12EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
276
277
template <PrimitiveType T>
278
Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
279
                    typename PrimitiveTypeTraits<T>::CppType* slot,
280
0
                    const cctz::time_zone& time_zone) {
281
    // this would happend just only when `enable_docvalue_scan = false`, and field has timestamp format date from _source
282
0
    if (col.IsNumber()) {
283
        // ES process date/datetime field would use millisecond timestamp for index or docvalue
284
        // processing date type field, if a number is encountered, Doris On ES will force it to be processed according to ms
285
        // Doris On ES needs to be consistent with ES, so just divided by 1000 because the unit for from_unixtime is seconds
286
0
        return get_date_value_int<T>(col, type, false, slot, time_zone);
287
0
    } else if (col.IsArray() && pure_doc_value && !col.Empty()) {
288
        // this would happened just only when `enable_docvalue_scan = true`
289
        // ES add default format for all field after ES 6.4, if we not provided format for `date` field ES would impose
290
        // a standard date-format for date field as `2020-06-16T00:00:00.000Z`
291
        // At present, we just process this string format date. After some PR were merged into Doris, we would impose `epoch_mills` for
292
        // date field's docvalue
293
0
        if (col[0].IsString()) {
294
0
            return get_date_value_int<T>(col[0], type, true, slot, time_zone);
295
0
        }
296
        // ES would return millisecond timestamp for date field, divided by 1000 because the unit for from_unixtime is seconds
297
0
        return get_date_value_int<T>(col[0], type, false, slot, time_zone);
298
0
    } else {
299
        // this would happened just only when `enable_docvalue_scan = false`, and field has string format date from _source
300
0
        RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
301
0
        RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
302
0
        return get_date_value_int<T>(col, type, true, slot, time_zone);
303
0
    }
304
0
}
Unexecuted instantiation: _ZN5doris12get_date_intILNS_13PrimitiveTypeE25EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris12get_date_intILNS_13PrimitiveTypeE26EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris12get_date_intILNS_13PrimitiveTypeE11EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris12get_date_intILNS_13PrimitiveTypeE12EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_19PrimitiveTypeTraitsIXT_EE7CppTypeERKN4cctz9time_zoneE
305
template <PrimitiveType T>
306
Status fill_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
307
0
                     IColumn* col_ptr, const cctz::time_zone& time_zone) {
308
0
    typename PrimitiveTypeTraits<T>::CppType data;
309
0
    RETURN_IF_ERROR((get_date_int<T>(col, type, pure_doc_value, &data, time_zone)));
310
0
    col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&data)), 0);
311
0
    return Status::OK();
312
0
}
Unexecuted instantiation: _ZN5doris13fill_date_intILNS_13PrimitiveTypeE11EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_7IColumnERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris13fill_date_intILNS_13PrimitiveTypeE12EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_7IColumnERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris13fill_date_intILNS_13PrimitiveTypeE25EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_7IColumnERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris13fill_date_intILNS_13PrimitiveTypeE26EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bPNS_7IColumnERKN4cctz9time_zoneE
313
314
template <typename T>
315
Status get_float_value(const rapidjson::Value& col, PrimitiveType type, void* slot,
316
0
                       bool pure_doc_value) {
317
0
    static_assert(sizeof(T) == 4 || sizeof(T) == 8);
318
0
    if (col.IsNumber()) {
319
0
        *reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble());
320
0
        return Status::OK();
321
0
    }
322
323
0
    if (pure_doc_value && col.IsArray() && !col.Empty()) {
324
0
        *reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
325
0
        return Status::OK();
326
0
    }
327
328
0
    RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
329
0
    RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
330
331
0
    StringParser::ParseResult result;
332
0
    const std::string& val = col.GetString();
333
0
    size_t len = col.GetStringLength();
334
0
    T v = StringParser::string_to_float<T>(val.c_str(), len, &result);
335
0
    RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
336
0
    *reinterpret_cast<T*>(slot) = v;
337
338
0
    return Status::OK();
339
0
}
Unexecuted instantiation: _ZN5doris15get_float_valueIfEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
Unexecuted instantiation: _ZN5doris15get_float_valueIdEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPvb
340
341
template <typename T>
342
Status insert_float_value(const rapidjson::Value& col, PrimitiveType type, IColumn* col_ptr,
343
0
                          bool pure_doc_value, bool nullable) {
344
0
    static_assert(sizeof(T) == 4 || sizeof(T) == 8);
345
0
    if (col.IsNumber() && nullable) {
346
0
        T value = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble());
347
0
        col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
348
0
        return Status::OK();
349
0
    }
350
351
0
    if (pure_doc_value && col.IsArray() && !col.Empty() && nullable) {
352
0
        T value = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
353
0
        col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
354
0
        return Status::OK();
355
0
    }
356
357
0
    RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
358
0
    RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
359
360
0
    StringParser::ParseResult result;
361
0
    const std::string& val = col.GetString();
362
0
    size_t len = col.GetStringLength();
363
0
    T v = StringParser::string_to_float<T>(val.c_str(), len, &result);
364
0
    RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
365
366
0
    col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&v)), 0);
367
368
0
    return Status::OK();
369
0
}
Unexecuted instantiation: _ZN5doris18insert_float_valueIdEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
Unexecuted instantiation: _ZN5doris18insert_float_valueIfEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
370
371
template <typename T>
372
Status insert_int_value(const rapidjson::Value& col, PrimitiveType type, IColumn* col_ptr,
373
0
                        bool pure_doc_value, bool nullable) {
374
0
    if (col.IsNumber()) {
375
0
        T value;
376
        // ES allows inserting float and double in int/long types.
377
        // To parse these numbers in Doris, we direct cast them to int types.
378
0
        if (col.IsDouble()) {
379
0
            value = static_cast<T>(col.GetDouble());
380
0
        } else if (col.IsFloat()) {
381
0
            value = static_cast<T>(col.GetFloat());
382
0
        } else {
383
0
            value = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64());
384
0
        }
385
0
        col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
386
0
        return Status::OK();
387
0
    }
388
389
0
    auto parse_and_insert_data = [&](const rapidjson::Value& col_value) -> Status {
390
0
        StringParser::ParseResult result;
391
0
        std::string val = col_value.GetString();
392
        // ES allows inserting numbers and characters containing decimals in numeric types.
393
        // To parse these numbers in Doris, we remove the decimals here.
394
0
        size_t pos = val.find('.');
395
0
        if (pos != std::string::npos) {
396
0
            val = val.substr(0, pos);
397
0
        }
398
0
        size_t len = val.length();
399
0
        T v = StringParser::string_to_int<T>(val.c_str(), len, &result);
400
0
        RETURN_ERROR_IF_PARSING_FAILED(result, col_value, type);
401
402
0
        col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&v)), 0);
403
0
        return Status::OK();
404
0
    };
Unexecuted instantiation: _ZZN5doris16insert_int_valueIaEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbbENKUlSB_E_clESB_
Unexecuted instantiation: _ZZN5doris16insert_int_valueIsEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbbENKUlSB_E_clESB_
Unexecuted instantiation: _ZZN5doris16insert_int_valueIiEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbbENKUlSB_E_clESB_
Unexecuted instantiation: _ZZN5doris16insert_int_valueIlEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbbENKUlSB_E_clESB_
Unexecuted instantiation: _ZZN5doris16insert_int_valueInEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbbENKUlSB_E_clESB_
405
406
0
    if (pure_doc_value && col.IsArray() && !col.Empty()) {
407
0
        if (col[0].IsNumber()) {
408
0
            T value = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
409
0
            col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
410
0
            return Status::OK();
411
0
        } else {
412
0
            RETURN_ERROR_IF_COL_IS_ARRAY(col[0], type, true);
413
0
            RETURN_ERROR_IF_COL_IS_NOT_STRING(col[0], type);
414
0
            return parse_and_insert_data(col[0]);
415
0
        }
416
0
    }
417
418
0
    RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
419
0
    RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
420
0
    return parse_and_insert_data(col);
421
0
}
Unexecuted instantiation: _ZN5doris16insert_int_valueIaEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
Unexecuted instantiation: _ZN5doris16insert_int_valueIsEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
Unexecuted instantiation: _ZN5doris16insert_int_valueIiEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
Unexecuted instantiation: _ZN5doris16insert_int_valueIlEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
Unexecuted instantiation: _ZN5doris16insert_int_valueInEENS_6StatusERKN9rapidjson12GenericValueINS2_4UTF8IcEENS2_19MemoryPoolAllocatorINS2_12CrtAllocatorEEEEENS_13PrimitiveTypeEPNS_7IColumnEbb
422
423
template <PrimitiveType T>
424
Status handle_value(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
425
0
                    typename PrimitiveTypeTraits<T>::CppType& val) {
426
    if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT || T == TYPE_BIGINT ||
427
0
                  T == TYPE_LARGEINT) {
428
0
        RETURN_IF_ERROR(get_int_value<typename PrimitiveTypeTraits<T>::CppType>(col, sub_type, &val,
429
0
                                                                                pure_doc_value));
430
0
        return Status::OK();
431
0
    }
432
0
    if constexpr (T == TYPE_FLOAT) {
433
0
        RETURN_IF_ERROR(get_float_value<float>(col, sub_type, &val, pure_doc_value));
434
0
        return Status::OK();
435
0
    }
436
0
    if constexpr (T == TYPE_DOUBLE) {
437
0
        RETURN_IF_ERROR(get_float_value<double>(col, sub_type, &val, pure_doc_value));
438
0
        return Status::OK();
439
0
    }
440
0
    if constexpr (T == TYPE_STRING || T == TYPE_CHAR || T == TYPE_VARCHAR) {
441
0
        RETURN_ERROR_IF_COL_IS_ARRAY(col, sub_type, true);
442
0
        if (!col.IsString()) {
443
0
            val = json_value_to_string(col);
444
0
        } else {
445
0
            val = col.GetString();
446
0
        }
447
0
        return Status::OK();
448
0
    }
449
0
    if constexpr (T == TYPE_BOOLEAN) {
450
0
        if (col.IsBool()) {
451
0
            val = col.GetBool();
452
0
            return Status::OK();
453
0
        }
454
455
0
        if (col.IsNumber()) {
456
0
            val = static_cast<typename PrimitiveTypeTraits<T>::CppType>(col.GetInt());
457
0
            return Status::OK();
458
0
        }
459
460
0
        bool is_nested_str = false;
461
0
        if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
462
0
            val = col[0].GetBool();
463
0
            return Status::OK();
464
0
        } else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
465
0
            is_nested_str = true;
466
0
        } else if (pure_doc_value && col.IsArray()) {
467
0
            return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
468
0
        }
469
470
0
        const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
471
0
        const std::string& str_val = str_col.GetString();
472
0
        size_t val_size = str_col.GetStringLength();
473
0
        StringParser::ParseResult result;
474
0
        val = StringParser::string_to_bool(str_val.c_str(), val_size, &result);
475
0
        RETURN_ERROR_IF_PARSING_FAILED(result, str_col, sub_type);
476
0
        return Status::OK();
477
0
    }
478
0
    throw Exception(ErrorCode::INTERNAL_ERROR, "Un-supported type: {}", type_to_string(T));
479
0
}
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE23EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE3EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE4EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE5EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE6EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE7EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE8EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE9EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
Unexecuted instantiation: _ZN5doris12handle_valueILNS_13PrimitiveTypeE2EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_19PrimitiveTypeTraitsIXT_EE7CppTypeE
480
481
template <PrimitiveType T>
482
Status process_single_column(const rapidjson::Value& col, PrimitiveType sub_type,
483
0
                             bool pure_doc_value, Array& array) {
484
0
    typename PrimitiveTypeTraits<T>::CppType val;
485
0
    RETURN_IF_ERROR(handle_value<T>(col, sub_type, pure_doc_value, val));
486
0
    array.push_back(Field::create_field<T>(val));
487
0
    return Status::OK();
488
0
}
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE23EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE3EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE4EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE5EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE6EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE7EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE8EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE9EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris21process_single_columnILNS_13PrimitiveTypeE2EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
489
490
template <PrimitiveType T>
491
Status process_column_array(const rapidjson::Value& col, PrimitiveType sub_type,
492
0
                            bool pure_doc_value, Array& array) {
493
0
    for (const auto& sub_col : col.GetArray()) {
494
0
        RETURN_IF_ERROR(process_single_column<T>(sub_col, sub_type, pure_doc_value, array));
495
0
    }
496
0
    return Status::OK();
497
0
}
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE23EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE3EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE4EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE5EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE6EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE7EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE8EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE9EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris20process_column_arrayILNS_13PrimitiveTypeE2EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
498
499
template <PrimitiveType T>
500
Status process_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
501
0
                      Array& array) {
502
0
    if (!col.IsArray()) {
503
0
        return process_single_column<T>(col, sub_type, pure_doc_value, array);
504
0
    } else {
505
0
        return process_column_array<T>(col, sub_type, pure_doc_value, array);
506
0
    }
507
0
}
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE23EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE3EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE4EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE5EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE6EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE7EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE8EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE9EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
Unexecuted instantiation: _ZN5doris14process_columnILNS_13PrimitiveTypeE2EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayE
508
509
template <PrimitiveType T>
510
Status process_date_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
511
0
                           Array& array, const cctz::time_zone& time_zone) {
512
0
    if (!col.IsArray()) {
513
0
        typename PrimitiveTypeTraits<T>::CppType data;
514
0
        RETURN_IF_ERROR((get_date_int<T>(col, sub_type, pure_doc_value, &data, time_zone)));
515
0
        array.push_back(Field::create_field<T>(data));
516
0
    } else {
517
0
        for (const auto& sub_col : col.GetArray()) {
518
0
            typename PrimitiveTypeTraits<T>::CppType data;
519
0
            RETURN_IF_ERROR((get_date_int<T>(sub_col, sub_type, pure_doc_value, &data, time_zone)));
520
0
            array.push_back(Field::create_field<T>(data));
521
0
        }
522
0
    }
523
0
    return Status::OK();
524
0
}
Unexecuted instantiation: _ZN5doris19process_date_columnILNS_13PrimitiveTypeE25EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayERKN4cctz9time_zoneE
Unexecuted instantiation: _ZN5doris19process_date_columnILNS_13PrimitiveTypeE26EEENS_6StatusERKN9rapidjson12GenericValueINS3_4UTF8IcEENS3_19MemoryPoolAllocatorINS3_12CrtAllocatorEEEEES1_bRNS_5ArrayERKN4cctz9time_zoneE
525
526
Status process_jsonb_column(const rapidjson::Value& col, PrimitiveType sub_type,
527
0
                            bool pure_doc_value, Array& array) {
528
0
    if (!col.IsArray()) {
529
0
        JsonBinaryValue jsonb_value;
530
0
        RETURN_IF_ERROR(jsonb_value.from_json_string(json_value_to_string(col)));
531
0
        JsonbField json(jsonb_value.value(), jsonb_value.size());
532
0
        array.push_back(Field::create_field<TYPE_JSONB>(std::move(json)));
533
0
    } else {
534
0
        for (const auto& sub_col : col.GetArray()) {
535
0
            JsonBinaryValue jsonb_value;
536
0
            RETURN_IF_ERROR(jsonb_value.from_json_string(json_value_to_string(sub_col)));
537
0
            JsonbField json(jsonb_value.value(), jsonb_value.size());
538
0
            array.push_back(Field::create_field<TYPE_JSONB>(json));
539
0
        }
540
0
    }
541
0
    return Status::OK();
542
0
}
543
544
Status ScrollParser::parse_column(const rapidjson::Value& col, PrimitiveType sub_type,
545
                                  bool pure_doc_value, Array& array,
546
0
                                  const cctz::time_zone& time_zone) {
547
0
    switch (sub_type) {
548
0
    case TYPE_CHAR:
549
0
    case TYPE_VARCHAR:
550
0
    case TYPE_STRING:
551
0
        return process_column<TYPE_STRING>(col, sub_type, pure_doc_value, array);
552
0
    case TYPE_TINYINT:
553
0
        return process_column<TYPE_TINYINT>(col, sub_type, pure_doc_value, array);
554
0
    case TYPE_SMALLINT:
555
0
        return process_column<TYPE_SMALLINT>(col, sub_type, pure_doc_value, array);
556
0
    case TYPE_INT:
557
0
        return process_column<TYPE_INT>(col, sub_type, pure_doc_value, array);
558
0
    case TYPE_BIGINT:
559
0
        return process_column<TYPE_BIGINT>(col, sub_type, pure_doc_value, array);
560
0
    case TYPE_LARGEINT:
561
0
        return process_column<TYPE_LARGEINT>(col, sub_type, pure_doc_value, array);
562
0
    case TYPE_FLOAT:
563
0
        return process_column<TYPE_FLOAT>(col, sub_type, pure_doc_value, array);
564
0
    case TYPE_DOUBLE:
565
0
        return process_column<TYPE_DOUBLE>(col, sub_type, pure_doc_value, array);
566
0
    case TYPE_BOOLEAN:
567
0
        return process_column<TYPE_BOOLEAN>(col, sub_type, pure_doc_value, array);
568
    // date/datetime v2 is the default type for catalog table,
569
    // see https://github.com/apache/doris/pull/16304
570
    // No need to support date and datetime types.
571
0
    case TYPE_DATEV2: {
572
0
        return process_date_column<TYPE_DATEV2>(col, sub_type, pure_doc_value, array, time_zone);
573
0
    }
574
0
    case TYPE_DATETIMEV2: {
575
0
        return process_date_column<TYPE_DATETIMEV2>(col, sub_type, pure_doc_value, array,
576
0
                                                    time_zone);
577
0
    }
578
0
    case TYPE_JSONB: {
579
0
        return process_jsonb_column(col, sub_type, pure_doc_value, array);
580
0
    }
581
0
    default:
582
0
        LOG(ERROR) << "Do not support Array type: " << sub_type;
583
0
        return Status::InternalError("Unsupported type");
584
0
    }
585
0
}
586
587
0
ScrollParser::ScrollParser(bool doc_value_mode) : _size(0), _line_index(0) {}
588
589
0
ScrollParser::~ScrollParser() = default;
590
591
0
Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) {
592
    // rely on `_size !=0 ` to determine whether scroll ends
593
0
    _size = 0;
594
0
    _document_node.Parse(scroll_result.c_str(), scroll_result.length());
595
0
    if (_document_node.HasParseError()) {
596
0
        return Status::InternalError("Parsing json error, json is: {}", scroll_result);
597
0
    }
598
599
0
    if (!exactly_once && !_document_node.HasMember(FIELD_SCROLL_ID)) {
600
0
        LOG(WARNING) << "Document has not a scroll id field scroll response:" << scroll_result;
601
0
        return Status::InternalError("Document has not a scroll id field");
602
0
    }
603
604
0
    if (!exactly_once) {
605
0
        const rapidjson::Value& scroll_node = _document_node[FIELD_SCROLL_ID];
606
0
        _scroll_id = scroll_node.GetString();
607
0
    }
608
    // { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
609
0
    const rapidjson::Value& outer_hits_node = _document_node[FIELD_HITS];
610
    // if has no inner hits, there has no data in this index
611
0
    if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
612
0
        return Status::OK();
613
0
    }
614
0
    const rapidjson::Value& inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
615
    // this happened just the end of scrolling
616
0
    if (!inner_hits_node.IsArray()) {
617
0
        return Status::OK();
618
0
    }
619
0
    _inner_hits_node.CopyFrom(inner_hits_node, _document_node.GetAllocator());
620
    // how many documents contains in this batch
621
0
    _size = _inner_hits_node.Size();
622
0
    return Status::OK();
623
0
}
624
625
0
int ScrollParser::get_size() const {
626
0
    return _size;
627
0
}
628
629
0
const std::string& ScrollParser::get_scroll_id() {
630
0
    return _scroll_id;
631
0
}
632
633
Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
634
                                  std::vector<MutableColumnPtr>& columns, bool* line_eof,
635
                                  const std::map<std::string, std::string>& docvalue_context,
636
0
                                  const cctz::time_zone& time_zone) {
637
0
    *line_eof = true;
638
639
0
    if (_size <= 0 || _line_index >= _size) {
640
0
        return Status::OK();
641
0
    }
642
643
0
    const rapidjson::Value& obj = _inner_hits_node[_line_index++];
644
0
    bool pure_doc_value = false;
645
0
    if (obj.HasMember("fields")) {
646
0
        pure_doc_value = true;
647
0
    }
648
    // obj may be neither have `_source` nor `fields` field.
649
0
    const rapidjson::Value* line = nullptr;
650
0
    if (obj.HasMember(FIELD_SOURCE)) {
651
0
        line = &obj[FIELD_SOURCE];
652
0
    } else if (obj.HasMember("fields")) {
653
0
        line = &obj["fields"];
654
0
    }
655
656
0
    for (int i = 0; i < tuple_desc->slots().size(); ++i) {
657
0
        const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
658
0
        auto* col_ptr = columns[i].get();
659
660
0
        if (slot_desc->col_name() == FIELD_ID) {
661
            // actually this branch will not be reached, this is guaranteed by Doris FE.
662
0
            if (pure_doc_value) {
663
0
                return Status::RuntimeError("obtain `_id` is not supported in doc_values mode");
664
0
            }
665
            // obj[FIELD_ID] must not be NULL
666
0
            std::string _id = obj[FIELD_ID].GetString();
667
0
            size_t len = _id.length();
668
669
0
            col_ptr->insert_data(const_cast<const char*>(_id.data()), len);
670
0
            continue;
671
0
        }
672
673
0
        const char* col_name = pure_doc_value ? docvalue_context.at(slot_desc->col_name()).c_str()
674
0
                                              : slot_desc->col_name().c_str();
675
676
0
        if (line == nullptr || line->FindMember(col_name) == line->MemberEnd()) {
677
0
            if (slot_desc->is_nullable()) {
678
0
                auto* nullable_column = reinterpret_cast<ColumnNullable*>(col_ptr);
679
0
                nullable_column->insert_data(nullptr, 0);
680
0
                continue;
681
0
            } else {
682
0
                std::string details = absl::Substitute(INVALID_NULL_VALUE, col_name);
683
0
                return Status::RuntimeError(details);
684
0
            }
685
0
        }
686
687
0
        const rapidjson::Value& col = (*line)[col_name];
688
689
0
        auto type = slot_desc->type()->get_primitive_type();
690
691
        // when the column value is null, the subsequent type casting will report an error
692
0
        if (col.IsNull() && slot_desc->is_nullable()) {
693
0
            col_ptr->insert_data(nullptr, 0);
694
0
            continue;
695
0
        } else if (col.IsNull() && !slot_desc->is_nullable()) {
696
0
            std::string details = absl::Substitute(INVALID_NULL_VALUE, col_name);
697
0
            return Status::RuntimeError(details);
698
0
        }
699
0
        switch (type) {
700
0
        case TYPE_CHAR:
701
0
        case TYPE_VARCHAR:
702
0
        case TYPE_STRING: {
703
            // sometimes elasticsearch user post some not-string value to Elasticsearch Index.
704
            // because of reading value from _source, we can not process all json type and then just transfer the value to original string representation
705
            // this may be a tricky, but we can work around this issue
706
0
            std::string val;
707
0
            if (pure_doc_value) {
708
0
                if (col.Empty()) {
709
0
                    break;
710
0
                } else if (!col[0].IsString()) {
711
0
                    val = json_value_to_string(col[0]);
712
0
                } else {
713
0
                    val = col[0].GetString();
714
0
                }
715
0
            } else {
716
0
                RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
717
0
                if (!col.IsString()) {
718
0
                    val = json_value_to_string(col);
719
0
                } else {
720
0
                    val = col.GetString();
721
0
                }
722
0
            }
723
0
            size_t val_size = val.length();
724
0
            col_ptr->insert_data(const_cast<const char*>(val.data()), val_size);
725
0
            break;
726
0
        }
727
728
0
        case TYPE_TINYINT: {
729
0
            RETURN_IF_ERROR(insert_int_value<int8_t>(col, type, col_ptr, pure_doc_value,
730
0
                                                     slot_desc->is_nullable()));
731
0
            break;
732
0
        }
733
734
0
        case TYPE_SMALLINT: {
735
0
            RETURN_IF_ERROR(insert_int_value<int16_t>(col, type, col_ptr, pure_doc_value,
736
0
                                                      slot_desc->is_nullable()));
737
0
            break;
738
0
        }
739
740
0
        case TYPE_INT: {
741
0
            RETURN_IF_ERROR(insert_int_value<int32_t>(col, type, col_ptr, pure_doc_value,
742
0
                                                      slot_desc->is_nullable()));
743
0
            break;
744
0
        }
745
746
0
        case TYPE_BIGINT: {
747
0
            RETURN_IF_ERROR(insert_int_value<int64_t>(col, type, col_ptr, pure_doc_value,
748
0
                                                      slot_desc->is_nullable()));
749
0
            break;
750
0
        }
751
752
0
        case TYPE_LARGEINT: {
753
0
            RETURN_IF_ERROR(insert_int_value<__int128>(col, type, col_ptr, pure_doc_value,
754
0
                                                       slot_desc->is_nullable()));
755
0
            break;
756
0
        }
757
758
0
        case TYPE_DOUBLE: {
759
0
            RETURN_IF_ERROR(insert_float_value<double>(col, type, col_ptr, pure_doc_value,
760
0
                                                       slot_desc->is_nullable()));
761
0
            break;
762
0
        }
763
764
0
        case TYPE_FLOAT: {
765
0
            RETURN_IF_ERROR(insert_float_value<float>(col, type, col_ptr, pure_doc_value,
766
0
                                                      slot_desc->is_nullable()));
767
0
            break;
768
0
        }
769
770
0
        case TYPE_BOOLEAN: {
771
0
            if (col.IsBool()) {
772
0
                int8_t val = col.GetBool();
773
0
                col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
774
0
                break;
775
0
            }
776
777
0
            if (col.IsNumber()) {
778
0
                int8_t val = static_cast<int8_t>(col.GetInt());
779
0
                col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
780
0
                break;
781
0
            }
782
783
0
            bool is_nested_str = false;
784
0
            if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
785
0
                int8_t val = col[0].GetBool();
786
0
                col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
787
0
                break;
788
0
            } else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
789
0
                is_nested_str = true;
790
0
            } else if (pure_doc_value && col.IsArray()) {
791
0
                return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
792
0
            }
793
794
0
            const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
795
796
0
            RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
797
798
0
            const std::string& val = str_col.GetString();
799
0
            size_t val_size = str_col.GetStringLength();
800
0
            StringParser::ParseResult result;
801
0
            bool b = StringParser::string_to_bool(val.c_str(), val_size, &result);
802
0
            RETURN_ERROR_IF_PARSING_FAILED(result, str_col, type);
803
0
            col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&b)), 0);
804
0
            break;
805
0
        }
806
0
        case TYPE_DECIMALV2: {
807
0
            DecimalV2Value data;
808
809
0
            if (col.IsDouble()) {
810
0
                data.assign_from_double(col.GetDouble());
811
0
            } else {
812
0
                std::string val;
813
0
                if (pure_doc_value) {
814
0
                    if (col.Empty()) {
815
0
                        break;
816
0
                    } else if (!col[0].IsString()) {
817
0
                        val = json_value_to_string(col[0]);
818
0
                    } else {
819
0
                        val = col[0].GetString();
820
0
                    }
821
0
                } else {
822
0
                    RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
823
0
                    if (!col.IsString()) {
824
0
                        val = json_value_to_string(col);
825
0
                    } else {
826
0
                        val = col.GetString();
827
0
                    }
828
0
                }
829
0
                data.parse_from_str(val.data(), static_cast<int32_t>(val.length()));
830
0
            }
831
0
            col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&data)), 0);
832
0
            break;
833
0
        }
834
835
0
        case TYPE_DATE:
836
0
            RETURN_IF_ERROR(
837
0
                    fill_date_int<TYPE_DATE>(col, type, pure_doc_value, col_ptr, time_zone));
838
0
            break;
839
0
        case TYPE_DATETIME:
840
0
            RETURN_IF_ERROR(
841
0
                    fill_date_int<TYPE_DATETIME>(col, type, pure_doc_value, col_ptr, time_zone));
842
0
            break;
843
0
        case TYPE_DATEV2:
844
0
            RETURN_IF_ERROR(
845
0
                    fill_date_int<TYPE_DATEV2>(col, type, pure_doc_value, col_ptr, time_zone));
846
0
            break;
847
0
        case TYPE_DATETIMEV2: {
848
0
            RETURN_IF_ERROR(
849
0
                    fill_date_int<TYPE_DATETIMEV2>(col, type, pure_doc_value, col_ptr, time_zone));
850
0
            break;
851
0
        }
852
0
        case TYPE_ARRAY: {
853
0
            Array array;
854
0
            const auto& sub_type = assert_cast<const DataTypeArray*>(
855
0
                                           remove_nullable(tuple_desc->slots()[i]->type()).get())
856
0
                                           ->get_nested_type()
857
0
                                           ->get_primitive_type();
858
0
            RETURN_IF_ERROR(parse_column(col, sub_type, pure_doc_value, array, time_zone));
859
0
            col_ptr->insert(Field::create_field<TYPE_ARRAY>(array));
860
0
            break;
861
0
        }
862
0
        case TYPE_JSONB: {
863
0
            JsonBinaryValue jsonb_value;
864
0
            RETURN_IF_ERROR(jsonb_value.from_json_string(json_value_to_string(col)));
865
0
            JsonbField json(jsonb_value.value(), jsonb_value.size());
866
0
            col_ptr->insert(Field::create_field<TYPE_JSONB>(json));
867
0
            break;
868
0
        }
869
0
        default: {
870
0
            LOG(ERROR) << "Unsupported data type: " << type_to_string(type);
871
0
            DCHECK(false);
872
0
            break;
873
0
        }
874
0
        }
875
0
    }
876
877
0
    *line_eof = false;
878
0
    return Status::OK();
879
0
}
880
#include "common/compile_check_end.h"
881
} // namespace doris