Ohjelmointi

Suunniteltu reaaliaikaiseen käyttöön: Suuret dataviestit Apache Kafkan kanssa, osa 2

Apache Kafkan JavaWorld-johdannon ensimmäisellä puoliskolla kehitit muutaman pienen tuottajan / kuluttajan sovelluksen käyttämällä Kafkaa. Näistä harjoituksista sinun tulee tuntea Apache Kafka -viestintäjärjestelmän perusteet. Tässä toisessa puoliskossa opit käyttämään osioita levittämään kuormitusta ja laajentamaan sovellustasi vaakasuunnassa käsittelemällä jopa miljoonia viestejä päivässä. Opit myös, kuinka Kafka käyttää viestisiirtymiä monimutkaisen viestinkäsittelyn seuraamiseen ja hallintaan sekä kuinka suojata Apache Kafka -viestintäjärjestelmääsi vialta, jos kuluttaja menee alas. Kehitämme esimerkkisovelluksen osasta 1 sekä julkaisu-tilaus- että pisteestä pisteeseen -tapauksiin.

Väliseinät Apache Kafkassa

Kafkan aiheet voidaan jakaa osioihin. Esimerkiksi luodessasi aihetta nimeltä Demo, voit määrittää sen olevan kolme osiota. Palvelin luo kolme lokitiedostoa, yhden kutakin demo-osiota varten. Kun tuottaja julkaisi viestin aiheeseen, se määritti osiotunnuksen tälle viestille. Palvelin lisäsi sitten viestin vain kyseisen osion lokitiedostoon.

Jos aloitit sitten kaksi kuluttajaa, palvelin saattaa määrittää osiot 1 ja 2 ensimmäiselle kuluttajalle ja osio 3 toiselle kuluttajalle. Jokainen kuluttaja lukisi vain sille määritetyistä osioista. Kolme osiota varten määritetty esittelyaihe näkyy kuvassa 1.

Laajenna skenaariota kuvittelemalla Kafka-klusteri, jossa on kaksi välittäjää ja jotka sijaitsevat kahdessa koneessa. Kun osioit demoaiheen, määrität siinä olevan kaksi osiota ja kaksi kopiota. Tämän tyyppisessä kokoonpanossa Kafka-palvelin määrittäisi kaksi osiota klusterisi kahdelle välittäjälle. Jokainen välittäjä olisi yhden osion johtaja.

Kun tuottaja julkaisi viestin, se meni osionjohtajalle. Johtaja ottaisi viestin ja liittäisi sen paikallisen koneen lokitiedostoon. Toinen välittäjä kopioisi passiivi passiivisesti omalle koneelleen. Jos osionjohtaja menisi alas, toisesta välittäjästä tulisi uusi johtaja ja hän aloittaisi asiakaspyyntöjen palvelemisen. Samalla tavalla, kun kuluttaja lähetti pyynnön osioon, kyseinen pyyntö meni ensin osionjohtajalle, joka palautti pyydetyt viestit.

Osioinnin edut

Harkitse Kafka-pohjaisen viestijärjestelmän osioinnin etuja:

  1. Skaalautuvuus: Järjestelmässä, jossa on vain yksi osio, aiheeseen julkaistut viestit tallennetaan lokitiedostoon, joka on olemassa yhdellä koneella. Aiheen viestien lukumäärän on mahtuttava yhteen sitoutuslokitiedostoon, ja tallennettujen viestien koko ei voi koskaan olla suurempi kuin koneen levytila. Aiheen osioinnin avulla voit skaalata järjestelmääsi tallentamalla viestejä klusterin eri koneille. Jos haluat esimerkiksi tallentaa 30 gigatavua (Gt) viestejä esittelyaiheelle, voit rakentaa Kafka-klusterin, joka koostuu kolmesta koneesta, joista jokaisella on 10 Gt levytilaa. Sitten määrität aiheessa kolme osiota.
  2. Palvelimen ja kuorman tasapainotus: Jos sinulla on useita osioita, voit lähettää viestipyynnöt välittäjille. Esimerkiksi, jos sinulla oli aihe, joka käsitteli miljoona viestiä sekunnissa, voit jakaa sen 100 osioon ja lisätä 100 välittäjää klusteriin. Jokainen välittäjä olisi yhden osion johtaja ja vastaisi vain 10000 asiakaspyyntöön sekunnissa.
  3. Kuluttajan ja kuorman tasapainottaminen: Kuten palvelimen ja kuormituksen tasapainottaminen, useiden kuluttajien isännöinti eri koneella antaa sinun jakaa kuluttajakuorman. Oletetaan, että haluat kuluttaa miljoona viestiä sekunnissa aiheesta, jossa on 100 osiota. Voit luoda 100 kuluttajaa ja käyttää heitä rinnakkain. Kafka-palvelin osoitti yhden osion kullekin kuluttajalle, ja jokainen asiakas käsitteli 10000 viestiä rinnakkain. Koska Kafka osoittaa kunkin osion vain yhdelle kuluttajalle, osiossa kukin viesti kulutettaisiin järjestyksessä.

Kaksi tapaa osioida

Tuottaja on vastuussa siitä, mihin osioon viesti menee. Tuottajalla on kaksi vaihtoehtoa tämän tehtävän hallitsemiseksi:

  • Mukautettu osioija: Voit luoda luokan, joka toteuttaa org.apache.kafka.clients.producer.Partitioner käyttöliittymä. Tämä tapa Osoitin toteuttaa liiketoimintalogiikan päättääkseen, mihin viestit lähetetään.
  • DefaultPartitioner: Jos et luo mukautettua osioijaluokkaa, niin oletuksena org.apache.kafka.clients.producer.internals.DefaultPartitioner luokkaa käytetään. Oletusosioija on tarpeeksi hyvä useimmissa tapauksissa ja tarjoaa kolme vaihtoehtoa:
    1. Manuaalinen: Kun luot a ProducerRecord, käytä ylikuormitettua rakentajaa uusi ProducerRecord (topicName, partitionId, messageKey, message) Määritä osion tunnus.
    2. Hajautus (paikkakunnalle herkkä): Kun luot a ProducerRecord, määritä a messageKey, soittamalla uusi ProducerRecord (topicName, messageKey, message). DefaultPartitioner käyttää avaimen tiivistettä varmistaakseen, että kaikki saman avaimen viestit menevät samalle tuottajalle. Tämä on helpoin ja yleisin lähestymistapa.
    3. Ruiskutus (satunnainen kuormituksen tasapainotus): Jos et halua hallita mihin osioviesteihin menee, soita vain uusi ProducerRecord (topicName, viesti) luoda ProducerRecord. Tällöin osio lähettää viestejä kaikille osioille pyöreällä tavalla varmistaen tasapainoisen palvelimen kuormituksen.

Apache Kafka -sovelluksen osiointi

Osan 1 yksinkertaista tuottaja / kuluttaja-esimerkkiä varten käytimme a DefaultPartitioner. Yritämme nyt luoda mukautetun osion. Oletetaan tässä esimerkissä, että meillä on vähittäiskauppasivusto, jolla kuluttajat voivat tilata tuotteita kaikkialta maailmasta. Käytön perusteella tiedämme, että suurin osa kuluttajista on joko Yhdysvalloissa tai Intiassa. Haluamme jakaa sovelluksemme lähettääksemme tilauksia Yhdysvalloista tai Intiasta omille kuluttajilleen, kun taas tilaukset mistä tahansa muualta menevät kolmannelle kuluttajalle.

Aloitetaan luomalla a CountryPartitioner joka toteuttaa org.apache.kafka.clients.producer.Partitioner käyttöliittymä. Meidän on toteutettava seuraavat menetelmät:

  1. Kafka soittaa määritä () kun alustamme Osoitin luokka, a Kartta kokoonpanon ominaisuuksista. Tämä menetelmä alustaa sovelluksen liiketoimintalogiikalle ominaiset toiminnot, kuten yhteyden muodostamisen tietokantaan. Tässä tapauksessa haluamme melko yleisen osion, joka vie maan nimi omaisuutena. Voimme sitten käyttää configProperties.put ("osiot.0", "USA") kartoittaa viestien kulku osioihin. Tulevaisuudessa voimme käyttää tätä muotoa muuttamaan maat, jotka saavat oman osionsa.
  2. Tuottaja API-puhelut osio () kerran jokaiselle viestille. Tässä tapauksessa käytämme sitä lukemaan viestin ja jäsentämään maan nimen viestistä. Jos maan nimi on countryToPartitionMap, se palaa osioId tallennettu Kartta. Jos ei, se hajauttaa maan arvon ja käyttää sitä laskemaan mihin osioon sen pitäisi mennä.
  3. Kutsumme kiinni() sammuttaa osio. Tämän menetelmän käyttäminen varmistaa, että kaikki alustuksen aikana hankitut resurssit puhdistetaan sammutuksen aikana.

Huomaa, että kun Kafka soittaa määritä (), Kafkan tuottaja välittää kaikki valmistajalle määrittämämme ominaisuudet Osoitin luokassa. On tärkeää, että luemme vain ne ominaisuudet, jotka alkavat osiot.jäsentää ne saadaksesi osioIdja tallenna tunnus countryToPartitionMap.

Alla on mukautettu toteutus Osoitin käyttöliittymä.

Listaus 1. CountryPartitioner

 public class CountryPartitioner toteuttaa Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = uusi HashMap (); kohteelle (Map.Entry-merkintä: configs.entrySet ()) {if (entry.getKey (). startsWith ("osiot")) {String keyName = entry.getKey (); Merkkijonoarvo = (Merkkijono) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Kokonaisluku.parseInt (avaimenimi.substring (11)); countryToPartitionMap.put (arvo, paritionId); }}} public int -osio (Merkkijono-aihe, Objektiavain, tavu [] keyBytes, Objektin arvo, tavu [] valueBytes, Cluster-klusteri) {Listaosastot = cluster.availablePartitionsForTopic (aihe); String valueStr = (merkkijono) arvo; String countryName = ((String) arvo) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Jos maa on määritetty tiettyyn osioon, palauta se return countryToPartitionMap.get (countryName); } else {// Jos yhtään maata ei ole määritetty tiettyyn osioon, jaa jäljellä olevien osioiden välillä int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}} 

Tuottaja luokka luettelossa 2 (alla) on hyvin samanlainen kuin yksinkertainen tuottajamme osasta 1, ja kaksi muutosta on merkitty lihavoituna:

  1. Asetamme config-ominaisuuden, jonka avain on yhtä suuri kuin arvo ProducerConfig.PARTITIONER_CLASS_CONFIG, joka vastaa meidän täysin pätevää nimeä CountryPartitioner luokassa. Asetimme myös maan nimi että osioId, siten kartoittamalla ominaisuudet, joille haluamme välittää CountryPartitioner.
  2. Ohitamme luokan esiintymän, joka toteuttaa org.apache.kafka.clients.producer.Callback käyttöliittymä toisena argumenttina tuottaja.lähetä () menetelmä. Kafka-asiakas soittaa sille valmistuttua() menetelmä, kun viesti on julkaistu onnistuneesti, liittämällä a RecordMetadata esine. Voimme käyttää tätä objektia selvittääksesi, mihin osioon viesti lähetettiin, sekä julkaistulle viestille määritetyn siirtymän.

Listaus 2. Jaettu tuottaja

 julkisen luokan tuottaja {yksityinen staattinen skanneri sisään; public static void main (String [] argv) heittää poikkeuksen {if (argv.length! = 1) {System.err.println ("Määritä 1 parametri"); System.exit (-1); } MerkkijononimiNimi = argv [0]; in = uusi skanneri (System.in); System.out.println ("Kirjoita viesti (lopeta kirjoittamalla exit)"); // Määritä tuottajaominaisuudet configProperties = uudet ominaisuudet (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "paikallinen isäntä: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("osio.1", "USA"); configProperties.put ("osio.2", "Intia");  org.apache.kafka.clients.producer.Producer producer = uusi KafkaProducer (configProperties); Merkkijono = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = uusi ProducerRecord (aiheenNimi, nolla, rivi); producer.send (rec, uusi soittopyyntö () {public void onCompletion (RecordMetadata-metatiedot, poikkeuspoikkeus) {System.out.println ("Viesti lähetetty aiheeseen ->" + metadata.topic () + ", parition->" + metadata.partition () + "tallennettu offset->" + metatiedot.offset ()); ; }}); rivi = in.nextLine (); } sulje (); tuottaja.sulje (); }} 

Osioiden määrittäminen kuluttajille

Kafka-palvelin takaa, että osio on osoitettu vain yhdelle kuluttajalle, mikä takaa viestien kulutuksen järjestyksen. Voit määrittää osion manuaalisesti tai antaa sen määrittää automaattisesti.

Jos liiketoimintalogiikka vaatii enemmän hallintaa, sinun on määritettävä osiot manuaalisesti. Tässä tapauksessa käytät KafkaConsumer.assign () välittää luettelo kullekin kuluttajalle kiinnostuneista osioista Kakfa-palvelimelle.

Oletus ja yleisin valinta on, että osiot määritetään automaattisesti. Tällöin Kafka-palvelin määrittää osion jokaiselle kuluttajalle ja määrittää osiot uudelleen mittakaavassa uusille kuluttajille.

Oletetaan, että luot uuden aiheen, jossa on kolme osiota. Kun aloitat uuden asiakkaan ensimmäisen kuluttajan, Kafka määrittää kaikki kolme osiota samalle kuluttajalle. Jos aloitat toisen kuluttajan, Kafka määrittelee kaikki osiot uudelleen ja määrittää yhden osion ensimmäiselle kuluttajalle ja loput kaksi osiota toiselle kuluttajalle. Jos lisäät kolmannen kuluttajan, Kafka määrittää osiot uudelleen niin, että jokaiselle kuluttajalle määritetään yksi osio. Lopuksi, jos aloitat neljännen ja viidennen kuluttajan, kolmella kuluttajista on määritetty osio, mutta muut eivät saa viestejä. Jos jokin kolmesta ensimmäisestä osiosta menee alas, Kafka käyttää samaa osiologiikkaa määrittäessään kyseisen kuluttajan osion uudelle kuluttajalle.

$config[zx-auto] not found$config[zx-overlay] not found