1

I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots

But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.

I don't know which step did I miss ?

Here's my configuration steps:

  1. Populate DB
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','[email protected]');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
  1. Create Signaling table & add a signal record
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data) 
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
  1. Config Source Connector and Start it.
{
    "name": "customer-adhoc",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "CDC",
        "database.hostname" : "sqlserver12",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.names" : "testDB",
        "snapshot.mode": "initial",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "database.encrypt": "false",
        "table.include.list": "dbo.customers,dbo.debezium_signal",
        "column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
        "schema.history.internal.store.only.captured.tables.ddl": "true",
        "schema.history.internal.store.only.captured.databases.ddl": "true",
        "incremental.snapshot.allow.schema.changes" : "true" ,
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",
        "schema.name.adjustment.mode": "avro",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "signal.data.collection": "testDB.dbo.debezium_signal",
        "signal.kafka.topic":"CDC.dbz-signal",
        "kafka.consumer.offset.commit.enabled": "true",
        "signal.kafka.groupId": "customer-kafka-signal",
        "signal.kafka.bootstrap.servers": "kafka12:9092"
    }
}
5
  • Snapshot, in all databases and the English language, means creating a copy of something, not retrieving the changes. From your description, Debezium did what you asked Commented Apr 3, 2024 at 8:05
  • BTW SQL Server has a lot of ways to track changes and replicate data for quite some time. There's snapshot, transactional and merge replication, since the 1990s. If you want to find out what changed since the last sync there's the cheap change tracking mechanism that's available in all versions and editions since 2000 or 2005. There's also the more resource-intensive CDC used by Debezium for some reason. If you want to replicate changes between SQL Server databases you don't need Debezium at all Commented Apr 3, 2024 at 8:09
  • The docs said: 'Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database'. So FYI, I think the snapshot result should be just 1 row in 'customers' table which is "'Edward','Walker','[email protected]'" Commented Apr 3, 2024 at 8:12
  • And yet you get everything, which means your assumptions are wrong. In this case, there are no additional-condition objects at all. From the docs, Each additional condition is an object that has data-collection and filter parameters. Commented Apr 3, 2024 at 8:23
  • I did edit the value of 'additional-conditions' to [{"data-collection": "dbo.customers" ,"filter":"last_name='Walker'"}]. So far, I get the idea of ad-hoc snapshot with filtering conditions is to capture all datas match filtering conditions in a table, which are 'WHERE' conditions. Am I right ? Commented Apr 3, 2024 at 8:46

2 Answers 2

0

There are 2 types of Debezium snapshot:

  • Initial snapshot using snapshot.mode parameter. You have initial mode. It makes a full snapshot when you restart the connector. That's why you have all rows captured at the beginning.
  • Incremental ad hoc snapshot using signal table. Each time you insert a signal, you get the requested set of data. So, it is easy to check: just insert a request to the signal table when connector is started. If it does not work, review format of your request. For instance, "additional-conditions" should be according to the documentation. Or you can insert it without any conditions first, and then if it works, add "additional-conditions".
Sign up to request clarification or add additional context in comments.

Comments

0

I solved my issue. Thanks to Artem and Panagiotis Kanavos. When Connector's configuration "snapshot.mode" is set to "initial", all rows in the table were captured. Snapshot, can be done with subset datas of a source table as mentioned before. The one that cause full snapshot is "initial" ( I was careless not to pay attention to this option ).

So, here's my procedure:

  1. Set "snapshot.mode" to "schema-only". ( Only changes from now on will be captured )
  2. Restart Connector.
  3. Add Signal Record to Signaling Table

The Ad-Hoc snapshot works like charm.

1 Comment

Hi! i'm just making sure, if I created the connector with initial as the snapshot mode, I can change the mode and everything will remain the same? (topic message-wise) And when you used the signal when you were in initial mode , did you just get all the rows for the table? because I saw that signaling is not supported in initial mode. Thanks in advance!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.