Apache Spark – Deo I

Apache Spark je open-source platforma za obradu velikih podataka. Spark je napisan u Scala programskom jeziku i pokreće se na Java virutelnoj mašini. Postoje dodatni moduli za mašinsko učenje, SQL, graf obradu i streaming. Razvijen je na Univerzitetu u Kaliforniji, da bi Apache Software Foundation nastavio njegov ravoj. Nastao je zbog ograničenja MapReduce paradigme. Nedostatak MapReduce-a je što se ulazni parametri, međurezultat i rezultat smeštaju na disk, što produžava vreme izvršavanja programa.

Iteracija:MapReduce-a

Spark je alat koji obradu podataka vrši u radnoj memoriji što ga čini veoma brzim. Međurezultat se takođe smešta u radu memoriji, ukoliko ima mesta.

Iteracija na Sparku

Ukoliko podaci ne mogu više da stanu u radnu memoriju, prebacuju se na disk. Spark će prvo pokušati da što više podataka smesti u radnu memoriju, pa tek onda ostalo na disk. Upravo zbog ove mogućnosti ima prednost u brzini. Spark dozvoljava da aplikacije pišemo u Java, Scala, ili Python programskom jeziku.

Koncepti Spark-a su:

  • RDD (Resilient Distributed Datasets) Spark-ova kolekcija podataka, u radnoj memoriji ili na disku, za koju su karakteristični paralelizam i oporavak od greške.
  • Operacije transformacije (Map, Filter, …) i akcija (count, collect, save…).

Pored osnovnog API-ja, tu su dodatne bibliotkeke koje su deo Spark-a i koje pružaju dodatnu mogućnost pri obradi i analizi velikih podataka. Biblioteke su:

  • Spark Streaming: se može koristiti ѕa obradu strimovanja podataka u realnom vremenu.
  • Spark SQL: omogućava korišćenje SQL upita u obradi podataka. Uvodi se još jedna apstrakcija podataka (DataFrame), što omogućava da izvor podataka bude raznovrsan. Na primer: JSON, Hive, relacione baze podataka, itd..
  • Spark MLlib: modul se koristi za razvoj alogritama mašinskog učenja na Spark-u i velikim količinama podataka. Omogućava klasifikaciju, regresiju, klasterovanje…
  • Spark GraphX: omogućava razvoj grafovskih algoritama. Ogoroman broj algoritama koji se često koriste su već implementirani u okviru biblioteke. Jedan od njih je Page Rank algoritam. Koristi još jedan nivo apstrakcije podataka – Resilient Distributed Property Graph. Da bi se podržao ovaj tip podataka uvodi se niz akcija (subgraph, joinVertices i aggregateMessages)

Resilient Distributed Datasets

Osnovu Sparka čine Resilient Distributed Datasets. RDD predstavlja osnovnu apstrakciju u Spark-u. RDD je strukutra podataka koja se ne može promeniti, pa se izvršavanjem neke operacije nad RDD-om generiše novi RDD. Za svaki RDD se čuvaju podaci od kojih je nastao tj. pravi se lanac zavisnosti. Svaki RDD u lancu zavisnosti ima funkciju za izračunavanje svojih podataka i pokazivač na svog RDD roditelja. To je dobro jer ukoliko mašina na klasteru iz tehničkih, ili nekih drugih, razloga nije u mogućnosti da izvrši svoj zadatak, deo posla sa te mašine se može izvršiti na drugoj bez uticaja na rad i rezultat ostalih mašina. Podržava lazy evaluation, što smanjuje vreme izvršavanja naše aplikacije. Spark neće odmah izvršiti neku transformaciju podataka dok ne vidi akciju. Nad RDD-om se mogu izvršavati transformacije i akcije.

Transformacije

RDD transformacije su lazy evaluated, pa će se izvršiti tek kada transformaciju izvrši neka akcija. Transformacije su zapravo fukncije koje se primenjuju na ulaz, a za rezultat daju novi RDD. Kada se izvrši neka transformacija dobija se novi RDD, bez promene starog. Postoje sledeće transformacije:

  • map(func) – ako je roditeljski RDD imao 10 elemenata, onda i rezultat posle ove transformacije mora biti 10.
  • flatMap(func) – za razliku od map-a, može stvoriti kao rezultat sekvencu izlaza za jedan ulaz.
  • filter(func) – vraća novi set podataka od elementa koji zadovoljavaju prosleljenu funkciju.
  • union(otherDataset): vraća nove podatke koji čine uniju ulaznih i prosleljenih vrednosti.
  • intersection(otherDataset) – vraća nove podatke koji čine presek ulaznih i prosleljenih vrednosti.
  • distinct() – vraća novi set podataka čiji su elementi jedinstveni
  • reduceByKey(func) – radi redukciju za prosleđeni skup parova (K, V), vraća vrednost za svaki ključ primenjujući prosleđenu funkciju koja mora biti tipa (V,V) kako bi vratila V.
  • sortByKey([ascending]) – skup parova (K, V), gde K implementira Ordered, vraća skup parova (K, V) sortirani u prolseđenom poretku.
  • join(otherDataset) – za skup parova (K, V) i (K,W) vraća skup od (K, (V, W)) parova.

Actions

Akcije su operacije na RDD-u koje ne vraćaju vrednosti. Kada se pozove neka akcija ona će izvršiti sve nadovezane transformacije tj. one su okidač za izvršavanje transformacija. Postoje sledeće akcije:

  • reduce(func) -redukcija
  • collect() – vraća sve elemente iz skupa podataka. Obično se primenjuje nakon trasnformacije filter.
  • count()– vraća broj elemenata u skupu.
  • first() – vraća prvi element u skupu
  • take(n) – vraća niz od prvih n elemenata u skupu
  • saveAsTextFile(path) – upisuje elemente iz skupa u teksutalni fajl prosleđen u argumentu.
  • saveAsObjectFile– koristeći serijalizaciju upisuje objekte u fajl.
  • foreach(func) – za svaki element primeni prosleđenu funkciju.

Lazy Evaluation

Lazy Evaluation nam pomaže da optimizujemo iskorišćenje memorije i vreme izvršavanja programa ukoliko je potrebno da Spark filtrira ogromnu količinu podataka. Recimo, želimo da pronađemo studenta u Srbiji sa prosekom 10.0 i onda od tog rezultata prvog takvog studenta. Zahvaljujući Lazy Evaluation osobini, Spark neće učitati ceo fajl, pa filtrirati po određenim kriterijumima i na kraju vratiti prvi rezultat, već će sačuvati sve naše transformacije i odogovoriti na akcije tj. učitaće prvog studetna sa određenim kirterijumom. Lazy Evaluation će se najbolje razumeti na primeru. Ukoliko imamo neki fajl i izvršimo naredbu:

val lines = sc.textFile("words.txt")

još uvek nismo učitali fajl. Fajl će biti učitan tek po zadavanju akcije. Sledećom naredbom uradićemo još jednu transformaciju:

val filtered = lines.filter(line => line.contains("word1"))

Naredbom:

filtered.first()

smo zadali akciju i tek sada će Spark izvršiti transformacije i zadate akcije.

Instalacija

Java

Pre instalacije potrebno je proveriti da li imamo instaliranu Javu. Prvo ćemo komandom

$ java -version

provereriti da li već postoji. Ako postoji, izlaz na konzoli je sličan ovome:

java version "1.7.0_91" OpenJDK Runtime Environment (IcedTea 2.6.3) (7u91-2.6.3-0ubuntu0.14.04.1) OpenJDK 64-Bit Server VM (build 24.91-b01, mixed mode)

Ukoliko sistem nema Javu, porebna je instalacija. Potrebno je preuzeti adekvatnu verziju Jave sa linka: http://www.oracle.com/technetwork/java/javase/downloads/index.html Preuzetu arhivu ćemo raspakovati sledećim komandama:

$ cd Downloads/ $ tar zxf jdk-7u91-linux-x64.gz $ ls jdk-7u91-linux-x64 jdk-7u91-linux-x64.gz

Kako bi bila dostupna svim korisnicima, komandom

# mv jdk-7u91-linux-x64 /usr/local/

ćemo je prebaciti u folder /usr/local. Kako ne bismo morali svaki put da kucamo celu putanju do Java programa potrebnno je izvršiti sledeće komande:

export JAVA_HOME=/usr/local/jdk-7u91-linux-x64 export PATH= $PATH:$JAVA_HOME/bin

kako bi putanja ostala zapamćena potrebno je izvršiti sledeću komandu:

$ source ~/.bashrc

Nakon ovoga, potrebno je proberiti da li je Java uspešno instalirana komandom iz prvog pasusa. Drugi način da se instalira nova verzija Jave (Java 8) moguć je samo na Ubuntu verzijama posle 14.10 komandom:

apt-get install openjdk-8-jdk

Scala

Nakon instalacije Jave, potrebno je instalirati Scalu. Scala je programski jezik koji kombinuje objektno-orijentisano, fukncionalno i konkurentno programiranje. Scala radi na Java virutelnoj mašini. Cilj Scala programskog jezika jeste da, u odnosu na Javu, razvoj softvera bude jednostavniji, elegantniji i bezbedniji. Scala može koristiti sve javine biblioteke. Spring i Hibernate rade odlično u kombinaciji sa Scalom. Takođe, editori kao što su Eclipse i Netbeans rade sa ovim programskim jezikom. Jedna od razlika između jave i skale je u strim operaciji nad kolekcijama. U Javi:

peoples.stream().filter( x -> x.firstName.equals(”Jon”)).collect(Collectors.toList())

u Scala programskom jeziku:

peoples.filter(_.firstName == "Jon”)

Kako bi Java manipulisala kolekcijom na način na koji to Scala radi, mora da implementira stream interfejs, kao i interfejs za rezultat. U Scala programskom jeziku je sve objekat. Čak su i brojevi i funkcije objekti. Funkcije možemo sačuvati u nekoj promenljivoj koju možemo proslediti kao argument neke druge funkcije. Dakle, podržava funkcionalno programiranje. Klase u Skali mogu imati parametre. Mnogi tradicionalni dizjan paterni su prirodno podržani u Scali. Na primer, singleton patern je podržan kroz definiciju objekta. Kada imamo više niti u programu potrebno je u trenutku dok mi menjamo neke podatke zaključati za druge niti, kako ih ne bi neka druga nit promenila. Za kompleksne programe je potrebno više brinuti o tome pa nam ovaj programski jezik omogućava da koristimo nepromenjene objekte (immutable objects). Ovi objketi se mogu koristiti bez obzira ko ih koristi. Scala je programski jezik koji svoj potencijal pruža pri konkurentnom i sinhronizovanom izvršavanju programa na više procesora Prvo ćemo proveriti komandom:

$scala -version

da li već postoji u našem sistemu. Ukoliko već postoji, izlaz je ova poruka:

Scala code runner version 2.9.2 -- Copyright 2002-2013, LAMP/EPFL

Ukoliko Scala nije instaliarana, potrebno je preuzeti odgovarajuću verziju sa linka: http://www.scala-lang.org/download/ Preuzetu arhivu ćemo raspakovati sledećim komandama:

$ cd Downloads/ $ tar zxf scala-2.11.8.tgz $ ls scala-2.11.8 scala-2.11.8.tgz

Kako bi bila dostupna svim korisnicima, komandom

# mv scala-2.11.8 /usr/local/

ćemo je prebaciti u folder /usr/local. Kako ne bismo morali svaki put da kucamo celu putanju potrebnno je izvršiti sledeće komande:

export PATH= $PATH:/user/local/scala-2.11.8/bin

kako bi putanja ostala zapamćena potrebno je izvršiti sledeću komandu:

$ source ~/.bashrc

Apache Spark

Prvi korak je preuzimanje adekvatne verzije sa Sparka. Nakon što se izabere i preuzme odgovarajuća verzija: spark-2.0.0-bin-hadoop2.7.tgz Preuzetu arhivu ćemo raspakovati sledećim komandama:

$ cd Downloads/ $ tar zxf spark-2.0.0-bin-hadoop2.7.tgz $ ls spark-2.0.0-bin-hadoop2.7 spark-2.0.0-bin-hadoop2.7.tgz

Kako bi bila dostupna svim korisnicima, komandom

# mv spark-2.0.0-bin-hadoop2.7 /usr/local/

ćemo je prebaciti u folder /usr/local. Kako ne bismo morali svaki put da kucamo celu putanju potrebnno je izvršiti sledeće komande:

export PATH= $PATH:/user/local/spark-2.0.0-bin-hadoop2.7/bin

kako bi putanja ostala zapamćena potrebno je izvršiti sledeću komandu:

$ source ~/.bashrc

Nakon toga, ukucati sledeću komadu:

$ spark-shell

Izvršavanje gore navedene komande rezultiraće sledećim izlazom:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/07/18 22:48:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/07/18 22:48:06 WARN Utils: Your hostname, K53E resolves to a loopback address: 127.0.1.1; using 192.168.0.12 instead (on interface wlan0) 16/07/18 22:48:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/07/18 22:48:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 16/07/18 22:48:11 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://192.168.0.12:4041 Spark context available as 'sc' (master = local[*], app id = local-1471553290176). Spark session available as 'spark'. Welcome to ____              __ / __/__  ___ _____/ /__ _\ \/ _ \/ _ `/ __/  '_/ /___/ .__/\_,_/_/ /_/\_\   version 2.0.0 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.7.0_91) Type in expressions to have them evaluated. Type :help for more information.

Gore navedeni izlaz je generisao Spark context pod skraćenicom sc. Spark context je suština Sparka. Najvažnija funkcionalnost (kreiranje RDD-ova) je u Spark context-u. Da bismo proverili da li nam sve radi kako treba možemo ukucati sledeće komande:

scala> sc.version res0: String = 2.0.0

ili

scala> sc.appName res1: String = Spark shell

Za izlazak iz Spark Shell prozora potrebno je ukucati sledeću komandu:

:quit

Korisni linkovi