Apache Kafka üzerinde konular (topic) vasıtası ile veriler gönderebilir ve bu verileri okuyabiliriz.
Bu işlem için bir çok programlama dilinde çözümler olduğu gibi örnek olması açısından Python3 üzerinde python3-kafka paketi ile denemelerimizi yapacağım.
Bir önceki yazımızdaki docker konteynırı üzerinde yapabileceğiniz gibi aşağıda belirli ayarları yaparak kendi sunucunuzla da iletime geçebilirsiniz.
Python için Kafka kütüphanesini yükleme
Python için üzerinde bir çok kafka kütüphanesi olmasına rağmen en çok kullanılanlardan birisi olan python3-kafka paketini kullanacağım. Kurulum için aşağıdaki komutun yazılması gerekecektir
sudo apt install python3-kafka
Bu adımdan sonra python üzerinde kafka kütüphanesini kullanabilir olacağız.
Üretici ile veri yazma
Bu adımda "uretici.py" isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak üretici (producer) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('ascii')
)
producer.send(
'ornekinsanlar',
value=
{
"name": "Ali Yazar",
"yas": "12",
"dogumyeri": "Zonguldak",
"detay": "iyi bayramlar!"
}
)
producer.flush()
Yazılan uygulamayı incelersek "ornekinsanlar" olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.
Üst kısımda KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.
Ayrıca Kafka'nın SSL desteği bulunduğu için normal şartlarda SSL ayarlarının da yapılması gerektiğini söylemem gerekir. Fakat örnek olması açısından bu ayarları es geçiyoruz.
Üretici uygulamamızın Kafka ile haberleşmesi sonrasında aşağıdaki gibi bir JSON formatının Kafka'ya gönderilmesi sağlanmakta.
{
"name": "Ali Yazar",
"yas": "12",
"dogumyeri": "Zonguldak",
"detay": "iyi bayramlar!"
}
Bu kısımdaki içerik uygulamanızın ihtiyaçlarını anlatmakta. İstediğiniz gibi oluşturabilirsiniz.
Veri yazma işlemi
Dosyayı oluşturdu isek aşağıdaki gibi dosyanın içerisine yazdığımız json verisini Kafka'ya gönderebiliriz. İçeriğini değiştirerek birkaç kere çalıştırdığınızda peş peşe eklendiğini göreceksiniz.
python3 uretici.py
Tüketici ile veri okuma
Yazdığınız verilerin okunması kısmı da oldukça basit olmaktadır.
Bu adımda "tuketici.py" isimli basit bir dosya oluşturabiliriz. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak tüketici (consumer) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.
from kafka import KafkaConsumer
from pprint import pprint
if __name__ == '__main__':
consumer = KafkaConsumer('ornekinsanlar', bootstrap_servers="localhost:9092",
enable_auto_commit=False, auto_offset_reset="earliest")
pprint(consumer.metrics())
for msg in consumer:
pprint(msg)
Yazılan uygulamayı incelersek "ornekinsanlar" olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.
Benzer şekilde KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.
İlk olarak bağlantı kurduğumuz Kafka ile temel bilgileri çekmek için "consumer.metrics()" fonksiyonunu kullanıyoruz.
Daha sonrasında ise çektiğimiz konu (topic) içerisindeki verilerin for döngüsü içerisinde verilerinin tamamının çekileceğini görebiliriz.
python3 tuketici.py
Uygulamanın çıktısı aşağıdaki gibi olacaktır.
ConsumerRecord(topic='ornekinsanlar', partition=0, offset=0, timestamp=1657528880472, timestamp_type=0, key=None, value=b'{"name": "Ali Bir", "yas": "21", "dogumyeri": "Ankara", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
ConsumerRecord(topic='ornekinsanlar', partition=0, offset=1, timestamp=1657529032891, timestamp_type=0, key=None, value=b'{"name": "Ali Yazar", "yas": "12", "dogumyeri": "Zonguldak", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
Nam et ipsa scientia potestas est