Skip to main content
Filter by
Sorted by
Tagged with
0 votes
1 answer
49 views

I'm upgrading a PyFlink job to 2.0 and want to write to a Kafka compacted topic using the new KafkaSink. The stream produces (key, value) tuples (key is a string, value is a JSON payload). I configure ...
Sudhakar's user avatar
0 votes
1 answer
42 views

I have a Table API pipeline that does a 1-minute Tumbling Count aggregation over a set of 15 columns. FlameGraph shows that most of the CPU (~40%) goes into serializing each row, despite using ...
Tomás Cerdá's user avatar
0 votes
1 answer
59 views

I have the following config set for my job 'table.exec.sink.upsert-materialize': 'NONE', 'table.exec.mini-batch.enabled': true, 'table.exec.mini-batch.allow-latency'...
hitesh's user avatar
  • 389
0 votes
0 answers
62 views

I'm encountering a Flink job failure and would appreciate any input on what might be misconfigured: 2025‑07‑28 17:30:52 org.apache.flink.runtime.JobException: Recovery is suppressed by ...
iman soltani's user avatar
2 votes
1 answer
57 views

I'm running a Flink streaming job using the Table API, which reads from Kafka and writes to S3 (for now I'm using a local path to simulate S3). The job uses a filesystem connector to write data in ...
Tuan Duy's user avatar
0 votes
0 answers
89 views

I'm trying to develop a PyFlink streaming job using a Python UDF. When executing from Jupyter Notebook (local) with execution.target = remote, the job fails at the Python environment initialization ...
Apicha's user avatar
  • 33
0 votes
1 answer
65 views

I'm doing a Flink SQL stream processing, trying to do some windowing aggregation ... but I'm suffering that the job stops emitting new aggregations after some time (or after he catch up with all the ...
JPG's user avatar
  • 1,138
0 votes
1 answer
39 views

i am trying to set up a local flink job that uses Table API to fetch data from a Kafka Source and printing it. Below is a snippet of the Flink Job public static void main(String[] args) { // ...
Ruoethren Pugunisparam's user avatar
0 votes
1 answer
53 views

I have a this sql select `from all tables` FROM table1 i LEFT JOIN table2 ip ON i.tenantId = ip.tenantId AND i.id = ip.id LEFT JOIN table2 t ON i.tenantId = t.tenantId AND i.id = t.id LEFT ...
hitesh's user avatar
  • 389
0 votes
0 answers
73 views

Can I partition iceberg table in ID ranging in millions? Or Bucketing is the best option? Am pushing 40- 50 million records from sql which has ID identity column using pyflink. And then I want to ...
user3692929's user avatar
0 votes
1 answer
30 views

For Flik Kafka SQL source definition: CREATE TEMPORARY TABLE person ( payload STRING, `headers` MAP<STRING, BYTES> METADATA, `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', ...
王子1986's user avatar
  • 3,659
0 votes
0 answers
33 views

I’ve been trying to create a table using the sqlserver-cdc connector in Flink with the following query: CREATE TABLE files ( Id INT, FileName STRING, FileContent STRING, CreatedAt TIMESTAMP(3),...
Mushegh Hovhannisyan's user avatar
1 vote
0 answers
82 views

I am a new for pyflink. I try to run submit a simple python to YARN application mode. But I got the error said cannot find the python file word_count.py. Below is my environment and the exception log. ...
Alvin Kam's user avatar
0 votes
1 answer
68 views

I'm using Flink 1.20.0, and try to submit a pyFlink job and start it from aan existed savepoint, I execute in command line: flink run --fromSavepoint s3a://.../1a4e1e73910e5d953183b8eb1cd6eb84/chk-1 -...
Rinze's user avatar
  • 834
1 vote
1 answer
44 views

Context: So, I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the ...
Sai Ashrritth Patnana's user avatar
0 votes
2 answers
145 views

I have a flink sql like this: select json_value(json_str, '$.key1') as key1, json_value(json_str, '$.key2') as key2, json_value(json_str, '$.key3') as key3, json_value(json_str, '$....
xinfa's user avatar
  • 1
0 votes
0 answers
23 views

I am trying to create a VIEW on table CREATE TEMPORARY VIEW `transform_3` AS SELECT `first_name` AS `first_name`, `id` AS `id`, `event_time` AS `...
md1980's user avatar
  • 349
0 votes
0 answers
81 views

I have created a table that reads from a Kafka topic. What I want is to sort by eventTime and add a new field that represents the previous value using the LAG function. The problem comes when two ...
Guille's user avatar
  • 2,320
1 vote
1 answer
48 views

I have this table /** mode('streaming')*/ CREATE OR REPLACE TABLE eoj_table ( `tenantId` string, `id` string, `name` string, `headers` MAP<STRING, BYTES> METADATA , `hard_deleted` ...
hitesh's user avatar
  • 389
0 votes
1 answer
78 views

I getting the following Exception in Flink. The window function requires the timecol is a time attribute type, but is TIMESTAMP(3) Little bit research at the internet tells me, this problem is cause ...
posthumecaver's user avatar
0 votes
0 answers
52 views

I am experimenting little bit with Flink SQL API, basically what I am trying to do is working but I am stuck at one point. I am reading some kafka topics and from the data I received I have to do some ...
posthumecaver's user avatar
-1 votes
1 answer
42 views

Question: I am working with Apache Flink (Flink SQL) to manage Hudi tables, and I noticed that Hudi supports multiple index types. According to the official documentation on Index Types in Hudi, these ...
wancrin potter's user avatar
3 votes
0 answers
182 views

I'm consuming a Kafka topic with Flink using the Table API, pretty much like this: CREATE TEMPORARY TABLE data_topic ( `hash` STRING, `date` STRING, `values` ARRAY<STRING>, ...
pfeigl's user avatar
  • 587
0 votes
0 answers
54 views

WITH shopping_items_agg AS ( SELECT order_number, SUM(s.quantity) AS total_quantity FROM `booking`, UNNEST(shopping_items) AS s WHERE TIMESTAMP_TRUNC(event_timestamp, DAY) = ...
Rahul Bera's user avatar
0 votes
0 answers
74 views

I am working on building Apache Flink pipeline where source is SQL server CDC, and sink is Upsert Kafka, everything looks fine until when executing insert I got error of Caused by: java.lang....
KS Bhale's user avatar

1
2 3 4 5
15