747 questions
0
votes
1
answer
49
views
How to emit keyed records for a compacted topic (SimpleStringSchema ClassCastException)?
I'm upgrading a PyFlink job to 2.0 and want to write to a Kafka compacted topic using the new KafkaSink. The stream produces (key, value) tuples (key is a string, value is a JSON payload). I configure ...
0
votes
1
answer
42
views
High CPU usage from RowData serialization in Flink Table API despite ObjectReuse optimization
I have a Table API pipeline that does a 1-minute Tumbling Count aggregation over a set of 15 columns. FlameGraph shows that most of the CPU (~40%) goes into serializing each row, despite using ...
0
votes
1
answer
59
views
FLink sql with mini batch seems to trigger only on checkpoint
I have the following config set for my job
'table.exec.sink.upsert-materialize': 'NONE',
'table.exec.mini-batch.enabled': true,
'table.exec.mini-batch.allow-latency'...
0
votes
0
answers
62
views
Flink SQL Job: com.starrocks.data.load.stream.exception.StreamLoadFailException: Could not get load state because
I'm encountering a Flink job failure and would appreciate any input on what might be misconfigured:
2025‑07‑28 17:30:52
org.apache.flink.runtime.JobException: Recovery is suppressed by ...
2
votes
1
answer
57
views
Files stuck as .inprogress, not rolling into final Parquet files
I'm running a Flink streaming job using the Table API, which reads from Kafka and writes to S3 (for now I'm using a local path to simulate S3). The job uses a filesystem connector to write data in ...
0
votes
0
answers
89
views
PyFlink Python UDF Fails in Remote Cluster from Jupyter Notebook – Connection Refused from Python Harness
I'm trying to develop a PyFlink streaming job using a Python UDF. When executing from Jupyter Notebook (local) with execution.target = remote, the job fails at the Python environment initialization ...
0
votes
1
answer
65
views
Flink SQL - windows aggregation
I'm doing a Flink SQL stream processing, trying to do some windowing aggregation ... but I'm suffering that the job stops emitting new aggregations after some time (or after he catch up with all the ...
0
votes
1
answer
39
views
Unnable to run Flink Table API locally in version 1.19.2
i am trying to set up a local flink job that uses Table API to fetch data from a Kafka Source and printing it. Below is a snippet of the Flink Job
public static void main(String[] args) {
// ...
0
votes
1
answer
53
views
Flink sql resulting in chaining join, which is causing state to bloat
I have a this sql
select `from all tables`
FROM table1 i
LEFT JOIN table2 ip
ON i.tenantId = ip.tenantId AND i.id = ip.id
LEFT JOIN table2 t
ON i.tenantId = t.tenantId AND i.id = t.id
LEFT ...
0
votes
0
answers
73
views
Apache Iceberg table partitioning based on ID
Can I partition iceberg table in ID ranging in millions? Or Bucketing is the best option?
Am pushing 40- 50 million records from sql which has ID identity column using pyflink. And then I want to ...
0
votes
1
answer
30
views
Always missing one latest record when using count over partition
For Flik Kafka SQL source definition:
CREATE TEMPORARY TABLE person
(
payload STRING,
`headers` MAP<STRING, BYTES> METADATA,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
...
0
votes
0
answers
33
views
Flink SQL TaskExecutor Error: No Allocated Slots Despite Slot and Memory Configurations
I’ve been trying to create a table using the sqlserver-cdc connector in Flink with the following query:
CREATE TABLE files (
Id INT,
FileName STRING,
FileContent STRING,
CreatedAt TIMESTAMP(3),...
1
vote
0
answers
82
views
Get Exception after submit the pyFlink Job
I am a new for pyflink. I try to run submit a simple python to YARN application mode. But I got the error said cannot find the python file word_count.py. Below is my environment and the exception log. ...
0
votes
1
answer
68
views
Unable to start a pyFlink job from savepoint
I'm using Flink 1.20.0, and try to submit a pyFlink job and start it from aan existed savepoint, I execute in command line:
flink run --fromSavepoint s3a://.../1a4e1e73910e5d953183b8eb1cd6eb84/chk-1 -...
1
vote
1
answer
44
views
How sqlExecute queries run in Apache Flink when triggered via proccessFunction?? How are the SQL Tasks managed?
Context:
So, I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the ...
0
votes
2
answers
145
views
flink sql Repeatedly parsing JSON problem
I have a flink sql like this:
select
json_value(json_str, '$.key1') as key1,
json_value(json_str, '$.key2') as key2,
json_value(json_str, '$.key3') as key3,
json_value(json_str, '$....
0
votes
0
answers
23
views
Why does Flink throw parser error when adding additional elements to JSON_OBJECT?
I am trying to create a VIEW on table
CREATE TEMPORARY VIEW `transform_3` AS
SELECT
`first_name` AS `first_name`,
`id` AS `id`,
`event_time` AS `...
0
votes
0
answers
81
views
Flink and LAG function
I have created a table that reads from a Kafka topic. What I want is to sort by eventTime and add a new field that represents the previous value using the LAG function.
The problem comes when two ...
1
vote
1
answer
48
views
How to read state generate by flink sql code
I have this table
/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
`tenantId` string,
`id` string,
`name` string,
`headers` MAP<STRING, BYTES> METADATA ,
`hard_deleted` ...
0
votes
1
answer
78
views
Time Attribute Type for a TUMBLE with Apache Flink
I getting the following Exception in Flink.
The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)
Little bit research at the internet tells me, this problem is cause ...
0
votes
0
answers
52
views
Apache Flink SQL API Temporary Table connector type
I am experimenting little bit with Flink SQL API, basically what I am trying to do is working but I am stuck at one point.
I am reading some kafka topics and from the data I received I have to do some ...
-1
votes
1
answer
42
views
Is key uniqueness enforced within partitions or across all partitions?
Question:
I am working with Apache Flink (Flink SQL) to manage Hudi tables, and I noticed that Hudi supports multiple index types. According to the official documentation on Index Types in Hudi, these ...
3
votes
0
answers
182
views
How to handle an array of objects in apache flink with the Table API
I'm consuming a Kafka topic with Flink using the Table API, pretty much like this:
CREATE TEMPORARY TABLE data_topic (
`hash` STRING,
`date` STRING,
`values` ARRAY<STRING>,
...
0
votes
0
answers
54
views
BQ query to Flink SQL
WITH shopping_items_agg AS (
SELECT
order_number,
SUM(s.quantity) AS total_quantity
FROM
`booking`,
UNNEST(shopping_items) AS s
WHERE TIMESTAMP_TRUNC(event_timestamp, DAY) = ...
0
votes
0
answers
74
views
Unable to insert data to Kafka topic in Apache Flink using Upsert Kafka Connector in Python
I am working on building Apache Flink pipeline where source is SQL server CDC, and sink is Upsert Kafka, everything looks fine until when executing insert I got error of
Caused by: java.lang....