Ohjelmointi

Kuinka luoda tilallisia suoratoistosovelluksia Apache Flinkin avulla

Fabian Hueske on Apache Flink -projektin sitouttaja ja PMC-jäsen ja Data Artisans -yrityksen perustaja.

Apache Flink on kehys tilallisten suoratoistosovellusten toteuttamiseen ja niiden suorittamiseen mittakaavassa laskentaryhmässä. Edellisessä artikkelissa tarkastelimme, mikä tilanmukainen suoratoisto on, mitä käyttötapauksia se käsittelee ja miksi sinun tulisi toteuttaa ja suorittaa suoratoistosovelluksesi Apache Flinkin avulla.

Tässä artikkelissa esitän esimerkkejä kahdesta tilan suoran käsittelyn yleisestä käyttötapauksesta ja keskustelen siitä, miten ne voidaan toteuttaa Flinkin avulla. Ensimmäinen käyttötapaus on tapahtumapohjaiset sovellukset, so. Sovellukset, jotka nauttivat jatkuvista tapahtumavirroista ja soveltavat näihin tapahtumiin jonkin verran liiketoimintalogiikkaa. Toinen on suoratoistoanalytiikan käyttötapaus, jossa esitän kaksi Flinkin SQL-sovellusliittymällä toteutettua analyyttistä kyselyä, jotka yhdistävät suoratoistotiedot reaaliajassa. Me Data Artisansissa tarjoamme kaikkien esimerkkien lähdekoodin julkisessa GitHub-arkistossa.

Ennen kuin tutustumme esimerkkien yksityiskohtiin, esitän tapahtumasuunnan, jonka esimerkkisovellukset käyttävät, ja selitän, kuinka voit suorittaa tarjoamamme koodin.

Virta taksimatkan tapahtumista

Esimerkkisovelluksemme perustuvat julkiseen tietojoukkoon taksimatkoista, jotka tapahtuivat New Yorkissa vuonna 2013. Vuoden 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge -järjestäjät järjestivät uudelleen alkuperäisen tietojoukon ja muuttivat sen yksi CSV-tiedosto, josta luemme seuraavat yhdeksän kenttää.

  • Mitali - taksin MD5-summatunnus
  • Hack_license - taksiluvan MD5-summatunnus
  • Noutopäivämäärä - aika, jolloin matkustajat noutettiin
  • Dropoff_datetime - aika, jolloin matkustajat pudotettiin
  • Noutopituus - noutopaikan pituusaste
  • Noutokerroin - noutopaikan leveysaste
  • Dropoff_longitude - pudotuspaikan pituusaste
  • Dropoff_latitude - pudotuspaikan leveysaste
  • Yhteensä_määrä — yhteensä maksettu dollareina

CSV-tiedosto tallentaa tietueet pudotusajan määritteen nousevassa järjestyksessä. Tiedostoa voidaan siten pitää järjestettynä lokina tapahtumista, jotka julkaistiin matkan päättyessä. Jotta voit suorittaa GitHubissa tarjoamiamme esimerkkejä, sinun on ladattava DEBS-haasteen tietojoukko Google Drivesta.

Kaikki esimerkkisovellukset lukevat CSV-tiedoston peräkkäin ja nauttivat sen taksimatkatapahtumana. Siitä lähtien sovellukset käsittelevät tapahtumia aivan kuten minkä tahansa muun virran, ts. Kuten virran, joka syötetään lokipohjaisesta julkaisu-tilausjärjestelmästä, kuten Apache Kafka tai Kinesis. Itse asiassa tiedoston (tai minkä tahansa muun tyyppisen pysyvän datan) lukeminen ja sen käsittely virtana on kulmakivi Flinkin lähestymistavalle erä- ja suoratoiston yhtenäistämisessä.

Suoritetaan Flink-esimerkkejä

Kuten aiemmin mainittiin, julkaisimme esimerkkisovellusten lähdekoodin GitHub-arkistossa. Kehotamme teitä haarautumaan ja kloonaamaan arkisto. Esimerkit voidaan suorittaa helposti valitsemastasi IDE: stä; sinun ei tarvitse määrittää ja määrittää Flink-klusteria niiden suorittamiseksi. Tuo ensin esimerkkien lähdekoodi Maven-projektina. Suorita sitten sovelluksen pääluokka ja ilmoita datatiedoston tallennussijainti (katso yllä linkki tietojen lataamiseen) ohjelman parametrina.

Kun olet käynnistänyt sovelluksen, se käynnistää paikallisen, upotetun Flink-instanssin sovelluksen JVM-prosessissa ja lähettää sovelluksen suorittamaan sen. Näet joukon lokilauseita, kun Flink on käynnissä ja työn tehtävät ajoitetaan. Kun sovellus on käynnissä, sen lähtö kirjoitetaan vakiotulosteeseen.

Tapahtumapohjaisen sovelluksen rakentaminen Flinkiin

Anna nyt keskustella ensimmäisestä käyttötapauksestamme, joka on tapahtumapohjainen sovellus. Tapahtumavetoiset sovellukset nauttivat tapahtumavirroista, suorittavat laskutoimituksia tapahtumien vastaanottamisen yhteydessä ja voivat lähettää uusia tapahtumia tai laukaista ulkoisia toimintoja. Useita tapahtumapohjaisia ​​sovelluksia voidaan muodostaa yhdistämällä ne yhteen tapahtumalokijärjestelmien kautta, samalla tavalla kuin suuret järjestelmät voidaan muodostaa mikropalveluista. Tapahtumaohjatut sovellukset, tapahtumalokit ja sovellustilan tilannekuvat (Flinkissä tunnetaan nimellä savepointit) muodostavat erittäin tehokkaan suunnittelumallin, koska voit palauttaa heidän tilansa ja toistaa heidän syötteensä vikojen palauttamiseksi, virheen korjaamiseksi tai siirtämiseksi sovellus toiseen klusteriin.

Tässä artikkelissa tarkastellaan tapahtumavetoista sovellusta, joka tukee palvelua, joka valvoo taksinkuljettajien työaikaa. Vuonna 2016 NYC Taxi and Limousine Commission päätti rajoittaa taksinkuljettajien työajan 12 tunnin vuoroihin ja vaatia vähintään kahdeksan tunnin tauon ennen seuraavan vuoron aloittamista. Vuori alkaa ensimmäisen ajon alusta. Siitä lähtien kuljettaja voi aloittaa uudet ajot 12 tunnin kuluessa. Sovelluksemme seuraa kuljettajien ajoa, merkitsee heidän 12 tunnin ikkunansa päättymisajan (eli ajankohdan, jolloin he voivat aloittaa viimeisen matkan) ja liputtaa ajoa, joka rikkoo asetusta. Löydät tämän esimerkin koko lähdekoodin GitHub-arkistostamme.

Sovelluksemme on toteutettu Flinkin DataStream API: lla ja a KeyedProcessFunction. DataStream-sovellusliittymä on toimiva sovellusliittymä, joka perustuu kirjoitettujen tietovirtojen käsitteeseen. A DataStream on tyypillisen tapahtumavirran looginen esitys T. Virta käsitellään soveltamalla siihen toimintoa, joka tuottaa toisen, mahdollisesti erityyppisen datavirran. Flink käsittelee suoratoistoja rinnakkain jakamalla tapahtumia osioiden suoratoistoon ja soveltamalla eri esiintymiä funktioita kuhunkin osioon.

Seuraava koodinpätkä näyttää seurantasovelluksemme korkean tason kulun.

// syö taksimatkoja.

DataStream rides = TaxiRides.getRides (env, inputPath);

DataStream ilmoitukset = ratsastukset

// osiovirta ajokorttitunnuksen mukaan

.keyBy (r -> r.licenseId)

// tarkkaile ajo-tapahtumia ja luo ilmoituksia

.process (uusi MonitorWorkTime ());

// tulosta ilmoitukset

ilmoitukset.tulosta ();

Sovellus alkaa kuluttaa taksimatkan tapahtumia. Esimerkissämme tapahtumat luetaan tekstitiedostosta, jäsennetään ja tallennetaan TaxiRide POJO-objektit. Reaalimaailman sovellus nielaisi tapahtumat yleensä viestijonosta tai tapahtumalokista, kuten Apache Kafka tai Pravega. Seuraava vaihe on näppäillä TaxiRide tapahtumia licenseId kuljettajan. avain operation osioi ilmoitetun kentän virran siten, että kaikki saman avaimen tapahtumat käsitellään saman rinnakkaisen seuraavan toiminnon esiintymässä. Meidän tapauksessamme osioimme licenseId koska haluamme seurata kunkin kuljettajan työaikaa.

Seuraavaksi sovellamme MonitorWorkTime toiminto osioitu TaxiRide Tapahtumat. Toiminto seuraa ajoja kuljettajaa kohden ja valvoo niiden vuoroja ja taukoja. Se lähettää tyyppisiä tapahtumia Tuple2, jossa jokainen pari edustaa ilmoitusta, joka sisältää kuljettajan ajokorttitunnuksen ja viestin. Lopuksi sovelluksemme lähettää viestit tulostamalla ne vakiotulosteeseen. Reaalimaailman sovellus kirjoittaisi ilmoitukset ulkoiseen viestiin tai tallennusjärjestelmään, kuten Apache Kafka, HDFS tai tietokantajärjestelmään, tai laukaisi ulkoisen puhelun ja työntäisi ne välittömästi ulos.

Nyt kun olemme keskustelleet sovelluksen kokonaisvirrasta, katsotaanpa MonitorWorkTime -toiminto, joka sisältää suurimman osan sovelluksen todellisesta liiketoimintalogiikasta. MonitorWorkTime toiminto on tilallinen KeyedProcessFunction että syö TaxiRide tapahtumia ja päästöjä Tuple2 tietueet. KeyedProcessFunction käyttöliittymässä on kaksi tapaa käsitellä tietoja: processElement () ja onTimer (). processElement () menetelmä kutsutaan jokaiselle saapuvalle tapahtumalle. onTimer () menetelmää kutsutaan, kun aiemmin rekisteröity ajastin laukeaa. Seuraava katkelma näyttää MonitorWorkTime toiminto ja kaikki, mikä ilmoitetaan käsittelymenetelmien ulkopuolella.

julkinen staattinen luokka MonitorWorkTime

laajentaa KeyedProcessFunction-toimintoa {

// aikavakio millisekunteina

yksityinen staattinen lopullinen pitkä ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 tuntia

yksityinen staattinen lopullinen pitkä REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 tuntia

yksityinen staattinen lopullinen pitkä CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 tuntia

yksityinen ohimenevä DateTimeFormatter-muotoilija;

// tilaa kahva tallentaa vuoron aloitusajan

ValueState shiftStart;

@Ohittaa

public void open (Configuration conf) {

// rekisteröi valtion kahva

shiftStart = getRuntimeContext (). getState (

uusi ValueStateDescriptor (“shiftStart”, Types.LONG));

// alustaa aikamuotoilu

this.formatter = DateTimeFormat.forPattern ("vvvv-kk-pp HH: mm: ss");

  }

// processElement () ja onTimer () käsitellään yksityiskohtaisesti jäljempänä.

}

Funktio ilmoittaa muutaman vakion aikaväleille millisekunteina, aikamuotoilijan ja tilakahvan avainkäyttötilalle, jota Flink hallinnoi. Hallittu tila tarkistetaan säännöllisesti ja palautetaan automaattisesti vikatilanteessa. Avaintila on järjestetty avainta kohti, mikä tarkoittaa, että funktio säilyttää yhden arvon kahvaa ja avainta kohden. Meidän tapauksessamme MonitorWorkTime toiminto ylläpitää a Pitkä arvo jokaiselle avaimelle, ts. jokaiselle licenseId. shiftStart osavaltio tallentaa kuljettajan vuoron alkamisajan. Tilakahva alustetaan avata() menetelmä, jota kutsutaan kerran ennen ensimmäisen tapahtuman käsittelyä.

Katsotaanpa nyt processElement () menetelmä.

@Ohittaa

public void processElement (

TaxiRide-ratsastus,

Konteksti ctx,

Keräilijä ulos) heittää poikkeuksen {

// etsi viimeisen vuoron aloitusaika

Pitkät aloitukset = shiftStart.value ();

jos (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// tämä on uuden vuoron ensimmäinen ajo.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

long endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

"Voit ottaa vastaan ​​uusia matkustajia, kunnes" + formatter.print (endTs)));

// rekisteröi ajastin puhdistaaksesi tilan 24 tunnissa

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// tämä matka alkoi sallitun työajan päätyttyä.

// se on sääntöjen vastaista!

out.collect (Tuple2.of (ride.licenseId,

"Tämä ratsastus rikkoi työaikamääräyksiä."));

  }

}

processElement () menetelmä vaaditaan kullekin TaxiRide tapahtuma. Ensin menetelmä hakee kuljettajan vaihdon alkamisajan tilakahvasta. Jos tila ei sisällä aloitusaikaa (startTs == nolla) tai jos viimeinen vuoro alkoi yli 20 tuntia (ALLOWED_WORK_TIME + REQ_BREAK_TIME) aikaisempaa kuin nykyinen ajo, nykyinen ajo on uuden vaihdon ensimmäinen ajo. Kummassakin tapauksessa toiminto aloittaa uuden vaihdon päivittämällä vuoron alkamisajan nykyisen ajon aloitusaikaan, lähettää kuljettajalle viestin uuden vaihdon päättymisajasta ja rekisteröi ajastimen puhdistaakseen tila 24 tunnissa.

Jos nykyinen ajo ei ole uuden vaihdon ensimmäinen ajo, toiminto tarkistaa, rikkooko se työaikamääräyksiä, eli onko se alkanut yli 12 tuntia myöhemmin kuin kuljettajan nykyisen vuoron alku. Jos näin on, toiminto lähettää viestin, joka ilmoittaa kuljettajalle rikkomuksesta.

processElement () menetelmä MonitorWorkTime toiminto rekisteröi ajastimen tilan puhdistamiseksi 24 tuntia vuoron alkamisen jälkeen. Tilan poistaminen, jota ei enää tarvita, on tärkeää, jotta vältetään vuotavan tilan aiheuttama tilakoon kasvu. Ajastin laukeaa, kun sovelluksen aika ylittää ajastimen aikaleiman. Siinä vaiheessa onTimer () menetelmää kutsutaan. Samoin kuin tilassa, ajastimia ylläpidetään avainta kohden, ja toiminto laitetaan liittyvän avaimen kontekstiin ennen onTimer () menetelmää kutsutaan. Siksi kaikki tilan käyttöoikeudet ohjataan avaimeen, joka oli aktiivinen ajastimen rekisteröinnin yhteydessä.

Katsotaanpa onTimer () menetelmä MonitorWorkTime.

@Ohittaa

public void onTimer (

pitkät ajastimet,

OnTimerContext ctx,

Keräilijä ulos) heittää poikkeuksen {

// poista siirtotila, jos uutta siirtoa ei jo aloitettu.

Pitkät aloitukset = shiftStart.value ();

jos (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

processElement () method rekisteröi ajastimet 24 tunnin ajaksi sen jälkeen kun vuoro on alkanut puhdistaa tilaa, jota ei enää tarvita. Tilan puhdistaminen on ainoa logiikka, jonka onTimer () menetelmä toteuttaa. Kun ajastin laukeaa, tarkistamme, aloittiinko kuljettaja uuden vaihdon sillä välin, eli muuttuiko vaihteen alkamisaika. Jos näin ei ole, tyhjennämme kuljettajan vaihtotilan.

$config[zx-auto] not found$config[zx-overlay] not found