Ohjelmointi

Kuinka käyttää Redistä reaaliaikaiseen suoratoistoon

Roshan Kumar on Redis Labsin vanhempi tuotepäällikkö.

Reaaliaikainen suoratoistodatan saanti on yleinen vaatimus monissa suurten datan käyttötapauksissa. IoT: n, verkkokaupan, tietoturvan, viestinnän, viihteen, rahoituksen ja vähittäiskaupan kaltaisilla aloilla, joilla niin paljon riippuu oikea-aikaisesta ja tarkasta dataan perustuvasta päätöksenteosta, reaaliaikainen tiedonkeruu ja analyysi ovat itse asiassa liiketoiminnan ytimessä.

Suoratoistodatan kerääminen, tallentaminen ja käsittely suurina määrinä ja suurella nopeudella aiheuttaa kuitenkin arkkitehtonisia haasteita. Tärkeä ensimmäinen askel reaaliaikaisen data-analyysin toimittamisessa on varmistaa, että nopeaa datavirtojen sieppaamiseen on käytettävissä riittävät verkko-, laskenta-, tallennus- ja muistiresurssit. Yrityksen ohjelmistopinon on kuitenkin vastattava fyysisen infrastruktuurin suorituskykyä. Muussa tapauksessa yritykset joutuvat kohtaamaan valtavan määrän tietoja tai mikä vielä pahempaa, puuttuvia tai puutteellisia tietoja.

Redisistä on tullut suosittu valinta tällaisille nopeille tiedonsiirtotilanteille. Kevyt muistitietokanta-alusta Redis saavuttaa miljoonien operaatioiden sekunnissa läpijuoksun millisekunnin aliviiveillä samalla kun se käyttää vain vähän resursseja. Se tarjoaa myös yksinkertaisia ​​toteutuksia, jotka mahdollistavat sen useat tietorakenteet ja toiminnot.

Tässä artikkelissa näytän, kuinka Redis Enterprise voi ratkaista yleisiä haasteita, jotka liittyvät suurten nopeiden datamäärien käsittelemiseen ja käsittelyyn. Käymme läpi kolme erilaista lähestymistapaa (mukaan lukien koodi) Twitter-syötteen reaaliaikaiseen käsittelyyn käyttämällä Redis Pub / Sub-, Redis Lists- ja Redis Sorted Sets -sovelluksia. Kuten näemme, kaikilla kolmella menetelmällä on merkitys tietojen nopeassa noutamisessa käyttötapauksesta riippuen.

Nopean tiedonsiirtoratkaisujen suunnittelun haasteet

Nopea tiedonsiirto sisältää usein useita erityyppisiä monimutkaisuuksia:

  • Suuret tietomäärät saapuvat joskus purskeina. Purskedata edellyttää ratkaisua, joka pystyy käsittelemään suuria tietomääriä minimaalisella viiveellä. Ihannetapauksessa sen pitäisi pystyä suorittamaan miljoonia kirjoja sekunnissa millisekunnin matalalla viiveellä ja käyttämällä vähäisiä resursseja.
  • Tiedot useista lähteistä. Tiedonsiirtoratkaisujen on oltava riittävän joustavia käsittelemään dataa monissa eri muodoissa, säilyttämään lähteen identiteetti tarvittaessa ja muuntamalla tai normalisoimalla reaaliajassa.
  • Tiedot, jotka on suodatettava, analysoitava tai toimitettava edelleen. Suurimmalla osalla tiedonsiirtoratkaisuista on yksi tai useampi tilaaja, joka kuluttaa tietoja. Nämä ovat usein erilaisia ​​sovelluksia, jotka toimivat samoissa tai eri paikoissa vaihtelevalla oletussarjalla. Tällaisissa tapauksissa tietokannan ei tarvitse vain muuttaa tietoja, vaan myös suodattaa tai yhdistää kuluttavien sovellusten vaatimuksista riippuen.
  • Tiedot tulevat maantieteellisesti hajautetuista lähteistä. Tässä skenaariossa on usein kätevää jakaa tiedonkeruusolmut ja sijoittaa ne lähteiden lähelle. Solmuista itsestään tulee osa nopeaa tiedonsiirtoratkaisua tietojen keräämiseksi, prosessoimiseksi, edelleenlähettämiseksi tai reitin muuttamiseksi.

Nopean tiedon käsittely Redisissä

Monet nopeaa tiedonsiirtoa tukevat ratkaisut ovat nykyään monimutkaisia, monipuolisia ja yksinkertaisia ​​vaatimuksia varten ylisuunniteltuja. Toisaalta Redis on erittäin kevyt, nopea ja helppo käyttää. Asiakkaiden käytettävissä yli 60 kielellä Redis voidaan helposti integroida suosittuihin ohjelmistopinoihin.

Redis tarjoaa tietorakenteita, kuten luettelot, sarjat, lajitellut joukot ja hajautukset, jotka tarjoavat yksinkertaisen ja monipuolisen tietojenkäsittelyn. Redis tuottaa yli miljoona luku- / kirjoitusoperaatiota sekunnissa, millisekunnin matalalla viiveellä vaatimattoman kokoisilla hyödykepilven ilmentymillä, mikä tekee siitä erittäin resurssitehokkaan suurille tietomäärille. Redis tukee myös viestipalveluja ja asiakaskirjastoja kaikilla suosituilla ohjelmointikielillä, joten se soveltuu hyvin nopeaan tiedonsiirtoon ja reaaliaikaiseen analytiikkaan. Redis Pub / Sub -komennot antavat sille mahdollisuuden toimia välittäjänä julkaisijoiden ja tilaajien välillä, ominaisuutta, jota käytetään usein ilmoitusten tai viestien lähettämiseen hajautettujen tietojen sisäänottosolmujen välillä.

Redis Enterprise parantaa Redistä saumattomalla skaalauksella, aina käytettävissä olevalla saatavuudella, automatisoidulla käyttöönotolla ja kyvyllä käyttää kustannustehokasta flash-muistia RAM-laajennuksena, jotta suurten tietojoukkojen käsittely voidaan suorittaa kustannustehokkaasti.

Seuraavissa osioissa hahmotellaan, miten Redis Enterprise -ohjelmaa voidaan käyttää yleisten tietojen saannin haasteisiin vastaamiseksi.

Redis Twitterin nopeudella

Rediksen yksinkertaisuuden havainnollistamiseksi tutkimme esimerkin nopeasta tiedonsiirtoratkaisusta, joka kerää viestejä Twitter-syötteestä. Tämän ratkaisun tavoitteena on käsitellä twiittejä reaaliajassa ja työntää ne putkea pitkin, kun niitä käsitellään.

Sitten ratkaisun saamat Twitter-tiedot kuluttavat useita prosessoreita. Kuten kuvassa 1 on esitetty, tämä esimerkki koskee kahta prosessoria - englanninkielistä Tweet-prosessoria ja vaikuttajan prosessoria. Jokainen prosessori suodattaa twiitit ja välittää ne omilla kanavillaan muille kuluttajille. Tämä ketju voi mennä niin pitkälle kuin ratkaisu vaatii. Esimerkissämme pysähdymme kuitenkin kolmannelle tasolle, jossa kootaan yhteen englantia puhuvien ja vaikuttajien suosittuja keskusteluja.

Redis Labs

Huomaa, että käytämme esimerkkiä Twitter-syötteiden käsittelystä tietojen saapumisnopeuden ja yksinkertaisuuden vuoksi. Huomaa myös, että Twitter-tiedot saavuttavat nopean tiedonsiirtomme yhden kanavan kautta. Monissa tapauksissa, kuten IoT, voi olla useita tietolähteitä, jotka lähettävät tietoja päävastaanottimeen.

Redis-sovelluksella on kolme mahdollista tapaa toteuttaa tämä ratkaisu: syödä Redis Pub / Sub -sisällöllä, syödä List-tietorakenteella tai syödä Lajiteltu joukko -datarakenteella. Tarkastellaan kaikkia näitä vaihtoehtoja.

Nieleminen Redis Pub / Sub: n kanssa

Tämä on nopein tiedonsiirron yksinkertaisin toteutus. Tämä ratkaisu käyttää Redisin Pub / Sub-ominaisuutta, jonka avulla sovellukset voivat julkaista ja tilata viestejä. Kuten kuvassa 2 on esitetty, kukin vaihe käsittelee datan ja julkaisee sen kanavalle. Seuraava vaihe tilaa kanavan ja vastaanottaa viestit jatkokäsittelyä tai suodatusta varten.

Redis Labs

Plussat

  • Helppo toteuttaa.
  • Toimii hyvin, kun tietolähteet ja prosessorit jaetaan maantieteellisesti.

Haittoja

  • Ratkaisu edellyttää, että kustantajat ja tilaajat ovat jatkuvasti päällä. Tilaajat menettävät tietoja pysäytettynä tai kun yhteys katkeaa.
  • Se vaatii lisää yhteyksiä. Ohjelma ei voi julkaista ja tilata samaa yhteyttä, joten jokainen välitietoprosessori vaatii kaksi yhteyttä - yhden tilaamaan ja toisen julkaisemaan. Jos suoritat Redistä DBaaS-alustalla, on tärkeää tarkistaa, onko paketillasi tai palvelutasollasi rajoituksia yhteyksien määrälle.

Huomautus yhteyksistä

Jos kanavaa tilaa useampi kuin yksi asiakas, Redis työntää dataa kullekin asiakkaalle lineaarisesti peräkkäin. Suuret tiedon hyötykuormat ja monet yhteydet voivat aiheuttaa viiveen julkaisijan ja sen tilaajien välillä. Vaikka oletusarvoinen enimmäismäärä yhteyksiä on 10000, sinun on testattava ja vertailtava, kuinka monta yhteyttä on hyötykuormallesi sopiva.

Redis ylläpitää asiakkaan ulostulopuskuria kullekin asiakkaalle. Pub / Sub-asiakaslähtöpuskurin oletusrajat asetetaan seuraavasti:

asiakas-tuotos-puskuriraja pubsub 32mb 8mb 60

Tällä asetuksella Redis pakottaa asiakkaita katkaisemaan yhteyden kahdessa tilanteessa: jos lähtöpuskuri kasvaa yli 32 Mt tai jos lähtöpuskurissa on 8 Mt tietoa jatkuvasti 60 sekunnin ajan.

Nämä ovat osoitus siitä, että asiakkaat kuluttavat tietoja hitaammin kuin ne julkaistaan. Jos tällainen tilanne ilmenee, yritä ensin optimoida kuluttajat siten, että he eivät lisää viivettä tietojen kulutuksen aikana. Jos huomaat, että asiakkaidesi yhteys katkeaa edelleen, voit nostaa client-output-buffer-limit pubsub kiinteistö redis.conf. Muista, että muutokset asetuksiin voivat lisätä viivettä julkaisijan ja tilaajan välillä. Kaikki muutokset on testattava ja todennettava perusteellisesti.

Koodisuunnittelu Redis Pub / Sub -ratkaisulle

Redis Labs

Tämä on yksinkertaisin tässä asiakirjassa kuvatuista kolmesta ratkaisusta. Tässä ovat tärkeät Java-luokat, jotka on toteutettu tälle ratkaisulle. Lataa lähdekoodi täydellä toteutuksella täältä: //github.com/redislabsdemo/IngestPubSub.

Tilaaja luokka on tämän mallin ydinluokka. Joka Tilaaja object ylläpitää uutta yhteyttä Redisiin.

luokka Tilaaja laajentaa JedisPubSub toteuttaa Runnable {

yksityinen merkkijono nimi;

yksityinen RedisConnection-yhteys = null;

yksityinen Jedis jedis = null;

yksityinen merkkijono SubscriberChannel;

public Subscriber (String subscriberName, String channelName) heittää poikkeuksen {

nimi = tilaajan nimi;

subscriberChannel = channelName;

Lanka t = uusi lanka (tämä);

t.start ();

       }

@Ohittaa

public void run () {

yrittää{

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

while (tosi) {

jedis.subscribe (tämä, tämä.subscriberChannel);

                      }

} saalis (poikkeus e) {

e.printStackTrace ();

              }

       }

@Ohittaa

public void onMessage (merkkijonokanava, merkkijonoviesti) {

super.onMessage (kanava, viesti);

       }

}

Kustantaja luokka ylläpitää erillistä yhteyttä Redisiin viestien julkaisemiseksi kanavalle.

public class Publisher {

RedisConnection yhteys = nolla;

Jedis jedis = nolla;

yksityinen String-kanava;

public Publisher (String channelName) heittää poikkeuksen {

kanava = kanavan nimi;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (String msg) heittää poikkeuksen {

jedis.publish (kanava, msg);

       }

}

EnglishTweetFilter, VaikuttajaTweetFilter, HashTagCollectorja Vaikuttaja-kerääjä suodattimet ulottuvat Tilaaja, jonka avulla he voivat kuunnella saapuvia kanavia. Koska tilaamiseen ja julkaisemiseen tarvitaan erilliset Redis-yhteydet, jokaisella suodatinluokalla on oma Yhdistä uudelleen esine. Suodattimet kuuntelevat kanaviensa uusia viestejä silmukassa. Tässä on esimerkkikoodi EnglishTweetFilter luokka:

public class EnglishTweetFilter laajentaa tilaajaa

{

yksityinen RedisConnection-yhteys = null;

yksityinen Jedis jedis = null;

private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) heittää poikkeuksen {

super (nimi, tilaajan kanava);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@Ohittaa

public void onMessage (String subscriberChannel, String-viesti) {

JsonParser jsonParser = uusi JsonParser ();

JsonElement jsonElement = jsonParser.parse (viesti);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// suodata viestejä: julkaise vain englanninkieliset twiitit

if (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). equals (“en”)) {

jedis.publish (publisherChannel, viesti);

              }

       }

}

Kustantaja luokassa on julkaisutapa, joka julkaisee viestit vaaditulle kanavalle.

public class Publisher {

.

.     

public void publish (String msg) heittää poikkeuksen {

jedis.publish (kanava, msg);

       }

.

}

Pääluokka lukee tietoja syötetystä virrasta ja lähettää sen Kaikki tiedot kanava. Tämän luokan päämenetelmä käynnistää kaikki suodatinobjektit.

julkinen luokka IngestPubSub

{

.

public void start () heittää poikkeuksen {

       .

       .

julkaisija = new Publisher (“AllData”);

englishFilter = new EnglishTweetFilter ("English Filter", "AllData",

”EnglishTweets”);

influencerFilter = uusi InfluencerTweetFilter ("Influencer-suodatin",

"AllData", "InfluencerTweets");

hashtagCollector = uusi HashTagCollector (“Hashtag Collector”,

”EnglishTweets”);

influencerCollector = uusi InfluencerCollector ("Influencer Collector",

”InfluencerTweets”);

       .

       .

}

Nieleminen Redis-luetteloilla

Redisin List-tietorakenne tekee jonotusratkaisun toteuttamisesta helppoa ja yksinkertaista. Tässä ratkaisussa tuottaja työntää jokaisen viestin jonon taakse, ja tilaaja kyselee jonoa ja vetää uusia viestejä toisesta päästä.

Redis Labs

Plussat

  • Tämä menetelmä on luotettava yhteyden katkeamisen yhteydessä. Kun data on työnnetty luetteloihin, se säilyy siellä, kunnes tilaajat lukevat sen. Tämä on totta, vaikka tilaajat pysäytettäisiin tai he menettäisivät yhteyden Redis-palvelimeen.
  • Tuottajat ja kuluttajat eivät vaadi yhteyttä keskenään.

Haittoja

  • Kun tiedot on vedetty luettelosta, ne poistetaan eikä niitä voi enää hakea. Ellei kuluttajat säilytä tietoja, ne menetetään heti niiden kulutuksen jälkeen.
  • Jokainen kuluttaja vaatii erillisen jonon, joka vaatii useiden kopioiden tallentamisen tiedoista.

Koodisuunnittelu Redis Lists -ratkaisulle

Redis Labs

Voit ladata Redis Lists -ratkaisun lähdekoodin täältä: //github.com/redislabsdemo/IngestList. Tämän ratkaisun pääluokat selitetään alla.

MessageList upottaa Redis-luettelon tietorakenteen. työntää() - menetelmä työntää uuden viestin jonon vasemmalle puolelle ja pop() odottaa uutta viestiä oikealta, jos jono on tyhjä.

public class MessageList {

suojattu merkkijono nimi = “MyList”; // Nimi

.

.     

public void push (String msg) heittää poikkeuksen {

jedis.lpush (nimi, msg); // Vasen painallus

       }

julkinen jousipop () heittää poikkeuksen {

palauta jedis.brpop (0, nimi) .toString ();

       }

.

.

}

MessageListener on abstrakti luokka, joka toteuttaa kuuntelijan ja julkaisijan logiikkaa. A MessageListener objekti kuuntelee vain yhtä luetteloa, mutta voi julkaista useille kanaville (MessageFilter esineitä). Tämä ratkaisu vaatii erillisen MessageFilter esine jokaiselle tilaajalle putkea pitkin.

luokan MessageListener toteuttaa Runnable {

yksityisen merkkijonon nimi = null;

yksityinen MessageList inboundList = null;

Map outBoundMsgFilters = uusi HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

if (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@Ohittaa

public void run () {

.

while (tosi) {

Merkkijono msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

suojattu void pushMessage (String msg) heittää poikkeuksen {

Set outBoundMsgNames = outBoundMsgFilters.keySet ();

kohteelle (String name: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (nimi);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter on vanhempien luokka, joka helpottaa filterAndPush () menetelmä. Kun data virtaa sisäänmenojärjestelmän läpi, se suodatetaan tai muunnetaan usein ennen lähettämistä seuraavaan vaiheeseen. Luokat, jotka laajentavat MessageFilter luokka ohittaa filterAndPush () menetelmää ja toteuttavat oman logiikkansa työntääkseen suodatetun viestin seuraavaan luetteloon.

public class MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (merkkijono msg) heittää poikkeuksen {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener on esimerkkitoteutus a MessageListener luokassa. Tämä kuuntelee kaikkia twiittejä Kaikki tiedot ja julkaisee tiedot osoitteeseen EnglishTweetsFilter ja VaikuttajaSuodatin.

public class AllTweetsListener laajentaa MessageListener {

.

.     

public static void main (String [] args) heittää poikkeuksen {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (uusi

EnglishTweetsFilter ("EnglishTweetsFilter", "EnglishTweets"));

allTweetsProcessor.registerOutBoundMessageList (uusi

InfluencerFilter (”InfluencerFilter”, “Influencers”));

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter ulottuu MessageFilter. Tämä luokka toteuttaa logiikkaa valitsemaan vain ne twiitit, jotka on merkitty englanninkielisiksi tweeteiksi. Suodatin hylkää muut kuin englanninkieliset twiitit ja siirtää englanninkieliset twiitit seuraavaan luetteloon.

public class EnglishTweetsFilter laajentaa MessageFilter {

public EnglishTweetsFilter (String name, String listName) heittää poikkeuksen {

super (nimi, luettelonimi);

       }

@Ohittaa

public void filterAndPush (merkkijono) heittää poikkeuksen {

JsonParser jsonParser = uusi JsonParser ();

JsonElement jsonElement = jsonParser.parse (viesti);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

if (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). equals (“en”)) {

Jedis jedis = super.getJedisInstance ();

jos (jedis! = null) {

jedis.lpush (super.nimi, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found