Norėdami baigti šią pamoką, turite turėti aktyvų „Kafka“ diegimą savo kompiuteryje. Perskaitykite „Apache Kafka“ diegimas „Ubuntu“, kad sužinotumėte, kaip tai padaryti.
„Apache Kafka“ „Python“ kliento diegimas
Kad galėtume pradėti dirbti su „Apache Kafka“ programoje „Python“, turime įdiegti „Apache Kafka“ programą „Python“. Tai galima padaryti naudojant pip („Python“ paketo rodyklė). Čia yra komanda tai pasiekti:
pip3 įdiekite kafka-pythonTai bus greitas įdiegimas terminale:
„Python Kafka“ kliento diegimas naudojant PIP
Dabar, kai turime aktyvų „Apache Kafka“ diegimą ir „Python Kafka“ klientą, esame pasirengę pradėti koduoti.
Gamintojo kūrimas
Pirmas dalykas, kurį reikia paskelbti pranešimuose „Kafka“, yra gamintojo programa, galinti siųsti pranešimus „Kafka“ temomis.
Atkreipkite dėmesį, kad „Kafka“ gamintojai yra asinchroniniai pranešimų gamintojai. Tai reiškia, kad operacijos, atliktos skelbiant pranešimą „Kafka Topic“ skaidinyje, nėra blokuojamos. Kad viskas būtų paprasta, šiai pamokai parašysime paprastą JSON leidėją.
Norėdami pradėti, sukurkite „Kafka Producer“ egzempliorių:
iš kafka importo KafkaProducerimportuoti Json
importo atspaudas
gamintojas = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.sąvartynai (v).koduoti ('utf-8'))
Atributas „bootstrap_servers“ informuoja apie „Kafka“ serverio pagrindinį kompiuterį ir prievadą. Atributas „value_serializer“ skirtas tik JSON susidariusių JSON reikšmių serizavimui.
Norėdami žaisti su „Kafka Producer“, pabandykime išspausdinti metriką, susijusią su „Producer“ ir „Kafka“ grupe:
metrika = gamintojas.metrika ()pprint.spauda (metrika)
Dabar pamatysime:
Kafka Mterics
Dabar pagaliau pabandykime nusiųsti žinutę į „Kafka“ eilę. Paprastas JSON objektas bus geras pavyzdys:
gamintojas.siųsti ('linuxhint', 'topic': 'kafka')The linuxhint yra temos skaidinys, kuriuo bus siunčiamas JSON objektas. Vykdydami scenarijų, negausite jokio išvesties, nes pranešimas tiesiog išsiųstas į temos skaidinį. Atėjo laikas parašyti vartotojui, kad galėtume išbandyti savo programą.
Padaryti vartotoją
Dabar mes esame pasirengę užmegzti naują ryšį kaip vartotojo programa ir gauti pranešimus iš „Kafka“ temos. Pradėkite nuo naujo vartotojo egzemplioriaus sukūrimo:
iš kafka importo KafkaConsumeriš kafka importo TopicPartition
spausdinti ('Užmegzti ryšį.„)
vartotojas = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Dabar priskirkite temą šiam ryšiui ir galimą poslinkio vertę.
spausdinti ('Temos priskyrimas.„)vartotojas.priskirti ([TopicPartition ('linuxhint', 2)])
Galiausiai mes pasirengę atsispausdinti mssage:
spausdinti ('Gaunamas pranešimas.„)pranešimui vartotojui:
spausdinti ("OFFSET:" + str (pranešimas [0]) + "\ t MSG:" + str (message))
Per tai gausime visų paskelbtų pranešimų „Kafka“ vartotojų temų skyriuje sąrašą. Šios programos rezultatas bus:
„Kafka“ vartotojas
Norint greitai sužinoti, čia yra visas „Producer“ scenarijus:
iš kafka importo KafkaProducerimportuoti Json
importo atspaudas
gamintojas = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.sąvartynai (v).koduoti ('utf-8'))
gamintojas.siųsti ('linuxhint', 'topic': 'kafka')
# metrika = gamintojas.metrika ()
# pprint.spauda (metrika)
Ir čia yra visa vartotojų programa, kurią naudojome:
iš kafka importo KafkaConsumeriš kafka importo TopicPartition
spausdinti ('Užmegzti ryšį.„)
vartotojas = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
spausdinti ('Temos priskyrimas.„)
vartotojas.priskirti ([TopicPartition ('linuxhint', 2)])
spausdinti ('Gaunamas pranešimas.„)
pranešimui vartotojui:
spausdinti ("OFFSET:" + str (pranešimas [0]) + "\ t MSG:" + str (message))
Išvada
Šioje pamokoje mes pažvelgėme į tai, kaip galime įdiegti „Apache Kafka“ ir pradėti jį naudoti savo „Python“ programose. Mes parodėme, kaip lengva atlikti paprastas užduotis, susijusias su „Kafka“ sistemoje „Python“, su demonstruojamu „Kafka Client for Python“.