Būtina sąlyga
Norėdami skaityti duomenis iš „Kafka“, turite įdiegti reikiamą pitono biblioteką. „Python3“ šioje mokymo programoje naudojamas vartotojui ir gamintojui rašyti. Jei paketas prieš jūsų „Linux“ operacinę sistemą dar nebuvo įdiegtas, prieš diegdami „Pyfon“ „Kafka“ biblioteką turite įdiegti „pip“. python3-kafka yra naudojamas šioje pamokoje norint skaityti duomenis iš „Kafka“. Norėdami įdiegti biblioteką, vykdykite šią komandą.
$ pip įdiekite python3-kafkaSkaitykite paprastus tekstinius duomenis iš „Kafka“
Iš gamintojo gali būti siunčiami įvairių tipų duomenys tam tikra tema, kurią vartotojas gali perskaityti. Kaip paprastus tekstinius duomenis galima siųsti ir gauti iš „Kafka“ naudojant gamintoją ir vartotoją, parodyta šioje šios pamokos dalyje.
Sukurkite failą pavadinimu gamintojas1.py su šiuo python scenarijumi. KafkaGamintojas modulis importuojamas iš „Kafka“ bibliotekos. Brokerių sąrašas turi būti apibrėžtas gamintojo objekto inicijavimo metu, kad būtų galima prisijungti prie „Kafka“ serverio. Numatytasis „Kafka“ prievadas yra9092 m". „bootstrap_servers“ argumentas naudojamas norint apibrėžti pagrindinio kompiuterio pavadinimą su uostu. "„First_Topic“'yra nustatytas kaip temos pavadinimas, kuriuo teksto pranešimas bus išsiųstas iš gamintojo. Kitas, paprastas tekstinis pranešimas “Sveiki iš Kafkos'siunčiamas naudojant siųsti () metodas KafkaGamintojas prie temos “„First_Topic“".
gamintojas1.py:
# Importuokite „KafkaProducer“ iš „Kafka“ bibliotekosiš kafka importo KafkaProducer
# Apibrėžkite serverį su uostu
bootstrap_servers = ['localhost: 9092']
# Apibrėžkite temos pavadinimą, kuriame bus paskelbtas pranešimas
topicName = 'First_Topic'
# Inicijuoti gamintojo kintamąjį
gamintojas = KafkaProducer (įkrovos_pagalbos_superverčiai = įkrovos_apsaugos_serveriai)
# Paskelbkite tekstą apibrėžtoje temoje
gamintojas.siųsti (topicName, b'Hello from kafka… ')
# Spausdinti pranešimą
spausdinti („Pranešimas išsiųstas“)
Sukurkite failą pavadinimu vartotojas1.py su tokiu pitono scenarijumi. „Kafka“ vartotojas modulis yra importuojamas iš „Kafka“ bibliotekos, kad būtų galima skaityti duomenis iš „Kafka“. sys modulis yra naudojamas scenarijaus nutraukimui. Vartotojo scenarijuje naudojamas tas pats gamintojo kompiuterio vardas ir prievado numeris, norint nuskaityti duomenis iš „Kafka“. Vartotojo ir gamintojo temos pavadinimas turi būti tas pats, kuris yra "Pirmoji tema". Toliau vartotojo objektas inicijuojamas trimis argumentais. Temos pavadinimas, grupės ID ir serverio informacija. dėl ciklas naudojamas čia norint perskaityti „Kafka“ gamintojo atsiųstą tekstą.
vartotojas1.py:
# Importuokite „KafkaConsumer“ iš „Kafka“ bibliotekosiš kafka importo KafkaConsumer
# Importuoti sys modulį
importuoti sistemas
# Apibrėžkite serverį su uostu
bootstrap_servers = ['localhost: 9092']
# Apibrėžkite temos pavadinimą, iš kur gausite pranešimą
topicName = 'First_Topic'
# Inicijuoti vartotojo kintamąjį
vartotojas = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Skaitykite ir atsispausdinkite vartotojo pranešimą
vartotojo žinutei:
spausdinti ("Temos pavadinimas =% s, Pranešimas =% s"% (žin.tema, praneš.vertė))
# Nutraukti scenarijų
sys.išėjimas ()
Išvestis:
Paleiskite šią komandą iš vieno terminalo, kad įvykdytumėte gamintojo scenarijų.
$ python3 gamintojas1.pyIšsiuntus pranešimą, pasirodys toks išvestis.
Paleiskite šią komandą iš kito terminalo, kad įvykdytumėte vartotojo scenarijų.
$ python3 vartotojas1.pyIšvestyje rodomas temos pavadinimas ir teksto pranešimas, kurį atsiuntė gamintojas.
Skaitydami JSON suformatuotus „Kafka“ duomenis
JSON formato duomenis gali siųsti „Kafka“ gamintojas, o „Kafka“ vartotojas gali juos perskaityti naudodamasis Json python modulis. Kaip JSON duomenys gali būti nuoseklūs ir išjungti iš eilės prieš siunčiant ir gaunant duomenis naudojant „python-kafka“ modulį, parodyta šioje mokymo programos dalyje.
Sukurkite python scenarijų pavadinimu gamintojas2.py su tokiu scenarijumi. Kitas modulis, pavadintas JSON, yra importuojamas su KafkaGamintojas modulį čia. value_serializer argumentas naudojamas su bootstrap_servers argumentas čia inicializuoti „Kafka“ gamintojo objektą. Šis argumentas rodo, kad JSON duomenys bus užkoduoti naudojant „utf-8simbolių rinkinys siuntimo metu. Tada JSON suformatuoti duomenys siunčiami į pavadintą temą JSONtopika.
gamintojas2.py:
# Importuokite „KafkaProducer“ iš „Kafka“ bibliotekosiš kafka importo KafkaProducer
# Importuokite JSON modulį, kad duomenys būtų nuoseklūs
importuoti Json
# Inicializuokite gamintojo kintamąjį ir nustatykite JSON kodavimo parametrą
gamintojas = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.sąvartynai (v).koduoti ('utf-8'))
# Siųsti duomenis JSON formatu
gamintojas.siųsti ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
# Spausdinti pranešimą
spausdinti („Pranešimas išsiųstas JSONtopic“)
Sukurkite python scenarijų pavadinimu vartotojas2.py su tokiu scenarijumi. „Kafka“ vartotojas, sys ir JSON moduliai importuojami į šį scenarijų. „Kafka“ vartotojas modulis naudojamas skaityti JSON suformatuotiems duomenims iš „Kafka“. JSON modulis naudojamas užkoduotų JSON duomenų, siunčiamų iš „Kafka“ gamintojo, dekodavimui. Sys modulis naudojamas scenarijaus nutraukimui. value_deserializer argumentas naudojamas su bootstrap_servers apibrėžti, kaip bus dekoduojami JSON duomenys. Kitas, dėl kilpa naudojama atspausdinti visus vartotojų įrašus ir JSON duomenis, paimtus iš „Kafka“.
vartotojas2.py:
# Importuokite „KafkaConsumer“ iš „Kafka“ bibliotekosiš kafka importo KafkaConsumer
# Importuoti sys modulį
importuoti sistemas
# Importuokite „Json“ modulį, kad duomenys būtų nuoseklūs
importuoti Json
# Inicializuokite vartotojo kintamąjį ir nustatykite JSON dekodavimo ypatybę
vartotojas = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.apkrovos (m.iššifruoti („utf-8“)))
# Skaitykite „kafka“ duomenis
pranešimui vartotojui:
spausdinti ("Vartotojo įrašai: \ n")
spausdinti (pranešimas)
spausdinti ("\ nSkaitymas iš JSON duomenų \ n")
spausdinti ("Vardas:", pranešimas [6] ['vardas'])
spausdinti („El. paštas:“, pranešimas [6] [„el. paštas“])
# Nutraukti scenarijų
sys.išėjimas ()
Išvestis:
Paleiskite šią komandą iš vieno terminalo, kad įvykdytumėte gamintojo scenarijų.
$ python3 gamintojas2.pyIšsiuntę JSON duomenis, scenarijus išspausdins šį pranešimą.
Paleiskite šią komandą iš kito terminalo, kad įvykdytumėte vartotojo scenarijų.
$ python3 vartotojas2.pyŠi išvestis pasirodys paleidus scenarijų.
Išvada:
Duomenis iš „Kafka“ galima siųsti ir gauti įvairiais formatais, naudojant „python“. Duomenys taip pat gali būti saugomi duomenų bazėje ir gauti iš duomenų bazės naudojant „Kafka“ ir „python“. Aš namuose, ši pamoka padės pitono vartotojui pradėti dirbti su Kafka.