3

I'm trying to join two existing Kafka topics in KSQL. Some data samples from Kafka (actual values redacted due to corporate environment):

device topic:

{
  "persistTime" : "2020-10-06T13:30:25.373Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

device-event topic (1st sample):

{
  "device" : "REDACTED",
  "event" : "403151",
  "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-09-30T11:03:50.000Z",
  "persistTime" : "2020-09-30T14:32:59.580Z",
  "state" : "open",
  "context" : {
    "2" : "25",
    "3" : "0",
    "4" : "60",
    "1" : "REDACTED"
  }
}

device-event topic (2nd sample):

{
  "device" : "REDACTED",
  "event" : "402004",
  "firstOccurrenceTime" : "2020-10-07T07:02:48Z",
  "lastOccurrenceTime" : "2020-10-07T07:02:48Z",
  "occurrenceCount" : 1,
  "receiveTime" : "2020-10-07T07:02:48Z",
  "persistTime" : "2020-10-07T07:15:28.533Z",
  "state" : "open",
  "context" : {
    "2" : "2020-10-07T07:02:48.0000000Z",
    "1" : "REDACTED"
  }
}

The issue that I'm facing is the varying amount of variables inside of context under the device-event topic.

I've tried the following statements for creating the events stream on ksqlDB:

CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');

The second statement only brings in data that has all four context variables present ("1", "2", "3" and "4").

How would one go about creating the KSQL equivalent stream for the device-event Kafka topic?

1 Answer 1

9

You need to use a MAP rather than a STRUCT.

BTW you also don't need the \ line separator any more :)

Here's a working example using ksqlDB 0.12.

  • Load the sample data into a topic

    kafkacat -b localhost:9092 -P -t events <<EOF
    { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
    { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
    EOF
    
  • In ksqlDB, declare the stream:

    CREATE STREAM "events" (
        "device" VARCHAR,
        "event" VARCHAR,
        "firstOccurenceTime" VARCHAR,
        "lastOccurenceTime" VARCHAR,
        "occurenceCount" INTEGER,
        "receiveTime" VARCHAR,
        "persistTime" VARCHAR,
        "state" VARCHAR,
        "context" MAP < VARCHAR, VARCHAR >
    ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
    
  • Query the stream to check things work:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
    +----------+--------+--------------------------+--------+------------------------------------+
    |device    |event   |receiveTime               |state   |context                             |
    +----------+--------+--------------------------+--------+------------------------------------+
    |REDACTED  |403151  |2020-09-30T11:03:50.000Z  |open    |{1=REDACTED, 2=25, 3=0, 4=60}       |
    |REDACTED  |402004  |2020-10-07T07:02:48Z      |open    |{1=REDACTED, 2=2020-10-07T07:02:48.0|
    |          |        |                          |        |000000Z}                            |
    
  • Use the [''] syntax to access specific keys within the map:

    ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
    +-----------+--------+------------------------------------+-----------+-----------+
    |device     |event   |context                             |CONTEXT_1  |CONTEXT_3  |
    +-----------+--------+------------------------------------+-----------+-----------+
    |REDACTED   |403151  |{1=REDACTED, 2=25, 3=0, 4=60}       |REDACTED   |0          |
    |REDACTED   |402004  |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED   |null       |
    |           |        |000000Z}                            |           |           |
    
Sign up to request clarification or add additional context in comments.

2 Comments

This solved the problem I was having, thanks. Now I want to merge the two topics (device & device-event) to include the location from the device topic to the device-event topic. Check the updated question for the KSQL select statement that I'm executing.
Hi @UltraAlkaline, if you have a new question, please post it as a new question and try to avoid adding another question to your existing one. :-) Stackoverflow is meant to be a Q&A site with one question and one (accepted) answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.