Apache Ignite – Deo II

U prvom delu smo mogli pročitati o tome šta je zapravo Apache Ignite kao i o nekim njegovim karakteristikama. U ovom delu ćemo na praktičnim primerima pokazati neke od mogućnosti koje Ignite pruža svojim korisnicima.

Preduslovi

Apache ignite je zvanično testiran na:

JDK-u:

  • Oracle JDK 7 i više

  • Open JDK 7 i više

  • IBM JDK 7 i više

Sledećim operativnim sistemima:

  • Linux (sve verzije)
  • Mac OSX (10.6 i više)
  • Windows (XP i više)
  • Windows Server (2008 i više)
  • Oracle Solaris

ISA:

  • x86, x64, SPARC, PowerPC

Mreža:

  • Bez ograničenja(10G preporuka)

Instalacija

U ovom primeru za pokretanje ćemo koristiti Eclipse IDE sa M2Eclipse dodatkom. Da bi počeli sa korišćenjem Apache Ignite-a potrebno je :

  1. Instalirati Eclipse IDE
  2. Instalirati M2Eclipse dodatak

Preuzimanje pomoću Eclipse okruženja

U nastavku teksta ćemo podrazumevati da je čitalac uspešno instalirao Elcipse IDE i M2Eclipse dodatak.

Potrebno je otvoriti Eclipse IDE i klikom na file meni odabrati opciju new, zatim other i na kraju Maven project. Dalje je potrebno pratiti wizard koji će na kraju kreirati Maven projekat. Kada je projekat kreiran otvaramo pom.xml fajl, u ovom fajlu se registruju zavisnosti koje naš projekat koristi. Ignite zahteva samo ignite-core zavisnost. Obično je potrebno dodati ignite-spring za XML konfiguraciju zasnovanu na springu i ignite-indexing za SQL upite.

Potrebno je dodati sledeći kod u pom.xml fajlu zameniti ${ignite-version} sa verzijom Ignite-a


    org.apache.ignite
    ignite-core
    ${ignite.version}


    org.apache.ignite
    ignite-spring
    ${ignite.version}


    org.apache.ignite
    ignite-indexing
    ${ignite.version}

Pokretanje prvog Ignite projekata

U kreiranom projektu dodajemo sledeću klasu:

import java.util.Scanner;

import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;

public class UvodniPrimer {

    @SuppressWarnings("resource")
    public static void main(String[] args) {
        try (Ignite ignite = Ignition.start()) {
            System.out.println("Hello world");
            new Scanner(System.in).nextLine();
        }
    }

}

Ignite čvor se može pokrenuti sa podrazumevanom konfiguracijom ili prosleđivanjem konfiguracionog fajla. Moguće je pokrenuti neograničen broj čvorova i svaki od njih će automatski otkriti ostale čvorove. Poziv funkcije Ignition.start() bez argumenata znači da se čvor pokreće sa podrazumevanom konfiguracijom. U narednim primerima ćemo koristiti i konfuguracioni fajl.

Pokretanje sa podrazumevanom konfiguracijom

Nakon kreiranja prethodno pomenute klase, da bismo pokrenuli Ignite čvor potrebno je da startujemo Maven projekat. Maven projekat se startuje kao i bilo koji drugi projekat u Eclipse okruženju. Izlaz bi trebao da izgleda ovako :

[17:15:52] __________ ________________
[17:15:52] / _/ ___/ |/ / _/_ __/ __/
[17:15:52] _/ // (7 7 // / / / / _/
[17:15:52] /___/\___/_/|_/___/ /_/ /___/
[17:15:52]
[17:15:52] ver. 2.4.0#20180305-sha1:aa342270
[17:15:52] 2018 Copyright(C) Apache Software Foundation
[17:15:52]
[17:15:52] OS: Windows 10 10.0 amd64
[17:15:52] VM information: Java(TM) SE Runtime Environment 1.8.0_111-b14 Oracle Corporation Java HotSpot(TM) 64-Bit Server VM 25.111-b14
[17:15:52] Initial heap size is 128MB (should be no less than 512MB, use -Xms512m -Xmx512m).
[17:15:55] Ignite node started OK (id=0e313b9e)
[17:15:55] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, offheap=1.6GB, heap=1.8GB]
[17:15:55] Data Regions Configured:
[17:15:55] ^– default [initSize=256.0 MiB, maxSize=1.6 GiB, persistenceEnabled=false]
Hello world

Posebno je potrebno obratiti pažnju na sledeće linije:

[17:15:55] Topology snapshot [ver=1, servers=1, clients=0, CPUs=8, offheap=1.6GB, heap=1.8GB]
Hello world

Hello world je izlaz naše aplikacije. Vrednost servers=1 nam govori koliko čvorova je pokrenuto. Ako bi pokrenuli još jednu instancu ovog projekta, neprekidajuću postojeću, izlaz bi izgledao ovako:

Topology snapshot [ver=2, servers=2, clients=0, CPUs=8, offheap=3.2GB, heap=3.5GB]

Ovo nam pokazuje osobinu Ignite nodova da se automatski međusobno detektuju.

Prosleđivanje konfiguracionog fajla

Da bi prosledeili konfiguracioni fajl eksplicitno, možemo metodi Ignition.start() proslediti kao argument putanju do konfiguracionog fajla. Na primer:

try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml"))

Putanja može biti apsolutna ili relativna.

SQL primer

Za početak ćemo napraviti dve tabele i dva indeksa. Imaćemo tabele Grad i Osoba. Više ljudi može da se nalazi u jednom gradu. Možemo da kolociramo objekte osoba sa objektima gradova u kojima se nalaze, ovo radimo pomoću komande WITH. Kreiramo pomenutu SQL šemu sledećim koodom:

// Registrujemo JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

// Otvaramo JDBC konekciju.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");

// Kreiramo tabele.
try (Statement stmt = conn.createStatement()) 
{


        stmt.executeUpdate("CREATE TABLE Grad(" + 
        " id LONG PRIMARY KEY, naziv VARCHAR) " +
        " WITH \"template=replicated\"");


        stmt.executeUpdate("CREATE TABLE Osoba(" +
        " id LONG, ime VARCHAR, city_id LONG, " +
        " PRIMARY KEY (id, city_id)) " +
        " WITH \"backups=1, affinityKey=city_id\"");

        // Kreiramo index nad tabelom Grad.
        stmt.executeUpdate("CREATE INDEX idx_grad_naziv ON Grad(naziv)");

        // Kreiramo index nad tabelom Osoba.
        stmt.executeUpdate("CREATE INDEX idx_osoba_ime ON Osoba(ime)");
 }

Sledeći korak je popunjavanje tabela:

IgniteCache gradCache = ignite.cache("SQL_PUBLIC_GRAD");
IgniteCache osobaCache = ignite.cache("SQL_PUBLIC_OSOBA");

//dodajemo nove unose
SqlFieldsQuery query = new SqlFieldsQuery(
"INSERT INTO Grad (id, naziv) VALUES (?, ?)");

gradCache.query(query.setArgs(1, "Kragujevac")).getAll();
gradCache.query(query.setArgs(2, "Beograd")).getAll();
gradCache.query(query.setArgs(3, "Novi Sad")).getAll();

query = new SqlFieldsQuery(
  "INSERT INTO Osoba (id, ime, grad_id) VALUES (?, ?, ?)");

osobaCache.query(query.setArgs(1, "Stepa Stepanovic", 3)).getAll();
osobaCache.query(query.setArgs(2, "Zivojin Misic", 2)).getAll();
osobaCache.query(query.setArgs(3, "Nikola Pasic", 1)).getAll();
osobaCache.query(query.setArgs(4, "Pavle Jurisic", 2)).getAll();

Sada možemo da izvršavamo upite nad unetim podacima. Na primer možemo za svaku osobu naći grad u kome živi. Ovo postižemo na sledeći način:

query = new SqlFieldsQuery("SELECT o.ime, g.naziv " +
" FROM Osoba o, Grad g WHERE o.grad_id = g.id");

FieldsQueryCursor cursor = gradCache.query(query);

Iterator iterator = cursor.iterator();


while (iterator.hasNext())
 {
    List row = iterator.next();
    System.out.println(row.get(0) + ", " + row.get(1));
 }

Pokretanjem projekta dobijamo:

Zivojin Misic, Beograd
Pavle Jurisic, Beograd
Nikola Pasic, Kragujevac
Stepa Stepanovic, Novi Sad

Izvršavanje proračuna sa Ignite-om

Napisaćemo grid aplikaciju za brojanje karaktera u rečenici ne uzimajući u obzir blanko karaktere. Podelićemo rečenicu na reči i za svaku reč ćemo kreirati po jedan posao koji će brojati karaktere u prosleđenoj reči, na kraju ćemo samo sabrati rezultate svih poslova.

  try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
              Collection pozivi = new ArrayList();

              // Prolazimo kroz sve reci u recenici i kreiramo callable poslove.
              for (final String rec : "Ignite je odlican".split(" "))
                  pozivi.add(rec::length);

              // Izvršavamo poslove na gridu
              Collection res = ignite.compute().call(pozivi);

              //Sabiramo sve rezultate
              int sum = res.stream().mapToInt(Integer::intValue).sum();

              System.out.println("Ukupno karaktera '" + sum + "'.");
            }

Za sledeći primer koristićemo objekte koje samo prethodno kreirali u SQL-u. Pretpostavimo da se u Beogradu desila vremenska nepogoda i da želimo da obavestimo sve ljude koji se trenutno nalaze u Beogradu o tome. To možemo da uradimo na sledeći način:

Ignite ignite = Ignition.start();
long gradId = 2; // Id  Beograda
//šaljemo logiku na cvor na kome se nalaze podaci o Beogradu i njegovim stanovnicima
ignite.compute().affinityRun("SQL_PUBLIC_GRAD", gradId, new IgniteRunnable() {
  @IgniteInstanceResource
  Ignite ignite;
  @Override
  public void run() {
    // pristupamo kešu sa osobama.
    IgniteCache ljudi= ignite.cache(
        "SQL_PUBLIC_OSOBA").withKeepBinary();
    ScanQuery query = new ScanQuery ();

    try (QueryCursor cursor =
           ljudi.query(query)) {

      // Prolazimo kroz lokalne podatke čvora
      for (Cache.Entry entry : cursor) {
        BinaryObject osobaKey = entry.getKey();

        // Biramo samo stanovnike Beograda
        if (osobaKey.field("GRAD_ID") == cityId) {
            person = entry.getValue();

            // Poslati poruku upozorenja osobi.
        }
      }
    }
  }
}

U ovom primeru smo koristili metodu affinityRun() kojoj smo prosledili SQL_PUBLIC_GRAD keš, gradId i IgniteRunnable. Ovim smo obezbedili da se cela logika izvršava na čvoru na kome se nalaze potrebni podaci. Ovakav pristup otklanja potrebu za skupim procesima kao što su serijalizacija i kretanje podataka preko mreže

Kao poslednji primer u ovom delu pokazaćemo računanje broja π Monte Carlovom metodom. Monte Carlova metoda se ukratko može opisati na sledeći način:

  1. Nacrtajmo kvadrat na zemlji, zatim upišimo krug u njemu
  2. Ravnomerno prospimo neke objekte jednake veličine (zrna peska ili pirinča) preko kvadrata
  3. Prebroji se broj zrna u krugu i ukupan broj zrna, odnos dva prebrajanja je procena odnosa dve površine tj. π/4. Pomnoži se rezultat sa 4, čime se dobija π.

U ovoj proceduri domen ulaza je kvadrat koji opisujemo oko kruga. Generišemo broj ulaza prosipanjem preko kvadrata, zatim izvršimo proračune za svaki ulaz (pitamo se da li je zrno palo u krug). Napokon, sažimamo rezultate da bi dobili konačan rezultat, aproksimaciju π. Ovde imamo dve bitne napomene: prvo, ako zrna nisu ravnomerno raspodeljena, onda je naša aproksimacija nepotpuna. Drugo, mora da postoji veliki broj ulaza. Aproksimacija je generalno loša samo ako je par zrna palo u ceo kvadrat. U proseku kvalitet procene se povećava sa većim brojem zrna u kvadratu. Upotreba Monte Karlo metode zahteva veliku količinu brojeva, što je prouzrokovalo formiranje generatora pseudobrojeva čijom upotrebom je proračunavanje olakšano.

        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
              Collection pozivi = new ArrayList();
              int brojPokusaja = 10;
              //Svaki posao ce imati 100 pokusaja.
              for (int i=0; i

Upotreba Data Grid-a

Sada ćemo napisati nekoliko mini primera koji će upisivati u i čitati vrednosti sa distribuiranog keša, osim toga pokažaćemo i način na koji možemo da izvršimo osnovne transakcije.

Primer 1 put i get:

try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
    IgniteCache cache = ignite.getOrCreateCache("MojKes");

    // čuvamo ključeve u kešu, svaka vrednost će biti smeštena na drugom čvoru
    for (int i = 0; i < 10; i++)
        cache.put(i, Integer.toString(i));

    for (int i = 0; i < 10; i++)
        System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
}

Primer 2 atomske operacije:

// Upisuje ako ne postoji i vraća prethodnu vrednost.
Integer oldVal = cache.getAndPutIfAbsent("Hello", 11);

//Upisuje ako ne postoji i vraća boolean indikator da li je operacija uspešno izvršena.
boolean success = cache.putIfAbsent("World", 22);

// Prepisuje ako postoji i vraća staru vrednost
oldVal = cache.getAndReplace("Hello", 11);

// Prepisuje ako postoji i vraća boolean indikator da li je operacija uspešno završena.
success = cache.replace("World", 22);

// Zameni vrednost ako postoji .
success = cache.replace("World", 2, 22);

// Obriši ako postoji
success = cache.remove("Hello", 1);

Primer 3 transakcije:

try (Transaction tx = ignite.transactions().txStart()) {
    Integer hello = cache.get("Hello");

    if (hello == 1)
        cache.put("Hello", 11);

    cache.put("World", 22);

    tx.commit();
}

Primer 4 lockovi:

//zaključaj keš ključ "Hello".
Lock lock = cache.lock("Hello");

lock.lock();

try {
    cache.put("Hello", 11);
    cache.put("World", 22);
}
finally {
    lock.unlock();
}

Ignite Service Grid

Ignite Service Grid je koristan za otpremanje mikro servisa na klasteru. Ignite pruža podršku za zadatke u vezi životnog ciklusa servisa i obezbeđuje jednostavn način pozivanja servisa iz aplikacije.

Kao primer ćemo kreirati servis koji će vratiti trenutnu vremensku prognozu za odabrani grad. Prvo kreiramo interfejs servisa sa jednom API metodom. Interfejs mora da nasleđuje org.apache.ignite.services.Service.

import org.apache.ignite.services.Service;

public interface WeatherService extends Service {
    /**
     * Slanje trenutne temperature za odabrani grad.
     *
     * @param kodZemlje kod zemlje u kojoj se grad nalazi (ISO 3166 kodovi).
     * @param nazivGrada trazeni grad.
     * @return trenutnu temperaturu u odabranom gradu u JSON formatu.
     * @throws Izuzetak ako do toga dodje.
     */
    String getCurrentTemperature(String kodZemlje, String nazivGrada)
        throws Exception;
}

Servis će se povezazi na vremenski kanal da bi dobio najnovije informacije o vremenu. Implementacija servisa bi mogla da izgleda ovako:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.ignite.services.ServiceContext;


public class WeatherServiceImpl implements WeatherService {
    /** URL servisa koji ce nam vratiti temperaturu. */
    private static final String WEATHER_URL = "http://samples.openweathermap.org/data/2.5/weather?";

    /** app id. */
    private static final String appId = "ca7345b4a1ef8c037f7749c09fcbf808";

    /** {@inheritDoc}. */
    @Override public void init(ServiceContext ctx) throws Exception {
        System.out.println("Servis inicijalizovan!");
    }

    /** {@inheritDoc}. */
    @Override public void execute(ServiceContext ctx) throws Exception {
        System.out.println("Servis startovan!");
    }

    /** {@inheritDoc}. */
    @Override public void cancel(ServiceContext ctx) {
        System.out.println("Servis zaustavljen!");
    }

    /** {@inheritDoc}. */
    @Override public String getCurrentTemperature(String nazivGrada,
        String kodZemlje) throws Exception {

        System.out.println(">>> Trazena vremenska prognoza za [grad="
            + nazivGrada+ ", kod zemlje=" + kodZemlje+ "]");

        String connStr = WEATHER_URL + "q=" + nazivGrada+ ","
            + kodZemlje+ "&appid=" + appId;

        URL url = new URL(connStr);

        HttpURLConnection conn = null;

        try {

            conn = (HttpURLConnection) url.openConnection();

            conn.setRequestMethod("GET");

            conn.connect();

            // čitamo podatke sa servera.
            try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(conn.getInputStream()))) {

                String line;
                StringBuilder builder = new StringBuilder();

                while ((line = reader.readLine()) != null)
                    builder.append(line);

                return builder.toString();
            }
        } finally {
            if (conn != null)
                conn.disconnect();
        }
    }
}

Na kraju potrebno je otpremiti servis na klaster. Nakon toga servis se može pozvati iz aplikacije. Zarad jednostavnosti otpremanje i poziv servisa ćemo raditi iz iste aplikacije.

import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;

public class ServiceGridExample {

    public static void main(String[] args) throws Exception {
        try (Ignite ignite = Ignition.start()) {


            ignite.services().deployClusterSingleton("WeatherService",
               new WeatherServiceImpl());


            WeatherService service =ignite.services().service("WeatherService");

            String forecast = service.getCurrentTemperature("London", "UK");

            System.out.println("Trazena vremenska prognoza:" + forecast);
        }
    }
}

Indeksi

Indeksi kao i polja nad kojima se mogu izvršavati upiti mogu da se konfigurišu uz pomoć @QuerySqlField anotacije. U sledećem primeru demonstriramo na koji način to možemo uraditi:

public class Osobaimplements Serializable {
  /** indeksno polje, vidljivo sql enginu */
        @QuerySqlField (index = true)
  private long id;

  /** polje nad kojim se može izvršavati upt, vidljivo sql enginu*/
  @QuerySqlField
  private String ime;

  /**neće biti vidljivo sql enginu*/
  private int starost;

  /**
   * indeksno polje sortirano u opadajućem poretku
   * vidljivo SQL enginu
   */
  @QuerySqlField(index = true, descending = true)
  private float plata;
}

I id i plata su indeksna polja. Polje id će biti sortirano u rastućem poretku ,što je podrazumevano ponašanje, dok će polje plata biti soritano u opadajućem poretku.

Ako želimo da polje bude dostupno SQL enginu ali ne želimo da ono bude član indeksa potrebno je izostaviti deo index= true.

Ako želimo da polje bude potpuno nevidljivo SQL enginu potrebno je da izostavimo anotaciju.

Registrovanje tipova indeksa

Nakon definisanja indeksa i polja potrebno je da ih registrujemo u SQL enginu. Da bi smo obavestili Ignite koje tipove želimo da indeksiramo, možemo da metodi CacheConfiguration.setIndexedTypes prosledimo key-value par.

// Priprema konfiguracije
CacheConfiguration ccfg = new CacheConfiguration();

// Registrovanje tipova indeksa
ccfg.setIndexedTypes(Long.class, Osoba.class);

Metod prima par tipova – prva klasa će se koristiti kao ključ a druga kao vrednost.

Grupni indeksi

Da bi smo kreirali indeks nad više polja koristimo @QuerySqlField.Group anotaciju.

public class Person implements Serializable {

@QuerySqlField(orderedGroups={@QuerySqlField.Group(
name = "starost_plata_idx", order = 0, descending = true)})
private int starost;

@QuerySqlField(index = true, orderedGroups={@QuerySqlField.Group(
name = "starost_plata_idx", order = 3)})
private double plata;
}

Isplativost indeksa

Pre nego što kreiramo indeks trebalo bi da imamo par stvari na umu. Indeksi nisu besplatni. Svaki od njih zauzima memoriju, osim toga, svaki put kada izvršimo naredbu ažuriranja nad tabelom moramo ažurirati i indeks. Još jedna stvar na koju trebamo da obratimo pažnju je da prilikom procesa optimizacije može da se desi da se izabere pogrešan indeks i samim tim dobijamo usporavanje izvršavanja upita umesto ubrzanja. Indeksiranje svega je loša strategija!

Zaključak

Kao što smo videli Ignite je moćno sredstvo za skladištenje i procesiranje velike količine podataka. Velika prednost Ignite-a je to što mu se može pristupiti iz velikog broja programskih jezika (Java, C#, C++) kao i iz svih najčešće korišćenih operativnih sistema. Ignite takođe dozvoljava korišćenje SQL-a kao query jezika što dodatno olakšava korišćenje ovog alata. Osim toga u Ignite-u je moguće kreirati usluge kojima se može pristupati iz bilo koje aplikacije. Sve ovo uz garntovanu skalabilnost, konzistentnost, trajnost, otpornost na otkaze i veliku fleksibilnost čini Ignite idealnim izborom za bilo koji sistema u kome se koriste velike količine podataka.

Literatura