Ohjelmointi

Suunniteltu reaaliaikaiseen käyttöön: Suuri dataviestintä Apache Kafkan kanssa, osa 1

Suuren dataliikkeen alkaessa se keskittyi enimmäkseen eräkäsittelyyn. Hajautetut tietojen tallennus- ja kyselytyökalut, kuten MapReduce, Hive ja Pig, on suunniteltu käsittelemään tietoja erissä eikä jatkuvasti. Yritykset tekisivät useita töitä joka ilta poimimaan tietoja tietokannasta, sitten analysoimaan, muuntamaan ja lopulta tallentamaan tiedot. Viime aikoina yritykset ovat löytäneet tiedon ja tapahtumien analysoinnin ja käsittelyn voiman kuten ne tapahtuvat, ei vain muutaman tunnin välein. Useimmat perinteiset viestintäjärjestelmät eivät kuitenkaan laajene käsittelemään suuria tietoja reaaliajassa. Joten LinkedInin insinöörit rakensivat ja hankkivat avoimen lähdekoodin Apache Kafkan: hajautetun viestintäkehyksen, joka täyttää big data -vaatimukset skaalaamalla hyödykelaitteita.

Viime vuosina Apache Kafka on noussut ratkaisemaan erilaisia ​​käyttötapauksia. Yksinkertaisimmassa tapauksessa se voi olla yksinkertainen puskuri sovelluslokien tallentamiseen. Yhdessä Spark Streamingin kaltaisen tekniikan kanssa sitä voidaan käyttää datamuutosten seuraamiseen ja tietojen käsittelyyn ennen niiden tallentamista lopulliseen määränpäähän. Kafkan ennakoiva tila tekee siitä tehokkaan työkalun petosten havaitsemiseksi, kuten luottokorttitapahtuman oikeellisuuden tarkistamiseksi, kun se tapahtuu, eikä odottaen eräkäsittelyä tunteja myöhemmin.

Tämä kaksiosainen opetusohjelma esittelee Kafkan aloittaen siitä, kuinka se asennetaan ja suoritetaan kehitysympäristössäsi. Saat yleiskatsauksen Kafkan arkkitehtuurista ja sen jälkeen johdannon valmiiden Apache Kafka -viestintäjärjestelmien kehittämiseen. Lopuksi rakennat mukautetun tuottaja- / kuluttajasovelluksen, joka lähettää ja kuluttaa viestejä Kafka-palvelimen kautta. Opetusohjelman toisella puoliskolla opit jakamaan ja ryhmittelemään viestejä sekä hallitsemaan, mitä viestejä Kafkan kuluttaja kuluttaa.

Mikä on Apache Kafka?

Apache Kafka on viestintäjärjestelmä, joka on rakennettu mittakaavaksi suurille tiedoille. Samoin kuin Apache ActiveMQ tai RabbitMq, Kafka mahdollistaa eri alustoille rakennettujen sovellusten kommunikoinnin asynkronisen viestien välityksen kautta. Mutta Kafka eroaa näistä perinteisemmistä viestintäjärjestelmistä keskeisillä tavoilla:

  • Se on suunniteltu mittakaavassa vaakasuoraan lisäämällä lisää hyödykepalvelimia.
  • Se tarjoaa paljon suuremman läpimenon sekä tuottaja- että kuluttajaprosesseille.
  • Sitä voidaan käyttää tukemaan sekä erä- että reaaliaikaisia ​​käyttötapauksia.
  • Se ei tue JMS: ää, Java: n viestikeskeistä väliohjelmiston sovellusliittymää.

Apache Kafkan arkkitehtuuri

Ennen kuin tutustumme Kafkan arkkitehtuuriin, sinun tulisi tietää sen perustermit:

  • A tuottaja on prosessi, joka voi julkaista viestin aiheeseen.
  • a kuluttaja on prosessi, joka voi tilata yhden tai useamman aiheen ja kuluttaa aiheisiin julkaistuja viestejä.
  • A aiheluokka on syötteen nimi, johon viestit julkaistaan.
  • A välittäjä on prosessi, joka toimii yhdellä koneella.
  • A klusteri on ryhmä välittäjiä, jotka työskentelevät yhdessä.

Apache Kafkan arkkitehtuuri on hyvin yksinkertainen, mikä voi johtaa parempaan suorituskykyyn ja suorituskykyyn joissakin järjestelmissä. Jokainen Kafkan aihe on kuin yksinkertainen lokitiedosto. Kun tuottaja julkaisee viestin, Kafka-palvelin liittää sen tietyn aiheensa lokitiedoston loppuun. Palvelin antaa myös offset, joka on numero, jota käytetään kunkin viestin pysyvään tunnistamiseen. Viestien määrän kasvaessa kunkin siirtymän arvo kasvaa; esimerkiksi jos tuottaja julkaisee kolme viestiä, ensimmäinen saattaisi saada siirron 1, toinen 2 ja kolmas 3.

Kun Kafka-asiakas käynnistyy ensimmäisen kerran, se lähettää vetopyynnön palvelimelle ja pyytää hakemaan kaikki tietyn aiheen viestit, joiden siirtymäarvo on suurempi kuin 0. Palvelin tarkistaa kyseisen aiheen lokitiedoston ja palauttaa kolme uutta viestiä . Kuluttaja käsittelee viestit ja lähettää sitten pyynnön viesteistä, joissa on siirtymä korkeampi yli 3 ja niin edelleen.

Kafkassa asiakas on vastuussa siirtomäärän muistamisesta ja viestien noutamisesta. Kafka-palvelin ei seuraa tai hallitse viestien kulutusta. Oletusarvoisesti Kafka-palvelin säilyttää viestin seitsemän päivän ajan. Palvelimen taustalanka tarkistaa ja poistaa seitsemän päivää tai vanhemmat viestit. Kuluttaja voi käyttää viestejä niin kauan kuin ne ovat palvelimella. Se pystyy lukemaan viestin useita kertoja ja jopa lukemaan viestejä päinvastaisessa järjestyksessä. Mutta jos kuluttaja ei noudata viestiä ennen kuin seitsemän päivää on kulunut, hän kaipaa viestiä.

Kafkan vertailuarvot

LinkedInin ja muiden yritysten käyttämä tuotanto on osoittanut, että asianmukaisella kokoonpanolla Apache Kafka pystyy käsittelemään satoja gigatavuja dataa päivittäin. Vuonna 2011 kolme LinkedIn-insinööriä käytti vertailutestausta osoittaakseen, että Kafka pystyi saavuttamaan paljon suuremman suorituskyvyn kuin ActiveMQ ja RabbitMQ.

Apache Kafkan pika-asennus ja esittely

Rakennamme mukautetun sovelluksen tähän opetusohjelmaan, mutta aloitetaan asentamalla ja testaamalla Kafka-ilmentymä valmiiden tuottajien ja kuluttajien kanssa.

  1. Käy Kafkan lataussivulla ja asenna uusin versio (0,9 tämän kirjoituksen jälkeen).
  2. Pura binääritiedostot a ohjelmisto / kafka kansio. Nykyisessä versiossa se on ohjelmisto / kafka_2.11-0.9.0.0.
  3. Muuta nykyinen hakemistosi osoittamaan uuteen kansioon.
  4. Käynnistä Zookeeper-palvelin suorittamalla komento: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Käynnistä Kafka-palvelin suorittamalla: bin / kafka-server-start.sh config / server.properties.
  6. Luo testausaihe, jota voit käyttää testaamiseen: bin / kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Käynnistä yksinkertainen konsolikuluttaja, joka voi kuluttaa tiettyyn aiheeseen julkaistuja viestejä, kuten javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 - aihe javaworld - alusta alkaen.
  8. Käynnistä yksinkertainen tuottajakonsoli, joka voi julkaista viestejä testiaiheeseen: bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic javaworld.
  9. Yritä kirjoittaa yksi tai kaksi viestiä tuottajakonsoliin. Viestiesi pitäisi näkyä kuluttajakonsolissa.

Esimerkkisovellus Apache Kafkan kanssa

Olet nähnyt, kuinka Apache Kafka toimii laatikosta. Seuraavaksi kehitetään mukautettu tuottaja / kuluttajasovellus. Tuottaja hakee käyttäjän syötteet konsolista ja lähettää jokaisen uuden rivin viestinä Kafka-palvelimelle. Kuluttaja hakee tietyn aiheen viestit ja tulostaa ne konsolille. Tuottaja- ja kuluttajakomponentit ovat tässä tapauksessa omia toteutuksiasi kafka-console-producer.sh ja kafka-console-consumer.sh.

Aloitetaan luomalla a Producer.java luokassa. Tämä asiakasluokka sisältää logiikkaa käyttäjän syötteen lukemiseen konsolista ja sen lähettämiseksi viestinä Kafka-palvelimelle.

Määritämme tuottajan luomalla objektin java.util.Kiinteistöt luokka ja sen ominaisuuksien asettaminen. ProducerConfig-luokka määrittelee kaikki käytettävissä olevat ominaisuudet, mutta Kafkan oletusarvot ovat riittäviä useimpiin käyttötarkoituksiin. Oletuskokoonpanoa varten meidän on asetettava vain kolme pakollista ominaisuutta:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.palvelimet) asettaa luettelon isäntäkoneista: porttiparit, joita käytetään muodostamaan ensimmäiset yhteydet Kakfa - klusteriin isäntä1: portti1, isäntä2: portti2, ... muoto. Vaikka Kafka-klusterissamme olisi enemmän kuin yksi välittäjä, meidän on määritettävä vain ensimmäisen välittäjän arvo isäntä: portti. Kafka-asiakas käyttää tätä arvoa löytöpuhelun soittamiseen välittäjälle, joka palauttaa luettelon kaikista klusterin välittäjistä. On hyvä määrittää useampi kuin yksi välittäjä BOOTSTRAP_SERVERS_CONFIG, joten jos ensimmäinen välittäjä on poissa, asiakas voi kokeilla muita välittäjiä.

Kafka-palvelin odottaa viestejä tavu [] avain, tavu [] arvo muoto. Kafkan asiakaspuolen kirjasto sallii jokaisen avaimen ja arvon muuntamisen sijaan käyttää ystävällisempiä tyyppejä, kuten Merkkijono ja int viestien lähettämistä varten. Kirjasto muuntaa nämä sopivaan tyyppiin. Esimerkiksi esimerkkisovelluksessa ei ole viestikohtaista avainta, joten käytämme tyhjä avainta varten. Arvoksi käytämme a Merkkijono, joka on käyttäjän syöttämä tieto konsoliin.

Määritä viestinäppäin, asetamme arvon KEY_SERIALIZER_CLASS_CONFIG on org.apache.kafka.common.serialization.ByteArraySerializer. Tämä toimii, koska tyhjä ei tarvitse muuntaa tavu[]. Varten viestin arvo, asetamme VALUE_SERIALIZER_CLASS_CONFIG on org.apache.kafka.common.serialization.StringSerializer, koska kyseinen luokka tietää kuinka muuntaa a Merkkijono osaksi a tavu[].

Mukautetut avain / arvo-objektit

Samanlainen kuin StringSerializer, Kafka toimittaa sarjoittajia muille primitiiville, kuten int ja pitkä. Jotta voisimme käyttää mukautettua objektia avaimeemme tai arvoon, meidän on luotava luokan toteutus org.apache.kafka.common.serialization.Serializer. Voisimme sitten lisätä logiikkaa sarjan sarjoimiseksi tavu[]. Meidän olisi myös käytettävä vastaavaa deserializeria kuluttajakoodissamme.

Kafkan tuottaja

Täyttämisen jälkeen Ominaisuudet luokassa tarvittavat kokoonpano-ominaisuudet, voimme käyttää sitä luomaan objektin KafkaTuottaja. Aina kun haluamme lähettää viestin Kafka-palvelimelle sen jälkeen, luomme objektin ProducerRecord ja soita KafkaTuottajaon lähettää() menetelmällä kyseisen tietueen kanssa viestin lähettämiseksi. ProducerRecord ottaa kaksi parametria: sen aiheen nimi, jolle viesti tulisi julkaista, ja varsinainen viesti. Älä unohda soittaa Producer.close () menetelmä, kun olet valmis käyttämään tuottajaa:

Listaus 1. KafkaProducer

 julkisen luokan tuottaja {yksityinen staattinen skanneri sisään; public static void main (String [] argv) heittää poikkeuksen {if (argv.length! = 1) {System.err.println ("Määritä 1 parametri"); System.exit (-1); } MerkkijononimiNimi = argv [0]; in = uusi skanneri (System.in); System.out.println ("Kirjoita viesti (lopeta kirjoittamalla exit)"); // Määritä tuottajaominaisuudet configProperties = uudet ominaisuudet (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "paikallinen isäntä: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = uusi KafkaProducer (configProperties); Merkkijono = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = uusi ProducerRecord (aiheenNimi, rivi); tuottaja.lähetä (rec); rivi = in.nextLine (); } sulje (); tuottaja.sulje (); }} 

Viestin kuluttajan määrittäminen

Seuraavaksi luomme yksinkertaisen kuluttajan, joka tilaa aiheen. Aina kun uusi viesti julkaistaan ​​aiheeseen, se lukee kyseisen viestin ja tulostaa sen konsolille. Kuluttajakoodi on melko samanlainen kuin tuottajakoodi. Aloitetaan luomalla objekti java.util.Kiinteistöt, asettamalla sen kuluttajakohtaiset ominaisuudet ja käyttämällä sitä sitten uuden objektin luomiseen KafkaKuluttaja. ConsumerConfig-luokka määrittää kaikki ominaisuudet, jotka voimme asettaa. Pakollisia ominaisuuksia on vain neljä:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.palvelimet)
  • KEY_DESERIALIZER_CLASS_CONFIG (avain.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (arvo.deserializer)
  • GROUP_ID_CONFIG (käynnistyshihna.palvelimet)

Aivan kuten teimme tuottajaluokassa, käytämme BOOTSTRAP_SERVERS_CONFIG määrittää isäntä / portti parit kuluttajaluokalle. Tämän kokoonpanon avulla voimme muodostaa ensimmäiset yhteydet Kakfa-klusteriin isäntä1: portti1, isäntä2: portti2, ... muoto.

Kuten aiemmin totesin, Kafka-palvelin odottaa viestejä sisään tavu[] avain ja tavu[] arvomuodot, ja sillä on oma toteutustapa erilaisten tiedostojen sarjallisuuteen tavu[]. Aivan kuten teimme tuottajan kanssa, kuluttajapuolella meidän on käytettävä mukautettua deserializeria muuntamiseen tavu[] takaisin oikeaan tyyppiin.

Esimerkkisovelluksessa tiedämme, että tuottaja käyttää ByteArraySerializer avaimen ja StringSerializer arvolle. Asiakkaan puolella meidän on siis käytettävä org.apache.kafka.common.serialization.ByteArrayDeserializer avaimen ja org.apache.kafka.common.serialization.StringDeserializer arvolle. Näiden luokkien asettaminen arvoiksi KEY_DESERIALIZER_CLASS_CONFIG ja VALUE_DESERIALIZER_CLASS_CONFIG avulla kuluttaja voi deserialisoida tavu[] tuottajan lähettämät koodatut tyypit.

Lopuksi meidän on asetettava GROUP_ID_CONFIG. Tämän tulisi olla ryhmän nimi merkkijonomuodossa. Selitän lisää tästä kokoonpanosta minuutissa. Katsokaa nyt Kafkan kuluttajaa, jolla on neljä pakollista ominaisuutta:

$config[zx-auto] not found$config[zx-overlay] not found