Qcert.Backend.NNRCMRtoSpark
Section NNRCMRtoSpark.
Context {fruntime:foreign_runtime}.
Context {fredop:foreign_reduce_op}.
Context {ftojavascript:foreign_to_javascript}.
Context {ftospark:foreign_to_spark}.
Definition js_endl := eol_backn.
Section sanitize.
Definition scalaAllowedIdentifierInitialCharacters := ["A";"B";"C";"D";"E";"F";"G";"H";"I";"J";"K";"L";"M";"N";"O";"P";"Q";"R";"S";"T";"U";"V";"W";"X";"Y";"Z";"a";"b";"c";"d";"e";"f";"g";"h";"i";"j";"k";"l";"m";"n";"o";"p";"q";"r";"s";"t";"u";"v";"w";"x";"y";"z"]%char.
Definition scalaAllowedIdentifierCharacters := ["A";"B";"C";"D";"E";"F";"G";"H";"I";"J";"K";"L";"M";"N";"O";"P";"Q";"R";"S";"T";"U";"V";"W";"X";"Y";"Z";"a";"b";"c";"d";"e";"f";"g";"h";"i";"j";"k";"l";"m";"n";"o";"p";"q";"r";"s";"t";"u";"v";"w";"x";"y";"z";"0";"1";"2";"3";"4";"5";"6";"7";"8";"9";"_"]%char.
Definition scalaIdentifierInitialCharacterToAdd := "X"%char.
Definition scalaIdenitiferCharacterForReplacement := "X"%char.
Definition scalaIdentifierFixInitial (ident:string) : string
:= match ident with
| EmptyString ⇒
String scalaIdentifierInitialCharacterToAdd EmptyString
| String a _ ⇒
if in_dec ascii_dec a scalaAllowedIdentifierInitialCharacters
then ident
else String scalaIdentifierInitialCharacterToAdd ident
end.
Definition scalaIdentifierSanitizeChar (a:ascii)
:= if a == "$"%char
then "_"%char
else if in_dec ascii_dec a scalaAllowedIdentifierCharacters
then a
else scalaIdenitiferCharacterForReplacement.
Definition scalaIdentifierSanitizeBody (ident:string)
:= map_string scalaIdentifierSanitizeChar ident.
Definition scalaIdentifierSanitize (ident:string)
:= scalaIdentifierFixInitial (scalaIdentifierSanitizeBody ident).
Definition scalaSafeSeparator := "_".
Definition scalaAvoidList :=
["abstract"; "case"; "catch"; "class"; "def"; "do" ; "else"; "extends"
; "false"; "final"; "finally"; "for"; "forSome"; "if" ; "implicit"; "import"
; "lazy"; "match"; "new"; "null"; "object"; "override"
; "package"; "private"; "protected"; "return"; "sealed"; "super";
"this"; "throw"; "trait"; "try"; "true"; "type"; "val"; "var"
; "while"; "with"; "yield"].
Definition nnrcmr_rename_local_for_js (mrl:nnrcmr)
:= nnrcmr_rename_local
jsSafeSeparator
jsIdentifierSanitize
jsAvoidList
mrl.
Definition nnrcmr_rename_graph_for_scala (mrl:nnrcmr)
:= nnrcmr_rename_graph
scalaSafeSeparator
scalaIdentifierSanitize
scalaAvoidList
mrl.
Definition nnrcmr_rename_for_spark (mrl:nnrcmr)
:= nnrcmr_rename_graph_for_scala
(nnrcmr_rename_local_for_js mrl).
End sanitize.
Section MRSpark.
Definition rdd_env_id (x: string) :=
"RDD"++(scalaIdentifierSanitize x).
Definition rdd_map_id (x: string) :=
"RDDMap_"++(scalaIdentifierSanitize x).
Definition rdd_reduce_id (x: string) :=
"RDDReduce_"++(scalaIdentifierSanitize x).
Definition attr_id (x: string) :=
"'"++x++"'".
Definition get_engine_func (scala_endl:string) :=
"var engine : Option[javax.script.ScriptEngine] = None" ++ scala_endl ++
"def get_engine() : javax.script.ScriptEngine = {" ++ scala_endl ++
" val factory = new javax.script.ScriptEngineManager" ++ scala_endl ++
" val new_engine = factory.getEngineByName(""JavaScript"")" ++ scala_endl ++
" load_harness(new_engine)" ++ scala_endl ++
" return (new_engine)" ++ scala_endl ++
"}" ++ scala_endl.
Definition array_of_js_func_v7 (scala_endl:string) :=
"def array_of_js(e: javax.script.ScriptEngine, js_arr: Object): Array[String] = {" ++ scala_endl ++
" e.put(""js_arr"", js_arr)" ++ scala_endl ++
" val jarr = e.eval(""js_arr.map(function(elem){ return JSON.stringify(elem); })"").asInstanceOf[jdk.nashorn.api.scripting.ScriptObjectMirror].to(classOf[jdk.nashorn.internal.runtime.ScriptObject])" ++ scala_endl ++
" val arr = jarr.getArray().asObjectArray()" ++ scala_endl ++
" val sarr = arr.map(_.toString())" ++ scala_endl ++
" return sarr" ++ scala_endl ++
"}" ++ scala_endl.
Definition array_of_js_func_v8 (scala_endl:string) :=
"def array_of_js(e: javax.script.ScriptEngine, js_arr: Object): Array[String] = {" ++ scala_endl ++
" e.put(""js_arr"", js_arr)" ++ scala_endl ++
" val jarr = e.eval(""js_arr.map(function(elem){ return JSON.stringify(elem); })"").asInstanceOf[jdk.nashorn.api.scripting.ScriptObjectMirror].to(classOf[jdk.nashorn.internal.runtime.ScriptObject])" ++ scala_endl ++
" val arr = jarr.getArray().asObjectArray()" ++ scala_endl ++
" val sarr = arr.map(_.toString())" ++ scala_endl ++
" return sarr" ++ scala_endl ++
"}" ++ scala_endl.
Definition array_of_js_func := array_of_js_func_v8.
Definition js_of_iterable_func (scala_endl:string) :=
"def js_of_iterable(coll: Iterable[String]): String = {" ++ scala_endl ++
" /* Hack! */" ++ scala_endl ++
" var values = ""[ """ ++ scala_endl ++
" var first = true" ++ scala_endl ++
" for (elem <- coll) {" ++ scala_endl ++
" if (!first) {" ++ scala_endl ++
" values = values + "", """ ++ scala_endl ++
" }" ++ scala_endl ++
" values = values + elem" ++ scala_endl ++
" first = false" ++ scala_endl ++
" }" ++ scala_endl ++
" values = values + "" ]""" ++ scala_endl ++
" values" ++ scala_endl ++
"}" ++ scala_endl.
Definition load_harness_from_file_func (scala_endl:string) (quotel:string) :=
"def load_harness(e:javax.script.ScriptEngine) {" ++ scala_endl ++
" e.eval(new java.io.FileReader(dataFile))" ++ scala_endl ++
" e.eval(new java.io.FileReader(harnessFile))" ++ scala_endl ++
"}" ++ scala_endl.
Definition load_env_defs (init: string) (scala_endl:string) (quotel:string) :=
"def mkWorld(e: javax.script.ScriptEngine) : Object = {" ++ scala_endl ++
" /* XXX Should be updated to use the new io format XXX */" ++ scala_endl ++
" e.eval(new java.io.FileReader(dataFile))" ++ scala_endl ++
" e.eval(""var world = mkWorld(input)"")" ++ scala_endl ++
" e.eval(""world["++(attr_id init)++"] = null"")" ++ scala_endl ++
" e.eval(""world"")" ++ scala_endl ++
"}" ++ scala_endl ++
"def loadScalar(e: javax.script.ScriptEngine, sc: SparkContext, world: Object, attr: String) : org.apache.spark.rdd.RDD[String] = {" ++ scala_endl ++
" e.put(""world"", world)" ++ scala_endl ++
" val coll = array_of_js(e, e.eval(""[ world[""+attr+""] ]""))" ++ scala_endl ++
" sc.parallelize(coll)" ++ scala_endl ++
"}" ++ scala_endl ++
"def loadDistributed(e: javax.script.ScriptEngine, sc: SparkContext, world: Object, attr: String): org.apache.spark.rdd.RDD[String] = {" ++ scala_endl ++
" e.put(""world"", world)" ++ scala_endl ++
" val coll = array_of_js(e, e.eval(""world[""+attr+""]""))" ++ scala_endl ++
" sc.parallelize(coll)" ++ scala_endl ++
"}" ++ scala_endl.
Lemma var_loc_eq_dec:
∀ x_loc y_loc : (var × dlocalization),
{ x_loc = y_loc } + { x_loc ≠ y_loc }.
Definition load_env (env_vars: list (var × dlocalization)) (scala_endl:string) (quotel:string) :=
let env_vars := List.nodup var_loc_eq_dec env_vars in
"val e = get_engine()" ++ scala_endl ++
"val world = mkWorld(e)" ++ scala_endl ++
List.fold_left
(fun acc (x_loc: var × dlocalization) ⇒
let (x, loc) := x_loc in
let unconsted_x := unConstVar x in
let load_func :=
match loc with
| Vlocal ⇒ "loadScalar"
| Vdistr ⇒ "loadDistributed"
end
in
acc ++
"val "++(rdd_env_id x)++" = "++load_func++"(e, sc, world, """++(attr_id unconsted_x)++""")" ++ scala_endl)
env_vars "".
Definition check_result_from_file_func (scala_endl:string) (quotel:string) :=
"def check_result(actual: String) {" ++ scala_endl ++
" val factory = new javax.script.ScriptEngineManager" ++ scala_endl ++
" val e = factory.getEngineByName(""JavaScript"")" ++ scala_endl ++
" load_harness(e)" ++ scala_endl ++
" e.eval(""var actual = ""+actual+"";"")" ++ scala_endl ++
" e.eval(""var expected = [[output]];"")" ++ scala_endl ++
" e.eval(""if (equal(expected, actual)) { print(\""OK\\n\""); } else { print(\""ERROR\\n\""); }"");" ++ scala_endl ++
"}" ++ scala_endl.
If the map-reduce has the identifier id, the argument name
x and body n, we want to generate the following Scala code
for MapFlatten:
val RDDMap_id = RDD_input.flatMap(x => {
val e = get_engine()
val x_js = e.eval(x.asInstanceOfString)
e.put("x", x_js)
array_of_js(e, e.eval("if (typeof map_id === 'undefined') {"+
"function map_id(x) {"+
"js(n)"+
"}"
"}"
"map_id(x)"))
})
Definition scala_of_mr_map (mr_id: string) (input: string) (mr_map: map_fun) (scala_endl:string) (quotel:string) :=
match mr_map with
| MapDist (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".map(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(map_"++(mr_id)++"(x))"").asInstanceOf[String]" ++ scala_endl ++
"})" ++ scala_endl
| MapDistFlatten (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".flatMap(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" array_of_js(e, e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""map_"++(mr_id)++"(x)""))" ++ scala_endl ++
"})" ++ scala_endl
| MapScalar (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".flatMap(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" array_of_js(e, e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""map_"++(mr_id)++"(x)""))" ++ scala_endl ++
"})" ++ scala_endl
end.
match mr_map with
| MapDist (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".map(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(map_"++(mr_id)++"(x))"").asInstanceOf[String]" ++ scala_endl ++
"})" ++ scala_endl
| MapDistFlatten (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".flatMap(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" array_of_js(e, e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""map_"++(mr_id)++"(x)""))" ++ scala_endl ++
"})" ++ scala_endl
| MapScalar (map_v, map_f) ⇒
let map_v_js := "map_arg" in
let '(j, v, t) := nnrcToJSunshadow map_f 3 1 js_endl quotel (map_v::nil) ((map_v, map_v_js)::nil) in
"val "++(rdd_map_id mr_id)++" = "++(rdd_env_id input)++".flatMap(x => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x.asInstanceOf[String]+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" array_of_js(e, e.eval(""if (typeof map_"++(mr_id)++" === 'undefined') {"++js_endl++"""+" ++ scala_endl ++
" "" function map_"++(mr_id)++"("++map_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }"++js_endl++"""+" ++ scala_endl ++
" ""}"++js_endl++"""+" ++ scala_endl ++
" ""map_"++(mr_id)++"(x)""))" ++ scala_endl ++
"})" ++ scala_endl
end.
functions to compute stats
Definition stats_funcs scala_endl :=
"def statsReduce (x: String, y: String): String = {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" var res:String = """"" ++ scala_endl ++
" if (x.equals("""")) {" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': 1, 'sum': y, 'min': y, 'max': y };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" } else {" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': x['count'] + 1,"++js_endl++"""+" ++ scala_endl ++
" "" 'sum': x['sum'] + y,"++js_endl++"""+" ++ scala_endl ++
" "" 'min': Math.min(x['min'], y),"++js_endl++"""+" ++ scala_endl ++
" "" 'max': Math.max(x['max'], y) };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" }" ++ scala_endl ++
" return res" ++ scala_endl ++
"}" ++ scala_endl ++
"def statsRereduce (x: String, y: String): String = {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" var res:String = """"" ++ scala_endl ++
" if (x.equals("""")) {" ++ scala_endl ++
" if (y.equals("""")) {" ++ scala_endl ++
" res = ""{ 'count': 0, 'sum': 0, 'min': 0, 'max': 0 }""" ++ scala_endl ++
" } else {" ++ scala_endl ++
" res = y" ++ scala_endl ++
" }" ++ scala_endl ++
" } else {" ++ scala_endl ++
" if (y.equals("""")) {" ++ scala_endl ++
" res = x" ++ scala_endl ++
" } else {" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': x['count'] + y['count'],"++js_endl++"""+" ++ scala_endl ++
" "" 'sum': x['sum'] + y['sum'],"++js_endl++"""+" ++ scala_endl ++
" "" 'min': Math.min(x['min'], y['min']),"++js_endl++"""+" ++ scala_endl ++
" "" 'max': Math.max(x['max'], y['max']) };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" }" ++ scala_endl ++
" }" ++ scala_endl ++
" return res" ++ scala_endl ++
"}" ++ scala_endl.
"def statsReduce (x: String, y: String): String = {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" var res:String = """"" ++ scala_endl ++
" if (x.equals("""")) {" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': 1, 'sum': y, 'min': y, 'max': y };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" } else {" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': x['count'] + 1,"++js_endl++"""+" ++ scala_endl ++
" "" 'sum': x['sum'] + y,"++js_endl++"""+" ++ scala_endl ++
" "" 'min': Math.min(x['min'], y),"++js_endl++"""+" ++ scala_endl ++
" "" 'max': Math.max(x['max'], y) };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" }" ++ scala_endl ++
" return res" ++ scala_endl ++
"}" ++ scala_endl ++
"def statsRereduce (x: String, y: String): String = {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" var res:String = """"" ++ scala_endl ++
" if (x.equals("""")) {" ++ scala_endl ++
" if (y.equals("""")) {" ++ scala_endl ++
" res = ""{ 'count': 0, 'sum': 0, 'min': 0, 'max': 0 }""" ++ scala_endl ++
" } else {" ++ scala_endl ++
" res = y" ++ scala_endl ++
" }" ++ scala_endl ++
" } else {" ++ scala_endl ++
" if (y.equals("""")) {" ++ scala_endl ++
" res = x" ++ scala_endl ++
" } else {" ++ scala_endl ++
" val x_js = e.eval(""var tmp = ""+x+""; tmp"")" ++ scala_endl ++
" e.put(""x"", x_js)" ++ scala_endl ++
" val y_js = e.eval(""var tmp = ""+y+""; tmp"")" ++ scala_endl ++
" e.put(""y"", y_js)" ++ scala_endl ++
" res = e.eval(""var res = { 'count': x['count'] + y['count'],"++js_endl++"""+" ++ scala_endl ++
" "" 'sum': x['sum'] + y['sum'],"++js_endl++"""+" ++ scala_endl ++
" "" 'min': Math.min(x['min'], y['min']),"++js_endl++"""+" ++ scala_endl ++
" "" 'max': Math.max(x['max'], y['max']) };"++js_endl++"""+" ++ scala_endl ++
" ""JSON.stringify(res)"").asInstanceOf[String]" ++ scala_endl ++
" }" ++ scala_endl ++
" }" ++ scala_endl ++
" return res" ++ scala_endl ++
"}" ++ scala_endl.
Case Flatten:
val RDDReduce_id = RDDMap_id
Case Collect:
val RDDReduce_id = (() => {
val e = get_engine()
val values = RDDMap_id.collect()
val values_js = e.eval("var tmp = "+(js_of_iterable(values))+"; tmp")
e.put("values", values_js)
val res = e.eval("if (typeof reduce_id === 'undefined') {"+
" function reduce_id(red_arg) { red_body; return v }"+
"}"+
"JSON.stringify(reduce_id(values))")
sc.parallelize(Array(res))
}: org.apache.spark.rdd.RDDObject) ()
Definition scala_of_mr_reduce (mr_id: string) (mr_reduce: reduce_fun) (scala_endl:string) (quotel:string) :=
match mr_reduce with
| RedId ⇒
"var "++(rdd_reduce_id mr_id)++" = "++(rdd_map_id mr_id) ++ scala_endl
| RedCollect reduce ⇒
let (red_values_v, red_f) := reduce in
let red_values_v_js := "values" in
let '(j, v, t) := nnrcToJSunshadow red_f 1 1 js_endl quotel (red_values_v :: nil) ((red_values_v, red_values_v_js)::nil) in
"val "++(rdd_reduce_id mr_id)++" = (() => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val values = "++(rdd_map_id mr_id)++".collect()" ++ scala_endl ++
" val values_js = e.eval(""var tmp = ""+(js_of_iterable(values))+""; tmp"")" ++ scala_endl ++
" e.put(""values"", values_js)" ++ scala_endl ++
" val res = e.eval(""if (typeof reduce_"++(mr_id)++" === 'undefined') {""+" ++ scala_endl ++
" "" function reduce_"++(mr_id)++"("++red_values_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }""+" ++ scala_endl ++
" ""}""+" ++ scala_endl ++
" ""JSON.stringify(reduce_"++(mr_id)++"(values))"").asInstanceOf[String]" ++ scala_endl ++
" sc.parallelize(Array(res))" ++ scala_endl ++
"}: org.apache.spark.rdd.RDD[String]) ()" ++ scala_endl
| RedOp rop ⇒
match rop with
| RedOpForeign frop ⇒
"val "++(rdd_reduce_id mr_id)++" = (() => {" ++ scala_endl ++
" val res = "++(rdd_map_id mr_id) ++ (foreign_to_spark_reduce_op frop scala_endl quotel) ++ scala_endl ++
" sc.parallelize(Array(res))" ++ scala_endl ++
"}: org.apache.spark.rdd.RDD[String]) ()" ++ scala_endl
end
| RedSingleton ⇒
"val "++(rdd_reduce_id mr_id)++" = "++(rdd_map_id mr_id) ++ scala_endl
end.
match mr_reduce with
| RedId ⇒
"var "++(rdd_reduce_id mr_id)++" = "++(rdd_map_id mr_id) ++ scala_endl
| RedCollect reduce ⇒
let (red_values_v, red_f) := reduce in
let red_values_v_js := "values" in
let '(j, v, t) := nnrcToJSunshadow red_f 1 1 js_endl quotel (red_values_v :: nil) ((red_values_v, red_values_v_js)::nil) in
"val "++(rdd_reduce_id mr_id)++" = (() => {" ++ scala_endl ++
" val e = get_engine()" ++ scala_endl ++
" val values = "++(rdd_map_id mr_id)++".collect()" ++ scala_endl ++
" val values_js = e.eval(""var tmp = ""+(js_of_iterable(values))+""; tmp"")" ++ scala_endl ++
" e.put(""values"", values_js)" ++ scala_endl ++
" val res = e.eval(""if (typeof reduce_"++(mr_id)++" === 'undefined') {""+" ++ scala_endl ++
" "" function reduce_"++(mr_id)++"("++red_values_v_js++") {"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return "++v++";"++js_endl++"""+" ++ scala_endl ++
" "" }""+" ++ scala_endl ++
" ""}""+" ++ scala_endl ++
" ""JSON.stringify(reduce_"++(mr_id)++"(values))"").asInstanceOf[String]" ++ scala_endl ++
" sc.parallelize(Array(res))" ++ scala_endl ++
"}: org.apache.spark.rdd.RDD[String]) ()" ++ scala_endl
| RedOp rop ⇒
match rop with
| RedOpForeign frop ⇒
"val "++(rdd_reduce_id mr_id)++" = (() => {" ++ scala_endl ++
" val res = "++(rdd_map_id mr_id) ++ (foreign_to_spark_reduce_op frop scala_endl quotel) ++ scala_endl ++
" sc.parallelize(Array(res))" ++ scala_endl ++
"}: org.apache.spark.rdd.RDD[String]) ()" ++ scala_endl
end
| RedSingleton ⇒
"val "++(rdd_reduce_id mr_id)++" = "++(rdd_map_id mr_id) ++ scala_endl
end.
Definition scala_of_mr (mr_id: string) (mr: mr) (outputs: list var) (scala_endl:string) (quotel:string) :=
scala_of_mr_map mr_id mr.(mr_input) mr.(mr_map) scala_endl quotel ++
scala_of_mr_reduce mr_id mr.(mr_reduce) scala_endl quotel ++
if in_dec equiv_dec mr.(mr_output) outputs then
(rdd_env_id mr.(mr_output))++" = "++(rdd_env_id mr.(mr_output))++"++"++(rdd_reduce_id mr_id)
else
"var "++(rdd_env_id mr.(mr_output))++" = "++(rdd_reduce_id mr_id).
Definition scala_of_mr_chain (l:list mr) (scala_endl:string) (quotel:string) :=
let '(_, _, scala) :=
List.fold_left
(fun (acc: string × list var × string) mr ⇒
let '(mr_id, outputs, s) := acc in
let mr_scala := scala_of_mr mr_id mr outputs scala_endl quotel in
((append mr_id "_next"), mr.(mr_output)::outputs, s++scala_endl++mr_scala))
l ("something_", nil, "")
in
scala ++ scala_endl.
Fixpoint string_of_list sep l :=
match l with
| nil ⇒ ""
| s::nil ⇒ s
| s::l ⇒ s++sep++(string_of_list sep l)
end.
Definition scala_of_mr_last (mr_last: ((list var × nnrc) × list (var × dlocalization))) (scala_endl:string) (quotel:string) :=
let params_js :=
List.map (fun x ⇒ "v"++x) (fst (fst mr_last))
in
let args_js :=
List.map
(fun x_loc ⇒
match x_loc with
| (x, Vlocal) ⇒ """+"++(rdd_env_id x)++".collect()(0)"++"+"""
| (x, Vdistr) ⇒ """+"++(rdd_env_id x)++".collect()"++"+"""
end)
(snd mr_last)
in
let result :=
let '(j, v, _) := nnrcToJSunshadow (snd (fst mr_last)) 3 1 js_endl quotel nil nil in
"e.eval(""(function ("++(string_of_list ", " params_js)++"){"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return JSON.stringify("++v++");"++js_endl++"""+" ++ scala_endl ++
" ""})("++(string_of_list ", " args_js)++")"").asInstanceOf[String]"++ scala_endl
in
result.
Definition scala_of_nnrcmr (mrl:nnrcmr) (scala_endl:string) (quotel:string) :=
scala_of_mr_chain mrl.(mr_chain) scala_endl quotel ++
scala_of_mr_last mrl.(mr_last) scala_endl quotel.
Definition nnrcmrToSparkTopDataFromFile (test_name: string) (init: var) (l:nnrcmr) (scala_endl:string) (quotel: string) :=
"import org.apache.spark.SparkContext" ++ scala_endl ++
"import org.apache.spark.SparkContext._" ++ scala_endl ++
"import org.apache.spark.SparkConf" ++ scala_endl ++
"import java.io._" ++ scala_endl ++
scala_endl ++
"object "++test_name++" {" ++ scala_endl ++
scala_endl ++
"var harnessFile: String = """"" ++ scala_endl ++
"var dataFile: String = """"" ++ scala_endl ++
get_engine_func scala_endl ++
scala_endl ++
js_of_iterable_func scala_endl ++
scala_endl ++
array_of_js_func scala_endl ++
scala_endl ++
stats_funcs scala_endl ++
scala_endl ++
load_harness_from_file_func scala_endl quotel ++ scala_endl ++
scala_endl ++
load_env_defs init scala_endl quotel ++
scala_endl ++
check_result_from_file_func scala_endl quotel ++
scala_endl ++
"def run(sc: SparkContext): String = {" ++ scala_endl ++
get_engine_func scala_endl ++ scala_endl ++
load_env l.(mr_inputs_loc) scala_endl quotel ++ scala_endl ++
scala_of_nnrcmr l scala_endl quotel ++
"}" ++ scala_endl ++
scala_endl ++
"def main(args: Array[String]) {" ++ scala_endl ++
" if (args.length != 2 && args.length != 3) {" ++ scala_endl ++
" println(""Expected arguments: harness.js, test.js.io [and output.io]"")" ++ scala_endl ++
" sys.exit(1)" ++ scala_endl ++
" }" ++ scala_endl ++
" harnessFile = args(0)" ++ scala_endl ++
" dataFile = args(1)" ++ scala_endl ++
" val conf = new SparkConf().setAppName("""++test_name++""")" ++ scala_endl ++
" val sc = new SparkContext(conf)" ++ scala_endl ++
" val json_result = run(sc)" ++ scala_endl ++
" if (args.length == 3) {" ++ scala_endl ++
" val pw = new PrintWriter(new File(args(2)))" ++ scala_endl ++
" pw.write(json_result)" ++ scala_endl ++
" pw.close" ++ scala_endl ++
" }" ++ scala_endl ++
" println(json_result)" ++ scala_endl ++
" if (args.length ==2)" ++ scala_endl ++
" check_result(json_result)" ++ scala_endl ++
"}" ++ scala_endl ++
"}" ++ scala_endl.
End MRSpark.
Definition nnrcmrToSparkTopDataFromFileTop (test_name: string) (init: var) (l:nnrcmr) : string :=
nnrcmrToSparkTopDataFromFile test_name init l eol_newline "'".
End NNRCMRtoSpark.
scala_of_mr_map mr_id mr.(mr_input) mr.(mr_map) scala_endl quotel ++
scala_of_mr_reduce mr_id mr.(mr_reduce) scala_endl quotel ++
if in_dec equiv_dec mr.(mr_output) outputs then
(rdd_env_id mr.(mr_output))++" = "++(rdd_env_id mr.(mr_output))++"++"++(rdd_reduce_id mr_id)
else
"var "++(rdd_env_id mr.(mr_output))++" = "++(rdd_reduce_id mr_id).
Definition scala_of_mr_chain (l:list mr) (scala_endl:string) (quotel:string) :=
let '(_, _, scala) :=
List.fold_left
(fun (acc: string × list var × string) mr ⇒
let '(mr_id, outputs, s) := acc in
let mr_scala := scala_of_mr mr_id mr outputs scala_endl quotel in
((append mr_id "_next"), mr.(mr_output)::outputs, s++scala_endl++mr_scala))
l ("something_", nil, "")
in
scala ++ scala_endl.
Fixpoint string_of_list sep l :=
match l with
| nil ⇒ ""
| s::nil ⇒ s
| s::l ⇒ s++sep++(string_of_list sep l)
end.
Definition scala_of_mr_last (mr_last: ((list var × nnrc) × list (var × dlocalization))) (scala_endl:string) (quotel:string) :=
let params_js :=
List.map (fun x ⇒ "v"++x) (fst (fst mr_last))
in
let args_js :=
List.map
(fun x_loc ⇒
match x_loc with
| (x, Vlocal) ⇒ """+"++(rdd_env_id x)++".collect()(0)"++"+"""
| (x, Vdistr) ⇒ """+"++(rdd_env_id x)++".collect()"++"+"""
end)
(snd mr_last)
in
let result :=
let '(j, v, _) := nnrcToJSunshadow (snd (fst mr_last)) 3 1 js_endl quotel nil nil in
"e.eval(""(function ("++(string_of_list ", " params_js)++"){"++js_endl++"""+"++ scala_endl ++
" """++j++js_endl++"""+" ++ scala_endl ++
" "" return JSON.stringify("++v++");"++js_endl++"""+" ++ scala_endl ++
" ""})("++(string_of_list ", " args_js)++")"").asInstanceOf[String]"++ scala_endl
in
result.
Definition scala_of_nnrcmr (mrl:nnrcmr) (scala_endl:string) (quotel:string) :=
scala_of_mr_chain mrl.(mr_chain) scala_endl quotel ++
scala_of_mr_last mrl.(mr_last) scala_endl quotel.
Definition nnrcmrToSparkTopDataFromFile (test_name: string) (init: var) (l:nnrcmr) (scala_endl:string) (quotel: string) :=
"import org.apache.spark.SparkContext" ++ scala_endl ++
"import org.apache.spark.SparkContext._" ++ scala_endl ++
"import org.apache.spark.SparkConf" ++ scala_endl ++
"import java.io._" ++ scala_endl ++
scala_endl ++
"object "++test_name++" {" ++ scala_endl ++
scala_endl ++
"var harnessFile: String = """"" ++ scala_endl ++
"var dataFile: String = """"" ++ scala_endl ++
get_engine_func scala_endl ++
scala_endl ++
js_of_iterable_func scala_endl ++
scala_endl ++
array_of_js_func scala_endl ++
scala_endl ++
stats_funcs scala_endl ++
scala_endl ++
load_harness_from_file_func scala_endl quotel ++ scala_endl ++
scala_endl ++
load_env_defs init scala_endl quotel ++
scala_endl ++
check_result_from_file_func scala_endl quotel ++
scala_endl ++
"def run(sc: SparkContext): String = {" ++ scala_endl ++
get_engine_func scala_endl ++ scala_endl ++
load_env l.(mr_inputs_loc) scala_endl quotel ++ scala_endl ++
scala_of_nnrcmr l scala_endl quotel ++
"}" ++ scala_endl ++
scala_endl ++
"def main(args: Array[String]) {" ++ scala_endl ++
" if (args.length != 2 && args.length != 3) {" ++ scala_endl ++
" println(""Expected arguments: harness.js, test.js.io [and output.io]"")" ++ scala_endl ++
" sys.exit(1)" ++ scala_endl ++
" }" ++ scala_endl ++
" harnessFile = args(0)" ++ scala_endl ++
" dataFile = args(1)" ++ scala_endl ++
" val conf = new SparkConf().setAppName("""++test_name++""")" ++ scala_endl ++
" val sc = new SparkContext(conf)" ++ scala_endl ++
" val json_result = run(sc)" ++ scala_endl ++
" if (args.length == 3) {" ++ scala_endl ++
" val pw = new PrintWriter(new File(args(2)))" ++ scala_endl ++
" pw.write(json_result)" ++ scala_endl ++
" pw.close" ++ scala_endl ++
" }" ++ scala_endl ++
" println(json_result)" ++ scala_endl ++
" if (args.length ==2)" ++ scala_endl ++
" check_result(json_result)" ++ scala_endl ++
"}" ++ scala_endl ++
"}" ++ scala_endl.
End MRSpark.
Definition nnrcmrToSparkTopDataFromFileTop (test_name: string) (init: var) (l:nnrcmr) : string :=
nnrcmrToSparkTopDataFromFile test_name init l eol_newline "'".
End NNRCMRtoSpark.