0

from one of the source systems i received the below event payload

Created Stream1 for the below json payload

Event JSON 1

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"40000",
   "apr":"2.6%",
   "minpayment":"400",
   "status":"Approved"
  }
 }
}

Event JSON 2

{
 "event": {
  "header": {
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{"customerIdentifiers":[
   {"customerIdentifier":"1234","customerIdType":"cc"},
   {"customerIdentifier":"234","customerIdType":"id"}
  ],
  "accountIdentifiers":[
   {"accountIdentifier":"123","accountIdType":"no"},
   {"accountIdentifier":"Primary","accountIdType":"da"}
  ],
  "eventDetails":{
   "offeramount":"70000",
   "apr":"3.6%",
   "minpayment":"600",
   "status":"Rejected"
  }
 }
}

I have created aggregation table on the the above stream1

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    avg(minpayment) as Avg_MinPayment, 
    avg(apr) AS Avg_APr, 
    avg(offeramount) AS Avgofferamount , 
    status 
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

I got the above result from KTable and KTable Topic json look like this

Aggregate JSON1

PRINT 'EVENT_TABLE';

{
  "Status" : "Approved", 
  "Avg_Minpayment" : "400", 
  "Avg_APr" : "2.6%", 
  "offeramount" : "40000"
}

Aggregate JSON2

{
  "Status" : "Rejected", 
  "Avg_Minpayment" : "600", 
  "Avg_APr" : "3.6%", 
  "offeramount" : "70000"
}

But i have to Construct and publish the final target json on output topic like below json format. i have to add the header and body to the aggregate json1 and aggregate json2.

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}
7
  • 1
    Please format your code to make things easier for users in the future =) Commented Aug 12, 2020 at 10:37
  • It would also help if your example worked. For example, your sql uses avg(salary), but there is no salary in your source data. Commented Aug 12, 2020 at 10:45
  • Unfortunately, I don't have enough information to comment more. Your required output seems to be grouping multiple inputs together. But you've not explained how you want them grouped, or provided sufficient example input. Provide a more detailed description of your problem and I may be able to help. Commented Aug 12, 2020 at 10:47
  • @AndrewCoates. that is sample data i might miss the salary but main problem is i will get the aggregation data like this {"Status" : "Approved", "Avg_Sal" : "10000", "Avg_APr" : "3.6%", "offeramount" : "40000"} and i want add header and repeated attributes to the final output json data. how can i build the json Commented Aug 12, 2020 at 14:04
  • For Header attributes i can add any static values it is just for structure only {"header":{"name":"abc","version":"1.0","producer":"123","channel":"lab","countryCode":"US"}.... want to add this header to my aggregated results json. is there any way to concatenate? or any solution to structure whole json in streams? Commented Aug 12, 2020 at 14:13

1 Answer 1

1

It's not terribly clear what you're trying to achieve, given that your example SQL won't produce the example output, given then example input. In fact your example SQL would fail with unknown column errors.

Something like the following would generate your example output:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

Next, your example output...

Status | Avg_MinPayment | Avg_APr | Avgofferamount 
-----------------------------------------
Approved | 400 | 2.6% | 40000

Rejected | 600 | 3.6% | 70000

...is outputting one row per status. Yet, the output you say you want to achieve ...

{
 "event":{
  "header":{
   "name":"abc",
   "version":"1.0",
   "producer":"123",
   "channel":"lab",
   "countryCode":"US"
  },
  "body":{
   "Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
  ]
 }
}

...contains both statuses, i.e. its combining both of your example input messages into a single output.

If I'm understanding you correctly, and you do indeed want to output the above JSON, then:

You would first need to include the event information. But which event information? If you know they're always going to be the same, then you can use:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    status,
    latest_by_offset(event) as event,
    avg(eventDetails->minpayment) as Avg_MinPayment, 
    avg(eventDetails->apr) AS Avg_APr, 
    avg(eventDetails->offeramount) AS Avgofferamount
  FROM STREAM1 
  GROUP BY status 
  EMIT CHANGES;

The latest_by_offset aggregate function will capture the event information from the last message it saw. Though I'm not convinced this is what you want. Could you not be getting other rejected and accepted messages with different event information? If it is the event information that identifies which messages should be grouped together, then something like this might give you something close to what you want:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event,
    collect_list(eventDetails) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;

If this is close, then you may want to use the STRUCT constructor and AS_VALUE function to restructure your output. For example:

CREATE TABLE EVENT_TABLE AS 
  SELECT 
    event as key,
    AS_VALUE(event) as event,
    STRUCT(
      keys := collect_list(eventDetails)
    ) as body
  FROM STREAM1 
  GROUP BY event 
  EMIT CHANGES;
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.