0

I just started using ksql, when I do print topic from beginning I get data in below format.

   rowtime: 4/12/20, 9:00:05 AM MDT, key: {"messageId":null}, value: {"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

But all the examples in KSQL have the data in below format

{"ROWTIME":1537436551210,"ROWKEY":"3375","rating_id":3375,"user_id":2,"stars":3,"route_id":6972,"rating_time":1537436551210,"channel":"web","message":"airport refurb looks great, will fly outta here more!"}

so I'm not able to perform any operations, the format is showing as

Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING

on my topic. How can I modify the data into the specific format?

Thanks

1
  • I tried to create stream with Value_format as Avro but still it's not working. Commented Apr 13, 2020 at 3:11

1 Answer 1

1

ksqlDB does not yet support JSON message keys, (See the tracking Github issue).

However, you can still access the data, both in the key and the value. The JSON key is just a string after all!

The value, when reformatted, looks like this:

{
  "WHS":[
    {
      "Character Set":"UTF-8",
      "action":"finished",
      "Update-Date-Time":"2020-04-11 09:00:02:25",
      "Number":0,
      "Abbr":"",
      "Name":"",
      "Name2":"", 
      "Country-Code":"",
      "Addr-1":"",
      "Addr-2":"",
      "Addr-3":"",
      "Addr-4":"",
      "City":"",
      "State":""
    }
  ]
}

Which, assuming all rows share a common format, ksqlDB can easily handle.

To import your stream you should be able to run something like this:

-- assuming v0.9 of Kafka
create stream stuff 
  (
    ROWKEY STRING KEY,
    WHS ARRAY<
      STRUCT<
        `Character Set` STRING,
        action STRING,
        `Update-Date-Time` STRING,
        Number STRING,
        ... etc
      >
    >
   )
   WITH (kafka_topic='?', value_format='JSON');

The value column WHS is an array of structs, (where the will be only one element), and the struct defines all the fields you need to access. Note, some field names needed quoting as they contained invalid characters, e.g. spaces and dashes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.