Qcert.NNRCMR.Lang.NNRCMR
Section NNRCMR.
Context {fruntime:foreign_runtime}.
Context {fredop:foreign_reduce_op}.
Definition var := string.
Definition nrcmr_env := list (var × ddata).
Named Nested Relational Calculus + Map Reduce
Inductive map_fun :=
| MapDist : var × nnrc → map_fun
| MapDistFlatten : var × nnrc → map_fun
| MapScalar : var × nnrc → map_fun.
Inductive reduce_op :=
| RedOpForeign : foreign_reduce_op_type → reduce_op.
Inductive reduce_fun :=
| RedId : reduce_fun
| RedCollect : (var × nnrc) → reduce_fun
| RedOp : reduce_op → reduce_fun
| RedSingleton : reduce_fun.
Record mr := mkMR
{ mr_input: var;
mr_output: var;
mr_map: map_fun;
mr_reduce: reduce_fun }.
Record nnrcmr := mkMRChain
{ mr_inputs_loc: vdbindings;
mr_chain: list mr;
mr_last: (list var × nnrc) × list (var × dlocalization); }.
Section sanitize_local.
Context (sep:string).
Context (renamer:string→string).
Context (avoid:list var).
Definition nnrc_rename_in1
(oldvar:var) (e:nnrc) : (var×nnrc)
:= let newvar := nnrc_pick_name sep renamer avoid oldvar e in
(newvar, nnrc_rename_lazy e oldvar newvar).
Definition nnrc_rename_in2
(oldvar1 oldvar2:var) (e:nnrc) : (var×var×nnrc)
:= let newvar1 := nnrc_pick_name sep renamer (oldvar2::avoid) oldvar1 e in
let newvar2 := nnrc_pick_name sep renamer (newvar1::avoid) oldvar2 e in
let e'1 := nnrc_rename_lazy e oldvar1 newvar1 in
let e'2 := nnrc_rename_lazy e'1 oldvar2 newvar2 in
(newvar1, newvar2, e'2).
Definition rename_map_fun (m:map_fun) : map_fun
:= match m with
| MapDist (v, e) ⇒ MapDist (nnrc_rename_in1 v e)
| MapDistFlatten (v,e) ⇒ MapDistFlatten (nnrc_rename_in1 v e)
| MapScalar (v,e) ⇒ MapScalar (nnrc_rename_in1 v e)
end.
Definition rename_reduce_fun (r:reduce_fun) : reduce_fun
:= match r with
| RedId ⇒ RedId
| RedCollect (v,e) ⇒ RedCollect (nnrc_rename_in1 v e)
| RedOp op ⇒ RedOp op
| RedSingleton ⇒ RedSingleton
end.
Definition mr_rename_local (mr0:mr) : mr
:= mkMR mr0.(mr_input) mr0.(mr_output)
(rename_map_fun mr0.(mr_map))
(rename_reduce_fun mr0.(mr_reduce)).
Definition nnrcmr_rename_local (mrl:nnrcmr)
:= mkMRChain
mrl.(mr_inputs_loc)
(map mr_rename_local mrl.(mr_chain))
mrl.(mr_last).
End sanitize_local.
Section sanitize_dbs.
Definition nnrcmr_inoutlist (mrl:nnrcmr)
:= insertion_sort StringOrder.lt_dec
(flat_map
(fun x ⇒ x.(mr_input)::x.(mr_output)::nil)
mrl.(mr_chain)).
Definition mr_subst_io substlist (mr0:mr) : mr
:= mkMR (substlist_subst substlist mr0.(mr_input))
(substlist_subst substlist mr0.(mr_output))
mr0.(mr_map) mr0.(mr_reduce).
Definition nnrcmr_subst_io substlist (mrl:nnrcmr) : nnrcmr
:= let chain := (map (mr_subst_io substlist) mrl.(mr_chain)) in
let last :=
let (params, n) := fst mrl.(mr_last) in
let args := snd mrl.(mr_last) in
let params := List.map (substlist_subst substlist) params in
let n := nnrc_substlist_subst substlist n in
let args :=
List.map
(fun x_loc ⇒ (substlist_subst substlist (fst x_loc), snd x_loc))
args
in
((params, n), args)
in
mkMRChain
mrl.(mr_inputs_loc)
chain
last.
Context (sep:string).
Context (renamer:string→string).
Context (avoid:list var).
Fixpoint nnrcmr_mk_rename_list (leave_alone:list var) (names:list var) (acc:list (var×var))
:= match names with
| nil ⇒ acc
| x::l ⇒ if (in_dec string_dec x leave_alone)
then nnrcmr_mk_rename_list leave_alone l ((x,x)::acc)
else
let proposedname := renamer x in
let newname :=
fresh_var_from sep proposedname
(l++(map snd acc)++avoid) in
nnrcmr_mk_rename_list leave_alone l ((x,newname)::acc)
end.
Definition nnrcmr_rename_graph (mrl:nnrcmr)
:= let leave_alone := List.map fst (mrl.(mr_inputs_loc)) in
let names := nnrcmr_inoutlist mrl in
let substlist_rev := nnrcmr_mk_rename_list leave_alone names nil in
let substlist := rev substlist_rev in
nnrcmr_subst_io substlist mrl.
End sanitize_dbs.
Definition mr_causally_consistent (mr1 mr2:mr) : bool
:= mr1.(mr_input) ≠b mr2.(mr_output).
Definition mr_chain_causally_consistent (mr_chain:list mr) : bool
:= forallb_ordpairs_refl mr_causally_consistent mr_chain.
Definition nnrcmr_causally_consistent (mrl:nnrcmr) : bool
:= mr_chain_causally_consistent mrl.(mr_chain).
Definition function_with_no_free_vars (f: var × nnrc) :=
(∀ (x: var), In x (nnrc_free_vars (snd f)) → x = fst f).
Definition function2_with_no_free_vars (f: (var × var) × nnrc) :=
(fst (fst f)) ≠ (snd (fst f)) ∧
(∀ x, In x (nnrc_free_vars (snd f)) → x = (fst (fst f)) ∨ x = (snd (fst f))).
Definition map_well_formed map :=
match map with
| MapDist f ⇒ function_with_no_free_vars f
| MapDistFlatten f ⇒ function_with_no_free_vars f
| MapScalar f ⇒ function_with_no_free_vars f
end.
Definition reduce_well_formed red :=
match red with
| RedId ⇒ True
| RedCollect f ⇒ function_with_no_free_vars f
| RedOp op ⇒ True
| RedSingleton ⇒ True
end.
Definition mr_well_formed mr :=
map_well_formed mr.(mr_map) ∧ reduce_well_formed mr.(mr_reduce).
Definition mr_chain_well_formed (l: list mr) :=
∀ mr, In mr l → mr_well_formed mr.
Definition nnrcmr_well_formed (mrl: nnrcmr) :=
mr_chain_well_formed mrl.(mr_chain).
Definition mr_input_localized mr :=
match mr.(mr_map) with
| MapDist _ ⇒ (mr.(mr_input), Vdistr)
| MapDistFlatten _ ⇒ (mr.(mr_input), Vdistr)
| MapScalar _ ⇒ (mr.(mr_input), Vlocal)
end.
Definition mr_output_localized mr :=
match mr.(mr_reduce) with
| RedId ⇒ (mr.(mr_output), Vdistr)
| RedCollect _ ⇒ (mr.(mr_output), Vlocal)
| RedOp op ⇒ (mr.(mr_output), Vlocal)
| RedSingleton ⇒ (mr.(mr_output), Vlocal)
end.
Definition map_well_localized map d :=
match map, d with
| MapDist _, Ddistr _ ⇒ True
| MapDistFlatten _, Ddistr _ ⇒ True
| MapScalar _, Dlocal _ ⇒ True
| _, _ ⇒ False
end.
Section Semantics.
Context (h:brand_relation_t).
Definition mr_map_eval (map: map_fun) (input_d: ddata) : option (list data) :=
match map with
| MapDist f ⇒
let f_map (d:data) : option data :=
let (doc, body) := f in
nnrc_core_eval h ((doc,d)::nil) body
in
match input_d with
| Ddistr coll ⇒
rmap f_map coll
| Dlocal d ⇒ None
end
| MapDistFlatten f ⇒
let f_map (d:data) : option data :=
let (doc, body) := f in
nnrc_core_eval h ((doc,d)::nil) body
in
match input_d with
| Ddistr coll ⇒
let nested_coll := rmap f_map coll in
olift rflatten nested_coll
| Dlocal d ⇒ None
end
| MapScalar f ⇒
let f_map (d:data) : option data :=
let (doc, body) := f in
nnrc_core_eval h ((doc,d)::nil) body
in
match input_d with
| Ddistr coll ⇒ None
| Dlocal d ⇒
match f_map d with
| Some (dcoll coll) ⇒ Some coll
| _ ⇒ None
end
end
end.
Definition reduce_op_eval (rop:reduce_op) (dl:list data) : option data
:= match rop with
| RedOpForeign op
⇒ foreign_reduce_op_interp h op dl
end.
Definition mr_reduce_eval (reduce: reduce_fun) (values_v: list data) : option (ddata) :=
match reduce with
| RedId ⇒ Some (Ddistr values_v)
| RedCollect f ⇒
let (values_arg, body) := f in
let v := nnrc_core_eval h ((values_arg, dcoll values_v) :: nil) body in
lift (fun res ⇒ Dlocal res) v
| RedOp op ⇒
lift (fun res ⇒ Dlocal res) (reduce_op_eval op values_v)
| RedSingleton ⇒
match values_v with
| d :: nil ⇒ Some (Dlocal d)
| _ ⇒ None
end
end.
Definition mr_eval (mr:mr) (d: ddata) : option (ddata) :=
let map_result := mr_map_eval mr.(mr_map) d in
olift (mr_reduce_eval mr.(mr_reduce)) map_result.
Lemma mr_eval_ignores_env map_fun red :
∀ (mr_input1 mr_input2 mr_output1 mr_output2:var),
∀ (d:ddata),
mr_eval (mkMR mr_input1 mr_output1 map_fun red) d =
mr_eval (mkMR mr_input2 mr_output2 map_fun red) d.
Definition merge_env (x: var) (d: ddata) (env: nrcmr_env) : option nrcmr_env :=
match d with
| Ddistr coll ⇒
match lookup equiv_dec env x with
| None ⇒ Some ((x, Ddistr coll)::env)
| Some (Ddistr coll') ⇒ Some ((x, Ddistr (coll ++ coll') )::env)
| Some _ ⇒ None
end
| Dlocal v ⇒
match lookup equiv_dec env x with
| None ⇒ Some ((x, Dlocal v)::env)
| Some d' ⇒ None
end
end.
Definition mr_chain_eval (env:nrcmr_env) (l:list mr) : nrcmr_env × option ddata :=
List.fold_left
(fun acc mr ⇒
match acc with
| (env', None) ⇒ (env', None)
| (env', Some _) ⇒
let oinput_d := lookup equiv_dec env' mr.(mr_input) in
let ores := olift (mr_eval mr) oinput_d in
match ores, olift (fun res ⇒ merge_env mr.(mr_output) res env') ores with
| Some res, Some env'' ⇒ (env'', Some res)
| _, _ ⇒ (env', None)
end
end)
l (env, Some (Ddistr nil)).
Fixpoint build_nnrc_env' (mr_env:nrcmr_env) (params: list var) (args: list (var × dlocalization)) :=
match (params, args) with
| (nil, nil) ⇒ Some nil
| (x1::params, (x2, loc)::args) ⇒
match lookup equiv_dec mr_env x2 with
| Some loc_d ⇒
lift (fun nnrc_env ⇒ (x1, unlocalize_data loc_d) :: nnrc_env) (build_nnrc_env' mr_env params args)
| None ⇒ None
end
| _⇒ None
end.
Definition nnrc_env_build (form:list var) (eff: option (list data)): option bindings :=
olift (zip form) eff.
Definition effective_params_from_bindings (args:list (var × dlocalization)) (mr_env:nrcmr_env) : option (list data) :=
lift_map
(fun (v : var) ⇒
lift unlocalize_data (lookup equiv_dec mr_env v))
(map fst args).
Definition build_nnrc_env (mr_env:nrcmr_env) (params: list var) (args: list (var × dlocalization)) :=
let eff_params := effective_params_from_bindings args mr_env in
nnrc_env_build params eff_params.
Definition mr_last_eval (mr_env:nrcmr_env) (mr_last: (list var × nnrc) × list (var × dlocalization)) :=
let (params, n) := fst mr_last in
let args := snd mr_last in
let nnrc_env := build_nnrc_env mr_env params args in
olift (fun env ⇒ nnrc_core_eval h env n) nnrc_env.
Definition nnrcmr_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
match mr_chain_eval env mrl.(mr_chain) with
| (mr_env, Some _) ⇒
mr_last_eval mr_env mrl.(mr_last)
| (_, None) ⇒ None
end.
Lemma mr_chain_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
∀ env1 loc_d1,
mr_chain_eval env l1 = (env1, Some loc_d1) →
mr_chain_eval env (l1 ++ (r :: l2)) = mr_chain_eval env1 (r :: l2).
Lemma mr_chain_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
∀ env' loc_d,
mr_chain_eval env (l1 ++ l2) = (env', Some loc_d) →
∃ env1 loc_d1,
mr_chain_eval env l1 = (env1, Some loc_d1).
End Semantics.
Section SemanticsLazy.
Context (h:brand_relation_t).
Definition mr_lazy_eval (mr:mr) (d: ddata) : option (ddata) :=
match d with
| Ddistr nil ⇒ None
| _ ⇒
match mr_map_eval h mr.(mr_map) d with
| Some nil ⇒ None
| map_result ⇒
olift (mr_reduce_eval h mr.(mr_reduce)) map_result
end
end.
Definition mr_chain_lazy_eval (env:nrcmr_env) (l:list mr) : nrcmr_env × option ddata :=
List.fold_left
(fun (acc: nrcmr_env × option ddata) mr ⇒
match acc with
| (env', None) ⇒ (env', None)
| (env', Some _) ⇒
let oinput_d := lookup equiv_dec env' mr.(mr_input) in
let ores := olift (mr_lazy_eval mr) oinput_d in
match ores, olift (fun res ⇒ merge_env mr.(mr_output) res env') ores with
| Some res, Some env'' ⇒ (env'', Some res)
| _, _ ⇒ (env', None)
end
end)
l (env, Some (Ddistr nil)).
Definition nnrcmr_lazy_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
match mr_chain_lazy_eval env mrl.(mr_chain) with
| (mr_env, Some _) ⇒
mr_last_eval h mr_env mrl.(mr_last)
| (_, None) ⇒ None
end.
Lemma mr_chain_lazy_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
∀ env1 loc_d1,
mr_chain_lazy_eval env l1 = (env1, Some loc_d1) →
mr_chain_lazy_eval env (l1 ++ (r :: l2)) = mr_chain_lazy_eval env1 (r :: l2).
Lemma mr_chain_lazy_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
∀ env' loc_d,
mr_chain_lazy_eval env (l1 ++ l2) = (env', Some loc_d) →
∃ env1 loc_d1,
mr_chain_lazy_eval env l1 = (env1, Some loc_d1).
Lemma mr_chain_lazy_eval_correct (env:nrcmr_env) (l:list mr):
∀ env' res,
mr_chain_lazy_eval env l = (env', Some res) →
mr_chain_eval h env l = (env', Some res).
End SemanticsLazy.
Section ReduceEmpty.
Context (h:brand_relation_t).
Definition mr_reduce_empty (mr: mr): option nnrc :=
match mr.(mr_reduce) with
| RedId ⇒
Some (NNRCConst (dcoll nil))
| RedCollect (x, n) ⇒
Some (nnrc_subst n x (NNRCConst (dcoll nil)))
| RedOp op ⇒
lift (fun d ⇒ NNRCConst d) (reduce_op_eval h op nil)
| RedSingleton ⇒
match mr.(mr_map) with
| MapScalar (x, n) ⇒
Some (nnrc_subst n x (NNRCConst (dcoll nil)))
| _ ⇒ None
end
end.
Definition get_mr_chain_result (res: nrcmr_env × option ddata) : option data :=
match res with
| (_, Some ld) ⇒ Some (unlocalize_data ld)
| (_, None) ⇒ None
end.
Lemma mr_reduce_empty_correct:
∀ (l:list mr) (mr:mr) (env:nrcmr_env),
∀ res,
get_mr_chain_result (mr_chain_eval h env (l ++ mr::nil)) = Some (normalize_data h res) →
(∃ pre_res, snd (mr_chain_lazy_eval h env l) = Some pre_res) →
snd (mr_chain_lazy_eval h env (l ++ mr::nil)) = None →
olift (fun n ⇒ nnrc_core_eval h nil n) (mr_reduce_empty mr) = Some (normalize_data h res).
End ReduceEmpty.
Section mr_library.
The function (fun d => d)
Definition id_function :=
let d := "x"%string in
(d, NNRCVar d).
Lemma id_function_no_free_vars: function_with_no_free_vars id_function.
let d := "x"%string in
(d, NNRCVar d).
Lemma id_function_no_free_vars: function_with_no_free_vars id_function.
The function (fun d => coll d)
Definition coll_function :=
let d := "x"%string in
(d, NNRCUnop AColl (NNRCVar d)).
Lemma coll_function_no_free_vars: function_with_no_free_vars coll_function.
let d := "x"%string in
(d, NNRCUnop AColl (NNRCVar d)).
Lemma coll_function_no_free_vars: function_with_no_free_vars coll_function.
The function (fun v => d)
Definition constant_function (d:data) :=
let v := "x"%string in
(v, NNRCConst d).
Lemma constant_function_no_free_vars (d:data): function_with_no_free_vars (constant_function d).
let v := "x"%string in
(v, NNRCConst d).
Lemma constant_function_no_free_vars (d:data): function_with_no_free_vars (constant_function d).
Map-Reduce job that transform a scalar collection into a distribued one
Definition mr_scalar_to_distributed (input: var) (output: var) :=
mkMR
input
output
(MapScalar id_function)
RedId.
mkMR
input
output
(MapScalar id_function)
RedId.
Map-Reduce job that flatten its input into its output
Definition mr_flatten_scalar (input: var) (output: var) :=
mkMR
input
output
(MapDistFlatten id_function)
RedId.
Lemma mr_flatten_scalar_wf (init: var) (output: var):
mr_well_formed (mr_flatten_scalar init output).
Lemma mr_scalar_to_distributed_wf (init: var) (output: var):
mr_well_formed (mr_scalar_to_distributed init output).
End mr_library.
End NNRCMR.
mkMR
input
output
(MapDistFlatten id_function)
RedId.
Lemma mr_flatten_scalar_wf (init: var) (output: var):
mr_well_formed (mr_flatten_scalar init output).
Lemma mr_scalar_to_distributed_wf (init: var) (output: var):
mr_well_formed (mr_scalar_to_distributed init output).
End mr_library.
End NNRCMR.