be/src/exprs/function/function_jsonb_transform.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <string> |
19 | | #include <vector> |
20 | | |
21 | | #include "common/status.h" |
22 | | #include "core/data_type/data_type_jsonb.h" |
23 | | #include "core/data_type/primitive_type.h" |
24 | | #include "exprs/function/simple_function_factory.h" |
25 | | #include "util/jsonb_document.h" |
26 | | #include "util/jsonb_document_cast.h" |
27 | | #include "util/jsonb_writer.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | // Sort the keys of the JSON object and deduplicate the repeated keys, keeping the first one |
32 | 0 | void sort_json_object_keys(JsonbWriter& jsonb_writer, const JsonbValue* jsonb_value) { |
33 | 0 | if (jsonb_value->isObject()) { |
34 | 0 | std::vector<std::pair<StringRef, const JsonbValue*>> kvs; |
35 | 0 | const auto* obj_val = jsonb_value->unpack<ObjectVal>(); |
36 | 0 | for (auto it = obj_val->begin(); it != obj_val->end(); ++it) { |
37 | 0 | kvs.emplace_back(StringRef(it->getKeyStr(), it->klen()), it->value()); |
38 | 0 | } |
39 | | // sort by key |
40 | 0 | std::sort(kvs.begin(), kvs.end(), |
41 | 0 | [](const auto& left, const auto& right) { return left.first < right.first; }); |
42 | | // unique by key |
43 | 0 | kvs.erase(std::unique(kvs.begin(), kvs.end(), |
44 | 0 | [](const auto& left, const auto& right) { |
45 | 0 | return left.first == right.first; |
46 | 0 | }), |
47 | 0 | kvs.end()); |
48 | 0 | jsonb_writer.writeStartObject(); |
49 | 0 | for (const auto& kv : kvs) { |
50 | 0 | jsonb_writer.writeKey(kv.first.data, static_cast<uint8_t>(kv.first.size)); |
51 | 0 | sort_json_object_keys(jsonb_writer, kv.second); |
52 | 0 | } |
53 | 0 | jsonb_writer.writeEndObject(); |
54 | 0 | } else if (jsonb_value->isArray()) { |
55 | 0 | const auto* array_val = jsonb_value->unpack<ArrayVal>(); |
56 | 0 | jsonb_writer.writeStartArray(); |
57 | 0 | for (auto it = array_val->begin(); it != array_val->end(); ++it) { |
58 | 0 | sort_json_object_keys(jsonb_writer, &*it); |
59 | 0 | } |
60 | 0 | jsonb_writer.writeEndArray(); |
61 | 0 | } else { |
62 | | // scalar value |
63 | 0 | jsonb_writer.writeValue(jsonb_value); |
64 | 0 | } |
65 | 0 | } |
66 | | |
67 | | // Walk a JSONB object recursively and emit flat "<dot.path>": value entries |
68 | | // directly into `writer`. Members whose value is a non-empty object recurse; |
69 | | // every other shape (scalars, arrays, null literals, empty objects) is emitted |
70 | | // as an opaque leaf at its dot-joined path. The `prefix` buffer is reused |
71 | | // across the whole row — appended on descent and truncated on return — so no |
72 | | // path segment is ever re-allocated outside this single growing string. |
73 | | void flatten_json_object_into(JsonbWriter& jsonb_writer, const ObjectVal* obj, |
74 | 33 | std::string& prefix) { |
75 | 69 | for (auto it = obj->begin(); it != obj->end(); ++it) { |
76 | 36 | const auto* val = it->value(); |
77 | 36 | const size_t saved = prefix.size(); |
78 | 36 | if (!prefix.empty()) { |
79 | 22 | prefix.push_back('.'); |
80 | 22 | } |
81 | 36 | prefix.append(it->getKeyStr(), it->klen()); |
82 | | |
83 | 36 | if (val->isObject() && val->unpack<ObjectVal>()->numElem() > 0) { |
84 | 20 | flatten_json_object_into(jsonb_writer, val->unpack<ObjectVal>(), prefix); |
85 | 20 | } else { |
86 | 16 | jsonb_writer.writeKey(prefix.data(), static_cast<uint8_t>(prefix.size())); |
87 | 16 | jsonb_writer.writeValue(val); |
88 | 16 | } |
89 | 36 | prefix.resize(saved); |
90 | 36 | } |
91 | 33 | } |
92 | | |
93 | | // json_object_flatten: turn a nested JSONB object into a single-level JSONB |
94 | | // object whose keys are the dot-joined paths to each leaf (NiFi FlattenJson |
95 | | // "keep-arrays" semantics — arrays / scalars / nulls / empty objects stay as |
96 | | // opaque leaf values; only objects are walked). |
97 | | // {"a":{"b":2}} -> {"a.b":2} |
98 | | // {"a":[{"b":1}]} -> {"a":[{"b":1}]} |
99 | | // Top-level non-object values pass through unchanged. |
100 | 19 | void flatten_json_object(JsonbWriter& jsonb_writer, const JsonbValue* jsonb_value) { |
101 | 19 | if (!jsonb_value->isObject()) { |
102 | 6 | jsonb_writer.writeValue(jsonb_value); |
103 | 6 | return; |
104 | 6 | } |
105 | 13 | jsonb_writer.writeStartObject(); |
106 | 13 | std::string prefix; |
107 | 13 | flatten_json_object_into(jsonb_writer, jsonb_value->unpack<ObjectVal>(), prefix); |
108 | 13 | jsonb_writer.writeEndObject(); |
109 | 13 | } |
110 | | |
111 | | // Convert all numeric types in JSON to double type |
112 | 0 | void normalize_json_numbers_to_double(JsonbWriter& jsonb_writer, const JsonbValue* jsonb_value) { |
113 | 0 | if (jsonb_value->isObject()) { |
114 | 0 | jsonb_writer.writeStartObject(); |
115 | 0 | const auto* obj_val = jsonb_value->unpack<ObjectVal>(); |
116 | 0 | for (auto it = obj_val->begin(); it != obj_val->end(); ++it) { |
117 | 0 | jsonb_writer.writeKey(it->getKeyStr(), it->klen()); |
118 | 0 | normalize_json_numbers_to_double(jsonb_writer, it->value()); |
119 | 0 | } |
120 | 0 | jsonb_writer.writeEndObject(); |
121 | 0 | } else if (jsonb_value->isArray()) { |
122 | 0 | const auto* array_val = jsonb_value->unpack<ArrayVal>(); |
123 | 0 | jsonb_writer.writeStartArray(); |
124 | 0 | for (auto it = array_val->begin(); it != array_val->end(); ++it) { |
125 | 0 | normalize_json_numbers_to_double(jsonb_writer, &*it); |
126 | 0 | } |
127 | 0 | jsonb_writer.writeEndArray(); |
128 | 0 | } else { |
129 | | // scalar value |
130 | 0 | if (jsonb_value->isInt() || jsonb_value->isFloat() || jsonb_value->isDouble() || |
131 | 0 | jsonb_value->isDecimal()) { |
132 | 0 | double to; |
133 | 0 | CastParameters params; |
134 | 0 | params.is_strict = false; |
135 | 0 | JsonbCast::cast_from_json_to_float(jsonb_value, to, params); |
136 | 0 | NormalizeFloat(to); |
137 | 0 | jsonb_writer.writeDouble(to); |
138 | 0 | } else { |
139 | 0 | jsonb_writer.writeValue(jsonb_value); |
140 | 0 | } |
141 | 0 | } |
142 | 0 | } |
143 | | |
144 | | // Input jsonb, output jsonb |
145 | | template <typename Impl> |
146 | | class FunctionJsonbTransform : public IFunction { |
147 | | public: |
148 | | static constexpr auto name = Impl::name; |
149 | | |
150 | 6 | static FunctionPtr create() { return std::make_shared<FunctionJsonbTransform>(); }_ZN5doris22FunctionJsonbTransformINS_18SortJsonObjectKeysEE6createEv Line | Count | Source | 150 | 2 | static FunctionPtr create() { return std::make_shared<FunctionJsonbTransform>(); } |
_ZN5doris22FunctionJsonbTransformINS_28NormalizeJsonNumbersToDoubleEE6createEv Line | Count | Source | 150 | 2 | static FunctionPtr create() { return std::make_shared<FunctionJsonbTransform>(); } |
_ZN5doris22FunctionJsonbTransformINS_17JsonObjectFlattenEE6createEv Line | Count | Source | 150 | 2 | static FunctionPtr create() { return std::make_shared<FunctionJsonbTransform>(); } |
|
151 | | |
152 | 3 | String get_name() const override { return name; }_ZNK5doris22FunctionJsonbTransformINS_18SortJsonObjectKeysEE8get_nameB5cxx11Ev Line | Count | Source | 152 | 1 | String get_name() const override { return name; } |
_ZNK5doris22FunctionJsonbTransformINS_28NormalizeJsonNumbersToDoubleEE8get_nameB5cxx11Ev Line | Count | Source | 152 | 1 | String get_name() const override { return name; } |
_ZNK5doris22FunctionJsonbTransformINS_17JsonObjectFlattenEE8get_nameB5cxx11Ev Line | Count | Source | 152 | 1 | String get_name() const override { return name; } |
|
153 | | |
154 | 0 | DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
155 | 0 | return std::make_shared<DataTypeJsonb>(); |
156 | 0 | } Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_18SortJsonObjectKeysEE20get_return_type_implERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_28NormalizeJsonNumbersToDoubleEE20get_return_type_implERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_17JsonObjectFlattenEE20get_return_type_implERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE |
157 | | |
158 | 0 | size_t get_number_of_arguments() const override { return 1; }Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_18SortJsonObjectKeysEE23get_number_of_argumentsEv Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_28NormalizeJsonNumbersToDoubleEE23get_number_of_argumentsEv Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_17JsonObjectFlattenEE23get_number_of_argumentsEv |
159 | | |
160 | | Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
161 | 0 | uint32_t result, size_t size) const override { |
162 | 0 | auto input_column = block.get_by_position(arguments[0]).column; |
163 | 0 | auto to_column = ColumnString::create(); |
164 | |
|
165 | 0 | const auto& input_jsonb_column = assert_cast<const ColumnString&>(*input_column); |
166 | |
|
167 | 0 | to_column->get_chars().reserve(input_jsonb_column.get_chars().size()); |
168 | 0 | to_column->get_offsets().reserve(input_jsonb_column.get_offsets().size()); |
169 | |
|
170 | 0 | JsonbWriter writer; |
171 | 0 | for (size_t i = 0; i < size; ++i) { |
172 | 0 | StringRef val = input_jsonb_column.get_data_at(i); |
173 | 0 | const JsonbDocument* doc = nullptr; |
174 | 0 | auto st = JsonbDocument::checkAndCreateDocument(val.data, val.size, &doc); |
175 | 0 | if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] { |
176 | | // mayby be invalid jsonb, just insert default |
177 | | // invalid jsonb value may be caused by the default null processing |
178 | | // insert empty string |
179 | 0 | to_column->insert_default(); |
180 | 0 | continue; |
181 | 0 | } |
182 | 0 | const JsonbValue* value = doc->getValue(); |
183 | 0 | if (UNLIKELY(!value)) { |
184 | | // mayby be invalid jsonb, just insert default |
185 | | // invalid jsonb value may be caused by the default null processing |
186 | | // insert empty string |
187 | 0 | to_column->insert_default(); |
188 | 0 | continue; |
189 | 0 | } |
190 | | |
191 | 0 | writer.reset(); |
192 | |
|
193 | 0 | Impl::transform(writer, value); |
194 | |
|
195 | 0 | to_column->insert_data(writer.getOutput()->getBuffer(), writer.getOutput()->getSize()); |
196 | 0 | } |
197 | 0 | block.get_by_position(result).column = std::move(to_column); |
198 | 0 | return Status::OK(); |
199 | 0 | } Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_18SortJsonObjectKeysEE12execute_implEPNS_15FunctionContextERNS_5BlockERKSt6vectorIjSaIjEEjm Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_28NormalizeJsonNumbersToDoubleEE12execute_implEPNS_15FunctionContextERNS_5BlockERKSt6vectorIjSaIjEEjm Unexecuted instantiation: _ZNK5doris22FunctionJsonbTransformINS_17JsonObjectFlattenEE12execute_implEPNS_15FunctionContextERNS_5BlockERKSt6vectorIjSaIjEEjm |
200 | | }; |
201 | | |
202 | | struct SortJsonObjectKeys { |
203 | | static constexpr auto name = "sort_json_object_keys"; |
204 | 0 | static void transform(JsonbWriter& writer, const JsonbValue* value) { |
205 | 0 | sort_json_object_keys(writer, value); |
206 | 0 | } |
207 | | }; |
208 | | |
209 | | struct NormalizeJsonNumbersToDouble { |
210 | | static constexpr auto name = "normalize_json_numbers_to_double"; |
211 | 0 | static void transform(JsonbWriter& writer, const JsonbValue* value) { |
212 | 0 | normalize_json_numbers_to_double(writer, value); |
213 | 0 | } |
214 | | }; |
215 | | |
216 | | struct JsonObjectFlatten { |
217 | | static constexpr auto name = "json_object_flatten"; |
218 | 0 | static void transform(JsonbWriter& writer, const JsonbValue* value) { |
219 | 0 | flatten_json_object(writer, value); |
220 | 0 | } |
221 | | }; |
222 | | |
223 | | using FunctionSortJsonObjectKeys = FunctionJsonbTransform<SortJsonObjectKeys>; |
224 | | using FunctionNormalizeJsonNumbersToDouble = FunctionJsonbTransform<NormalizeJsonNumbersToDouble>; |
225 | | using FunctionJsonObjectFlatten = FunctionJsonbTransform<JsonObjectFlatten>; |
226 | | |
227 | 1 | void register_function_json_transform(SimpleFunctionFactory& factory) { |
228 | 1 | factory.register_function<FunctionSortJsonObjectKeys>(); |
229 | 1 | factory.register_function<FunctionNormalizeJsonNumbersToDouble>(); |
230 | 1 | factory.register_function<FunctionJsonObjectFlatten>(); |
231 | | |
232 | 1 | factory.register_alias(FunctionSortJsonObjectKeys::name, "sort_jsonb_object_keys"); |
233 | 1 | factory.register_alias(FunctionNormalizeJsonNumbersToDouble::name, |
234 | 1 | "normalize_jsonb_numbers_to_double"); |
235 | 1 | } |
236 | | |
237 | | } // namespace doris |