
from pyspark import SparkConf, SparkContext
import csv
from datetime import datetime, date

conf = SparkConf().setMaster("local").setAppName("Festivali")
sc = SparkContext(conf = conf)

def getRDDfromCSV(fileName):
    return sc.textFile(fileName).mapPartitions(lambda x: csv.reader(x))

def filtriranje(x):
    godFestivala = int(x[1][0])

    dateFrom = datetime.strptime(x[1][1][1], "%Y-%M-%d").date().year

    if x[1][1][2] != "NULL":
        dateTo = datetime.strptime(x[1][1][2], "%Y-%M-%d").date().year
    else:
        dateTo = date.today().year

    if godFestivala >= dateFrom:
        if godFestivala <= dateTo:
            return True

    return False

def nadjiMaksNastupa(x):
    if x[1] == maksNastupa:
        return True
    return False

lineup_festivala = getRDDfromCSV("../festivali/lineup_festivala.csv").map(lambda x: (x[2], x[1]))
sastav_benda = getRDDfromCSV("../festivali/sastav_benda.csv").map(lambda x: (x[1], (x[0], x[2], x[3])))
muzicari = getRDDfromCSV("../festivali/muzicari.csv").map(lambda x: (x[0], (x[1], x[2])))

lineup_sastav = lineup_festivala.join(sastav_benda).filter(lambda x: filtriranje(x))
lineup_sastav = lineup_sastav.map(lambda x: (x[1][1][0], 1)).reduceByKey(lambda x, y: x + y)

maksNastupa = 0
for line in lineup_sastav.collect():
    if line[1] > maksNastupa:
        maksNastupa = line[1]

lineup_sastav = lineup_sastav.filter(lambda x: nadjiMaksNastupa(x))

izvrsni = lineup_sastav.join(muzicari).map(lambda x: (x[1][1][0], x[1][1][1], x[1][0]))

with open("najviseNastupa.csv", "w") as f:
    for line in izvrsni.collect():
        f.write(line[0] + "," + line[1] + "," + str(line[2]) + "\n")
