Qcert.Translation.NNRCMRtoCldMR
Section NNRCMRToCldMR.
Context {fruntime:foreign_runtime}.
Context {fredop:foreign_reduce_op}.
Context {fcloudant:foreign_cloudant}.
Context {ftocloudant:foreign_to_cloudant}.
Context (h:list(string×string)).
Definition MRtoMapCld (mrmap: NNRCMR.map_fun) (collectflag:bool) (index:nat) : cld_map :=
let emit :=
if collectflag then CldEmitCollect O else CldEmitDist
in
match mrmap with
| MapDist (x, n) ⇒ mkMapCld (CldMapId (x, n)) emit
| MapDistFlatten (x, n) ⇒ mkMapCld (CldMapFlatten (x, n)) emit
| MapScalar (x, n) ⇒ mkMapCld (CldMapFlatten (x,n)) emit
end.
Definition pushReduce (avoiddb: list var) (r:(var×nnrc)) : cld_reduce × (var×cld_map) :=
let fresh_outputdb := fresh_var "output_" avoiddb in
let pushed_reduce := collectReduce (Some fresh_outputdb) in
let v_red := fst r in
let f_red := snd r in
let pushed_map := MRtoMapCld (MapScalar (v_red,NNRCUnop AColl f_red)) true 0 in
(pushed_reduce, (fresh_outputdb, pushed_map)).
Definition pushMR (dbinput dboutput:var) (pushed_map: cld_map) (mrempty:option nnrc) :=
mkMRCld dbinput pushed_map (Some (idReduce (Some dboutput))) mrempty.
Definition pushMRNoRed (dbinput:var) (pushed_map: cld_map) (mrempty:option nnrc) :=
mkMRCld dbinput pushed_map None mrempty.
Definition minMap :=
let x := "x"%string in
let map_fun := (x, NNRCUnop (ADot "min") (NNRCVar x)) in
mkMapCld (CldMapId map_fun) (CldEmitCollect O).
Definition minMR (stats_v out_v: var) :=
mkMRCld stats_v minMap (Some (idReduce (Some out_v))).
Definition minMRNoRed (stats_v: var) :=
mkMRCld stats_v minMap None.
Definition maxMap :=
let x := "x"%string in
let map_fun := (x, NNRCUnop (ADot "max") (NNRCVar x)) in
mkMapCld (CldMapId map_fun) (CldEmitCollect O).
Definition maxMR (stats_v out_v: var) :=
mkMRCld stats_v maxMap (Some (idReduce (Some out_v))).
Definition maxMRNoRed (stats_v: var) :=
mkMRCld stats_v maxMap None.
Definition MRtoReduceCld (v_key:var) (out_v:var) (avoiddb: list var) (mrr: NNRCMR.reduce_fun) (mrempty:option nnrc) :
bool × cld_reduce × (option cld_mr) × (list var) :=
match mrr with
| RedId ⇒ (false, idReduce (Some out_v), None, avoiddb)
| RedCollect redf ⇒
let '(pushed_red, (newdbout,pushed_map)) := pushReduce avoiddb redf in
let pushed_mr := pushMR newdbout out_v pushed_map mrempty in
(true, pushed_red, Some pushed_mr, newdbout::avoiddb)
| RedOp op ⇒
match op with
| RedOpForeign frop ⇒
(true, opReduce (foreign_to_cloudant_reduce_op frop) (Some out_v), None, avoiddb)
end
| RedSingleton ⇒ (true, idReduce (Some out_v), None, avoiddb)
end.
Definition MRtoMRCld (avoiddb: list var) (mr: mr) : (list cld_mr) × (list var) :=
let cld_input := mr_input mr in
let cld_output := mr_output mr in
let mrmap := mr_map mr in
let mrred := mr_reduce mr in
let mrempty := mr_reduce_empty h mr in
let v_key :=
match mrmap with
| MapDist (x, _) ⇒ x
| MapDistFlatten (x, _) ⇒ x
| MapScalar (x, _) ⇒ x
end
in
let '(collectflag, first_reduce,pushedmr,avoiddb') := MRtoReduceCld v_key cld_output avoiddb mrred mrempty in
let cld_map := MRtoMapCld mrmap collectflag 0 in
match pushedmr with
| None ⇒ (mkMRCld cld_input cld_map (Some first_reduce) mrempty :: nil, avoiddb')
| Some nextmr ⇒ ((mkMRCld cld_input cld_map (Some first_reduce) (Some (NNRCConst (dcoll nil)))) :: nextmr :: nil, avoiddb')
end.
Definition cld_data_of_localized_data (locd:ddata) : list data :=
match locd with
| Dlocal d ⇒ (d::nil)
| Ddistr dl ⇒ dl
end.
Lemma rmap_with_key (prefix:list nat) (i:nat) (v:var) (n:nnrc) (coll: list data) :
(rmap
(fun d : data × data ⇒
let (k, v0) := d in
match nnrc_core_eval h ((v, v0) :: nil) n with
| Some res ⇒ Some (k, res)
| None ⇒ None
end) (init_keys_aux prefix i coll)) =
lift (init_keys_aux prefix i) (rmap (fun d : data ⇒ nnrc_core_eval h ((v, d) :: nil) n) coll).
Lemma rmap_eval_through_init_keys (l:list data) (n:nnrc) (v:var) :
(rmap (fun d : data × data ⇒
let (k, v0) := d in
match nnrc_core_eval h ((v, v0) :: nil) n with
| Some res0 ⇒ Some (k, res0)
| None ⇒ None
end) (init_keys l))
= lift init_keys (rmap (fun d : data ⇒ nnrc_core_eval h ((v, d) :: nil) n) l).
Lemma MRtoMRCld_preserves_causally_consistent (init_unit:var) (mr:mr) :
mr.(mr_input) ≠ mr.(mr_output) →
mr.(mr_output) ≠ init_unit →
init_unit ≠ mr.(mr_input) →
cld_mr_chain_causally_consistent (fst (MRtoMRCld (mr.(mr_input) :: mr.(mr_output) :: init_unit :: nil) mr)) = true.
TODO: currently false
Fixpoint mr_chain_to_cld_mr_chain (avoiddb: list var) (l: list mr) : list cld_mr :=
match l with
| nil ⇒ nil
| mr :: l' ⇒
let (cld_mr,avoiddb') := (MRtoMRCld avoiddb mr) in
cld_mr ++ (mr_chain_to_cld_mr_chain avoiddb' l')
end.
Definition mr_last_to_cld_mr_last
(mr_last_closure:(list var × nnrc) × list (var × dlocalization))
: (list var × nnrc) × list var :=
let (fvs, mr_last) := fst mr_last_closure in
let vars_loc := snd mr_last_closure in
let cld_mr_last :=
List.fold_right
(fun fv k ⇒
match lookup equiv_dec vars_loc fv with
| None ⇒ k
| Some Vdistr ⇒
NNRCLet fv (NNRCVar fv) k
| Some Vlocal ⇒
NNRCLet fv
(NNRCEither (NNRCUnop ASingleton (NNRCVar fv))
fv (NNRCVar fv)
fv (NNRCConst dunit))
k
end)
mr_last fvs
in
((fvs, cld_mr_last), map fst vars_loc).
Definition NNRCMRtoNNRCMRCloudant (avoiddb: list var) (mrl: nnrcmr) : cld_mrl :=
mkMRCldChain
(mr_chain_to_cld_mr_chain avoiddb mrl.(mr_chain))
(mr_last_to_cld_mr_last mrl.(mr_last)).
Lemma MRtoMRCld_causally_consistent
avoiddb (mr:mr) :
In mr.(mr_input) avoiddb →
mr_causally_consistent mr mr = true →
∀ x,
forallb (fun y ⇒ cld_mr_input y ≠b mr.(mr_output)) x = true →
cld_mr_chain_causally_consistent x = true →
incl (mr.(mr_output) :: map cld_mr_input x) avoiddb →
cld_mr_chain_causally_consistent (x ++ fst (MRtoMRCld avoiddb mr)) = true.
Lemma MRtoMRCid_avoids avoiddb a :
let '(l0, l1) := MRtoMRCld avoiddb a in
incl (mr_input a :: nil) avoiddb →
incl (avoiddb ++ map cld_mr_input l0) l1.
Lemma MRtoMRCid_input_avoids avoiddb a :
let '(l0, l1) := MRtoMRCld avoiddb a in
Forall (fun x ⇒ cld_mr_input x = mr_input a ∨ ¬ In (cld_mr_input x) avoiddb) l0.
Lemma NNRCMRtoNNRCMRCloudant_causally_consistent avoiddb l :
mr_chain_causally_consistent l = true →
∀ x,
cld_mr_chain_causally_consistent x = true →
forallb (fun a ⇒ forallb (fun y : cld_mr ⇒ cld_mr_input y ≠b mr_output a) x) l = true →
incl (map mr_input l ++ map mr_output l ++ map cld_mr_input x) avoiddb →
cld_mr_chain_causally_consistent (x ++ mr_chain_to_cld_mr_chain avoiddb l) = true.
Definition NNRCMRtoNNRCMRCloudantTop (mrl: nnrcmr) : cld_mrl :=
let avoiddb := List.map mr_input mrl.(mr_chain) ++ List.map mr_output mrl.(mr_chain) in
NNRCMRtoNNRCMRCloudant avoiddb mrl.
Lemma NNRCMRtoNNRCMRCloudantTop_causally_consistent mrl :
nnrcmr_causally_consistent mrl = true →
cld_mr_chain_causally_consistent (NNRCMRtoNNRCMRCloudantTop mrl).(cld_mr_chain) = true.
End NNRCMRToCldMR.