Debezium Sunumu

Created by Berke Düzgün

Selamlar herkese

Debezium sunumuna hoşgeldiniz

Giriş

CDC'nin Tanımı ve Amacı


Down arrow

Debezium Nedir Sorusundan Önce CDC (Change Data Capture) Nedir Sorusuna Cevap Arayalım

Kritik verilerin bulunduğu veritabanı uygulamalarında, veriler üzerindeki değişiklikleri veri güvenliği amacıyla kaydetmemiz gerekir.

Bunun yapılması için çeşitli yöntemler bulunmaktadır: Tetikleyiciler(triggers), saklı yordamlar(stored procedures) vb. Bu yöntemlere ek olarak, SQL Server geliştiricileri bu yöntemlerden daha performanslı ve sisteme daha az yük getirmek amacıyla yeni bir çözüm ürettiler.

"CDC" Veritabanında insert, update ve delete işlemleri neticesinde nihai olarak öncesinden farklı olan, bir başka deyişle değişiklik gösteren verileri yakalamamızı, izlememizi ve işlem yapabilmemizi sağlayan bir tasarım modelidir.

Bu model sayesinde verilerin ilk ve son hallerini elde edebilir ve türlü operasyonlar gerçekleştirebiliriz. Tabi ki de CDC’yi kullanabilmek için ilgili veritabanı tarafından destekleniyor olması gerekmektedir.

Verisel değişiklikleri izleyebilmek için trigger?? 🤔

Tabi ki de trigger ile bu işlemleri gerçekleştirebilirsiniz. Lakin CDC ile ihtiyaca dönük operasyonları daha hızlı bir şekilde işleyebilirsiniz. Çünkü CDC verilere dair tüm bilgileri veritabanı loglarından toplayarak hizmet sağlamaktadır


Bu sebepten dolayı trigger’lara nazaran daha performanslı ve az maliyetli bir iş süreci sağlamış bulunmaktadır. Ayrıca CDC sayesinden, veritabanında istenilen tablonun tamamı yahut istek doğrultusunda bir kısmı üzerinde yapılan değişiklikler takip edilerek süreçteki verisel akış filtrelenebilmekte ve arzu edinildiği şekilde yönetilebilmektedir.

CDC

Devam Edelim


Down arrow

CDC (Change Data Capture) Özellikleri

CDC, kaynak sistemdeki değişiklikleri yakalayarak bu değişiklikleri hedef sistemlere aktarır.

Bu sayede, farklı sistemler arasında veri entegrasyonu ve senkronizasyonu sağlanır. Sistemler arasında güncel ve doğru veri akışı elde edilir.

CDC, veritabanındaki değişiklikleri gerçek zamanlı olarak takip eder ve hedef sistemlere anında aktarır. Bu sayede, veri entegrasyonu süreci hızlanır ve güncel veriler elde edilir.

CDC, log tabanlı veya tablo tabanlı yöntemler kullanarak değişiklikleri takip eder. Log tabanlı CDC, veritabanının log dosyalarını analiz ederek değişiklikleri yakalar. Tablo tabanlı CDC ise özel olarak tasarlanmış tablolar kullanarak değişiklikleri izler.

PostgreSQL ve CDC


Down arrow

PostgreSQL'in kendisi doğal olarak CDC özelliği sağlamaz. Ancak, PostgreSQL'in Write-Ahead Log (WAL) mekanizması, CDC için kullanılabilir.

PostgreSQL, yapılan değişiklikleri kaydetmek için WAL mekanizmasını kullanır. CDC, bu WAL loglarını analiz ederek yapılan değişiklikleri yakalar ve hedef sistemlere aktarır.

Ve Geldik Sunum Konumuza

PostgreSQL için üçüncü taraf CDC eklentileri mevcuttur. Bu eklentiler, CDC işlemlerini kolaylaştırır ve PostgreSQL ile diğer sistemler arasında veri entegrasyonunu sağlar. (Debezium, Londiste, Bucardo)

Debezium

Debezium'un Tanımı ve Amacı


Down arrow

Debezium Nedir ?

Debezium, açık kaynaklı bir Change Data Capture(CDC) aracıdır ve geniş bir kullanıcı topluluğu tarafından desteklenir. Bu sayede, geliştirme süreçleri hızlanır ve kullanıcılar arasında deneyim paylaşımı sağlanır

Debezium, veritabanında istenilen tabloda yahut o tablonun istenilen kolonlarında olan tüm verisel değişiklikleri yakalayarak Kafka’ya aktarır. Bu değişiklikleri yakalama süreci, Debezium’un çalışmadığı süreçlerde de pasif bir şekilde de gerçekleştirilebilmektedir.

Debezium, veritabanındaki değişiklikleri gerçek zamanlı olarak takip eder ve bu değişiklikleri çeşitli formatlarda (JSON, Avro, Protobuf vb.) hedef sistemlere aktarır. Bu sayede, sistemler arasında güncel ve doğru veri akışı sağlanır.

Peki bunları nasıl yapıyor?

Veritabanı’nın transaction log’unu okuyarak
Debezium ile Kafka Connect Apı yardımı ile Kafka Topic’e aktarılmasını sağlamaktayız. Bu bize yakaladığı verilerin dayanıklılığı ve güvenilirliğini de sağlamda yardımcı olur.

Şimdi demo uygulama üzerinden daha da ayrıntılara gireceğiz

Ama bu demoda kullandığımız toollara kısa bir ön bilgilendirme yapmak isterim

Kafka Nedir

Binlerce yazılım projesi ve şirket tarafından kullanılan, publisher/subscriber tabanlı olan open source distributed streaming platformudur. Temelde bilinmesi gereken Producer, Consumer, Broker ve Topic olmak üzere dört terminolojik terimi mevcuttur

  • Producer, publisher rolündedir. Subscriber’lara mesaj gönderir.
  • Consumer, subscriber rolündedir. Producer’ların ürettiği mesajları dinlemekte ve tüketmektedir.
  • Topic, Kafka’da mesajların tutulduğu yapıdır.
  • Broker, Topic’leri barındıran sunuculardır.
Debezium ile veritabanında yakalanan değişikliklerin Kafka’ya aktarılabilmesi için verilerin bunun anlayabileceği dilden bir formata dönüştürülmesi gerekmektedir. Bunun için Kafka Connect tool’unu kullanıyor olacağız.

Kafka Connect Nedir?

Kafka’nın veritabanlarına, consule yahut eleasticsearch gibi yapılara bağlanabilmesini ve metrikleri alabilmesini sağlayan bir tool’dur. Hususi olarak içeriğimiz çerçevesinde değerlendirirsek eğer Kafka ile Debezium arasında veri akışını sağlayacak olan köprü görevi gören yapılanmadır diyebiliriz.

Son olarak ZooKeeper Nedir?

Birçok yapılandırma bilgisinin, distributed mimariler tarafından şu veya bu şekilde kullanıldığını biliyoruz. Tabi bu bilgilerin yönetimi manuel bir şekilde yapıldığı taktirde uygulanmalarının zorluğu nedeniyle insani durumlardan kaynaklı hatalar kaçınılmaz olabilmektedir

Hadi diyelim doğru bir şekilde yönetildiler, bu sefer de uygulamalar dağıtıldığında yönetim karmaşası gibi durumlarla karşılaşılabilmektedir. İşte ZooKeeper, tüm bu riskleri gözardı edip güvenilir distributed yapılandırma yapmamızı sağlamaktadır.

Haliyle, Kafka’nın yapılandırma bilgilerini depolamak, adlandırmak, senkronizasyonunu sağlamak ve koordine etmek için ZooKeeper yazılımını kullanıyor olacağız

Debezium entegrasyonlarında ZooKeeper, temel kurulum dışında derinlemesine anlaşılmayı gerektirmemektedir.

Debezium İle PostgreSQL’de Change Data Capture(CDC) Nasıl Yapılır?

İlk olarak Debezium’u hangi veritabanıyla kullanırsak kullanalım temelde Kafka ve Debezium çalıştırılmalı ve ZooKeeper ile yapılandırmalar ayarlanmalıdır.

Tüm bu yazılımlarım ayağa kaldırılması için Docker’dan istifade ediyor olacağız. Şimdi gelin ihtiyacımız olan araçları hızlıca oluşturup, ayağa kaldıralım.

docker-compose-postgres.yaml


						version: '3.1'
						services:
						zookeeper:
							image: debezium/zookeeper
							ports:
							- "2181:2181"
							- "2888:2888"
							- "3888:3888"
						kafka:
							image: debezium/kafka
							ports:
							- "9092:9092"
							- "29092:29092"
							depends_on:
							- zookeeper
							environment:
							- ZOOKEEPER_CONNECT=zookeeper:2181
							- KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
							- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
							- KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
							- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
						postgres:
							image: debezium/postgres
							ports:
							- 5432:5432
							environment:
							- POSTGRES_USER=postgres
							- POSTGRES_PASSWORD=123456
							- POSTGRES_DB=DebeziumDB
						connect:
							image: debezium/connect
							ports:
							- 8083:8083
							environment:
							- BOOTSTRAP_SERVERS=kafka:9092
							- GROUP_ID=1
							- CONFIG_STORAGE_TOPIC=my_connect_configs
							- OFFSET_STORAGE_TOPIC=my_connect_offsets
							- STATUS_STORAGE_TOPIC=my_connect_statuses
							depends_on:
							- zookeeper
							- kafka

					

Önceki slide bulunan docker compose dosyasına bakarsanız eğer ihtiyaç doğrultusundaki ilgili tüm araçları barındırmaktadır. Tüm image’lar da debezium’lu versiyonları kullanmamızın nedeni debezium ile çalışmaları için gerekli ayarlarının etkin gelmesindendir.

PostgreSQL’de ise debezium’un sağlıklı çalışabilmesi için wal_level konfigürasyonuna ‘logical’ değeri verilmelidir.

Bunun nedeni, PostgreSQL’de herhangi bir transaction yürütüldüğünde öncelikle bu işlem Write-ahead logging(WAL) adı verilen yerde gerçekleştirilmektedir. Debezium’da gerekli verileri WAL’dan toplayacağı için bu şekilde ayarlanması gerekmektedir

Haliyle debezium/postgres bu konfigürasyon temellerinde gelmektedir. Şimdi PostgreSQL sunucusunun yapılandırmalarına bir göz atıp öyle devam edelim.

Logical, WAL verilerini harici sistemlerin tüketebilmesi için gerekli izin konfigürasyonunu sağlar.

Pretty Code


							docker-compose -f docker-compose-postgres.yaml up
						

Şimdi bu docker compose dosyasını bu talimat eşliğinde çalıştıralım.

Tüm yüklemelerin bitip, container’ların ayağa kaldırıldığından emin olalım.

Pretty Code


							create  schema exampleschema
							set search_path TO exampleschema,public;
							create table exampleschema.exampletable (
								column1 int,
								column2 int,
								column3 varchar(150),
								primary key(column1)
							);
							alter table exampleschema.exampletable replica identity FULL;
							insert into exampleschema.exampletable values (1000, 2000, 'example value');
							insert into exampleschema.exampletable values (1001, 2001, 'example value 2');
						
PostgreSQL sunucusunda yukarıdaki docker-compose-postgres.yaml dosyasında oluşturulan ‘DebeziumDB’ veritabanı içerisinde aşağıdaki örnek tabloyu ve şemayı oluşturalım ve ardından örnek bir kaç veri insert edelim.

Şimdi ise debezzium connector’ın bizlere sunmuş olduğu API aracılığıyla dinlenecek olan veritabanı ve tablo bilgilerinin verilmesi gerekmektedir. Bunun için sonraki slaytdaki gibi bir .json dosyası oluşturarak gerekli bilgileri içerisinde tanımlamamız gerekmektedir.

register-postgres.json


							{
								"name": "example-connector",
								"config": {
									"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
									"tasks.max": "1",
									"database.hostname": "postgres",
									"database.port": "5432",
									"database.user": "postgres",
									"database.password": "123456",
									"database.dbname" : "DebeziumDB",
									"database.server.name": "exampleserver",
									"schema.include.list": "exampleschema",
									"table.whitelist": "exampleschema.exampletable"
								}
							}
						

Pretty Code


							curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
						

Curl üzerinden yukarıdaki post isteğini göndermemiz yeterlidir.

Böylece bu son hamle eşliğinde bir connector tanımlayarak debezium’u etkinleştirmiş bulunmaktayız. Connector’ı etkinleştirme neticesinde yukarıdaki görselde tek satır halinde gelen result’ı doğru düzgün görmek isteyenler için formatlandırıp aşağı slayta bırakıyorum;

Pretty Code


							{
								"name":"example-connector",
								"config":{
								   "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
								   "tasks.max":"1",
								   "database.hostname":"postgres",
								   "database.port":"5432",
								   "database.user":"postgres",
								   "database.password":"123456",
								   "database.dbname":"DebeziumDB",
								   "database.server.name":"exampleserver",
								   "schema.include.list":"exampleschema",
								   "table.whitelist":"exampleschema.exampletable",
								   "name":"example-connector"
								},
								"tasks":[
									
								],
								"type":"source"
							 }
						

Connector’ı etkinleştirme neticesinde result


							docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic exampleserver.exampleschema.exampletable						

Ve şimdi ise debezium connector ile takip edilen veritabanı sunucusundaki ilgili tablodaki veri değişikliklerinin izlemesini gerçekleştirelim. Bunun için yukarıdaki docker talimatının verilmesi yeterlidir.

Evet, görüldüğü üzere PostgreSQL veritabanındaki yapılan değişiklikler yakalanmış bulunmaktadır. Hatta debezium connector bağlanmadan önce girilen örnek veriler bile önceki paragraflarda bildirildiği üzere debezium çalıştırıldıktan sonra yakalanmış ve elde edilmiştir

Şimdi gelen json verileri düzgün formata getirip, inceleyelim.

Bu kısım aslında demo'muzun en önemli kısmı

							{
								"schema":{
								   "type":"struct",
								   "fields":[
									  {
										 "type":"int32",
										 "optional":false,
										 "field":"column1"
									  }
								   ],
								   "optional":false,
								   "name":"exampleserver.exampleschema.exampletable.Key"
								},
								"payload":{
								   "column1":1000
								}
							 }{
								"schema":{
								   "type":"struct",
								   "fields":[
									  {
										 "type":"struct",
										 "fields":[
											{
											   "type":"int32",
											   "optional":false,
											   "field":"column1"
											},
											{
											   "type":"int32",
											   "optional":true,
											   "field":"column2"
											},
											{
											   "type":"string",
											   "optional":true,
											   "field":"column3"
											}
										 ],
										 "optional":true,
										 "name":"exampleserver.exampleschema.exampletable.Value",
										 "field":"before"
									  },
									  {
										 "type":"struct",
										 "fields":[
											{
											   "type":"int32",
											   "optional":false,
											   "field":"column1"
											},
											{
											   "type":"int32",
											   "optional":true,
											   "field":"column2"
											},
											{
											   "type":"string",
											   "optional":true,
											   "field":"column3"
											}
										 ],
										 "optional":true,
										 "name":"exampleserver.exampleschema.exampletable.Value",
										 "field":"after"
									  },
									  {
										 "type":"struct",
										 "fields":[
											{
											   "type":"string",
											   "optional":false,
											   "field":"version"
											},
											{
											   "type":"string",
											   "optional":false,
											   "field":"connector"
											},
											{
											   "type":"string",
											   "optional":false,
											   "field":"name"
											},
											{
											   "type":"int64",
											   "optional":false,
											   "field":"ts_ms"
											},
											{
											   "type":"string",
											   "optional":true,
											   "name":"io.debezium.data.Enum",
											   "version":1,
											   "parameters":{
												  "allowed":"true,last,false,incremental"
											   },
											   "default":"false",
											   "field":"snapshot"
											},
											{
											   "type":"string",
											   "optional":false,
											   "field":"db"
											},
											{
											   "type":"string",
											   "optional":true,
											   "field":"sequence"
											},
											{
											   "type":"string",
											   "optional":false,
											   "field":"schema"
											},
											{
											   "type":"string",
											   "optional":false,
											   "field":"table"
											},
											{
											   "type":"int64",
											   "optional":true,
											   "field":"txId"
											},
											{
											   "type":"int64",
											   "optional":true,
											   "field":"lsn"
											},
											{
											   "type":"int64",
											   "optional":true,
											   "field":"xmin"
											}
										 ],
										 "optional":false,
										 "name":"io.debezium.connector.postgresql.Source",
										 "field":"source"
									  },
									  {
										 "type":"string",
										 "optional":false,
										 "field":"op"
									  },
									  {
										 "type":"int64",
										 "optional":true,
										 "field":"ts_ms"
									  },
									  {
										 "type":"struct",
										 "fields":[
											{
											   "type":"string",
											   "optional":false,
											   "field":"id"
											},
											{
											   "type":"int64",
											   "optional":false,
											   "field":"total_order"
											},
											{
											   "type":"int64",
											   "optional":false,
											   "field":"data_collection_order"
											}
										 ],
										 "optional":true,
										 "field":"transaction"
									  }
								   ],
								   "optional":false,
								   "name":"exampleserver.exampleschema.exampletable.Envelope"
								},
								"payload":{
								   "before":null,
								   "after":{
									  "column1":1000,
									  "column2":2000,
									  "column3":"example value"
								   },
								   "source":{
									  "version":"1.8.1.Final",
									  "connector":"postgresql",
									  "name":"exampleserver",
									  "ts_ms":1647763176365,
									  "snapshot":"true",
									  "db":"DebeziumDB",
									  "sequence":"[null,\"23723656\"]",
									  "schema":"exampleschema",
									  "table":"exampletable",
									  "txId":557,
									  "lsn":23723656,
									  "xmin":null
								   },
								   "op":"r",
								   "ts_ms":1647763176368,
								   "transaction":null
								}
							 }
						

Yukarıdaki json datayı incelerseniz eğer 177 ile 200. satır aralığında değişiklik olan verileri içeren ‘payload’ alanı mevcuttur.

Yakalanan verilerin hangi aksiyona tabii tutulduğunu 198. satırdaki ‘op’ alanındaki c, u, d ve r değerleri ifade etmektedir.

  1. c Yakalanan verinin eklendiğini ifade eder.
  2. u Yakalanan verinin güncellendiğini ifade eder.
  3. d Yakalanan verinin silindiğini ifade eder.
  4. r Yakalanan verinin zaten var olduğunu ifade eder.
Gelen json data’da ki diğer alanlar ne?
  • schema; takip edilen tablonun yapısı ve kolonlarla ilgili bilgiler taşıyan kısımdır
  • payload; yukarıda görüdüğümüz üzere değişiklikler neticesinde akan verilere dair bilgi veren kısımdır.
  • before; değişiklik olan verilerin, değişiklik olmadan önceki hallerini taşıyan kısımdır. Insert haricinde update ve delete sorgularında dolu gelmektedir.
  • after; değişiklik olan verilerdeki son değerleri veren kısımdır. Delete hariç diğer sorgularda dolu gelmektedir.

								update exampleschema.exampletable
								set
								column1 = 5000,
								column2 = 6000,
								column3 = 'updated example value'
								where column1 = 1000
								

Şimdi test amaçlı yukarıdaki veritabanı sorgularını çalıştıralım.

Görüldüğü üzere çalıştırılan sorgu update sorgusu ise veritabanı açısından bu bir delete ve insert işlemi olduğu için önce ‘op’ değeri ‘d’ sonra da ‘c’ olan iki kayıt yakalanmaktadır. Haliyle ‘before’ ve ‘after’ alanlarından update yapılan sorguya dair verileri rahatlıkla yakalayabilmekteyiz.

Böylece PostgreSQL ile CDC çalışmasını gerçekleştirmiş olduk. Mevzu bahis, Debezium ile yakalanan verilerin Kafka üzerinden .NET Core ortamında okunmasına geldi

Bunun için aşağıdaki gibi Confluent.Kafka kütüphanesini kullanarak basit bir consumer oluşturabiliriz.


								using Confluent.Kafka;
 
								ConsumerConfig config = new()
								{
									GroupId = "exampleserver.exampleschema.exampletable",
									BootstrapServers = "localhost:29092",
									AutoOffsetReset = AutoOffsetReset.Earliest,
								 
								};
								 
								using IConsumer< Ignore, string> consumer = new ConsumerBuilder< Ignore, string>(config).Build();
								consumer.Subscribe("exampleserver.exampleschema.exampletable");
								 
								CancellationTokenSource source = new CancellationTokenSource();
								Console.CancelKeyPress += (_, e) =>
								{
									e.Cancel = true;
									source.Cancel();
								};
								while (true)
								{
									ConsumeResult< Ignore, string> result = consumer.Consume(source.Token);
									Console.WriteLine($"Topic Name : {result.TopicPartitionOffset}");
									Console.WriteLine($"Message : {result.Value}");
								}										
								

Debezium Documentation

Teşekkürler 🎉