Apache Kafka ile veri yazma ve okuma

Ali Orhun Akkirman - Jul 11 '22 - - Dev Community

Apache Kafka üzerinde konular (topic) vasıtası ile veriler gönderebilir ve bu verileri okuyabiliriz.

Bu işlem için bir çok programlama dilinde çözümler olduğu gibi örnek olması açısından Python3 üzerinde python3-kafka paketi ile denemelerimizi yapacağım.

Bir önceki yazımızdaki docker konteynırı üzerinde yapabileceğiniz gibi aşağıda belirli ayarları yaparak kendi sunucunuzla da iletime geçebilirsiniz.

Python için Kafka kütüphanesini yükleme

Python için üzerinde bir çok kafka kütüphanesi olmasına rağmen en çok kullanılanlardan birisi olan python3-kafka paketini kullanacağım. Kurulum için aşağıdaki komutun yazılması gerekecektir

sudo apt install python3-kafka
Enter fullscreen mode Exit fullscreen mode

Bu adımdan sonra python üzerinde kafka kütüphanesini kullanabilir olacağız.

Üretici ile veri yazma

Bu adımda "uretici.py" isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak üretici (producer) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
 bootstrap_servers='localhost:9092',
 value_serializer=lambda v: json.dumps(v).encode('ascii')
)

producer.send(
 'ornekinsanlar',
 value=
    {
     "name": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }
)
producer.flush()

Enter fullscreen mode Exit fullscreen mode

Yazılan uygulamayı incelersek "ornekinsanlar" olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.

Üst kısımda KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

Ayrıca Kafka'nın SSL desteği bulunduğu için normal şartlarda SSL ayarlarının da yapılması gerektiğini söylemem gerekir. Fakat örnek olması açısından bu ayarları es geçiyoruz.

Üretici uygulamamızın Kafka ile haberleşmesi sonrasında aşağıdaki gibi bir JSON formatının Kafka'ya gönderilmesi sağlanmakta.

    {
     "name": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }
Enter fullscreen mode Exit fullscreen mode

Bu kısımdaki içerik uygulamanızın ihtiyaçlarını anlatmakta. İstediğiniz gibi oluşturabilirsiniz.

Veri yazma işlemi

Dosyayı oluşturdu isek aşağıdaki gibi dosyanın içerisine yazdığımız json verisini Kafka'ya gönderebiliriz. İçeriğini değiştirerek birkaç kere çalıştırdığınızda peş peşe eklendiğini göreceksiniz.

python3 uretici.py
Enter fullscreen mode Exit fullscreen mode

Tüketici ile veri okuma

Yazdığınız verilerin okunması kısmı da oldukça basit olmaktadır.

Bu adımda "tuketici.py" isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak tüketici (consumer) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaConsumer
from pprint import pprint

if __name__ == '__main__':
    consumer = KafkaConsumer('ornekinsanlar', bootstrap_servers="localhost:9092",
                             enable_auto_commit=False, auto_offset_reset="earliest")
    pprint(consumer.metrics())
    for msg in consumer:
        pprint(msg)
Enter fullscreen mode Exit fullscreen mode

Yazılan uygulamayı incelersek "ornekinsanlar" olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.

Benzer şekilde KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

İlk olarak bağlantı kurduğumuz Kafka ile temel bilgileri çekmek için "consumer.metrics()" fonksiyonunu kullanıyoruz.

Daha sonrasında ise çektiğimiz konu (topic) içerisindeki verilerin for döngüsü içerisinde verilerinin tamamının çekileceğini görebiliriz.

python3 tuketici.py
Enter fullscreen mode Exit fullscreen mode

Uygulamanın çıktısı aşağıdaki gibi olacaktır.

ConsumerRecord(topic='ornekinsanlar', partition=0, offset=0, timestamp=1657528880472, timestamp_type=0, key=None, value=b'{"name": "Ali Bir", "yas": "21", "dogumyeri": "Ankara", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
ConsumerRecord(topic='ornekinsanlar', partition=0, offset=1, timestamp=1657529032891, timestamp_type=0, key=None, value=b'{"name": "Ali Yazar", "yas": "12", "dogumyeri": "Zonguldak", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
Enter fullscreen mode Exit fullscreen mode

Nam et ipsa scientia potestas est


Terabox Video Player