0

I have a flink sql like this:

select
    json_value(json_str, '$.key1') as key1,
    json_value(json_str, '$.key2') as key2,
    json_value(json_str, '$.key3') as key3,
    json_value(json_str, '$.key4') as key4
from input_table

When I ran this SQL, I found that Flink actually parses this String four times, and the CPU usage is very high。Is there a way to parse it once and reuse this json object to save CPU.

My Flink Version: 1.16

1
  • Did you try it in the latest version, 1.20? It would also be interesting to hear the analysis you did to determine that it is being parsed four times Commented Mar 3 at 17:30

2 Answers 2

0

Sounds impossible.

I briefly read the source code, then it turns out that the results are cached.

source code version:

  • flink: 1.20
  • calcite: master

Flink json_value is mapped to Calcite function. src BuiltInFunctionDefinitions.java

public static final BuiltInFunctionDefinition JSON_QUERY =
        BuiltInFunctionDefinition.newBuilder()
                .name("JSON_QUERY")
                .kind(SCALAR)
                .callSyntax(JsonFunctionsCallSyntax.JSON_QUERY)
                .inputTypeStrategy(
                        sequence(
                                logical(LogicalTypeFamily.CHARACTER_STRING),
                                and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
                                TYPE_LITERAL,
                                symbol(JsonQueryWrapper.class),
                                SpecificInputTypeStrategies.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR,
                                SpecificInputTypeStrategies
                                        .JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR))
                .outputTypeStrategy(forceNullable(argument(2)))
                .runtimeDeferred()
                .build();

...
/**
 * Specifies that this {@link BuiltInFunctionDefinition} will be mapped to a Calcite
 * function.
 */
public Builder runtimeDeferred() {
    // This method is just a marker method for clarity. It is equivalent to calling
    // neither {@link #runtimeProvided} nor {@link #runtimeClass}.
    return this;
}

Calcite JSONfunction is stateful(cached by LoadingCache), src calcite/runtime/JsonFunctions.java

// org/apache/calcite/util/BuiltInMethod.java
JSON_VALUE(JsonFunctions.StatefulFunction.class, "jsonValue",
    String.class, String.class,
    SqlJsonValueEmptyOrErrorBehavior.class, Object.class,
    SqlJsonValueEmptyOrErrorBehavior.class, Object.class),

...
// org/apache/calcite/runtime/JsonFunctions.java
// jsonValue(String, String, ...) invoke by reflect
public @Nullable Object jsonValue(String input,
    String pathSpec,
    SqlJsonValueEmptyOrErrorBehavior emptyBehavior,
    Object defaultValueOnEmpty,
    SqlJsonValueEmptyOrErrorBehavior errorBehavior,
    Object defaultValueOnError) {
  return jsonValue(
      jsonApiCommonSyntaxWithCache(input, pathSpec),
      emptyBehavior,
      defaultValueOnEmpty,
      errorBehavior,
      defaultValueOnError);
}

// jsonApiCommonSyntaxWithCache
private final LoadingCache<String, JsonValueContext> cache =
    CacheBuilder.newBuilder()
        .maximumSize(FUNCTION_LEVEL_CACHE_MAX_SIZE.value())
        .build(CacheLoader.from(JsonFunctions::jsonValueExpression));

public JsonPathContext jsonApiCommonSyntaxWithCache(String input,
    String pathSpec) {
  return jsonApiCommonSyntax(cache.getUnchecked(input), pathSpec);
}

// JsonFunctions::jsonValueExpression
public static JsonValueContext jsonValueExpression(String input) {
  try {
    return JsonValueContext.withJavaObj(dejsonize(input));
  } catch (Exception e) {
    return JsonValueContext.withException(e);
  }
}

// dejsonize. JSON_PATH_JSON_PROVIDER def:
//     JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new JacksonJsonProvider()
public static @Nullable Object dejsonize(String input) {
  return JSON_PATH_JSON_PROVIDER.parse(input);
}

FUNCTION_LEVEL_CACHE_MAX_SIZE default value is 1_000, maybe you can modify this value to measure cpu usage.

doc:

/**
 * The maximum number of items in a function-level cache.
 *
 * <p>A few SQL functions have expensive processing that, if its results are
 * cached, can be reused by future calls to the function. One such function
 * is {@code RLIKE}, whose arguments are a regular expression and a string.
 * The regular expression needs to be compiled to a
 * {@link java.util.regex.Pattern}. Compilation is expensive, and within a
 * particular query, the arguments are often the same string, or a small
 * number of distinct strings, so caching makes sense.
 *
 * <p>Therefore, functions such as {@code RLIKE}, {@code SIMILAR TO},
 * {@code PARSE_URL}, {@code PARSE_TIMESTAMP}, {@code FORMAT_DATE} have a
 * function-level cache. The cache is created in the code generated for the
 * query, at the call site of the function, and expires when the query has
 * finished executing. Such caches do not need time-based expiration, but
 * we need to cap the size of the cache to deal with scenarios such as a
 * billion-row table where every row has a distinct regular expression.
 *
 * <p>Because of how Calcite generates and executes code in Enumerable
 * convention, each function object is used from a single thread. Therefore,
 * non thread-safe objects such as {@link java.text.DateFormat} can be safely
 * cached.
 *
 * <p>The value of this parameter limits the size of every function-level
 * cache in Calcite. The default value is 1,000.
 */
public static final CalciteSystemProperty<Integer> FUNCTION_LEVEL_CACHE_MAX_SIZE =
    intProperty("calcite.function.cache.maxSize", 1_000, v -> v >= 0);

Sign up to request clarification or add additional context in comments.

1 Comment

select json_map['key1'] as key1, json_map['key2'] as key2, json_map['key3'] as key3 ....... .. ..... ...... json_map['key100'] as key100 from( select json_map from input_table,lateral table(pass_through(json_tuple(json_str))) as T(json_map) ) I use lateral table to slove my problem ,thanks
0

'''


    json_map['key3'] as key3
    .......
    ..
    .....
    ......
    json_map['key100'] as key100

from(
        select json_map
        from input_table,lateral table(json_tuple(json_str)) as T(json_map)
    )

'''

I finally solved my problem through this method。this is json_tupe func

'''

public void eval(String jsonStr, String... keys) {
    HashMap<String, String> result = new HashMap<>(keys.length);
    if (jsonStr == null || jsonStr.isEmpty()) {
        collect(result);
        return;
    }
    try {
        JsonNode jsonNode = objectMapper.readTree(jsonStr);
        for (String key : keys) {
            JsonNode valueNode = jsonNode.get(key);

            if (valueNode != null) {
                result.put(key, valueNode.asText());
            }
        }
        collect(result);
    } catch (Exception e) {
        collect(result);
    }
}

'''

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.