Apache Kafkan JavaWorld-johdannon ensimmäisellä puoliskolla kehitit muutaman pienen tuottajan / kuluttajan sovelluksen käyttämällä Kafkaa. Näistä harjoituksista sinun tulee tuntea Apache Kafka -viestintäjärjestelmän perusteet. Tässä toisessa puoliskossa opit käyttämään osioita levittämään kuormitusta ja laajentamaan sovellustasi vaakasuunnassa käsittelemällä jopa miljoonia viestejä päivässä. Opit myös, kuinka Kafka käyttää viestisiirtymiä monimutkaisen viestinkäsittelyn seuraamiseen ja hallintaan sekä kuinka suojata Apache Kafka -viestintäjärjestelmääsi vialta, jos kuluttaja menee alas. Kehitämme esimerkkisovelluksen osasta 1 sekä julkaisu-tilaus- että pisteestä pisteeseen -tapauksiin.
Väliseinät Apache Kafkassa
Kafkan aiheet voidaan jakaa osioihin. Esimerkiksi luodessasi aihetta nimeltä Demo, voit määrittää sen olevan kolme osiota. Palvelin luo kolme lokitiedostoa, yhden kutakin demo-osiota varten. Kun tuottaja julkaisi viestin aiheeseen, se määritti osiotunnuksen tälle viestille. Palvelin lisäsi sitten viestin vain kyseisen osion lokitiedostoon.
Jos aloitit sitten kaksi kuluttajaa, palvelin saattaa määrittää osiot 1 ja 2 ensimmäiselle kuluttajalle ja osio 3 toiselle kuluttajalle. Jokainen kuluttaja lukisi vain sille määritetyistä osioista. Kolme osiota varten määritetty esittelyaihe näkyy kuvassa 1.
Laajenna skenaariota kuvittelemalla Kafka-klusteri, jossa on kaksi välittäjää ja jotka sijaitsevat kahdessa koneessa. Kun osioit demoaiheen, määrität siinä olevan kaksi osiota ja kaksi kopiota. Tämän tyyppisessä kokoonpanossa Kafka-palvelin määrittäisi kaksi osiota klusterisi kahdelle välittäjälle. Jokainen välittäjä olisi yhden osion johtaja.
Kun tuottaja julkaisi viestin, se meni osionjohtajalle. Johtaja ottaisi viestin ja liittäisi sen paikallisen koneen lokitiedostoon. Toinen välittäjä kopioisi passiivi passiivisesti omalle koneelleen. Jos osionjohtaja menisi alas, toisesta välittäjästä tulisi uusi johtaja ja hän aloittaisi asiakaspyyntöjen palvelemisen. Samalla tavalla, kun kuluttaja lähetti pyynnön osioon, kyseinen pyyntö meni ensin osionjohtajalle, joka palautti pyydetyt viestit.
Osioinnin edut
Harkitse Kafka-pohjaisen viestijärjestelmän osioinnin etuja:
- Skaalautuvuus: Järjestelmässä, jossa on vain yksi osio, aiheeseen julkaistut viestit tallennetaan lokitiedostoon, joka on olemassa yhdellä koneella. Aiheen viestien lukumäärän on mahtuttava yhteen sitoutuslokitiedostoon, ja tallennettujen viestien koko ei voi koskaan olla suurempi kuin koneen levytila. Aiheen osioinnin avulla voit skaalata järjestelmääsi tallentamalla viestejä klusterin eri koneille. Jos haluat esimerkiksi tallentaa 30 gigatavua (Gt) viestejä esittelyaiheelle, voit rakentaa Kafka-klusterin, joka koostuu kolmesta koneesta, joista jokaisella on 10 Gt levytilaa. Sitten määrität aiheessa kolme osiota.
- Palvelimen ja kuorman tasapainotus: Jos sinulla on useita osioita, voit lähettää viestipyynnöt välittäjille. Esimerkiksi, jos sinulla oli aihe, joka käsitteli miljoona viestiä sekunnissa, voit jakaa sen 100 osioon ja lisätä 100 välittäjää klusteriin. Jokainen välittäjä olisi yhden osion johtaja ja vastaisi vain 10000 asiakaspyyntöön sekunnissa.
- Kuluttajan ja kuorman tasapainottaminen: Kuten palvelimen ja kuormituksen tasapainottaminen, useiden kuluttajien isännöinti eri koneella antaa sinun jakaa kuluttajakuorman. Oletetaan, että haluat kuluttaa miljoona viestiä sekunnissa aiheesta, jossa on 100 osiota. Voit luoda 100 kuluttajaa ja käyttää heitä rinnakkain. Kafka-palvelin osoitti yhden osion kullekin kuluttajalle, ja jokainen asiakas käsitteli 10000 viestiä rinnakkain. Koska Kafka osoittaa kunkin osion vain yhdelle kuluttajalle, osiossa kukin viesti kulutettaisiin järjestyksessä.
Kaksi tapaa osioida
Tuottaja on vastuussa siitä, mihin osioon viesti menee. Tuottajalla on kaksi vaihtoehtoa tämän tehtävän hallitsemiseksi:
- Mukautettu osioija: Voit luoda luokan, joka toteuttaa
org.apache.kafka.clients.producer.Partitioner
käyttöliittymä. Tämä tapaOsoitin
toteuttaa liiketoimintalogiikan päättääkseen, mihin viestit lähetetään. - DefaultPartitioner: Jos et luo mukautettua osioijaluokkaa, niin oletuksena
org.apache.kafka.clients.producer.internals.DefaultPartitioner
luokkaa käytetään. Oletusosioija on tarpeeksi hyvä useimmissa tapauksissa ja tarjoaa kolme vaihtoehtoa:- Manuaalinen: Kun luot a
ProducerRecord
, käytä ylikuormitettua rakentajaauusi ProducerRecord (topicName, partitionId, messageKey, message)
Määritä osion tunnus. - Hajautus (paikkakunnalle herkkä): Kun luot a
ProducerRecord
, määritä amessageKey
, soittamallauusi ProducerRecord (topicName, messageKey, message)
.DefaultPartitioner
käyttää avaimen tiivistettä varmistaakseen, että kaikki saman avaimen viestit menevät samalle tuottajalle. Tämä on helpoin ja yleisin lähestymistapa. - Ruiskutus (satunnainen kuormituksen tasapainotus): Jos et halua hallita mihin osioviesteihin menee, soita vain
uusi ProducerRecord (topicName, viesti)
luodaProducerRecord
. Tällöin osio lähettää viestejä kaikille osioille pyöreällä tavalla varmistaen tasapainoisen palvelimen kuormituksen.
- Manuaalinen: Kun luot a
Apache Kafka -sovelluksen osiointi
Osan 1 yksinkertaista tuottaja / kuluttaja-esimerkkiä varten käytimme a DefaultPartitioner
. Yritämme nyt luoda mukautetun osion. Oletetaan tässä esimerkissä, että meillä on vähittäiskauppasivusto, jolla kuluttajat voivat tilata tuotteita kaikkialta maailmasta. Käytön perusteella tiedämme, että suurin osa kuluttajista on joko Yhdysvalloissa tai Intiassa. Haluamme jakaa sovelluksemme lähettääksemme tilauksia Yhdysvalloista tai Intiasta omille kuluttajilleen, kun taas tilaukset mistä tahansa muualta menevät kolmannelle kuluttajalle.
Aloitetaan luomalla a CountryPartitioner
joka toteuttaa org.apache.kafka.clients.producer.Partitioner
käyttöliittymä. Meidän on toteutettava seuraavat menetelmät:
- Kafka soittaa määritä () kun alustamme
Osoitin
luokka, aKartta
kokoonpanon ominaisuuksista. Tämä menetelmä alustaa sovelluksen liiketoimintalogiikalle ominaiset toiminnot, kuten yhteyden muodostamisen tietokantaan. Tässä tapauksessa haluamme melko yleisen osion, joka viemaan nimi
omaisuutena. Voimme sitten käyttääconfigProperties.put ("osiot.0", "USA")
kartoittaa viestien kulku osioihin. Tulevaisuudessa voimme käyttää tätä muotoa muuttamaan maat, jotka saavat oman osionsa. -
Tuottaja
API-puhelut osio () kerran jokaiselle viestille. Tässä tapauksessa käytämme sitä lukemaan viestin ja jäsentämään maan nimen viestistä. Jos maan nimi oncountryToPartitionMap
, se palaaosioId
tallennettuKartta
. Jos ei, se hajauttaa maan arvon ja käyttää sitä laskemaan mihin osioon sen pitäisi mennä. - Kutsumme kiinni() sammuttaa osio. Tämän menetelmän käyttäminen varmistaa, että kaikki alustuksen aikana hankitut resurssit puhdistetaan sammutuksen aikana.
Huomaa, että kun Kafka soittaa määritä ()
, Kafkan tuottaja välittää kaikki valmistajalle määrittämämme ominaisuudet Osoitin
luokassa. On tärkeää, että luemme vain ne ominaisuudet, jotka alkavat osiot.
jäsentää ne saadaksesi osioId
ja tallenna tunnus countryToPartitionMap
.
Alla on mukautettu toteutus Osoitin
käyttöliittymä.
Listaus 1. CountryPartitioner
public class CountryPartitioner toteuttaa Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = uusi HashMap (); kohteelle (Map.Entry-merkintä: configs.entrySet ()) {if (entry.getKey (). startsWith ("osiot")) {String keyName = entry.getKey (); Merkkijonoarvo = (Merkkijono) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Kokonaisluku.parseInt (avaimenimi.substring (11)); countryToPartitionMap.put (arvo, paritionId); }}} public int -osio (Merkkijono-aihe, Objektiavain, tavu [] keyBytes, Objektin arvo, tavu [] valueBytes, Cluster-klusteri) {Listaosastot = cluster.availablePartitionsForTopic (aihe); String valueStr = (merkkijono) arvo; String countryName = ((String) arvo) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Jos maa on määritetty tiettyyn osioon, palauta se return countryToPartitionMap.get (countryName); } else {// Jos yhtään maata ei ole määritetty tiettyyn osioon, jaa jäljellä olevien osioiden välillä int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}}
Tuottaja
luokka luettelossa 2 (alla) on hyvin samanlainen kuin yksinkertainen tuottajamme osasta 1, ja kaksi muutosta on merkitty lihavoituna:
- Asetamme config-ominaisuuden, jonka avain on yhtä suuri kuin arvo
ProducerConfig.PARTITIONER_CLASS_CONFIG
, joka vastaa meidän täysin pätevää nimeäCountryPartitioner
luokassa. Asetimme myösmaan nimi
ettäosioId
, siten kartoittamalla ominaisuudet, joille haluamme välittääCountryPartitioner
. - Ohitamme luokan esiintymän, joka toteuttaa
org.apache.kafka.clients.producer.Callback
käyttöliittymä toisena argumenttinatuottaja.lähetä ()
menetelmä. Kafka-asiakas soittaa sillevalmistuttua()
menetelmä, kun viesti on julkaistu onnistuneesti, liittämällä aRecordMetadata
esine. Voimme käyttää tätä objektia selvittääksesi, mihin osioon viesti lähetettiin, sekä julkaistulle viestille määritetyn siirtymän.
Listaus 2. Jaettu tuottaja
julkisen luokan tuottaja {yksityinen staattinen skanneri sisään; public static void main (String [] argv) heittää poikkeuksen {if (argv.length! = 1) {System.err.println ("Määritä 1 parametri"); System.exit (-1); } MerkkijononimiNimi = argv [0]; in = uusi skanneri (System.in); System.out.println ("Kirjoita viesti (lopeta kirjoittamalla exit)"); // Määritä tuottajaominaisuudet configProperties = uudet ominaisuudet (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "paikallinen isäntä: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("osio.1", "USA"); configProperties.put ("osio.2", "Intia"); org.apache.kafka.clients.producer.Producer producer = uusi KafkaProducer (configProperties); Merkkijono = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = uusi ProducerRecord (aiheenNimi, nolla, rivi); producer.send (rec, uusi soittopyyntö () {public void onCompletion (RecordMetadata-metatiedot, poikkeuspoikkeus) {System.out.println ("Viesti lähetetty aiheeseen ->" + metadata.topic () + ", parition->" + metadata.partition () + "tallennettu offset->" + metatiedot.offset ()); ; }}); rivi = in.nextLine (); } sulje (); tuottaja.sulje (); }}
Osioiden määrittäminen kuluttajille
Kafka-palvelin takaa, että osio on osoitettu vain yhdelle kuluttajalle, mikä takaa viestien kulutuksen järjestyksen. Voit määrittää osion manuaalisesti tai antaa sen määrittää automaattisesti.
Jos liiketoimintalogiikka vaatii enemmän hallintaa, sinun on määritettävä osiot manuaalisesti. Tässä tapauksessa käytät KafkaConsumer.assign ()
välittää luettelo kullekin kuluttajalle kiinnostuneista osioista Kakfa-palvelimelle.
Oletus ja yleisin valinta on, että osiot määritetään automaattisesti. Tällöin Kafka-palvelin määrittää osion jokaiselle kuluttajalle ja määrittää osiot uudelleen mittakaavassa uusille kuluttajille.
Oletetaan, että luot uuden aiheen, jossa on kolme osiota. Kun aloitat uuden asiakkaan ensimmäisen kuluttajan, Kafka määrittää kaikki kolme osiota samalle kuluttajalle. Jos aloitat toisen kuluttajan, Kafka määrittelee kaikki osiot uudelleen ja määrittää yhden osion ensimmäiselle kuluttajalle ja loput kaksi osiota toiselle kuluttajalle. Jos lisäät kolmannen kuluttajan, Kafka määrittää osiot uudelleen niin, että jokaiselle kuluttajalle määritetään yksi osio. Lopuksi, jos aloitat neljännen ja viidennen kuluttajan, kolmella kuluttajista on määritetty osio, mutta muut eivät saa viestejä. Jos jokin kolmesta ensimmäisestä osiosta menee alas, Kafka käyttää samaa osiologiikkaa määrittäessään kyseisen kuluttajan osion uudelle kuluttajalle.