Created by Berke Düzgün
Debezium sunumuna hoşgeldiniz
CDC'nin Tanımı ve Amacı
Kritik verilerin bulunduğu veritabanı uygulamalarında, veriler üzerindeki değişiklikleri veri güvenliği amacıyla kaydetmemiz gerekir.
Bunun yapılması için çeşitli yöntemler bulunmaktadır: Tetikleyiciler(triggers), saklı yordamlar(stored procedures) vb. Bu yöntemlere ek olarak, SQL Server geliştiricileri bu yöntemlerden daha performanslı ve sisteme daha az yük getirmek amacıyla yeni bir çözüm ürettiler.
"CDC" Veritabanında insert, update ve delete işlemleri neticesinde nihai olarak öncesinden farklı olan, bir başka deyişle değişiklik gösteren verileri yakalamamızı, izlememizi ve işlem yapabilmemizi sağlayan bir tasarım modelidir.
Bu model sayesinde verilerin ilk ve son hallerini elde edebilir ve türlü operasyonlar gerçekleştirebiliriz. Tabi ki de CDC’yi kullanabilmek için ilgili veritabanı tarafından destekleniyor olması gerekmektedir.
Tabi ki de trigger ile bu işlemleri gerçekleştirebilirsiniz. Lakin CDC ile ihtiyaca dönük operasyonları daha hızlı bir şekilde işleyebilirsiniz. Çünkü CDC verilere dair tüm bilgileri veritabanı loglarından toplayarak hizmet sağlamaktadır
CDC, kaynak sistemdeki değişiklikleri yakalayarak bu değişiklikleri hedef sistemlere aktarır.
Bu sayede, farklı sistemler arasında veri entegrasyonu ve senkronizasyonu sağlanır. Sistemler arasında güncel ve doğru veri akışı elde edilir.
CDC, log tabanlı veya tablo tabanlı yöntemler kullanarak değişiklikleri takip eder. Log tabanlı CDC, veritabanının log dosyalarını analiz ederek değişiklikleri yakalar. Tablo tabanlı CDC ise özel olarak tasarlanmış tablolar kullanarak değişiklikleri izler.
Debezium'un Tanımı ve Amacı
Binlerce yazılım projesi ve şirket tarafından kullanılan, publisher/subscriber tabanlı olan open source distributed streaming platformudur. Temelde bilinmesi gereken Producer, Consumer, Broker ve Topic olmak üzere dört terminolojik terimi mevcuttur
Kafka’nın veritabanlarına, consule yahut eleasticsearch gibi yapılara bağlanabilmesini ve metrikleri alabilmesini sağlayan bir tool’dur. Hususi olarak içeriğimiz çerçevesinde değerlendirirsek eğer Kafka ile Debezium arasında veri akışını sağlayacak olan köprü görevi gören yapılanmadır diyebiliriz.
Birçok yapılandırma bilgisinin, distributed mimariler tarafından şu veya bu şekilde kullanıldığını biliyoruz. Tabi bu bilgilerin yönetimi manuel bir şekilde yapıldığı taktirde uygulanmalarının zorluğu nedeniyle insani durumlardan kaynaklı hatalar kaçınılmaz olabilmektedir
Haliyle, Kafka’nın yapılandırma bilgilerini depolamak, adlandırmak, senkronizasyonunu sağlamak ve koordine etmek için ZooKeeper yazılımını kullanıyor olacağız
İlk olarak Debezium’u hangi veritabanıyla kullanırsak kullanalım temelde Kafka ve Debezium çalıştırılmalı ve ZooKeeper ile yapılandırmalar ayarlanmalıdır.
Tüm bu yazılımlarım ayağa kaldırılması için Docker’dan istifade ediyor olacağız. Şimdi gelin ihtiyacımız olan araçları hızlıca oluşturup, ayağa kaldıralım.
version: '3.1'
services:
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
postgres:
image: debezium/postgres
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=DebeziumDB
connect:
image: debezium/connect
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- zookeeper
- kafka
Önceki slide bulunan docker compose dosyasına bakarsanız eğer ihtiyaç doğrultusundaki ilgili tüm araçları barındırmaktadır. Tüm image’lar da debezium’lu versiyonları kullanmamızın nedeni debezium ile çalışmaları için gerekli ayarlarının etkin gelmesindendir.
PostgreSQL’de ise debezium’un sağlıklı çalışabilmesi için wal_level konfigürasyonuna ‘logical’ değeri verilmelidir.
Bunun nedeni, PostgreSQL’de herhangi bir transaction yürütüldüğünde öncelikle bu işlem Write-ahead logging(WAL) adı verilen yerde gerçekleştirilmektedir. Debezium’da gerekli verileri WAL’dan toplayacağı için bu şekilde ayarlanması gerekmektedir
Haliyle debezium/postgres bu konfigürasyon temellerinde gelmektedir. Şimdi PostgreSQL sunucusunun yapılandırmalarına bir göz atıp öyle devam edelim.
docker-compose -f docker-compose-postgres.yaml up
Şimdi bu docker compose dosyasını bu talimat eşliğinde çalıştıralım.
create schema exampleschema
set search_path TO exampleschema,public;
create table exampleschema.exampletable (
column1 int,
column2 int,
column3 varchar(150),
primary key(column1)
);
alter table exampleschema.exampletable replica identity FULL;
insert into exampleschema.exampletable values (1000, 2000, 'example value');
insert into exampleschema.exampletable values (1001, 2001, 'example value 2');
PostgreSQL sunucusunda yukarıdaki docker-compose-postgres.yaml dosyasında oluşturulan ‘DebeziumDB’ veritabanı içerisinde aşağıdaki örnek tabloyu ve şemayı oluşturalım ve ardından örnek bir kaç veri insert edelim.
Şimdi ise debezzium connector’ın bizlere sunmuş olduğu API aracılığıyla dinlenecek olan veritabanı ve tablo bilgilerinin verilmesi gerekmektedir. Bunun için sonraki slaytdaki gibi bir .json dosyası oluşturarak gerekli bilgileri içerisinde tanımlamamız gerekmektedir.
{
"name": "example-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "123456",
"database.dbname" : "DebeziumDB",
"database.server.name": "exampleserver",
"schema.include.list": "exampleschema",
"table.whitelist": "exampleschema.exampletable"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Curl üzerinden yukarıdaki post isteğini göndermemiz yeterlidir.
Böylece bu son hamle eşliğinde bir connector tanımlayarak debezium’u etkinleştirmiş bulunmaktayız. Connector’ı etkinleştirme neticesinde yukarıdaki görselde tek satır halinde gelen result’ı doğru düzgün görmek isteyenler için formatlandırıp aşağı slayta bırakıyorum;
{
"name":"example-connector",
"config":{
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max":"1",
"database.hostname":"postgres",
"database.port":"5432",
"database.user":"postgres",
"database.password":"123456",
"database.dbname":"DebeziumDB",
"database.server.name":"exampleserver",
"schema.include.list":"exampleschema",
"table.whitelist":"exampleschema.exampletable",
"name":"example-connector"
},
"tasks":[
],
"type":"source"
}
Connector’ı etkinleştirme neticesinde result
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic exampleserver.exampleschema.exampletable
Ve şimdi ise debezium connector ile takip edilen veritabanı sunucusundaki ilgili tablodaki veri değişikliklerinin izlemesini gerçekleştirelim. Bunun için yukarıdaki docker talimatının verilmesi yeterlidir.
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"column1"
}
],
"optional":false,
"name":"exampleserver.exampleschema.exampletable.Key"
},
"payload":{
"column1":1000
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"column1"
},
{
"type":"int32",
"optional":true,
"field":"column2"
},
{
"type":"string",
"optional":true,
"field":"column3"
}
],
"optional":true,
"name":"exampleserver.exampleschema.exampletable.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"column1"
},
{
"type":"int32",
"optional":true,
"field":"column2"
},
{
"type":"string",
"optional":true,
"field":"column3"
}
],
"optional":true,
"name":"exampleserver.exampleschema.exampletable.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"exampleserver.exampleschema.exampletable.Envelope"
},
"payload":{
"before":null,
"after":{
"column1":1000,
"column2":2000,
"column3":"example value"
},
"source":{
"version":"1.8.1.Final",
"connector":"postgresql",
"name":"exampleserver",
"ts_ms":1647763176365,
"snapshot":"true",
"db":"DebeziumDB",
"sequence":"[null,\"23723656\"]",
"schema":"exampleschema",
"table":"exampletable",
"txId":557,
"lsn":23723656,
"xmin":null
},
"op":"r",
"ts_ms":1647763176368,
"transaction":null
}
}
Yukarıdaki json datayı incelerseniz eğer 177 ile 200. satır aralığında değişiklik olan verileri içeren ‘payload’ alanı mevcuttur.
Yakalanan verilerin hangi aksiyona tabii tutulduğunu 198. satırdaki ‘op’ alanındaki c, u, d ve r değerleri ifade etmektedir.
update exampleschema.exampletable
set
column1 = 5000,
column2 = 6000,
column3 = 'updated example value'
where column1 = 1000
Şimdi test amaçlı yukarıdaki veritabanı sorgularını çalıştıralım.
using Confluent.Kafka;
ConsumerConfig config = new()
{
GroupId = "exampleserver.exampleschema.exampletable",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
using IConsumer< Ignore, string> consumer = new ConsumerBuilder< Ignore, string>(config).Build();
consumer.Subscribe("exampleserver.exampleschema.exampletable");
CancellationTokenSource source = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
source.Cancel();
};
while (true)
{
ConsumeResult< Ignore, string> result = consumer.Consume(source.Token);
Console.WriteLine($"Topic Name : {result.TopicPartitionOffset}");
Console.WriteLine($"Message : {result.Value}");
}