Sounds impossible.
I briefly read the source code, then it turns out that the results are cached.
source code version:
- flink: 1.20
- calcite: master
Flink json_value is mapped to Calcite function.
src BuiltInFunctionDefinitions.java
public static final BuiltInFunctionDefinition JSON_QUERY =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_QUERY")
.kind(SCALAR)
.callSyntax(JsonFunctionsCallSyntax.JSON_QUERY)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
TYPE_LITERAL,
symbol(JsonQueryWrapper.class),
SpecificInputTypeStrategies.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR,
SpecificInputTypeStrategies
.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR))
.outputTypeStrategy(forceNullable(argument(2)))
.runtimeDeferred()
.build();
...
/**
* Specifies that this {@link BuiltInFunctionDefinition} will be mapped to a Calcite
* function.
*/
public Builder runtimeDeferred() {
// This method is just a marker method for clarity. It is equivalent to calling
// neither {@link #runtimeProvided} nor {@link #runtimeClass}.
return this;
}
Calcite JSONfunction is stateful(cached by LoadingCache),
src calcite/runtime/JsonFunctions.java
// org/apache/calcite/util/BuiltInMethod.java
JSON_VALUE(JsonFunctions.StatefulFunction.class, "jsonValue",
String.class, String.class,
SqlJsonValueEmptyOrErrorBehavior.class, Object.class,
SqlJsonValueEmptyOrErrorBehavior.class, Object.class),
...
// org/apache/calcite/runtime/JsonFunctions.java
// jsonValue(String, String, ...) invoke by reflect
public @Nullable Object jsonValue(String input,
String pathSpec,
SqlJsonValueEmptyOrErrorBehavior emptyBehavior,
Object defaultValueOnEmpty,
SqlJsonValueEmptyOrErrorBehavior errorBehavior,
Object defaultValueOnError) {
return jsonValue(
jsonApiCommonSyntaxWithCache(input, pathSpec),
emptyBehavior,
defaultValueOnEmpty,
errorBehavior,
defaultValueOnError);
}
// jsonApiCommonSyntaxWithCache
private final LoadingCache<String, JsonValueContext> cache =
CacheBuilder.newBuilder()
.maximumSize(FUNCTION_LEVEL_CACHE_MAX_SIZE.value())
.build(CacheLoader.from(JsonFunctions::jsonValueExpression));
public JsonPathContext jsonApiCommonSyntaxWithCache(String input,
String pathSpec) {
return jsonApiCommonSyntax(cache.getUnchecked(input), pathSpec);
}
// JsonFunctions::jsonValueExpression
public static JsonValueContext jsonValueExpression(String input) {
try {
return JsonValueContext.withJavaObj(dejsonize(input));
} catch (Exception e) {
return JsonValueContext.withException(e);
}
}
// dejsonize. JSON_PATH_JSON_PROVIDER def:
// JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new JacksonJsonProvider()
public static @Nullable Object dejsonize(String input) {
return JSON_PATH_JSON_PROVIDER.parse(input);
}
FUNCTION_LEVEL_CACHE_MAX_SIZE default value is 1_000, maybe you can modify this value to measure cpu usage.
doc:
/**
* The maximum number of items in a function-level cache.
*
* <p>A few SQL functions have expensive processing that, if its results are
* cached, can be reused by future calls to the function. One such function
* is {@code RLIKE}, whose arguments are a regular expression and a string.
* The regular expression needs to be compiled to a
* {@link java.util.regex.Pattern}. Compilation is expensive, and within a
* particular query, the arguments are often the same string, or a small
* number of distinct strings, so caching makes sense.
*
* <p>Therefore, functions such as {@code RLIKE}, {@code SIMILAR TO},
* {@code PARSE_URL}, {@code PARSE_TIMESTAMP}, {@code FORMAT_DATE} have a
* function-level cache. The cache is created in the code generated for the
* query, at the call site of the function, and expires when the query has
* finished executing. Such caches do not need time-based expiration, but
* we need to cap the size of the cache to deal with scenarios such as a
* billion-row table where every row has a distinct regular expression.
*
* <p>Because of how Calcite generates and executes code in Enumerable
* convention, each function object is used from a single thread. Therefore,
* non thread-safe objects such as {@link java.text.DateFormat} can be safely
* cached.
*
* <p>The value of this parameter limits the size of every function-level
* cache in Calcite. The default value is 1,000.
*/
public static final CalciteSystemProperty<Integer> FUNCTION_LEVEL_CACHE_MAX_SIZE =
intProperty("calcite.function.cache.maxSize", 1_000, v -> v >= 0);