Kafka Streams Notları

2023-08-31

Kafka Notları

-> Bir event yani bir record 3 birimden oluşur: Key, Value ve Timestamp. Bunlara trinity denir.
-> Event streaming kullanılan alanlar: Kredi kartı fraud, intrusion detection, internet of things, financial industry, sharing data in real-time gibi alanlardır.

 

Kafka Brokers

-> RabbitMq veya Active MQ’dan farklı olarak, Kafka brokerler mesajları storage’da tutar. Key-value ve bazı metadataları byte formatında saklarlar.
-> Brokerları bir cluster içerisinde deploy ederiz ve brokerlar veriyi sağlıklı bir şekilde tutmak için replicate işlemi yapar.
-> Brokerlar, Kafka ekosisteminde storage kısmını ifade eder. Ayrıca producer veya consumer’dan gelen requestleri yönetir.

 

Kafka Streams

-> Kafka için native stream processing kütüphanesidir. Yani Kafka içerisinde hazır olarak gelir.
-> Kafka broker içerisinde ÇALIŞMAZ.
-> Amacı event data üzerinde transformation, joins, aggregations gibi işlemlerin yapılmasını sağlar.

 

ksqlDB

-> Event streaming veritabanıdır. Arka planda Kafka Streams’i kullanır.
-> SQL sorgusu yazarak event streaming operasyonları yapmamızı sağlar.

 

Kafka Brokers

-> Data management işlemlerinden sorumludur: Retention ve replication işlemlerini yapar.
-> Client’lardan gelen isteklere cevap verir.
-> Producer record’ları liste şeklinde broker’a gönderir. Producer’ın kullandığı kütüphane yani API daima collection yani liste ile çalışır.
-> Broker gelen kayıtları bir topic içerisinde tutar. Topic’ler bir veya birden fazla partition’dan oluşur. 
-> Örneğin 3 partition’dan oluşan bir topic için üç tane klasör oluşturulur. Bu klasörler log.dirs ile belirtilen dizin içerisinde yer alır. log.dirs dizini /var/kafka/topic-data değeri aldığı zaman broker purchases isimli topic için aşağıdaki gibi dizinleri create eder.

/var/kafka/topic-data/purchases-0
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint
/var/kafka/topic-data/purchases-1
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint
/var/kafka/topic-data/purchases-2
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint

-> Örnekte görüldüğü gibi topic isminde bir klasör yerine topic partition sayısı kadar klasör oluşturulmaktadır. Topic adını logical group adı olarak ifade edebiliriz.
-> Kafka broker record’ları key değerlerine göre topic’lere yönlendirir. Tek topic varsa bütün record’lar aynı topic’e yazılır. Birden fazla topic varsa, gelen record’ların key değerlerine göre hash alınıp topic sayısı kadar modulo işlemi yapılır. Örneğin 3 farklı key değeri varsa ve 2 partitiondan oluşan bir topic ise her keyin hash değeri hesaplanır ve mod 2 şeklinde işlem yapılır. Eğer record’ların key değerleri null ise Kafka broker eşit bir şekilde bu record’ları topiclere böler.

 

Consume Records

-> Verinin consume edilmesi RabbitMQ’nun aksine kayıtlı olan veriyi etkilemez. Hatırlarsak RabbitMQ’da consume edilen mesaj siliniyordu. Verinin ne zamana kadar tutulacağı topic’in retention süresine göre belirlenir.
-> Producer ve consumer birbirinden habersiz bir şekilde çalışmaktadır.
-> Consumer okumak istediği veriyi belirtirken topic adını, offset değerini ve partition bilgisini broker’a iletir. Partition değeri belirtilmediği taktirde tüm partition’lardan veri okunur.

 

Topic ve Partitions

-> Bir topic’in kapasitesi tek broker üzerindeki available disk space ile sınırlandırılmamıştır. 
-> Broker kayıtları partition’lara map ederken nasıl karar vermektedir? Producer kayıtları broker’a gönderdiği zaman hangi partition’a yazılacağı 3 farklı yolla belirlenebilir:
•Gelen record’da partition bilgisi yer almışsa ilgili partition’a yazılır.
•Record’da partition bilgisi yoksa yukarıda belirtildiği gibi record’un key değerinin hash’i alınıp partition sayısı kadar modulo şeklinde hesaplanır. Örnek olarak:
oHash değeri 5 ve partition sayısı 1 ise: 5%1 = 0 yani partition 0’a yazılır.
oHash değer 5 ve partition sayısı 2 ise 5%2 = 1 yani partition 1’e yazılır.
oHash değer 5 ve partition sayısı 3 ise 5%3 = 2 yani partition 2’ye yazılır.
-> Hem key değeri hem de partition belirtilmezse Round-Robin algoritması ise partition bilgisi belirlenir.

 

Offsets

-> Broker gelen record’u partition’a eklediği zaman o record’a bir unique key diğer bir ifade ile bir offset bilgisi ekler. Offset dosya sisteminde record’un tutulduğu yeri ifade eder.
-> Her gelen record’un offset değeri bir önceki record’dan 1 fazladır. 
-> Offset bilgisi sayesinde topic içerisindeki record’lar sıralı bir şekilde tutulmaktadır. Ancak record’ların timestamp değerine göre kesin sıralıdır denemez; çünkü record’un timestamp değeri ile partition’a yazıldığı süre farklı olabilir.
-> Broker consumer için son okunan record’un offset bilgisini kaydeder. Hangi consumer’ın hangi offsette kaldığı bilgisi bir topic’e yazılır. Bu topic’in adı: topic_adi_consumer_offsets şeklinde belirlenir.  Bu sayede bir sonraki record’u consumer’a göndermek için bu topic’teki son offset değerine bakılır.

 

Bazı Broker Komutları

-> Topic Create: kafka-topics --create --topic first-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1. Not: Topic isimleri genelde 2 kelimeden oluşur ve tire işareti ile ayrılır.
-> Produce Record: kafka-console-producer --topic first-topic --broker-list localhost:9092 –property parse.key=true key.seperator=":" Not: key.seperator “:” değerine sahip olduğu için key:value şeklinde değerler girilir. Ayrıca parse.key true değeri ile de key değerlerini gireceğimizi ifade ettik.
-> Consume Record: kafka-console-consumer --topic first-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.seperator="-". Not: Bu parametrelere ek olarak --partition parametresi kullanarak spesifik bir partition’dan okuma işlemi de yapabiliriz.

 

Segments

-> Kafka broker’a veri geldikçe partition’un büyüklüğü sonsuza kadar gider mi?
-> Böyle bir olayın yaşanmaması için broker dosyaları discrete partlara ayırır. 
-> Önceki notlarda broker’ın record’ları partition’lara append ettiğini ifade etmiştik. Aslında broker gelen record’ları active segment’e yazar. Varsayılan olarak 1GB boyutuna ulaşıldığında broker yeni bir segment oluşturur ve artık aktif segment bu olur. Segmentlerin dosya adı last-offset değerine göre belirlenir. Yani aşağıdaki gibi bir klasör yapısı vardır. Bu dosyaların isimlerine dikkat edersek 0 35 gibi bitmektedir. 35 burada bir offset değerdir. Bu dosyalar bir partition klasöründeki dosyaları ifade eder.

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex

-> Aktif segment oluştuğu zaman önceki segment read-only modda açılır, aktif segment ise read-write modda açılır. 
-> Segment’in boyutu log.segment.bytes ayarı ile belirlenmiştir. Varsayılan olarak 1GB’tır. Bu ayara ek olarak log.roll.ms ve log.roll.hours ayarları da kullanılmaktadır. log.roll.ms ayarının varsayılan değeri yoktur. log.roll.hours ayarının varsayılan değeri 168 saat yani 7 gündür. Özetle yeni bir segmentin oluşması için 7 gün veya 1gb koşulunun gerçekleşmesi gerekmektedir.

 

Data Retention

-> Broker 4 ayar bilgisine bakarak bir record’un silinmesine karar verir. Öncelik sırası aşağıdan yukarı doğrudur. Yani log.retention.ms ayarı değiştirilirse log.retention.hours değerine bakılmaz.
olog.retention.hours: Varsayılan 168 saattir.
olog.retention.minutes: Varsayılan null
olog.retention.ms: Varsayılan null
olog.retention.bytes: Varsayılan -1 
-> Silme işlemi bir için bir job çalışır. Silinecek record segment’ler içerisinde taranır. Record’un timestamp değeri yukarıdaki ayarlarda belirtilen değerlerden daha eski ise bu record silinir. Eğer segment içerisindeki son record’un timestamp değeri bu değerlerden daha eski ise segment komple silinir.


Compacted Topics

-> Bir topic create ederken --config parametresinde cleanup.policy=compact şeklinde kullanım sağlanırsa bu topic’teki veriler silinmez. Örnek topic create komutu şu şekilde olur: bin/kafka-topics.sh --create --topic my-compacted-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact
-> Compacted topic içerisinde aynı keye sahip iki değer olduğu zaman eskisi silinir son güncel değer kalır ve yeni bir değer gelmedikçe sürekli tutulur. Gelen yeni değer null ise tombstone kayıt olarak işaretlenir ve bu key’e ait hiçbir değer kalmaz!
-> log.cleanup.policy değeri compact şeklinde belirtilir. Örneğin __consumer_offsets topic compact moddadır. Not: Kafka’da internal topic’ler __topic_adi şeklinde isimlendirilir.
-> log.cleanup.policy üç farklı değer alabilir: delete, compact ve delete,compact. Üçüncü koşulda eski segment’ler retention time ve size değerine göre silinirken tutulan segmentler compacted moda dönüştürülür.
-> Compacted modda olan bir topic’te bir zaman aralığında aynı keye sahip birden fazla değer olabilir. Hatta tombstone olmuş veri delete.retention.ms ayarı ile belirtilen süre kadar, varsayılan 24 saat, tutulmaktadır. Compaction işlemi dirty ratio veya inaktif segmentlerdeki kayıtlara göre yapılmaktadır.
-> Bir consumer tombstone durumuna gelmiş bir kaydı anlayabilmesi için 24 saat içerisinde consume işlemi yapması gerekir. Aksi taktirde o verinin silindiğini bilemez. 
-> Kafka Streams’de state store mekanizması için oluşturulan topic’lerin modu da compact’tır.
-> Compact mod aktif edildiği zaman, manager ve cleaner thread’ler yaratılır. Bu thread sayısı log.cleaner.threads ayarı ile belirlenir. Thread’ler çalışmaya başladığında in-aktif olan segmentler yani read-only moddaki segmentler incelenir. Bu segmentler içerisindeki record’ların timestamp değerlerine göre en güncel record’lar yeni bir dosyaya yazılır. Örneğin key değeri 1 olan iki record’un güncel hali yeni dosyaya yazılır. Aynı şekilde diğer in-aktif partition’lar da incelenir ve yeni dosyalara yazılır. Son adımda bu yeni dosya ile eski dosyalar replace edilir.
-> Gelelim asıl soruya. Normalde bir partition’un in-aktif olması için ya dosya oluştuktan sonra 7 gün geçmeli veya dosyanın boyutu 1GB olmalıdır. Bunun anlamı küçük topic’ler için clean-up policy uygulanabilmesi için 7 gün sürenin geçmesi gerekiyor. Bu sorunu çözmek için segment.ms = 5000 şeklinde bir ayar yapılarak her 5 saniyede yeni bir segment oluşması sağlanabilir. Ayrıca min.cleanable.dirty.ratio =0.001 diyerek log temizleme işleminin sürekli çalışması sağlanır. Bu işlemleri yaptığımız zaman ilgili topic’te verilerin unique olarak tutulması garanti edilir. Not: Kafka Streams’de table’larda unique veri tutulmaktadır. Bu verilerin yazıldığı topic ayarlarına bakabilirsiniz.

 

Leaders ve Followers

-> Bir kafka broker kümesi içerisinde tek bir lider ve onu takip eden bir veya birden fazla follower’lar yer alır. 
-> Topic’lerin partitionları makinelere dağıtılır.
-> Bir partition’un tek bir lider broker’ı olur. Yani bir partitionu yönetecek broker tektir. 
-> 3 partition’dan oluşan bir topic’te her partition için farklı bir broker lider olmaktadır. Bundan dolayı partition sayısı broker sayısından fazla OLAMAZ.
-> Follower olan broker’lar leader broker’ın consumer’ları olur. Yani veriyi lider broker’dan okuyup storage’larına yazarlar.
-> Replication factor maksimum broker sayısı kadar değer alabilir. 3 broker varsa replication factor de maksimum 3 seçilir. 
-> Follower brokerlar ile leader broker’ın verileri eşitlendiğinde in-sync replica yani ISR durumu gerçekleşmiş olur.
-> replica.lag.time.max.ms ayarı ile follower’ın ISR’den atılması gerekip gerekmediğine karar verilir. Bunun varsayılan değeri 10.000 milisaniyedir. Bir bir follower 10 saniye içerisinde leader’i ile aynı data’ya sahip olamazsa ISR havuzundan çıkartılır. Böyle bir durumda topic’e yazan producer’a acks=all durumunda veri yazıyorsa hata alınır ve kayıt işlemi gerçekleşmez.
-> ISR’den atılan bir broker lider olarak seçilemez.
-> ISR’den atılan broker lider broker ile verilerini sekronize ettikten sonra tekrar ISR’ye girer.
-> replica.lag.max.messages ayarı ile out-of-sync ayarı yapılabilmektedir. Bu ayar varsayılan olarak -1’dir.
-> Bir broker’a gelen istek sayısı, broker’ın karşılayabileceği sayıdan fazla ise quee’ye eklenir.

 

Replication ve Acknowledgements

-> Producer üç farklı ack cevabı bekleyebilir: none, some ve all. All ack bilgisinin gönderilebilmesi için bütün broker’ların sync durumda olması gerekir. Bu verinin başarılı bir şekilde replica olduğu anlamına gelir. Yani record başarılı bir şekilde kaydedilmiştir.
-> min.insync.replicas yani minimum in-sync replica değeri varsayılan 1 olduğu için replication-factor değeri 3 olsa bile acks = all durumunda, veri tam sekronize olmasa bile producer’a ack bilgisi gider. Bundan dolayı replication factor 3 olması durumunda min.insync.replicas değerini de 2 yapmalıyız. Yani follower sayısı kaç ise o değer verilir.

 

Schema Registration

-> Canlı ortamdayken auto.register.schemes = false yapılmalıdır.
-> Kafka streams ile çalışırken hem serializer hem de deserializer olarak görev yapan Serdes sınıfı kullanılır.

 

Kafka Clients

-> En önemli nokta producer ile consumer birbiri ile hiçbir şekilde bağlantılı değildir. Yani birbirlerinden habersizdir. Veriler byte olarak tutulur ve içinde ne yazdığını kafka broker bilemez!
-> Klasik veri gönderirken ProducerRecord sınıfı kullanılır. send() metodu ile record’lar batch şeklinde gönderilir. Bunun anlamı send metodu bir liste parametresi alıyor demek değildir! Veriler belirli aralıklarla gönderilmesinden dolayı batch olarak gönderiliyor diye ifade edilmektedir. Örnek:

final Map<String, Object> producerConfigs = new HashMap<>();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
producerConfigs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "1");
producerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
producerConfigs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerConfigs.put("topic.name", TOPIC_NAME);
return producerConfigs;

try (
       Producer<String, ProductTransaction> producer = new KafkaProducer<>(
               producerConfigs)) { 
   while (keepProducing) {
       Collection<ProductTransaction> purchases = salesDataSource.fetch(); 
       purchases.forEach(purchase -> {
           ProducerRecord<String, ProductTransaction> producerRecord =
                   new ProducerRecord<>(topicName, purchase.getCustomerName(),
                           purchase); 
           producer.send(producerRecord,
                   (RecordMetadata metadata, Exception exception) -> {    
                       if (exception != null) {   
                           LOG.error("Error producing records ", exception);
                       } else {
                           LOG.info("Produced record at offset {} with timestamp {}", 
                                   metadata.offset(), metadata.timestamp());
                       }
                   });
       });
   }
}

1.İfade de try bloku içerisinde bir producer nesnesi oluşturulur. try bloku bizim için işimiz bittiğinde kaynakların kapatılmasını sağlar.
2.İfade de örnek bir veritabanı gibi bir yerden kayıtlar çekilir
3.İfade de bu kayıtlardan tek-tek producerRecord nesnesi oluşturulur.
4.İfade de send metodu ile kayıt broker’a gönderilir. Yukarıda bahsedildiği gibi hemen gönderilmez!

-> Bu örnekte acks=0 ise producerConfigs değişkeninde ayarlanmıştır, offset değeri -1 döner; çünkü lider broker verinin sekronize olup olmadığına bakmaksızın başarılı diye bilgiyi iletir. Acks = 0 ifadesi “fire and forget” olarak ifade edilmektedir. Acks = 1 değerine sahip olduğu zaman record’un sadece leader broker tarafından kaydedilmesi yeterli anlamına gelir; ancak full replica olup olmadığı bilinemez! Acks = all olduğu zaman, -ki en güvenilir veri kayıt işlemi bu şekilde sağlanır, record ancak tüm broker’lar tarafından kaydedildiğinde producer’a mesaj döner. Burada unutulmaması gereken min.insync.replicas follower broker sayısı kaç ise, topic oluşturma aşamasında belirtilmelidir veya global olarak broker’ın server.properties dosyasında belirlenmelidir. Örneğin topic create ederken şu şekilde belirlenir: bin/kafka-topics.sh --create --topic my-topic –bootstra-server localhost:9092 --partitions 3 --replication-factor 3 --config min.insync.replicas=2
-> min.insync.replicas değerinin 3 yerine minimum 2 şeklinde belirlenmesinin nedeni şudur: Veri producer tarafından üretildiği zaman lider broker tarafından her halükarda kaydedilir. Kaydedilen bu veri replication factor 3 olduğu için minimum 2 follower tarafından da kaydedilmesi gereklidir. 2 değeri o yüzden follower sayısına denk gelir.
-> delivery.timeout.ms = 1000x60x2 yani varsayılan olarak 2 dakikadır. Bazen network vb durumlardan dolayı göndermeye çalıştığımız record broker’a iletilemeyebilir. Ancak bu tarz iletim hataları basit hatalardır. retries ayarındaki değer kadar, örneğimizde Integer.MAX_VALUE olarak ayarlanmıştır, tekrar gönderilmeye çalışılır. Eğer ciddi bir hata varsa 2 dakikalık süre beklenmeden producer’a hata mesajı döndürülür.
-> send() metodu asenkron çalışan bir metodtur ve veriler bir buffer’a yazılıp topluca gönderilmektedir. Diyelim ki ilk batch buffer doldu ve broker’a gönderildi. Bu metod asenkron çalıştığı için o ilk gönderilen kayıtların sonucu beklenmeden ikinci bir batch buffer doldurulup gönderilir. İlk kayıtları gönderme aşamasında bir sorun oluşup ikinci kayıtlar başarılı bir şekilde gönderilirse record’ların sıralaması bozulur. Ayrıca ilk kayıtları tekrar göndermeye çalışıldığı için, retries sayısı kadar, veriler dublicate de olabilir. Kafka sıralama problemini önlemek için max.in.flight.requests.per.connection ayarını belirlememize imkan tanır. Bu değer varsayılan olarak 5’tir. Bu değer belirlendiği zaman kayıtların sıralaması bozulmaz! Dublicate probleminin çözümü ise idempotent producer kullanmaktır. enable.idempotence=true yapıldığı zaman sıralama da korunmuş olur. Idempotence true olduğu zaman max.in.flight.requests.per.connection değeri 5 veya 5’ten küçük olmalıdır. Varsayılan olarak 5 olduğu için bu değeri değiştirmemize gerek yoktur. Örnek ayarlar şu şekilde yapılmalıdır:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true"); // Idempotent producer özelliği
props.put("acks", "all"); // Tüm brokerlardan onay beklemek (acknowledge)

Producer<String, String> producer = new KafkaProducer<>(props);

 

Kafka Delivery Semantics

-> Üç tane türü vardır: at-least-once, at-most-once ve exactly-once. Kafka varsayılan olarak “at least once” değerini sağlar. Örneğin bir record gönderildikten sonra hata alınırsa, retry değeri Integer.MAX_VALUE olduğu için aynı record tekrar gönderilmeye çalışılır. İlk denemedeki hata çözülüp record kaydedilirse aynı recorddan birden fazla olabilir. Aynı şekilde consumer bu recordu consume edip işlem yapmaya çalıştığında, örneğin veritabanına kayıt yaparken hata aldığında, son offset değerini commit’leyemez. Bundan dolayı aynı kayıt tekrar consumer tarafından consume edilir. Her ne kadar duplicate record olsa da veri kaybı olmaz.
-> at most once durumunda ise acks = 0 iken gerçekleşebilir. Yani paket kaybı olabilir. 
-> Exactly once durumunda ise Kafka transaction kullanarak bir recordun sadece 1 kez kaydedildiğini garanti eder.

 

Timestamps

-> Bir topic create edilirken message.timestamp.type değerini ayarlayabiliriz. Bu değer iki farklı değer alabilir: create time veya log append time. Varsayılan değer create time’dır.
-> Producer tarafından bir timestamp değeri sağlanabilmesi için bu değerin varsayılan olarak yani create time şeklinde kalması gerekir.
-> Yukarıdaki producer örneğinde kullanılan ProducerRecord sınıfında timestamp isminde bir field vardır. Bu field’e bir değer atamazsak, o anki işletim sisteminin zamanı atanmış olur. Örneğin veritabanından çekilen bir kaydın createdAt sütun değerini Long türüne çevirip bu timestamp field’e set edebiliriz.
-> Broker bu timestamp değerine göre işlemler yapmaktadır. Örneğin silinecek kayıtları kontrol ederken süresinin dolup dolmadığını bu timestamp değerine göre belirler. 
-> Broker’a ek olarak Kafka Streams ve ksqlDB ortamlarında da timestamp ağırlıklı olarak kullanılmaktadır.

 

Kafka Consumers

-> Consumer poll call olarak bilinen bir işlem yaparak subscribe olduğu topic’ten verileri çeker. Örnek:

try (final Consumer<String, ProductTransaction> consumer = new KafkaConsumer<>(
               consumerConfigs)) {

  consumer.subscribe(topicNames);   
   while (keepConsuming) {
       ConsumerRecords<String, ProductTransaction> consumerRecords =
               consumer.poll(Duration.ofSeconds(5)); 
       consumerRecords.forEach(record -> {     
           ProductTransaction pt = record.value();
           LOG.info("Sale for {} with product {} for a total sale of {}",
                   record.key(),
                   pt.getProductName(),
                   pt.getQuantity() * pt.getPrice());
       });
   }
}

2. satırda yer alan subscribe metodu bir collection almaktadır. Yani bir veya birden fazla topic’e subscribe yapılması sağlanır.
-> Yukarıdaki kod satırında yer alan consumerConfigs değişkeni map türünden bir değişken olup aşağıdaki değerlerin set edilmesi gerekmektedir.
1.bootstrap.servers,
2.max.poll.interval.ms: Varsayılan olarak 5 dakikadır (milisaniye cinsinden belirlenir). 5 dakika boyunca consumer broker’a istek göndermezse consumer grup koordinator bu consumer’ı gruptan çıkarır. Bu işlemin ardından rebalance yapılıp diğer consumer’lara partition ataması gerçekleştirilir.
3.group.id: Random bir string değer alır. Aynı değere sahip consumer’lar o grubun üyesi olurlar.
4.enable.auto.commit: Varsayılan değer true’dur. Eğer false olursa commit işlemi manuel yapılmalıdır. Her commit’ten sonra broker offset değerini 1 artırır.
5.auto.commit.interval.ms: Varsayılan değer 5 saniyedir. Bu değeri yapılan işlemin süresine göre değiştirmek gerekir. Örneğin consume edilen kayıtları bir veritabanına yazıyorsak ya da 5 saniyeden daha uzun süren bir işlem yapıyorsak, bu değerin artırılması gerekmektedir.
6.auto.offset.reset: 3 farklı değer alabilir. Earliest, latest ve none. none değeri olduğu zaman ilgili consumer grupta hiç offset yoksa hata almayı sağlar. Bu hatayı yakalayarak bazı kontroller yapılabilir. Yani consume edilmek istenen topic’lerin hiç consume edilip edilmediğini bu şekilde anlayabiliriz. Varsayılan değer latest’dır.
7.Son iki config ayarları key.deserializer.class ve value.deserializer.class’tır.

Örnek bir ayar değişkeni:

static Map<String, Object> getConsumerConfigs() {
   final Map<String, Object> consumerConfigs = new HashMap<>();
   consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   consumerConfigs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);
   consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "product-transaction-group");
   consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
   consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
   consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
   consumerConfigs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
   consumerConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
   consumerConfigs.put("topic.names", TOPIC_NAME);
   return consumerConfigs;
}


Pool Interval

-> KafkaConsumer.poll(Duration.ofSeconds(5)) ifadesi maksimum 5 saniye içerisinde consumer’ın broker’dan veri çekmesi gerektiğini ifade eder. Yani timeout değerdir. 5 saniye boyunca broker’dan gelmesini bekler.
-> max.poll.interval.ms ayarı, varsayılan olarak 5 dakikadır, 5 dakika olarak belirlendiğinde foreach içerisindeki gelen record’ları işlemesi bu süre içerisinde işlemesi gerekmektedir. Eğer bu süre içerisinde offset commit’lenmez ise yukarıda bahsedildiği gibi bu consumer dead olarak işaretlenir ve consumer gruptan çıkarılır.
-> max.poll.records: Varsayılan olarak 500’e ayarlanmıştır. Yani poll işleminde maksimum 500 tane kayıt dönecek anlamına gelir. Eğer her bir kayıt için arka planda başka işlemler yapılacaksa örneğin db ye insert işlemi gibi, bu değer düşürülebilir. Çünkü gelen 500 kayıt maksimum 5 dakika içerisinde işlenmesi gerekmektedir.
-> group.id: Aynı grup id değerine sahip consumer app’ler aynı grupta yer alır. Her bir consumer ancak ve ancak tek bir partition’dan consume işlemi yapabilir. Eğer topic partition sayısı 1 ise, bir grup içerisindeki diğer bütün consumer’lardan sadece 1 tanesi aktif olarak consume işlemini gerçekleştirir. Diğer kalanlar idle durumunda bekletilir. Eğer 3 tane partition varsa ve consumer grup içerisinde 3 tane consumer varsa paralel bir şekilde topic’i consume eder. Partition sayısı 6 ise ve 3 tane consumer varsa, aynı group.id değerine sahip, her bir consumer 2 partition’u consume eder. 
-> Örnek olarak 3 tane consumer varsa ve bunlardan biri gruptan düşerse, gruptan düşen consumer’ın consume ettiği partitionlar re-partition yapılarak diğer consumer’lara atanır. Bu işleme rebalance denir. Aynı şekilde aynı group.id ye sahip sahip bir consumer gruba katılınca da rebalance yapılır.
-> Aktif consumer sayısı partition sayısından fazla olamaz. Yani iki consumer aynı anda tek partition’u consume edemez. Diğer consumer idle durumunda bekler.
-> Hatırlarsak bir consumer’ın gruptan düşmesi için gereken maksimum bekleme süresi yani max.poll.interval.ms değeri varsayılan olarak 5 dakika idi. Dead olan bir consumer’ın belirlenmesi ise bu süreden çok daha kısadır. Aslında bir consumer veriyi consume ederken KafkaConsumer sınıfı içerisinde başka bir thread her 10 saniyede bir “heartbeat” dediğimiz sinyaller gönderir. Broker’a heartbeat istekleri gelmezse max.poll.interval.ms değerindeki 5 dakika süresi beklenmeden sorunlu olan consumer gruptan atılır. 10 saniye değeri ise session.timeout.ms ayarı ile belirlenmektedir. Varsayılan 10 saniyedir.

 


Consumer Group Leader ve Group Coordinator

-> Group coordinator bir broker; group leader ise bir consumer’dır. Bir gruptaki her consumer group coordinator ile iletişim kurabilirken, group coordinator sadece group lideri ile iletişim kurar.

 

At-least-once ve At-most-once

-> Bir consumer auto offset değeri latest olarak konfigure edilmişse ve poll ettiği veriyi işlerken bir sorunla karşılaşıp kapanırsa veya offset değerini commit edemeden hata alırsa ve ilgili consumer tekrardan aktif olursa aynı offset değerine sahip record’u tekrar consume etmek zorunda kalır. Burada görüldüğü üzere iki kez aynı kaydı consume etme problemi vardır.  Yani record at-least-once olarak gelmiştir. 
-> Yukarıda bahsedilen sorunun yaşanmaması için enable.auto.commit değeri false yapılır ve consume edilen record’larla ilgili herhangi bir işlem yapmadan direk olarak offset  commit’lenir. Ancak bu durumda da gelen veri at-most-once durumunda olur. Böyle bir işlemin sonucunda paket kaybı söz konusu olabilir.

 

Exactly Once Delivery

-> KafkaProducer’un exactly once message gönderimi ile ilgili idempotent, bir işlemin birden çok kez tekrarlanmasına rağmen sonucunun değişmemesi anlamına gelir, ve transactional producer olmak üzere iki türü vardır. 
-> Idempotent producer bir mesajın sıralı ve sadece tek bir kez gönderileceğini garanti eder. 
-> Transactional producer ise mesajların birden fazla topic’lere topluca iletilmesini veya hiçbirinin iletilmemesini garanti eder.

 

Idempotent Producer

-> enable.idempotence = true olarak ayarlanır, max.inflight.requests.per.connection değeri maksimum 5 olarak ayarlanır, varsayılan değeri 5’tir, ACKS = all yapılır ve retries değeri de Integer.MAX_VALUE yapılır. Bu işlemlerden sonra producer artık mesajları sıralı ve sadece 1 kez gönderir. Yani exactly-once-delivery sağlanmış olur. Not: Apache Kafka 3.0 sürümünden sonra idemponent producer ayarları varsayılan olmuştur. İlgili ayarlar aşağıdaki gibi olmalıdır:

Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ...);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ...);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); 
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

-> Kayıtların sıralı bir şekilde gönderilmesinin sağlanması için bir producer’a unique bir id değeri atanır. Bu id ye PID yani unique producer id denir. Ayrıca producer’ın mesajlarını sırayla takip edilmesi için de sequence number kullanılır.
-> Sequence number sıfırdan başlayıp her batch mesajına atanır. (Hatırlarsak producer mesajları tek tek değil batch buffer’a kaydedip topluca göndermektedir.)

 

Transactional Producer

-> Birden çok partition’a tek bir transaction içerisinde yazmaya imkan tanır. Bir hata ile karşılaşıldığı zaman kayıtlar rollback olur.
-> Transaction producer kullanabilmek için, transactional.id değerini set etmemiz gerekmektedir.
-> Bir broker aynı producer instance’dan gelen farklı session isteklerini belirleyebilmesi için transactional.id değerini kullanır. Örneğin bir producer uygulaması verileri broker’a gönderirken sorunla karşılaşıp down olursa ve bu app’ten yeni bir instance üretilip aynı transactional.id değeri de set edilirse ve önceki down olan app de tekrar ayağa kalkarsa eskisini broker zombi app olarak işaretler. Broker her bir transaction için epoch sayısını 1 artırır. Eski down olan app’in açmış olduğu transaction’ın epoch değeri ile yeni app instance’ın epoch değeri aynı transaction id için farklı olacaktır. Bu sayede eski epoch değerine sahip olan producer zombi producer olduğu için ondan gelen transaction isteği bloklanır ve bunun sonucunda ProducerFencedException meydana gelir.
-> Epoch sayısı initTransactions() metodu çağrıldığı zaman 1 artırılır.
-> commit() veya abort() metodları transaction.timeout.ms süresi içerisinde çağrılmazsa, varsayılan değer 1 dakikadır, transaction coordinator yani broker tarafından transaction işlemi zorla iptal edilir. 
-> Transaction log kayıtları da topic’e yazılır.

Örnek:

HashMap<String, Object> producerProps = new HashMap<>();
producerProps.put("transactional.id", "set-a-unique-transactional-id"); 
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();

try {
   producer.beginTransaction(); 
   producer.send(topic, "key", "value"); 
   producer.commitTransaction();   
} catch (ProducerFencedException | OutOfOrderSequenceException
        | AuthorizationException e) {  
   producer.close();
} catch (KafkaException e) {     
   producer.abortTransaction();
   // safe to retry at this point 
}

 

Consumers in Transactions

-> Consumer ayarları yapılırken bir ayar kullanılmaktadır: isolation.level ayarı varsayılan olarak read_committed değerine sahiptir. Bunun anlamı şudur: Sadece transaction işlemi tamamlanan record’ları getir ve başarılı bir şekilde kaydedilmemiş kayıtların çekilmesini sağlar. isolation.level değeri şu şekilde ayarlanır:

consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

 

Kafka Connect

-> Veritabanında bir tabloda CDC (Change Data Capture) aktif edilerek, yapılan güncellemeleri Kafka topic’e Kafka Connect ile direk aktarabiliriz.
-> İki tür connector vardır. Source connector veriyi başka yerden alıp Kafka’ya yazar. Sink connector ise Kafka’dan alıp başka yere yazmayı sağlar. Örneğin bir topic’teki kayıtları Elastic Search’e aktarabiliriz.
-> Kafka Connect’te Single Message Transform (SMT) özelliği bulunmaktadır. Bu özellik sayesinde her bir mesajın topic’e yazılmadan veya topic’ten çıktıktan sonra bazı alanlarını silebilir veya güncelleyebiliriz. Örneğin veritabanından okumuş olduğumuz kullanıcı bilgileri içerisinde kişisel verileri MaskField SMT ile mask’layıp topic’e yazdırabiliriz.  
-> Kafka Connect distributed ve standalone olmak üzere iki farklı şekilde deploy edilebilir. Prod ortamında distributed kullanılması daha uygun olacaktır.
-> Standalone modda tek bir worker bulunur. Her bir worker içerisinden birden fazla connector bulunur ve her connector içerisinde de bir veya birden fazla Task bulunur.

 


-> Distributed modda ise aşağıdaki gibi Task’lar klonlanır. 
 


-> Hata olması durumunda diğer connector’a aktarılır.

 

 

Connect Converter

-> Connect converter ile mesajlar Kafka’ya yazılmadan önce SMT ile her mesajı güncelleyebiliriz. 
-> SMT sayesinde filtreleme, flatten işlemi ( iç içe geçmiş verileri basitleştirme) veya bir alanın silinmesi gibi işlemler sağlanmaktadır. Bu işlemlerin yapılabilmesi için ValueToKey, ExtractField, MaskField gibi SMT’ler kullanılır.
-> DLQ (Dead Letter Queue): Kafka connect ile verileri veritabından ya da başka bir ortamdan çekip topic’e yazarken veya topicteki kayıtları sink connector ile başka bir ortama aktarırken bazı record’larda hatalar oluşabilir. Oluşan hataların bir log’ta tutulması canlı sistemler için çok önemlidir.  Bunun kafka connector’da  errors.tolerance ayarı yapılmalıdır. Bu işlemi gerçekleştirmek için değeri “all” yapılmalı ve errors.deadletterqueue.topic.name ile topic.replication.factor ayarları da aşağıdaki gibi belirlenmelidir.

"errors.tolerance":"all",  
"errors.deadletterqueue.topic.name":"orientation_student_dlq", 
"errors.deadletterqueue.context.headers.enable": "true", 
"errors.deadletterqueue.topic.replication.factor": "1"

 

 


Kafka Streams

-> Temel olarak 3 birimden oluşur. Source node, processor node ve sink node. Source node record’ları broker’dan consume etmeyi sağlar. Processor node’lar gelen record’u işler, düzenler, değiştirir vb. işlemleri yapar. Sink node işlenen veriyi yeni bir topic’e yazmayı sağlar. Kafka Streams içerisindeki her bir metod aslında topology’de bir node üretilmesine neden olur. Bu node’ların türleri ise source, processor ve sink node’tur.
-> Kafka’nın native stream processing kütüphanesidir. Kafka broker içerisinde çalışmaz, client uygulama şeklinde bağlanır. 
-> Streams aslında klasik consumer veya producer oluşturmadan bir abstraction sağlayarak bu işlemlerin daha kolay yapılmasını sağlar. Önceki bölümlerde örnekleri verilmiş olan işlemler Kafka Streams ile çok daha kolay şekilde ve minimum ayar ile yapılabilir.

 

Streams DSL (Domain Specific Language)

-> Kafka Streams’in API kısmını ifade eder. Yani Kafka Streams’e özgü işlemlerin nasıl yapılacağını belirler. 
-> Bu API’nin temelinde KStreams ve KTable sınıfıları bulunur.
-> Bu sınıflardaki metodlar fluent interface style programlama mantığı ile yazılmıştır. Yani genelde metodların dönüş tipler KStreams ve KTable için de KTable olmaktadır.

 

Hello World Uygulaması

-> Stream’ler bir topology mantığı ile değerlendirilir. Örneğin record ya da diğer bir adıyla event’ları bir source topic’ten okuyup value değerini uppercase’e dönüştüren uygulamanın topology’si şu şekilde oluşur:

 

Kod Örneği:

@Override
public Topology topology(Properties streamProperties) {

   Serde<String> stringSerde = Serdes.String();
   StreamsBuilder builder = new StreamsBuilder();

   KStream<String, String> simpleFirstStream = builder.stream("src-topic",
           Consumed.with(Serdes.String(), Serdes.String()));
   KStream<String, String> upperCasedStream = simpleFirstStream.mapValues(value -> value.toUpperCase());

   upperCasedStream.print(Printed.toSysOut());
   upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde));

   return builder.build(streamProperties);
}

Yukarıdaki kodda görüldüğü üzere ilk node yani source node: simpleFirstStream ifadesidir. Sonra upperCasedStream nesnesi ile bir tane processor node üretilir. upperCasedStream.to() metodu ile de topology içerisine sink node eklenmiş olur.
-> Birden fazla topic’ten okumak için List.of gibi collection da kullanabiliriz. Ayrıca regex pattern kullanımına da izin verdiği için Pattern.compile(“topic[A-C]”) gibi ifadeler de yazılabilmektedir.
-> Consume edilen source topic’ler silindiğinde veya yukarıdaki regex koşulunu sağlayan bir topic broker’da oluşturulduğunda otomatik olarak kodu değiştirmeden okuma işlemine devam edebiliriz.
-> Ancak consume edilen topic’lerdeki kayıtların key türü aynı olmalıdır.  Yani biri integer diğeri string türünde key’ler olamaz!
-> Source node’dan processing node üretme işlemi yukarıdaki örnekte görüldüğü gibi mapValues() metodu çağrılarak yapıldı. ve mapValues() metodu sonucunda da yine aynı nesne yani Kstreams nesnesi dönmüş oldu. 
-> Bu kod parçasında sadece value değeri güncellendi, key değeri ile ilgili henüz bir işlem yapılmamıştır.

Şimdi yukarıdaki Topology nesnesi dönen metodu bir main metodu içerisinde kullanalım:

Properties streamProperties = new Properties();
streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "yelling_app_id");
streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");


Topology topology = topology(streamProperties);
LOG.info("Topology description {}", topology.describe());

try (KafkaStreams kafkaStreams = new KafkaStreams(topology, streamProperties)) {
   LOG.info("Hello World Yelling App Started");
   kafkaStreams.start();
   LOG.info("Shutting down the Yelling APP now");
}

-> application.id değeri “yelling_app_id” olarak belirlendi. Bu ilk kısımda bahsetmiş olduğumuz consumer grubunun ID’sidir. Bu app’ten birden fazla çalıştırdığımızda her bir app bir consumer olarak Kafka tarafından yönetilir ve bu consumerlardan oluşan yapı tek bir consumer grup olarak değerlendirilir.

 

Aggregation Operations

-> Diyelim ki bir KStream nesnemiz var ve bu nesne ile kullanıcıların skorlarını key değerine göre gruplayıp hesaplamamız gerekiyor. Bu işlemi gerçekleştirmek için KStream sınıfının groupByKey() metodu kullanılır. Aşağıdaki kodu inceleyelim:

public Topology topology(final Properties streamProperties) {
   StreamsBuilder builder = new StreamsBuilder();
   KStream<String, Double> pokerScoreStream = builder.stream("poker-game",
           Consumed.with(Serdes.String(), Serdes.Double()));
   pokerScoreStream
           .groupByKey()
           .reduce(Double::sum,
                   Materialized.with(Serdes.String(), Serdes.Double()))
           .toStream()
           .peek((key, value) -> LOG.info("key[{}] value[{}]", key, value))
           .to("total-scores",
                   Produced.with(Serdes.String(), Serdes.Double()));
   return builder.build();
}

-> groupByKey() metodu aggregate, count ve reduce metodlarını barındıran KGroupedStream nesnesini üretir. 
-> groupByKey() metoduna ek groupBy() isminde bir metod daha vardır. Bu metod ile varsayılan key değeri yerine value içerisindeki bir field’in veya farklı bir değerin key olarak ayarlanmasını sağlayabiliriz. Örnek:

pokerScoreStream.groupBy((key, value) -> value > 0 ? "winners" : "losers")

-> reduce() metodu KTable nesnesi döner. KTable nesnesinden Kstream nesnesi üretmek için toStream() metodu kullanılır. peek() metodu ara katmanda record’ların son durumlarını kontrol etmek için kullanılan bir metod’tur.
-> Materialized nesnesi state store da tutulacak ara-değerlerin ayarlarının yapılmasını sağlar. Çünkü tahmin edileceği üzere bazı aggregation işlemlerinde önceki kayıtların değerlerine ihtiyaç duyulmaktadır. Önceki kayıtların değerleri state store dediğimiz kafka broker’lar içerisinde özel dosyalar içerisinde tutulur. Yani bunu bir topic gibi değil farklı bir tür olarak değerlendirmek gerekir. 
-> Çok önemli: Kafka Streams’ler stateful operasyonlarının sonuçlarını belirli aralıklarla state store dosyalarına kaydeder. Sonuçları kısa bir süreliğine bir cache içerisinde tutar. Cache’in flush olması için 30 saniye geçmeli veya buffer’ın 10mb seviyesine gelmesi gerekir. 
-> Son aşamada hesaplanan veriler to() metodu ile belirtilen “total-scores” isimli topic’e kaydedilir.

 


KStream flatMap() metodu

-> mapValues ile key değeri değiştirelemediği için map() metodu kullanabiliriz. Bu iki metodda da 10 tane record ile ilgili işlem yapılmışsa sonuç olarak yine 10 tane record çıkar. Ancak 1 tane record girip birden fazla değer üretilmek isteniyorsa yani 1:N ilişkisi isteniyorsa flatMap() metodu kullanılır. flatMap() ifadesindeki flat yeni üretilen değerlerin düzleştirilmesi anlamına gelir.
-> map() metodu KeyValue değeri döner. 
-> Kafka Streams’te print, foreach, process, to gibi terminal metodlar yer almaktadır. Bu metodların dönüş tipi void olduğundan dolayı chain yapısının son temsilcisi olur.
-> filter() metodu ile filtreleme işlemi yapılmaktadır. 
-> split() metodu ile kayıtlar branch’lere bölünür. Örneğin aktif kayıtların “active_records” isimli bir topic’e yazılması ve pasif olan kayıtların “passive_records” isimli bir topic’e yazılması sağlanabilir.

 

Altın Kural

-> Stream processing sonucunda veriler bir topic’e yazdırılacaksa otomatik topic oluşturma yerine manuel topic oluşturmak daha doğru olur. Manuel topic oluştururken retention time, partition sayısı, replication factor gibi önemli parametreleri belirtebiliriz.

 

Dynamic Routing of Messages

-> Topic’lere mesajları dinamik olarak yönlendirmek için TopicNameExtractor interface’ini implement etmemiz gerekmektedir. Bu function interface’in extract metodu topic adını döndermeyi sağlar.

@Override
public String extract(String key,
                     Purchase value,
                     RecordContext recordContext) {
   String department = value.getDepartment();
   if (department.equals("coffee")
           || department.equals("electronics")) { ❶
       return department;
   } else {
       return "purchases";             ❷
   }
}

 

Naming Topology Nodes

-> Kafka Streams’de her bir node’a spesifik olarak bir isim verilmediği zaman otomatik olarak isim verilir.
-> Node’lardan oluşan topology bilgisine erişmek için streamsBuilder.build().describe() metodu kullanılır. Bu metod TopologyDescription nesnesini döndürür. System.out.println(description.toString()); şeklinde kullanıldığında aşağıdaki gibi topology yapısını görebiliriz:

Topologies:
  Sub-topology: 0
   Source: KSTREAM-SOURCE-0000000000 (topics: [src-topic])  ❶
     --> KSTREAM-MAPVALUES-0000000001  ❷
   Processor: KSTREAM-MAPVALUES-0000000001 (stores: []) ❸
     --> KSTREAM-SINK-0000000002
     <-- KSTREAM-SOURCE-0000000000  ❹

   Sink: KSTREAM-SINK-0000000002 (topic: out-topic) ❺
     <-- KSTREAM-MAPVALUES-0000000001

-> Node isimleri üretilirken, örnekte görüldüğü gibi, metod adına global olarak artan sayı eklenir. 
-> Kafka Stream DSL API’de neredeyse tüm metodlar paramtre olarak ekstra Named nesnesi alır. Bu parametre ile node isimleri belirtilmektedir. Bu sayede topology daha okunaklı ve daha güvenli olur; çünkü var olan bir topology güncellendiği zaman yeniden isimlendirme yapılır ve bunun sonucunda bazı state store dosyaları silinebilir. Bu konu detaylı bir konu olduğu için Confluent sitesinden detaylı bilgiye erişebilirsiniz.
Örnek kullanım:

KStream<String, String> simpleFirstStream = builder.stream("src-topic",
       Consumed.with(stringSerde, stringSerde)
               .withName("Application-Input"));

KStream<String, String> upperCasedStream = simpleFirstStream.mapValues((key, value) -> value.toUpperCase(),
       Named.as("Convert_to_Yelling"));

upperCasedStream.print(Printed.<String, String>toSysOut().withName("Console_Printer"));
upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde)
       .withName("Application-Output"));

 

Streams ve State Store Kavramları

-> Şimdiye kadar stateless stream işlemlerle ilgili örnekleri incelemiş olduk. Şimdi state gerektiren işlemleri inceleyeceğiz. Yani stream verisi işlenirken kullanıcının kaç kez login olduğu gibi hesaplama gerektiren işlemler yapıldığında bu hesaplamaların bir state’te tutulması gerekmektedir. Örnek olarak aşağıdaki gibi bir işlem yapıldığında state tutulmasına gerek yoktur:

public boolean numberIsOnePredicate (Widget widget) {
   return widget.number == 1;
}

-> State gerektiren bir işlem için şu şekilde örnek verebiliriz:

public int count(Widget widget) {

   int widgetCount = hashMap.compute(widget.id,
           (key, value) -> (value == null) ? 1 : value + 1);

   return widgetCount;
}

-> reduce() operasyonu stateful bir operasyondur; çünkü toplama işleminin sonucu için önceki toplanan değere ihtiyaç duyulmaktadır. Örnek:

pokerScoreStream
       .groupByKey()
.reduce(Double::sum,
       Materialized.with(Serdes.String(), Serdes.Double()))
       .toStream() 
.to("total-scores",
       Produced.with(Serdes.String(), Serdes.Double()));

1.reduce() metodunun sonucunda KStream yerine KTable döner. KTable’dan tekrar stream nesnesi üretmek için toStream() metodu kullanılmaktadır.
2.groupByKey veya groupBy() metodu ile verinin gruplanması gerekmektedir. Bu iki metod’un dönderdiği değer KGroupedStream nesnesidir. Bu nesnede aggregate, count ve reduce metodları yer almaktadır.
3.Materialized sınıfı state  store’da verilerin tutulması ve okunması için gerekli olan serializer ve deserializer işlemlerini ve state store yapılandırmasını sağlar. Benzer sınıflar okuma işlemleri için Consumed ve yazma işlemleri için Produced sınıfıdır.

-> Aggregation sonucunu Kafka topic’e yazılırken stateless ile statefull processor’lar arasında bir fark vardır. Stateful processor bir cache mekanizması kullanır. Bunu önceki sayfalarda açıklamıştık. Her stateful işleminde hesaplanan değerin hemen yazılması için cache disable edilmelidir.

 

Not: Bütün stateful operasyonlarda sonucun gecikmesinin nedeni cache mekanizmasıdır!

 

Aggregation vs Reducing

-> reducing metodun aynı tür veri döndöndürmek gereklidir, aggregate() metodu ile farklı bir tür veri dönebiliriz.
-> aggregate() metodunun sonucunda dönülecek olan yeni türün ilk hesaplada belirlenebilmesi için, initial value’ya ihtiyaç duyulmaktadır. reduce() metodunda ise ilk değer zaten ilk value olarak kullanılır.
-> aggregate() metodunun ikinci parametre türü Aggregator interface’dir. Bu interface de apply() metodu implement edilmektedir.

 

Repartitioning the Data

-> Kafka Streams’te var olan bir record’un yani mesajın key değeri değiştiği zaman re-partition işlemi yapılır. Hatırlarsak Kafka Broker mesajları topic partitionlarına yazmadan önce key değerine göre mod işlemi yapıp ilgili mesajın nereye yazılacağını belirliyordu. Bundan dolayı key değeri değiştiği zaman re-partition tetiklenir.
-> Bu işlem Kafka Streams tarafından çok basit bir şekilde gerçekleştirilir. Önce mesajlar başka bir topic’e produce edilir, produce edilme aşamasında mesajlar yeni topic’te key değerine göre partition’lara yazılacaktır, ve stream operasyonunun devam edebilmesi için tekrar consume edilir.
-> Kafka Streams bu işlemi yapmak için ara katmanda bir sink node ekler, sink node producer görevi görür, ve bir source node ekleyip mesajları yeniden kullanmak için consume eder.

 


Stream – Stream Joins

-> İki stream içerisindeki verilerin join yapılabilmesi için her iki stream’deki mesajın key değerleri ve türü aynı olmalıdır! Kafka Streams, key değerine göre mesajları birleştirmektedir. Not: value değerlerinin türü farklı veya aynı olabilir.
-> JoinWindows aracılığı ile belirli kriterlerin karşılanmasını zorunlu hale getirebiliriz. Örneğin şu koşul gerçekleşirse join işlemi yapılsın denebilir: Bir müşteri bir ürün satın almışsa ve aynı müşteri yarım saat içerisinde başka bir ürün alırsa ikinci ürün için %50 indirimi kazanmaya şansını yakalayacaktır. Bu koşulun sağlanıp sağlanmadığını kontrol etmek ve eğer koşul sağlanıyorsa indirimin uygulanmasını sağlamak için JoinWindows sınıfı kullanılmalıdır. Bu tarz örneklerin basit bir şekilde yapılabilmesi için join işleminin sonucu indirim-kazanan-musteriler isminde yeni bir topic’e yazılıp başka bir app bunu consume edebilir.
-> Join işleminde left ve right side olmak üzere iki taraf bulunur. Join işleminin gerçekleşmesi için Kafka Streams yeni join proccessor’ları topology’e ekler. 
-> Her join processor kendi state store’unu yönetirken karşı tarafın da state store ismini tutar. Not: İsimlendirme işlemi state store’da manuel isim verilmediği zaman otomatik olarak belirlendiğinden bahsetmiştik.
-> İsim bilgisini değiştirmek için join() metodunun 4. parametresinde StreamJoined nesnesi kullanılır.  

@Override
public Topology topology(Properties streamProperties) {
   StreamsBuilder builder = new StreamsBuilder();
   Serde<CoffeePurchase> coffeeSerde = SerdeUtil.protobufSerde(CoffeePurchase.class);
   Serde<RetailPurchase> retailSerde = SerdeUtil.protobufSerde(RetailPurchase.class);
   Serde<Promotion> promotionSerde = SerdeUtil.protobufSerde(Promotion.class);
   Serde<String> stringSerde = Serdes.String();

   ValueJoiner<CoffeePurchase, RetailPurchase, Promotion> purchaseJoiner = new PurchaseJoiner();
   JoinWindows thirtyMinuteWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30));

   KStream<String, CoffeePurchase> coffeePurchaseKStream = builder.stream("coffee-purchase", Consumed.with(stringSerde, coffeeSerde));
   KStream<String, RetailPurchase> retailPurchaseKStream = builder.stream("retail-purchase", Consumed.with(stringSerde, retailSerde));

   KStream<String, Promotion> promotionStream = coffeePurchaseKStream.join(retailPurchaseKStream, purchaseJoiner,
           thirtyMinuteWindow,
           StreamJoined.with(stringSerde,
                   coffeeSerde,
                   retailSerde).withName("purchase-join").withStoreName("purchase-join-store"));

   promotionStream.peek((key, value) -> LOG.info("key[{}] value[{}]", key, value))
           .to("promotion-output", Produced.with(stringSerde, promotionSerde));

   return builder.build();
}


-> Yeni bir record geldiğinde, processor node’u bu kaydı kendi state store’una ekler ve join işleminin karşı tarafındaki stream içerisinde aynı key değerine sahip ve JoinWindows ile belirtilen zaman aralığı karşılanıyorsa bulunan kayıt left processor node tarafından çekilir. 
-> ValueJoiner<V1, V2, R> interface ile join olan kayıtların value değerleri alınıp yeni bir value nesnesi üretilebilir. join() metodunun ikinci parametresi ValueJoiner sınıfı almaktadır.

 

State Store’a Detaylı Bakış

-> State store’daki mesajların saklanması için persistent ve in-memory olmak üzere iki farklı mod kullanılabilir. Varsayılan olarak persistent’tir. Her iki modda da kayıtlar, changelog topic’ler sayesinde durable’dır.
-> Önceki örneklerde görüldüğü üzere mesajlar hem persistent store’a hem de change-log topic’e yazılır. Burada bir noktayı iyi bilmek gerekiyor: Persistent, local store, ya da diğer bir ifade ile STATE STORE BİR TOPIC DEĞİLDİR; change-log topicler tarafından yönetilen local dosyalardır!
-> Change-log topic’ler compact modda oluşturulduğu için veri kaybı yaşanmaz!
-> Bilinmesi gereken çok önemli bir nokta vardır: Kafka Streams node’ların her hesapladığı data’yı direk state store ve change-log topic’e yazmaz. Arada cache mekanizması kullanarak her 30 saniye veya 10mb boyutuna geldiğinde veriler state-store dosyalarına ve change-log topic’e yazılır. 30 saniye dolmadan bir sorun çıkarsa ya da state-store local dosyaları bozulursa, Kafka Streams change-log topic’teki son kayda göre state-store restore eder.
-> Persisten-store yani local store da veriler key-value şeklinde tutulmaktadır. Kafka burada RocksDB kullanır. 
-> İki farklı tutma yöntemi olduğundan bahsetmiştik. Persistent ve in-memory. Bir sorun olma durumunda örneğin restart işlemi gibi, in-memory’deki kayıtlar kaybolur. Böyle durumda state state bilgisi change-log topicten restore edilir. Persistent türü seçildiyse restart işlemlerinden etkilenmez; ancak tüm state store yapısı kaybolursa veya veri corruption olursa aşağıdaki şekilde restore işlemi yapılır:
ooffset bilgisinin tutulduğu checkpoint dosyası, bu dosya her bir persistent store özelliğine sahip state-store’lar için otomatik üretilir, içindeki offset bilgisi kullanılarak change-log topic’ten ilgili kayıt okunur.
oEğer offset bilgisi geçerli değilse, consume edilen ana topic’ten full consume işlemi yapılarak state store yeniden yaratılır.
-> Özetle state store verisi sağlam ve tutarlı bir veridir.

 

State Store Location On The File System

-> StreamsConfig.STATE_DIR_CONFIG değişkeni ile state store’ların nerede tutulacağını belirtebiliriz. Belirtilmediği zaman JVM’nin geçici dizininde “kafka-streams” diye bir klasör yaratılır ve tüm state store’lar burada tutulur. Bundan dolayı bu ayar için sabit bir path vermek sağlıklı olacaktır.
-> “kafka-streams” klasörü altında application-id ye göre alt bir dizin yaratılır. Örnek: “kafka-streams/test-application”
-> Bir topology’de her sub-topology id ve consume edilen topic’in partition numarasına göre bir dizin daha yaratılır. Örnek: “0_0”
-> Bir sonraki dizinin adı ise “rocksdb” dir.
-> En son dizin ise Materialized nesnesi ile isim belirtilmezse auto-increment artan 10 haneli ve sıfırdan başlayan bir sayı ile isimlendirilir.
-> Örnek dizin: /var/tmp/kafka-streams/test-application/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001
-> Bu örnek isimlendirmede görüldüğü üzere her bir processor node için yani aggregate node’u için bir state store oluşturulmuştur. Hatırlarsak processor node demek ara işlemleri yapan node demektir. Aslında Kafka Streams DSL API’de her bir metod için bir node üretilir! 
-> change-log topic’in adı ise kolay bir şekilde Kafka broker’da topicler bölümünde görülür. Örnek bir topic adı: “test-application-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog”
-> Bu şekilde karışık isimler görmek yerine Materialized nesnesi ile isimlendirme yapabiliriz. Ayrıca isimlendirme sayesinde stream topology değişse bile state store verisi değişmemiş olur. Örnek: aStream.groupByKey().count(Materialized.as(“counting-store”))
-> Changelog topic normalde compacted moddadır. Bu modu “delete, compact” şeklinde ayarlarsak bu topic’in aşırı büyümesini engeller. Bu moda geçmek için cleanup policy mekanizması kullanılır. Örnek bir kod:

Map<String, String> changeLogConfigs = new HashMap<>();
changeLogConfigs.put("cleanup.policy", "compact,delete");

builder.stream("input")
        .groupByKey()
        .count(Materialized.as("counting-store")
                .withLoggingEnabled(changeLogConfigs))
.toStream();

 

Windowing ve Time Stamps

-> KTable kısaca bir change-log stream’dir. Aggregation işlemlerinin sonucunda KTable nesnesi döner. 
-> KTable’da bir key’e ait son güncel veri tutulmaktadır. KStreams’de ise bir key’e ait birden fazla değer bulanabilir.
-> KTable’ı bir lookup table olarak düşünüp, stream-table join ile ilgili record’u zenginleştirebiliriz. Stream-stream join işlemlerinde foreign key kullanılamamasına karşılık table-table join işlemlerinde foreign key de kullanılabilmektedir.
-> KTable verileri herhangi bir topic’te tutulmaz. aggregate operasyonlarını yöneten processor node’lar için bir state store tutulduğunu ifade etmiştik. KTable verileri bu state store’larda tutulur. StateStore sınıfı ile state durumu takip edilir ve varsayılan olarak in-memory değil persistent’tır. State store’lar için ifade edilen bütün bilgiler KTable için de geçerlidir. 
-> KTable’da tutulan kayıtları bir topic’e yazdırmak için toStream().to(“results-topic”) şeklinde bir stream’e dönüştürülüp o stream’den topic’e yazdırılmalıdır!
-> KStream gibi KTable’da consume ettiği topic partition sayısına göre dağıtılan task’ler üzerine dağıtılır.
-> KTable sınıfında yer alan metodlar: filter, filterNot, mapValues ve transformValues

 

Tombstone Kavramı

-> Value değeri null olduğu zaman veya filter, filterNot metodları kullanıldığı zaman filtrelenen ve null olan mesajlar tombstone marker konularak yeni bir tabloya yönlendirilir.


Global KTable

-> Hatırlarsak topic’ler partitionlardan oluşmaktaydı. Bir consumer sadece consume ettiği topic’in tek bir partition’undan veriyi okur. Örnek olarak topic 3 partition’dan oluşursa ve 3 tane aynı uygulama varsa her bir consumer bu partition’lardan veriyi okur. Haliyle KTable içerisinde tek bir partition record’ları yer alır. 
-> GlobalKTable de bütün partition’lardaki kayıtlar yer alır. Yani bu tabloda tutulan veri topic’in bütün verileridir! 
-> Bir topic’teki bütün kayıtların tutulmasını istiyorsak ve bu sayede consumer uygulamalarının hepsinde bu veriyi kullanmak istiyorsak GlobalKTable oluşturulmalıdır; çünkü topic’in bütün verisine sahiptir.
-> Eğer topic tek bir partitiondan oluşursa GlobalKTable ihtiyaç duyulmaz. Böyle bir durumda KTable’da tutulan veri ile GlobalKTable’da tutulan veri aynıdır.

 

Stream – Table Join İşlemleri

-> Kural 1: Stream tarafı KStream yani her zaman join işleminin sol tarafındadır.
-> Kural 2: Window kullanılmaya gerek yoktur.
-> Kural 3: Sadece stream’deki değişiklik join işlemini tetikler. Yani tablodaki değişiklik join işlemini tetiklemez.

 

Table – Table Join İşlemleri

-> Her iki tarafta meydana gelen güncelleme join işlemini tetikler.
-> Foreign key join işlemi yapabiliriz. Bu özellik stream’lerde yoktur.

 

Windowing Kavramı

-> KStream ve kısmen KTable’da veriler sürekli artmaktadır.  Stream’deki toplam veri sayısını count() ile bulabiliriz; ancak bazı durumlarda bu sayı işimize yaramaz.
-> Bazı durumlarda belirli zaman aralıklarına göre stream’i bölüp o zaman aralığındaki veriyi işleme ihtiyacı duyabiliriz. Örneğin: her dakika sisteme kaç kişi login olmuş sayısını bulmak için stream’i dakikalara bölmek gerekmektedir. Bu bölme işlemi windowing olarak ifade edilir. 
-> Kafka Streams’de 4 farklı şekilde windowing yapabiliriz. Bunlar hopping, tumbling, session ve sliding’tir.

-> Tumbling Window verileri sabit zaman dilimlerine böler. Veriler bu zaman aralıklarında ayrı ayrı işlenir. Örneğin stream’i saatlere bölerek her saat üretilen veriden çıkarımlar yapabiliriz. Örnek kod:

KStream<String, String> countStream = builder.stream(inputTopic,
       Consumed.with(stringSerde,stringSerde));
countStream.groupByKey()
       .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1, Duration.ofSeconds(30)))
       .count(Materialized.as("Tumbling-window-counting-store"))
       .toStream()
       .peek(printKV("Tumbling Window results"))
       .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value))
       .to(outputTopic, Produced.with(stringSerde, longSerde));


Bu örnekte her dakika count işlemi yapılır ve üretilen sonuçlar output topic’e kaydedilir. TimeWindows.ofSizeAndGrace ismindeki metodun ikinci parametresi grace period değeridir.
-> Grace period out-of-order veriler için gecikme süresini ifade eder. Yani topic’te tutulan record’ların timestamp değerine göre sıralaması bozulabilir. Bunun en önemli nedeni timestamp verisinin manuel ayarlanmasıdır. Örnek olarak veritabandsın’daki kayıtları bir topic’e yazdırırken timestamp değerini value değerinden çektirirsek out-of-order sorunu ile karşılaşabiliriz. Burada grace değer ile bu out-of-order süresinin maksimum kaç saniye tolere edileceği belirtilmiş olur. Bu örnekte 30 saniye belirtilmiş olduğu için maksimum 30 saniye kayıtların timestamp değerinde sapma tolere edilir.
-> Hopping Window belirli bir zaman aralığında ve belirli bir adım (hop) uzunluğunda pencereler oluşturmayı sağlar. Hop 5 saniye ve window 1 dakika ise her 5 saniyede 1 dakikalık window’lar oluşturulur. Bunun sonucunda iki window’daki bazı kayıtlar aynı olabilir. Hop değerini TimeWindows sınıfının advanceBy() metodu ile sağlarız. İlk kayıt geldiği zaman 0-5 dakikalık içerik olmayacağı için sadece 1 dakikalık veri olur. Sonra yeniden pencere oluşturulur ve bu pencere yani window’un aralığı 1-6 arası olur. 2 dakika sonra pencere aralığı 2-7 olur.  Yani kayıtlar overlap olur.  Örnek bir kullanım amacı bir e-ticaret sitesinde bir ürün detay sayfasında ortalama tıklama sayısını hesaplamak için kullanılabilir.
-> Session Window’da in-active zaman belirtilir. Yani 15 dakikalık in-active değeri varsa iki kayıt arasındaki timestamp farkı 15 dakikadan az olursa bu kayıtlar aynı window içerisinde yer alır. Yani web uygulamasındaki session mantığı gibi çalışmaktadır. Bu window yapısını kullanarak bir kullanıcının 15 dakika içerisinde yapmış olduğu tüm kayıtları user_id ye göre gruplayıp bulabiliriz. in-active time süresi aşıldığında ise yeni bir window oluşturulur ve kayıtlar bu yeni window içerisinde yer alır.
-> Sliding Window: Önce bir tane pencere açılır ve o pencere içerisinde bir kayıt eklenir. İkinci kayıt ile ilk kayıt timestamp değerine göre karşılaştırılır. SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)) şeklinde ayarlanan bir sliding window içerisine ikinci kaydın girebilmesi için timestamp farkı 30 saniyeyi geçmemelidir. Bu sayede 30 saniye içerisinde birbiri ile ilişkili işlemlerin gerçekleşip gerçekleşmediğini belirleyebiliriz. Yani bu işlemler aynı zamanda join operasyonları ile de kullanıldığı için fraud detection sağlanabilir. Sliding Window’da pencereler dinamik açılır. Yani 30 saniye süresi aşıldığı zaman yeni bir window açılır. Örnek bir fraud detection kodu:

 

Transactions.groupByKey()      .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))) // 5 dakikalık kayan pencere
       .aggregate(
               () -> 0.0,
               (key, transaction, total) -> total + transaction,
               Materialized.with(Serdes.String(), Serdes.Double())
       )
       .toStream()
       .foreach((key, total) -> {
           // Hareketli ortalamanın üzerindeki işlemleri tespit et
           if (total >= 100.0) {
               System.out.println("Sahtekarlık tespit edildi! Toplam: " + total + " zaman aralığı: " + key.window());
           }
       });

Bu kod’da yer alan aggregate metodunun ilk parametresi aggregate sonucunun yani toplam harcanan sonucun ilk değerini ifade ediyor. Yani başlangıçta 0.0 TL olarak belirtildi. Sonra her bir transaction işlemi yani ödenen tutar bilgisi ile toplam tutar toplanıp sonucu total’e eşitleniyor. foreach içerisinde ise her bir window’da yapılan ödeme miktarı 100 TL’den fazla ise fraud işlemi tespit edilmiş oluyor. Örnekten de görüleceği üzere toStream() metodu window sonuçlarını bir stream’e dönüştürür.

 

Suppress Kavramı

-> Window işlemi kullanılırken kayıtların sürekli güncellenmesi bir değişkende tutulması problemini ortaya çıkarmaktadır. Bu sorunu önlemek için suppress kullanılır. Örnek:

KStream<String, String> countStream = builder.stream(inputTopic,
       Consumed.with(stringSerde,stringSerde));
countStream.groupByKey()
       .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
       .count(Materialized.as("Tumbling-window-suppressed-final-counting-store"))
       // BufferConfig.withMaxRecords(10_000).shutDownWhenFull()) use this instead to control the shutdown
       .suppress(untilWindowCloses(maxRecords(10_000).shutDownWhenFull()))
       .toStream()
       .peek(printKV("Tumbling Window suppressed-final results"))
       .map((windowedKey, value) -> KeyValue.pair(windowedKey.key(), value))
       .to(outputTopic, Produced.with(stringSerde, longSerde));

Eğer bu şekilde bir kullanım sağlamazsak window’lama işlemi bitmeden memory exception yaşanabilir. Bu kod ile window koşulunu değiştiriyoruz. Yani window için belirlenen süre bitmeden kayıt sayısına göre window kapatılması sağlanır.

 

Timestemps in Streams


-> Timestamp’leri üç gruba ayırabiliriz: Event time, Ingestion time ve Processing time
-> Event time eğer kayıt içerisinde timestamp field’ı varsa o değer kullanılır, eğer böyle bir alan yoksa record’un producer tarafından yaratıldığı zamanı ifade eder. Varsayılan değer producer’ın create time’ıdır.
-> Ingestion Time (Log-append-time): Kafka broker’ı tarafından set edilir.
-> Processing Time: Record bir processing pipeline’den geçerken set edilir. Daha doğrusu record consume edilirken set edilir.
-> Bunlara ek olarak TimeStampExtractor sınıfı ile custom timestamp set edilir.

 

Custom Timestamp Extractor

-> Bir record içerisindeki herhangi bir field’i örneğin createdAt, updatedAt gibi timestamp olarak ayarlamak için Consumed.withTimestampExtractor() metodunu kullanırız. Yani recordları consume etmeden önce timestamp belirlemek mümkündür. Evet burada consume etmeden önce timestamp dan bahsediyoruz. Çünkü consume edilirken bir kaydın timestamp değeri değiştirilip sonra stream işlemleri de yapmak mümkündür. Örnek:

public class TransactionTimestampExtractor implements TimestampExtractor {

   @Override
   public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
       Purchase purchasePurchaseTransaction = (Purchase) record.value();
       return purchasePurchaseTransaction.getPurchaseDate().getTime();
   }
}

KStream<String, String> inputTopic = new StreamsBuilder().stream(
       "input-topic",
       Consumed.with(Serdes.String(), Serdes.String()).withTimestampExtractor(new CustomTimestampExtractor())
);

-> Bir diğer seçenek global olarak timestampt extractor sınıfını kullanmaktır. DEFAULT_TIME_STAMP_EXTRACTOR_CLASS_CONFIG değerini aşağıdaki gibi kullanabiliriz:

public class SimpleKafkaStreamsApp {
   public static void main(String[] args) {
       Properties config = new Properties();
       config.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
       config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-servers");
       config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TransactionTimestampExtractor.class.getName());

       StreamsBuilder builder = new StreamsBuilder();

       KStream<String, String> inputTopic = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));


       inputTopic.foreach((key, value) -> {
           System.out.println("Key: " + key + ", Value: " + value);

       });

       KafkaStreams streams = new KafkaStreams(builder.build(), config);
       streams.start();

 
       Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
   }
}

Not: DEFAULT_TIME_STAMP_EXTRACTOR_CLASS_CONFIG ayarının varsayılan değeri FailOnInvalidTimestamp.class sınıfıdır.

 

Streamtime

-> Bir recordun delay değerini ölçmek için kullanılan hep ileri doğru hareket eden zamandır. Delay değerini hesaplamanın nedeni, windowing işleminde grace ayarı ile belirlenen zaman farkını kontrol etmektir. Eğer delay değeri ayarlanan grace süresinden daha az ise record aynı window içerisine eklenir; aksi durumda yeni window da yer alır.
-> En büyük timestamp’tir.
-> Record’un out-of-order olup olmadığını belirlemeye yarar. Stream time değeri aşağıdaki şekilde görüldüğü üzere record’un timestamp değerine set edilir. Eğer out-of-order olmuş bir paket gelirse herhangi bir işlem yapılmaz.

 

 

ksqlDB

-> Stream oluşturma işlemi aşağıdaki gibi SQL sorgusuna benzer bir sorgu ile gerçekleştirilir:

create
stream abc_stream (cumle VARCHAR) 
WITH (kafka_topic='cumle-satirlar', partitions = 1, value_format='KAFKA');

-> Veri Ekleme işlemi

INSERT INTO abc_stream(cumle) VALUES('Cümle 1');
INSERT INTO abc_stream(cumle) VALUES('Cümle 2');
INSERT INTO abc_stream(cumle) VALUES('Cümle 3');

-> Push Queries: Oluşturulan stream den veri çekmek istediğimizde bir sorgu yazılır. Bu sorgu stream sorgusu olduğundan dolayı sorgunun sonucu veri akışı ksqlDB client uygulamasında yani terminalinde PING sorgusu gibi akmaya devam eder. Bu veri akışı işlemine ksqlDB server’ından ksqlDB client’ına push işlemi denir. Yani veriler sunucudan client’a sürekli push edilir.
-> Yeni bir stream oluşturma işlemi aşağıdaki gibi yapılabilir. Bu örnekte ilk stream’den yeni bir stream üretildi. Yeni stream’de base stream’deki kayıtlar upper case olarak consume edilir.

CREATE STREAM yelling AS
SELECT UCASE(cumle) as SHOUT
FROM abc_stream EMIT CHANGES;


-> Timestamp bilgisini ayarlama işlemi aşağıdaki gibi çok kolaydır:

WITH (kafka_topic='user_activity',
   partitions=4,
   value_format='JSON',
   timestamp ='created_at',
   timestamp_format='yyyy-MM-dd HH:mm:ss');

-> Producer’ın varsayılan timestamp değerini kullanmak için, ksqlDB ROWTIME isminde her bir record için eklemiş olduğu field’i kullanmalıyız.
-> ksqlDB de topic yoksa create edilir. Eğer var olan bir topic kafka_topic alanında yazılırsa ve yukarıdaki gibi partitions,timestamp vb. özellikleri de ekstradan belirtilse bile var olan topic ayarları güncellenmez! Bu ayarlar sadece ilk topic create edileceği zaman kullanılır. Hatırlarsak genel kural topic’lerin öncesinde manuel olarak create edilmesidir. Auto-create özelliği fazla tavsiye edilmemektedir. 
-> key_format ve value_format değerleri şu türlerden birisi olabilir: JSON, JSON_SR, AVRO, PROTOBUF, NONE, KAFKA, DELIMITED, PROTOBUF_NOSR
-> Avro veya Protobuf kullanılacağı zaman ksqlDB’nin ksql.schema.registry.url property değeri docker kurulumunda set edilmelidir.
-> Stream create ederken WITH içerisinde value_schema_id değerini ayarlayabiliriz. Eğer ayarlanmazsa son sürüm kullanılır.
-> AVRO veya Protobuf formatı kullanıldığında schema registry’de key ve value şemasının id bilgisi şu şekilde belirlenir: topic-adi-key ve topic-adi-value. Bu id değerlerine sahip bir şema bulunduğunda, ksqlDB o şema tanımına göre veriyi deserialize yapar.
-> Sütun isimleri de de-serialize edildikten sonra ilgili sınıfın field alanları ile temsil edilir. 
-> GROUP BY keyword ile bir sorgu yazılırken CREATE STREAM ifadesi KULLANILAMAZ. Hatırlarsak GROUP BY veriyi key’e göre grupladığı için işlemin sonunda KGroupTable nesnesi üretiliyordu. Aynı şekilde ksqlDB’de de gruplama yapıldığında bir tablo oluşur. Örnek sorgu aşağıdaki gibi yazılmalıdır:

CREATE TABLE activity_leaders AS
SELECT last_name, SUM(steps)
FROM user_activity
GROUP BY last_name EMIT CHANGES;

-> Direk Table oluşturmak için şu sorgu çalıştırılmalıdır:

CREATE TABLE user_activity_table
(
   user_id    INT PRIMARY KEY,
   first_name VARCHAR,
   last_name  VARCHAR,
   activity   VARCHAR,
   event_time VARCHAR,
   steps      INT

) WITH (kafka_topic = 'user_activity',
     partitions4,
     key_format = 'KAFKA'
     value_format = 'JSON',
     timestamp = 'event_time',
     timestamp_format = 'yyyy-MM-dd HH:mm:ss'
);

-> Diyelim ki last_name’e göre gruplamak mantıksız geldi ve tabloyu aynı isimde ve farklı ayarlarla yeniden oluşturmak istiyoruz. Bunun için öncelikle var olan table silinir ve yeniden oluşturulur. Silme işlemi için şu şekilde bir sorgu yazılır:

drop table activity_leaders DELETE topic;

Not: DELETE topic ifadesi zorunlu değildir ancak arka planda gereksiz topic kalmaması için bu şekilde kullanmak daha doğrudur. Topic asenkron bir şekilde silinir yani direk silinmez.

-> Composite keys: Table oluştururken group by ifadesinden sonra birden fazla field belirtebiliriz. O zaman bu tablo’nun key’i composite key olur yani birden fazla key’den oluşur.

CREATE TABLE activity_leaders AS
SELECT first_name, last_name, activity, SUM(steps)
FROM user_activity
GROUP BY first_name, last_name, activity EMIT CHANGES;

Not: Bu sorgu çalıştırıldığında KEY formatı KAFKA yani basit tip olduğu için “Key format doesn’t support schema” hatası alınır. Composite key JSON gibi, AVRO gibi türe sahip olmalıdır. Sorguyu şu şekilde düzeltirsek başarılı bir şekilde execute edilir:

CREATE TABLE activity_leaders WITH (KEY_FORMAT='JSON') AS
SELECT first_name, last_name, activity, SUM(steps)
FROM user_activity
GROUP BY first_name, last_name, activity EMIT CHANGES;


Not: Yukarıdaki ifade görüldüğü üzere first_name, last_name ve activity field’leri bir key field oluşturmak için bir araya geldiğinden dolayı ve SELECT sorgusu sadece değeri VALUE’den getirdiği için ilgili sütun değerlerini göremeyiz. Bundan dolayı AS_VALUE isimli bir keyword kullanılmalıdır. Örnek olarak select AS_VALUE ( first_name) şeklinde kullanabiliriz.

 

Persistent, Push ve Pull Query Kavramları

-> ksqlDB’de üç tip query vardır: Push bir diğer adıyla continues query, pull query ve persistent query
-> Persistent query CREATE STREAM veya CREATE TABLE gibi query’leri ifade eder. Bunlar veriyi topic’e yazar. Eğer topic yoksa oluşturulur varsa oluşturulmaz.
-> Push query kavramına daha önce değinmiştik. Ek olarak sadece şu denebilir: ROWTIME, ROWPARTITION ve ROWOFFSET değerleri de yer alır.
-> Push query durdurmak için LIMIT keyword’u kullanılır. O limit değerine geldiğinde query sona erer. EMIT CHANGES LIMIT 10 dediğimizde ekrana 10 tane kayıt yazılır ve kapanır.
-> Pull Query select last_name, activity from activity_leaders where key_1=’Smith’ gibi nokta atışı sorgular için kullanılan bir terimdir . burada key_1 ile composite key’in ilk sütunu ifade edilir.

 

Creating Streams and Tables

-> Şimdiye kadar incelemiş olduğumuz ksqlDB sorgularında key değeri hiç kullanılmadı. Sadece value değerini kullandık. Bu tarz sorgularda key değeri belirtilmediği için, null olarak belirlenmiştir. 
-> Key’i create stream veya create table sorgularında tanımlamak için şu şekilde bir kod yazılmalıdır:

CREATE
STREAM user_activity (user_id INT KEY,
….) WITH (
…. key_format='KAFKA')

-> Bir tabloyu direk bir topic’ten oluşturmak için şu şekilde bir sorgu yazılmalıdır:

CREATE TABLE user_activity_table
(
   user_id    INT PRIMARY KEY,
   first_name VARCHAR
)
   WITH (kafka_topic = 'user_activity',
       Partitions = 4,
       key_format = 'KAFKA',
       value_format = 'JSON')


Not: Table create ederken primary key zorunludur. Eğer group by kullanılmazsa select ile table create edemeyiz. Yukarıdaki örnekte direk bir topic’ten table oluşturulduğu için primary key belirtilmesi gerekmektedir. Ayrıca bu key user_activity isimli yeni topic için key alanı olarak belirlenecektir! Eğer WITH ifadesi ile kafka_topic değeri belirtilmeseydi oluşturulacak topic adı user_activity_table olacaktı!

 

 


Schema Registry Integration

-> ksqlDB key-format veya value-format değerleri Protobuf, Avro gibi bir değer aldığında otomatik olarak field’ları schema registry’den algılar. Bu sayede CREATE TABLE user_activity_table (user_id int primary key) with… sorgusu ile table üretildiği zaman otomatik olarak diğer alanlar da table de gözükür. Hatta ve hatta key_format değeri de Avro veya Protobuf ise primary key bile belirtilmenden key alanı da çıkarılır. Yani sorgu CREATE TABLE user_activity_table with(….) şekline dönüşür.
-> Select … AS şeklinde oluşturulan bir sorguda value_format=’Protobuf’ veya Avro olduğunda ksqlDB, sub-stream veya table değerini schema registry’e otomatik olarak kaydeder.  
-> Protobuf to Avro veya Avro to Protobuf veya JSON dönüşümü işlemleri SELECT … AS sorgularında mümkündür. BASE stream ya da TABLE’dan yeni bir table üretilip value_format değerini base stream’den farklı belirtebiliriz. Bu özellik ile Avro formatını desteklemeyen client’lara JSON formatında veri sağlanabilir.


CREATE TABLE activity_count WITH (value_format = 'JSON')
AS
select last_name,
      count(activity) as activity_count
from user_activity
group by last_name EMIT CHANGES;

 

JOIN Streams ve Table İşlemleri

-> İki stream’in join işlemi yapılabilmesi için iki kural vardır: Key değerleri aynı olmalı ve partition sayıları aynı olmalıdır!

CREATE STREAM coffee_purchase_stream (custId VARCHAR KEY,
                                     drink VARCHAR,
                                     drinkSize VARCHAR,
                                     price DOUBLE,
                                     purchaseDate BIGINT)
    WITH (kafka_topic = 'coffee-purchase',
          partitions = 1,
          value_format = 'PROTOBUF',
          timestamp = 'purchaseDate'
    );


CREATE STREAM store_purchase_stream(custId VARCHAR KEY,
                             credit_card VARCHAR,
                                   purchaseDate BIGINT,
                                   storeId VARCHAR,
                                   total DOUBLE)
    WITH (kafka_topic = 'store-purchase',
          partitions = 1,
          value_format = 'PROTOBUF',
          timestamp = 'purchaseDate'
    );


CREATE STREAM customer-rewards-stream AS
SELECT c.custId AS customerId, 
      s.total as amount,
      CASE
        WHEN s.total < 25.00 THEN 15
                 WHEN s.total < 50.00 THEN 50
                 ELSE 75
          END AS reward_points
FROM coffee-purchase-stream c
   INNER JOIN store-purchase-stream s
   WITHIN 30 MINUTES GRACE PERIOD 2 MINUTES
ON c.custId = s.custId


Kafka Streams ile ksqlDB Arasındaki Table Farkı


Kafka Streams ve ksqlDB, farklı veri işleme araçlarıdır ve farklı yöntemlerle çalışırlar. İkisi arasındaki bazı önemli farklar şunlardır:
1.Veri İşleme Yaklaşımı:
•Kafka Streams: Kafka Streams, uygulamanın kod tabanında doğrudan işlem yürütür. Stateful işlem için durum deposu (state store) kullanır ve işlemleri belirli bir zaman penceresi içinde veya gerçek zamanlı olarak yapabilir.
•ksqlDB: ksqlDB daha yüksek seviyede bir SQL motoru gibidir. KSQL sorgularını çalıştırarak veriyi işleyebilirsiniz. ksqlDB'de "tablo" terimi, bir Kafka topic üzerine yapılmış bir sorgunun sonucunu ifade eder ve bu sonuçlar bir topic’te kaydedilir.
2.State Depolama:
•Kafka Streams: Kafka Streams, durum depolama (state store) adında bir mekanizma kullanarak veriyi saklar. KTable oluşturduğunuzda, bunun verileri bu durum deposunda tutulur.
•ksqlDB: ksqlDB, verileri yerel bir durum deposunda saklamaz. KSQL tabloları, altta yatan Kafka topic'e dayalıdır ve sorgu sonuçlarını dinamik olarak oluşturur.
3.Tablo ve Topic İlişkisi:
•Kafka Streams: Bir KTable bir durum deposundaki (state store) verilerin bir temsili olarak düşünülebilir. Ancak KTable'daki verileri sürekli bir Kafka topic olarak düşünmek yanıltıcı olabilir. Kafka topic ve KTable arasında bağlantı bulunur, ancak KTable kendisi bir topic değildir.
•ksqlDB: ksqlDB'de bir tablo oluşturduğunuzda, altta yatan bir Kafka topic oluşturulur.

Kısacası, ksqlDB SQL sorgularını bir stream uygulamasına dönüştüren java uygulaması gibi ifade edebiliriz. SQL ifadelerini ilgili KStream veya KTable yapısına yani stream node topology’sine dönüştürüp çalıştırır.

Not: ksqlDB server durduğu zaman veri akışı da durur.
 

© 2019 Tüm Hakları Saklıdır. Codesenior.COM