NNRCMR is a language to describe chains of Map/Reduce, followed by
some local computation over the result.
Summary:
-
Language: NNRCMR (Named Nested Relational Calculus with Map/Reduce)
-
Languages translating to NNRCMR: NNRC
-
Languages translating from NNRCMR: NNRC, DNNRC, CldMR, SparkRDD
Require Import String.
Require Import List.
Require Import EquivDec.
Require Import Utils.
Require Import DataRuntime.
Require Import cNNRCRuntime.
Require Import ForeignReduceOps.
Section NNRCMR.
Local Open Scope list_scope.
Context {
fruntime:
foreign_runtime}.
Context {
fredop:
foreign_reduce_op}.
Abstract Syntax
Named Nested Relational Calculus + Map Reduce
Different kind of map functions:
-
MapDist: A -> B function applied on each element of the input distributed collection.
-
MapDistFlatten: A -> coll B function applied on each element of the input distributed collection, the ressult of the map is flattened.
-
MapScalar: A -> coll B function applied on the scalar
Inductive map_fun :=
|
MapDist :
var *
nnrc ->
map_fun
|
MapDistFlatten :
var *
nnrc ->
map_fun
|
MapScalar :
var *
nnrc ->
map_fun.
Different kind of reduce functions:
-
RedId: return the collection computed flattened by the map
-
RedCollect red: apply the function red of that Coll A -> B
on the collection of all the elements computed by the map
-
RedOp: apply a built-in reduction operator that returns a
scalar
-
RedSingleton: supposes that the input of the reduce is a
list with one element and returns this element.
Inductive reduce_op :=
|
RedOpForeign :
foreign_reduce_op_type ->
reduce_op.
Inductive reduce_fun :=
|
RedId :
reduce_fun
|
RedCollect : (
var *
nnrc) ->
reduce_fun
|
RedOp :
reduce_op ->
reduce_fun
|
RedSingleton :
reduce_fun.
A map-reduce job mr is applied to a collection fetched in the
environment at the location mr.(mr_input) and the result store
in the environment at the location mr.(mr_output).
A function mr.(mr_map) is applied on the input to produce a
collection of type coll B.
Then, the functions mr.(mr_reduce) is applied on all the
elements generated by the map.
Record mr :=
mkMR
{
mr_input:
var;
(* Collection on which the map/reduce is applied. *)
mr_output:
var;
(* Collection where the result of the map/reduce is stored. *)
mr_map:
map_fun;
(* function applied on the input which generates collection *)
mr_reduce:
reduce_fun }.
(* function applied on the collection generated by the map *)
A NNRC expression is a chain of map-reduce, with a last expression describing final/local computation
Record nnrcmr :=
mkMRChain
{
mr_inputs_loc:
vdbindings;
(* Input variables with their localizations *)
mr_chain:
list mr;
(* Map/Reduce chain *)
mr_last: (
list var *
nnrc) *
list (
var *
dlocalization); }.
(* Last expression *)
Sanitization
This contains support functions in order to emit code which
follows lexical constraints imposed by the Spark.
Section sanitize_local.
Context (
sep:
string).
Context (
renamer:
string->
string).
Context (
avoid:
list var).
Definition nnrc_rename_in1
(
oldvar:
var) (
e:
nnrc) : (
var*
nnrc)
:=
let newvar :=
nnrc_pick_name sep renamer avoid oldvar e in
(
newvar,
nnrc_rename_lazy e oldvar newvar).
Definition nnrc_rename_in2
(
oldvar1 oldvar2:
var) (
e:
nnrc) : (
var*
var*
nnrc)
:=
let newvar1 :=
nnrc_pick_name sep renamer (
oldvar2::
avoid)
oldvar1 e in
let newvar2 :=
nnrc_pick_name sep renamer (
newvar1::
avoid)
oldvar2 e in
let e'1 :=
nnrc_rename_lazy e oldvar1 newvar1 in
let e'2 :=
nnrc_rename_lazy e'1
oldvar2 newvar2 in
(
newvar1,
newvar2,
e'2).
Definition rename_map_fun (
m:
map_fun) :
map_fun
:=
match m with
|
MapDist (
v,
e) =>
MapDist (
nnrc_rename_in1 v e)
|
MapDistFlatten (
v,
e) =>
MapDistFlatten (
nnrc_rename_in1 v e)
|
MapScalar (
v,
e) =>
MapScalar (
nnrc_rename_in1 v e)
end.
Definition rename_reduce_fun (
r:
reduce_fun) :
reduce_fun
:=
match r with
|
RedId =>
RedId
|
RedCollect (
v,
e) =>
RedCollect (
nnrc_rename_in1 v e)
|
RedOp op =>
RedOp op
|
RedSingleton =>
RedSingleton
end.
Definition mr_rename_local (
mr0:
mr) :
mr
:=
mkMR mr0.(
mr_input)
mr0.(
mr_output)
(
rename_map_fun mr0.(
mr_map))
(
rename_reduce_fun mr0.(
mr_reduce)).
Definition nnrcmr_rename_local (
mrl:
nnrcmr)
:=
mkMRChain
mrl.(
mr_inputs_loc)
(
map mr_rename_local mrl.(
mr_chain))
mrl.(
mr_last).
End sanitize_local.
Section sanitize_dbs.
Definition nnrcmr_inoutlist (
mrl:
nnrcmr)
:=
insertion_sort StringOrder.lt_dec
(
flat_map
(
fun x =>
x.(
mr_input)::
x.(
mr_output)::
nil)
mrl.(
mr_chain)).
Definition mr_subst_io substlist (
mr0:
mr) :
mr
:=
mkMR (
substlist_subst substlist mr0.(
mr_input))
(
substlist_subst substlist mr0.(
mr_output))
mr0.(
mr_map)
mr0.(
mr_reduce).
Definition nnrcmr_subst_io substlist (
mrl:
nnrcmr) :
nnrcmr
:=
let chain := (
map (
mr_subst_io substlist)
mrl.(
mr_chain))
in
let last :=
let (
params,
n) :=
fst mrl.(
mr_last)
in
let args :=
snd mrl.(
mr_last)
in
let params :=
List.map (
substlist_subst substlist)
params in
let n :=
nnrc_substlist_subst substlist n in
let args :=
List.map
(
fun x_loc => (
substlist_subst substlist (
fst x_loc),
snd x_loc))
args
in
((
params,
n),
args)
in
mkMRChain
mrl.(
mr_inputs_loc)
chain
last.
Context (
sep:
string).
Context (
renamer:
string->
string).
Context (
avoid:
list var).
Fixpoint nnrcmr_mk_rename_list (
leave_alone:
list var) (
names:
list var) (
acc:
list (
var*
var))
:=
match names with
|
nil =>
acc
|
x::
l =>
if (
in_dec string_dec x leave_alone)
then nnrcmr_mk_rename_list leave_alone l ((
x,
x)::
acc)
else
let proposedname :=
renamer x in
let newname :=
fresh_var_from sep proposedname
(
l++(
map snd acc)++
avoid)
in
nnrcmr_mk_rename_list leave_alone l ((
x,
newname)::
acc)
end.
Definition nnrcmr_rename_graph (
mrl:
nnrcmr)
:=
let leave_alone :=
List.map fst (
mrl.(
mr_inputs_loc))
in
let names :=
nnrcmr_inoutlist mrl in
let substlist_rev :=
nnrcmr_mk_rename_list leave_alone names nil in
let substlist :=
rev substlist_rev in
nnrcmr_subst_io substlist mrl.
End sanitize_dbs.
Well-formedness
Definition mr_causally_consistent (
mr1 mr2:
mr) :
bool
:=
mr1.(
mr_input) <>
b mr2.(
mr_output).
Definition mr_chain_causally_consistent (
mr_chain:
list mr) :
bool
:=
forallb_ordpairs_refl mr_causally_consistent mr_chain.
Definition nnrcmr_causally_consistent (
mrl:
nnrcmr) :
bool
:=
mr_chain_causally_consistent mrl.(
mr_chain).
Definition function_with_no_free_vars (
f:
var *
nnrc) :=
(
nnrc_global_vars (
snd f) =
nil) /\
(
forall (
x:
var),
In x (
nnrc_free_vars (
snd f)) ->
x =
fst f).
Definition function2_with_no_free_vars (
f: (
var *
var) *
nnrc) :=
(
nnrc_global_vars (
snd f) =
nil) /\
(
fst (
fst f)) <> (
snd (
fst f)) /\
(
forall x,
In x (
nnrc_free_vars (
snd f)) ->
x = (
fst (
fst f)) \/
x = (
snd (
fst f))).
Definition map_well_formed map :=
match map with
|
MapDist f =>
function_with_no_free_vars f
|
MapDistFlatten f =>
function_with_no_free_vars f
|
MapScalar f =>
function_with_no_free_vars f
end.
Definition reduce_well_formed red :=
match red with
|
RedId =>
True
|
RedCollect f =>
function_with_no_free_vars f
|
RedOp op =>
True
|
RedSingleton =>
True
end.
Definition mr_well_formed mr :=
map_well_formed mr.(
mr_map) /\
reduce_well_formed mr.(
mr_reduce).
Definition mr_chain_well_formed (
l:
list mr) :=
forall mr,
In mr l ->
mr_well_formed mr.
Definition nnrcmr_well_formed (
mrl:
nnrcmr) :=
mr_chain_well_formed mrl.(
mr_chain).
Definition mr_input_localized mr :=
match mr.(
mr_map)
with
|
MapDist _ => (
mr.(
mr_input),
Vdistr)
|
MapDistFlatten _ => (
mr.(
mr_input),
Vdistr)
|
MapScalar _ => (
mr.(
mr_input),
Vlocal)
end.
Definition mr_output_localized mr :=
match mr.(
mr_reduce)
with
|
RedId => (
mr.(
mr_output),
Vdistr)
|
RedCollect _ => (
mr.(
mr_output),
Vlocal)
|
RedOp op => (
mr.(
mr_output),
Vlocal)
|
RedSingleton => (
mr.(
mr_output),
Vlocal)
end.
Definition map_well_localized map (
d:
ddata) :=
match map,
d with
|
MapDist _,
Ddistr _ =>
True
|
MapDistFlatten _,
Ddistr _ =>
True
|
MapScalar _,
Dlocal _ =>
True
|
_,
_ =>
False
end.
Environment support for NNRCMR
Section nnrcmr_env.
This translation supposes that the environment contains at a unit
that is used to trigger the comparisons of NNRC expressions
without free variables.
load_env unit_var vars env takes the
environment env used to evaluate an NNRC expression and return
the environment to use to execute the translation of this
expression as a map-reduce chain. vars is the list of
variables that have to be stored in the map-reduce with
their dlocalization kind.
This function also add to the map-reduce environment and entry
init that contains the unit value.
Definition load_init_env (
initunit:
var) (
env:
dbindings) :
dbindings :=
(
initunit,
Dlocal dunit) ::
env.
Lemma load_env_lookup_initunit initunit env:
lookup equiv_dec (
load_init_env initunit env)
initunit =
Some (
Dlocal dunit).
Proof.
simpl.
match_destr.
congruence.
Qed.
Definition nnrcmr_env :=
list (
var *
ddata).
End nnrcmr_env.
Evaluation Semantics
Section Semantics.
Context (
h:
brand_relation_t).
Definition empty_dcenv :
dbindings :=
nil.
Definition empty_cenv :
bindings :=
nil.
Map
Definition mr_map_eval (
map:
map_fun) (
input_d:
ddata) :
option (
list data) :=
match map with
|
MapDist f =>
let f_map (
d:
data) :
option data :=
let (
doc,
body) :=
f in
nnrc_core_eval h empty_cenv ((
doc,
d)::
nil)
body
in
match input_d with
|
Ddistr coll =>
lift_map f_map coll
|
Dlocal d =>
None
end
|
MapDistFlatten f =>
let f_map (
d:
data) :
option data :=
let (
doc,
body) :=
f in
nnrc_core_eval h empty_cenv ((
doc,
d)::
nil)
body
in
match input_d with
|
Ddistr coll =>
let nested_coll :=
lift_map f_map coll in
olift oflatten nested_coll
|
Dlocal d =>
None
end
|
MapScalar f =>
let f_map (
d:
data) :
option data :=
let (
doc,
body) :=
f in
nnrc_core_eval h empty_cenv ((
doc,
d)::
nil)
body
in
match input_d with
|
Ddistr coll =>
None
|
Dlocal d =>
match f_map d with
|
Some (
dcoll coll) =>
Some coll
|
_ =>
None
end
end
end.
Reduce
Definition reduce_op_eval (
rop:
reduce_op) (
dl:
list data) :
option data
:=
match rop with
|
RedOpForeign op
=>
foreign_reduce_op_interp h op dl
end.
Definition mr_reduce_eval (
reduce:
reduce_fun) (
values_v:
list data) :
option (
ddata) :=
match reduce with
|
RedId =>
Some (
Ddistr values_v)
|
RedCollect f =>
let (
values_arg,
body) :=
f in
let v :=
nnrc_core_eval h empty_cenv ((
values_arg,
dcoll values_v) ::
nil)
body in
lift (
fun res =>
Dlocal res)
v
|
RedOp op =>
lift (
fun res =>
Dlocal res) (
reduce_op_eval op values_v)
|
RedSingleton =>
match values_v with
|
d ::
nil =>
Some (
Dlocal d)
|
_ =>
None
end
end.
Map/Reduce
Definition mr_eval (
mr:
mr) (
d:
ddata) :
option (
ddata) :=
let map_result :=
mr_map_eval mr.(
mr_map)
d in
olift (
mr_reduce_eval mr.(
mr_reduce))
map_result.
Lemma mr_eval_ignores_env map_fun red :
forall (
mr_input1 mr_input2 mr_output1 mr_output2:
var),
forall (
d:
ddata),
mr_eval (
mkMR mr_input1 mr_output1 map_fun red)
d =
mr_eval (
mkMR mr_input2 mr_output2 map_fun red)
d.
Proof.
reflexivity.
Qed.
Map/Reduce Chain
Definition merge_env (
x:
var) (
d:
ddata) (
env:
nnrcmr_env) :
option nnrcmr_env :=
match d with
|
Ddistr coll =>
match lookup equiv_dec env x with
|
None =>
Some ((
x,
Ddistr coll)::
env)
|
Some (
Ddistr coll') =>
Some ((
x,
Ddistr (
coll ++
coll') )::
env)
|
Some _ =>
None
end
|
Dlocal v =>
match lookup equiv_dec env x with
|
None =>
Some ((
x,
Dlocal v)::
env)
|
Some d' =>
None
end
end.
Definition mr_chain_eval (
env:
nnrcmr_env) (
l:
list mr) :
nnrcmr_env *
option ddata :=
List.fold_left
(
fun acc mr =>
match acc with
| (
env',
None) => (
env',
None)
| (
env',
Some _) =>
let oinput_d :=
lookup equiv_dec env'
mr.(
mr_input)
in
let ores :=
olift (
mr_eval mr)
oinput_d in
match ores,
olift (
fun res =>
merge_env mr.(
mr_output)
res env')
ores with
|
Some res,
Some env'' => (
env'',
Some res)
|
_,
_ => (
env',
None)
end
end)
l (
env,
Some (
Ddistr nil)).
Fixpoint build_nnrc_env' (
mr_env:
nnrcmr_env) (
params:
list var) (
args:
list (
var *
dlocalization)) :=
match (
params,
args)
with
| (
nil,
nil) =>
Some nil
| (
x1::
params, (
x2,
loc)::
args) =>
match lookup equiv_dec mr_env x2 with
|
Some loc_d =>
lift (
fun nnrc_env => (
x1,
unlocalize_data loc_d) ::
nnrc_env) (
build_nnrc_env'
mr_env params args)
|
None =>
None
end
|
_=>
None
end.
Definition nnrc_env_build (
form:
list var) (
eff:
option (
list data)):
option bindings :=
olift (
zip form)
eff.
Definition effective_params_from_bindings (
args:
list (
var *
dlocalization)) (
mr_env:
nnrcmr_env) :
option (
list data) :=
lift_map
(
fun (
v :
var) =>
lift unlocalize_data (
lookup equiv_dec mr_env v))
(
map fst args).
Definition build_nnrc_env (
mr_env:
nnrcmr_env) (
params:
list var) (
args:
list (
var *
dlocalization)) :=
let eff_params :=
effective_params_from_bindings args mr_env in
nnrc_env_build params eff_params.
Definition mr_last_eval (
mr_env:
nnrcmr_env) (
mr_last: (
list var *
nnrc) *
list (
var *
dlocalization)) :=
let (
params,
n) :=
fst mr_last in
let args :=
snd mr_last in
let nnrc_env :=
build_nnrc_env mr_env params args in
olift (
fun env =>
nnrc_core_eval h empty_cenv env n)
nnrc_env.
Definition nnrcmr_eval (
env:
nnrcmr_env) (
mrl:
nnrcmr) :
option data :=
match mr_chain_eval env mrl.(
mr_chain)
with
| (
mr_env,
Some _) =>
mr_last_eval mr_env mrl.(
mr_last)
| (
_,
None) =>
None
end.
Lemma mr_chain_eval_split (
env:
nnrcmr_env) (
l1:
list mr) (
r:
mr) (
l2:
list mr):
forall env1 loc_d1,
mr_chain_eval env l1 = (
env1,
Some loc_d1) ->
mr_chain_eval env (
l1 ++ (
r ::
l2)) =
mr_chain_eval env1 (
r ::
l2).
Proof.
Lemma mr_chain_eval_progress (
env:
nnrcmr_env) (
l1:
list mr) (
l2:
list mr):
forall env'
loc_d,
mr_chain_eval env (
l1 ++
l2) = (
env',
Some loc_d) ->
exists env1 loc_d1,
mr_chain_eval env l1 = (
env1,
Some loc_d1).
Proof.
End Semantics.
A 'lazy' semantics
In the alternative 'lazy' semantics, evaluation of a given
map/reduce isn't triggered unless there is at least one value in its
input collection.
This alternative semantics provides a model which is closer to
the run-time behavior of some targets, e.g., for Cloudant.
Section SemanticsLazy.
Context (
h:
brand_relation_t).
Definition mr_lazy_eval (
mr:
mr) (
d:
ddata) :
option (
ddata) :=
match d with
|
Ddistr nil =>
None
|
_ =>
match mr_map_eval h mr.(
mr_map)
d with
|
Some nil =>
None
|
map_result =>
olift (
mr_reduce_eval h mr.(
mr_reduce))
map_result
end
end.
Definition mr_chain_lazy_eval (
env:
nnrcmr_env) (
l:
list mr) :
nnrcmr_env *
option ddata :=
List.fold_left
(
fun (
acc:
nnrcmr_env *
option ddata)
mr =>
match acc with
| (
env',
None) => (
env',
None)
| (
env',
Some _) =>
let oinput_d :=
lookup equiv_dec env'
mr.(
mr_input)
in
let ores :=
olift (
mr_lazy_eval mr)
oinput_d in
match ores,
olift (
fun res =>
merge_env mr.(
mr_output)
res env')
ores with
|
Some res,
Some env'' => (
env'',
Some res)
|
_,
_ => (
env',
None)
end
end)
l (
env,
Some (
Ddistr nil)).
Definition nnrcmr_lazy_eval (
env:
nnrcmr_env) (
mrl:
nnrcmr) :
option data :=
match mr_chain_lazy_eval env mrl.(
mr_chain)
with
| (
mr_env,
Some _) =>
mr_last_eval h mr_env mrl.(
mr_last)
| (
_,
None) =>
None
end.
Lemma mr_chain_lazy_eval_split (
env:
nnrcmr_env) (
l1:
list mr) (
r:
mr) (
l2:
list mr):
forall env1 loc_d1,
mr_chain_lazy_eval env l1 = (
env1,
Some loc_d1) ->
mr_chain_lazy_eval env (
l1 ++ (
r ::
l2)) =
mr_chain_lazy_eval env1 (
r ::
l2).
Proof.
Lemma mr_chain_lazy_eval_progress (
env:
nnrcmr_env) (
l1:
list mr) (
l2:
list mr):
forall env'
loc_d,
mr_chain_lazy_eval env (
l1 ++
l2) = (
env',
Some loc_d) ->
exists env1 loc_d1,
mr_chain_lazy_eval env l1 = (
env1,
Some loc_d1).
Proof.
Lemma mr_chain_lazy_eval_correct (
env:
nnrcmr_env) (
l:
list mr):
forall env'
res,
mr_chain_lazy_eval env l = (
env',
Some res) ->
mr_chain_eval h env l = (
env',
Some res).
Proof.
End SemanticsLazy.
Section ReduceEmpty.
Context (
h:
brand_relation_t).
Definition mr_reduce_empty (
mr:
mr):
option nnrc :=
match mr.(
mr_reduce)
with
|
RedId =>
Some (
NNRCConst (
dcoll nil))
|
RedCollect (
x,
n) =>
Some (
nnrc_subst n x (
NNRCConst (
dcoll nil)))
|
RedOp op =>
lift (
fun d =>
NNRCConst d) (
reduce_op_eval h op nil)
|
RedSingleton =>
match mr.(
mr_map)
with
|
MapScalar (
x,
n) =>
Some (
nnrc_subst n x (
NNRCConst (
dcoll nil)))
|
_ =>
None
end
end.
Definition get_mr_chain_result (
res:
nnrcmr_env *
option ddata) :
option data :=
match res with
| (
_,
Some ld) =>
Some (
unlocalize_data ld)
| (
_,
None) =>
None
end.
Lemma mr_reduce_empty_correct:
forall (
l:
list mr) (
mr:
mr) (
env:
nnrcmr_env),
forall res,
get_mr_chain_result (
mr_chain_eval h env (
l ++
mr::
nil)) =
Some (
normalize_data h res) ->
(
exists pre_res,
snd (
mr_chain_lazy_eval h env l) =
Some pre_res) ->
snd (
mr_chain_lazy_eval h env (
l ++
mr::
nil)) =
None ->
olift (
fun n =>
nnrc_core_eval h empty_cenv nil n) (
mr_reduce_empty mr) =
Some (
normalize_data h res).
Proof.
End ReduceEmpty.
NNRCMR Library
The following are built-in Map/Reduces which can be useful for
translation, optimization, etc.
Section mr_library.
The function (fun d => d)
Definition id_function :=
let d := "
x"%
string in
(
d,
NNRCVar d).
Lemma id_function_no_free_vars:
function_with_no_free_vars id_function.
Proof.
split; [
reflexivity| ].
intros x.
unfold id_function.
simpl.
intros H;
destruct H.
-
congruence.
-
contradiction.
Qed.
The function (fun d => coll d)
Definition coll_function :=
let d := "
x"%
string in
(
d,
NNRCUnop OpBag (
NNRCVar d)).
Lemma coll_function_no_free_vars:
function_with_no_free_vars coll_function.
Proof.
split; [
reflexivity| ].
intros x.
unfold coll_function.
simpl.
intros H;
destruct H.
-
congruence.
-
contradiction.
Qed.
The function (fun v => d)
Definition constant_function (
d:
data) :=
let v := "
x"%
string in
(
v,
NNRCConst d).
Lemma constant_function_no_free_vars (
d:
data):
function_with_no_free_vars (
constant_function d).
Proof.
Map-Reduce job that transform a scalar collection into a distribued one
Definition mr_scalar_to_distributed (
input:
var) (
output:
var) :=
mkMR
input
output
(
MapScalar id_function)
RedId.
Map-Reduce job that flatten its input into its output
Definition mr_flatten_scalar (
input:
var) (
output:
var) :=
mkMR
input
output
(
MapDistFlatten id_function)
RedId.
Lemma mr_flatten_scalar_wf (
init:
var) (
output:
var):
mr_well_formed (
mr_flatten_scalar init output).
Proof.
Lemma mr_scalar_to_distributed_wf (
init:
var) (
output:
var):
mr_well_formed (
mr_scalar_to_distributed init output).
Proof.
End mr_library.
Section Top.
Context (
h:
brand_relation_t).
Definition nnrcmr_eval_top (
init_vinit:
var) (
q:
nnrcmr) (
cenv:
dbindings) :
option data :=
nnrcmr_eval h (
load_init_env init_vinit cenv)
q.
End Top.
End NNRCMR.