0

I want to try out the Match_Recognize operator in Flink SQL from the SQL client. For this, I have done the following setup for the source table

# A typical table source definition looks like:
 - name: TaxiRides
   type: source
   update-mode: append
   connector: 
       type: filesystem
       path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
   format: 
       type: csv
       fields:
           - name: rideId
             type: LONG
           - name: taxiId
             type: LONG
           - name: isStart
             type: BOOLEAN
           - name: lon
             type: FLOAT
           - name: lat
             type: FLOAT
           - name: rideTime
             type: TIMESTAMP
           - name: psgCnt
             type: INT
       line-delimiter: "\n"
       field-delimiter: ","

   schema:
    - name: rideId
      type: LONG
    - name: taxiId
      type: LONG
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: rideTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: psgCnt
      type: INT

When I start the session, I get the following error

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
        ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

So, my question is: Is it possible to read the source stream as a table from a file or does it have to be from Kafka?

UPDATE: I am using Flink version 1.9.1

3
  • If you want to read from a file, you could use batch job by writing a script which waits tills the file is complete and then starts batch job. For streaming, it's always better to use kafka. Commented Feb 13, 2020 at 13:57
  • @AnuragAnand, thanks for your reply. However, there is already a CSVAppendTableSource. So, why it isn't it possible to configure the table source like that or is because currently SQL Client is still in beta? Commented Feb 13, 2020 at 14:22
  • Could you please add your query? Commented Feb 18, 2020 at 11:32

1 Answer 1

1

Unfortunately you are hitting a limitation of the csv filesystem connector. This connector does not support rowtime attributes.

In 1.10 we started work on expressing watermarks and time attributes in a slightly different way. See for a reference: https://issues.apache.org/jira/browse/FLINK-14320.

You can try creating a Table from DDL with a WATERMARK declaration as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table It works for the blink planner only though (blink planner is the default implementation in sql-client starting from the 1.10 release).

Another option you have is reading from Kafka with a CSV format.

BTW this particular exception message was improved in FLINK 1.10. Since now Flink will tell the problematic properties.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.