CREATE TABLE server_logs_v1_k (
userid STRING,
log_time TIMESTAMP_LTZ(3),
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'server_logs_v1',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'serverlogs_v1_local_cg_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I have the above table and I am trying to this query
select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);
I have 3 message in kafka topic server_logs_v1
{"userid": "1","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "2","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "3","log_time": "2023-07-07T16:43:00.000Z"}
I am NOT seeing result for
select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);
but if i remove the session window function and use select userid, count(userid) from server_logs_v1_k group by userid i see result
Flink version is 1.16.0
How to do I get session window aggregation to work with kafka connector.