Module NNRCMRtoCldMR


Section NNRCMRToCldMR.
  Require Import String.
  Require Import List.
  Require Import Sorting.Mergesort.
  Require Import EquivDec.
  Require Import Utils.
  Require Import BasicRuntime.
  Require Import NNRCRuntime.
  Require Import NNRCMRRuntime.
  Require Import ForeignCloudant.
  Require Import ForeignToCloudant.
  Require Import CldMRUtil.
  Require Import CldMR.
  
  Local Open Scope list_scope.

  Context {fruntime:foreign_runtime}.
  Context {fredop:foreign_reduce_op}.
  Context {fcloudant:foreign_cloudant}.
  Context {ftocloudant:foreign_to_cloudant}.

  Context (h:list(string*string)).


  Definition MRtoMapCld (mrmap: NNRCMR.map_fun) (collectflag:bool) (index:nat) : cld_map :=
    let emit :=
        if collectflag then CldEmitCollect O else CldEmitDist
    in
    match mrmap with
    | MapDist (x, n) => mkMapCld (CldMapId (x, n)) emit
    | MapDistFlatten (x, n) => mkMapCld (CldMapFlatten (x, n)) emit
    | MapScalar (x, n) => mkMapCld (CldMapFlatten (x,n)) emit
    end.

  Definition pushReduce (avoiddb: list var) (r:(var*nnrc)) : cld_reduce * (var*cld_map) :=
    let fresh_outputdb := fresh_var "output_" avoiddb in
    let pushed_reduce := collectReduce (Some fresh_outputdb) in
    let v_red := fst r in
    let f_red := snd r in
    let pushed_map := MRtoMapCld (MapScalar (v_red,NNRCUnop AColl f_red)) true 0 in
    (pushed_reduce, (fresh_outputdb, pushed_map)).

  Definition pushMR (dbinput dboutput:var) (pushed_map: cld_map) (mrempty:option nnrc) :=
    mkMRCld dbinput pushed_map (Some (idReduce (Some dboutput))) mrempty.
  
  Definition pushMRNoRed (dbinput:var) (pushed_map: cld_map) (mrempty:option nnrc) :=
    mkMRCld dbinput pushed_map None mrempty.

  Definition minMap :=
    let x := "x"%string in
    let map_fun := (x, NNRCUnop (ADot "min") (NNRCVar x)) in
    mkMapCld (CldMapId map_fun) (CldEmitCollect O).

  Definition minMR (stats_v out_v: var) :=
    mkMRCld stats_v minMap (Some (idReduce (Some out_v))).

  Definition minMRNoRed (stats_v: var) :=
    mkMRCld stats_v minMap None.

  Definition maxMap :=
    let x := "x"%string in
    let map_fun := (x, NNRCUnop (ADot "max") (NNRCVar x)) in
    mkMapCld (CldMapId map_fun) (CldEmitCollect O).

  Definition maxMR (stats_v out_v: var) :=
    mkMRCld stats_v maxMap (Some (idReduce (Some out_v))).

  Definition maxMRNoRed (stats_v: var) :=
    mkMRCld stats_v maxMap None.

  Definition MRtoReduceCld (v_key:var) (out_v:var) (avoiddb: list var) (mrr: NNRCMR.reduce_fun) (mrempty:option nnrc) :
    bool * cld_reduce * (option cldmr_step) * (list var) :=
    match mrr with
    | RedId => (false, idReduce (Some out_v), None, avoiddb)
    | RedCollect redf =>
      let '(pushed_red, (newdbout,pushed_map)) := pushReduce avoiddb redf in
      let pushed_mr := pushMR newdbout out_v pushed_map mrempty in
      (true, pushed_red, Some pushed_mr, newdbout::avoiddb)
    | RedOp op =>
      match op with
      | RedOpForeign frop =>
        (true, opReduce (foreign_to_cloudant_reduce_op frop) (Some out_v), None, avoiddb)
      end
    | RedSingleton => (true, idReduce (Some out_v), None, avoiddb)
    end.

  Definition MRtoMRCld (avoiddb: list var) (mr: mr) : (list cldmr_step) * (list var) :=
    let cld_input := mr_input mr in
    let cld_output := mr_output mr in
    let mrmap := mr_map mr in
    let mrred := mr_reduce mr in
    let mrempty := mr_reduce_empty h mr in
    let v_key :=
        match mrmap with
        | MapDist (x, _) => x
        | MapDistFlatten (x, _) => x
        | MapScalar (x, _) => x
        end
    in
    let '(collectflag, first_reduce,pushedmr,avoiddb') := MRtoReduceCld v_key cld_output avoiddb mrred mrempty in
    let cld_map := MRtoMapCld mrmap collectflag 0 in
    match pushedmr with
    | None => (mkMRCld cld_input cld_map (Some first_reduce) mrempty :: nil, avoiddb')
    | Some nextmr => ((mkMRCld cld_input cld_map (Some first_reduce) (Some (NNRCConst (dcoll nil)))) :: nextmr :: nil, avoiddb')
    end.
  
  Definition cld_data_of_localized_data (locd:ddata) : list data :=
    match locd with
    | Dlocal d => (d::nil)
    | Ddistr dl => dl
    end.


  Lemma rmap_with_key (prefix:list nat) (i:nat) (v:var) (n:nnrc) (coll: list data) :
    (rmap
       (fun d : data * data =>
          let (k, v0) := d in
          match nnrc_core_eval h ((v, v0) :: nil) n with
          | Some res => Some (k, res)
          | None => None
          end) (init_keys_aux prefix i coll)) =
    lift (init_keys_aux prefix i) (rmap (fun d : data => nnrc_core_eval h ((v, d) :: nil) n) coll).
Proof.
    revert i.
    induction coll; try reflexivity; simpl; intros.
    destruct (nnrc_core_eval h ((v, a) :: nil) n); try reflexivity; simpl.
    rewrite (IHcoll (S i)); clear IHcoll.
    destruct ((rmap (fun d0 : data => nnrc_core_eval h ((v, d0) :: nil) n) coll)); reflexivity.
  Qed.

  Lemma rmap_eval_through_init_keys (l:list data) (n:nnrc) (v:var) :
    (rmap (fun d : data * data =>
             let (k, v0) := d in
             match nnrc_core_eval h ((v, v0) :: nil) n with
             | Some res0 => Some (k, res0)
             | None => None
             end) (init_keys l))
    = lift init_keys (rmap (fun d : data => nnrc_core_eval h ((v, d) :: nil) n) l).
Proof.
    unfold init_keys.
    apply rmap_with_key.
  Qed.

  Lemma MRtoMRCld_preserves_causally_consistent (init_unit:var) (mr:mr) :
    mr.(mr_input) <> mr.(mr_output) ->
    mr.(mr_output) <> init_unit ->
    init_unit <> mr.(mr_input) ->
    cldmr_chain_causally_consistent (fst (MRtoMRCld (mr.(mr_input) :: mr.(mr_output) :: init_unit :: nil) mr)) = true.
Proof.
    intros.
    unfold MRtoMRCld.
    unfold MRtoReduceCld.
    destruct mr; simpl in *.
    destruct mr_reduce;
      unfold
        cldmr_chain_causally_consistent,
      cldmr_step_causally_consistent,
      nequiv_decb, equiv_decb; simpl.
    - match_destr.
      congruence.
    - destruct (equiv_dec mr_input mr_output); [ congruence | ].
TODO: currently false
      admit.
    - destruct r; simpl;
      match_destr;
      try congruence.
    - match_destr; congruence.
  Admitted.



  Fixpoint mr_chain_to_cldmr_chain (avoiddb: list var) (l: list mr) : list cldmr_step :=
    match l with
    | nil => nil
    | mr :: l' =>
      let (cldmr_step,avoiddb') := (MRtoMRCld avoiddb mr) in
      cldmr_step ++ (mr_chain_to_cldmr_chain avoiddb' l')
    end.

  Definition mr_last_to_cldmr_last
             (mr_last_closure:(list var * nnrc) * list (var * dlocalization))
    : (list var * nnrc) * list var :=
    let (fvs, mr_last) := fst mr_last_closure in
    let vars_loc := snd mr_last_closure in
    let cldmr_last :=
        List.fold_right
          (fun fv k =>
             match lookup equiv_dec vars_loc fv with
             | None => k
             | Some Vdistr =>
               NNRCLet fv (NNRCVar fv) k
             | Some Vlocal =>
               NNRCLet fv
                      (NNRCEither (NNRCUnop ASingleton (NNRCVar fv))
                                 fv (NNRCVar fv)
                                 fv (NNRCConst dunit))
                      k
             end)
          mr_last fvs
    in
    ((fvs, cldmr_last), map fst vars_loc).

  Definition NNRCMRtoNNRCMRCloudant (avoiddb: list var) (mrl: nnrcmr) : cldmr :=
    mkMRCldChain
      (mr_chain_to_cldmr_chain avoiddb mrl.(mr_chain))
      (mr_last_to_cldmr_last mrl.(mr_last)).

  Require Import Bool.

  Lemma MRtoMRCld_causally_consistent
        avoiddb (mr:mr) :
    In mr.(mr_input) avoiddb ->
    mr_causally_consistent mr mr = true ->
    forall x,
    forallb (fun y => cldmr_step_input y <>b mr.(mr_output)) x = true ->
    cldmr_chain_causally_consistent x = true ->
    incl (mr.(mr_output) :: map cldmr_step_input x) avoiddb ->
         cldmr_chain_causally_consistent (x ++ fst (MRtoMRCld avoiddb mr)) = true.
Proof.
    intros.
    unfold MRtoMRCld.
    unfold MRtoReduceCld.
    destruct mr; simpl in *.
    destruct mr_reduce;
      unfold
        cldmr_chain_causally_consistent,
      cldmr_step_causally_consistent,
      mr_causally_consistent,
      nequiv_decb, equiv_decb in *; simpl.
    - apply forallb_ordpairs_refl_app_cons; trivial.
    - rewrite app_cons_cons_expand.
      apply forallb_ordpairs_refl_app_cons; simpl.
      + apply forallb_ordpairs_refl_app_cons; simpl in *; trivial.
        * match_destr.
          rewrite e in H.
          apply fresh_var_fresh in H.
          intuition.
        * rewrite forallb_forall in *.
          intros aa inn.
          match_destr.
          unfold incl in H3.
          specialize (H3 (cldmr_step_input aa)).
          rewrite e in H3.
          eelim fresh_var_fresh.
          apply H3.
          simpl.
          rewrite in_map_iff.
          eauto.
      + match_destr.
        rewrite <- e in H3.
        unfold incl in H3.
        simpl in H3.
        eelim fresh_var_fresh.
        intuition.
      + simpl in *.
        match_destr_in H0.
        rewrite forallb_app; simpl. rewrite H1.
        match_destr; simpl.
        congruence.
        intuition.
    - destruct r;
      try solve [ apply forallb_ordpairs_refl_app_cons; simpl; trivial ].
    - apply forallb_ordpairs_refl_app_cons; simpl; trivial.
  Qed.

  Lemma MRtoMRCid_avoids avoiddb a :
    let '(l0, l1) := MRtoMRCld avoiddb a in
    incl (mr_input a :: nil) avoiddb ->
    incl (avoiddb ++ map cldmr_step_input l0) l1.
Proof.
    unfold MRtoMRCld, MRtoReduceCld.
    destruct a; simpl.
    destruct mr_reduce; simpl.
    - unfold incl; simpl; intros.
      repeat rewrite in_app_iff in H0.
      intuition.
    - unfold incl; simpl; intros.
      repeat rewrite in_app_iff in H0; simpl in H0.
      intuition.
    - destruct r; simpl;
      unfold incl; simpl; intros;
      repeat rewrite in_app_iff in H0; simpl in H0;
      intuition.
    - unfold incl; simpl; intros.
      repeat rewrite in_app_iff in H0; simpl in H0.
      intuition.
  Qed.

  Lemma MRtoMRCid_input_avoids avoiddb a :
    let '(l0, l1) := MRtoMRCld avoiddb a in
    Forall (fun x => cldmr_step_input x = mr_input a \/ ~ In (cldmr_step_input x) avoiddb) l0.
Proof.
    Hint Resolve fresh_var_fresh.
    unfold MRtoMRCld, MRtoReduceCld.
    destruct a; simpl.
    destruct mr_reduce; simpl.
    - eauto.
    - econstructor; simpl; [intuition | ].
      econstructor; simpl; [| intuition ].
      eauto.
    - destruct r;
      eauto.
    - eauto.
  Qed.
  
  Lemma NNRCMRtoNNRCMRCloudant_causally_consistent avoiddb l :
    mr_chain_causally_consistent l = true ->
    forall x,
      cldmr_chain_causally_consistent x = true ->
      forallb (fun a => forallb (fun y : cldmr_step => cldmr_step_input y <>b mr_output a) x) l = true ->
      incl (map mr_input l ++ map mr_output l ++ map cldmr_step_input x) avoiddb ->
      cldmr_chain_causally_consistent (x ++ mr_chain_to_cldmr_chain avoiddb l) = true.
Proof.
    unfold mr_chain_causally_consistent, cldmr_chain_causally_consistent.
    revert avoiddb.
    induction l; intros avoiddb lcc x xcc avoided.
    { rewrite app_nil_r; trivial. }
    simpl in *.
    repeat rewrite andb_true_iff in * .
    intuition.
    destruct l.
    - admit. (* XXXXXXXXXXXXX *)
 rewrite app_nil_r. *) apply MRtoMRCldLast_causually_consistent; trivial. *) simpl in *. *) unfold incl in *; simpl in *; intuition. *)    - match_case; intros.
       rewrite <- app_ass.
       apply IHl.
       + simpl in * .
         repeat rewrite andb_true_iff in * .
         intuition.
       + generalize (MRtoMRCld_causally_consistent avoiddb a).
         rewrite H0; simpl; intros HH.
         apply HH; clear HH; simpl; trivial.
         * unfold incl in *; simpl in *.
           intuition.
         * unfold incl in *.
           simpl.
           intros aa inn.
           specialize (H aa).
           repeat (simpl in H; rewrite in_app_iff in H).
           intuition.
       + simpl in *.
         repeat rewrite andb_true_iff in *.
         rewrite forallb_app.
         intuition.
         * generalize (MRtoMRCid_input_avoids avoiddb a).
           rewrite H0.
           rewrite Forall_forall, forallb_forall.
           intros HH ? inn2.
           specialize (HH _ inn2).
           unfold nequiv_decb, equiv_decb.
           match_destr.
           rewrite e in * .
           { destruct HH as [eqq|inn3].
             - unfold mr_causally_consistent in H3.
               unfold nequiv_decb, equiv_decb in H3.
               match_destr_in H3.
               congruence.
             - elim inn3.
               apply (H (mr_output m)).
               repeat (simpl; rewrite in_app_iff); intuition.
           }
         * rewrite forallb_forall in H8 |- *.
           intros ? inn.
           specialize (H8 _ inn).
           rewrite forallb_app.
           split; trivial.
           generalize (MRtoMRCid_input_avoids avoiddb a).
           rewrite H0.
           rewrite Forall_forall, forallb_forall.
           intros HH ? inn2.
           specialize (HH _ inn2).
           unfold nequiv_decb, equiv_decb.
           match_destr.
           rewrite e in * .
           { destruct HH as [eqq|inn3].
             - rewrite forallb_forall in H9.
               specialize (H9 _ inn).
               unfold mr_causally_consistent in H9.
               unfold nequiv_decb, equiv_decb in H9.
               match_destr_in H9.
               congruence.
             - elim inn3.
               apply (H (mr_output x0)).
               cut (In (mr_output x0) (map mr_output l)).
               + repeat (simpl; rewrite in_app_iff); intuition.
               + rewrite in_map_iff. eauto.
           }
       + generalize (MRtoMRCid_avoids avoiddb a).
         rewrite H0; intros inc.
         rewrite <- inc; clear inc; unfold incl in *; simpl in *;
           intros aa inn;
           specialize (H aa);
             repeat (rewrite in_app_iff in H; simpl in H).
         * rewrite in_app_iff.
           rewrite map_app in inn.
           repeat (rewrite in_app_iff in inn; simpl in inn).
           intuition.
         * intuition.
  Admitted.


  Definition NNRCMRtoNNRCMRCloudantTop (mrl: nnrcmr) : cldmr :=
    let avoiddb := List.map mr_input mrl.(mr_chain) ++ List.map mr_output mrl.(mr_chain) in
    NNRCMRtoNNRCMRCloudant avoiddb mrl.

  Lemma NNRCMRtoNNRCMRCloudantTop_causally_consistent mrl :
    nnrcmr_causally_consistent mrl = true ->
    cldmr_chain_causally_consistent (NNRCMRtoNNRCMRCloudantTop mrl).(cldmr_chain) = true.
Proof.
    intros cc.
    unfold NNRCMRtoNNRCMRCloudantTop.
   generalize (NNRCMRtoNNRCMRCloudantInit_causally_consistent *)                 (List.map mr_input l ++ List.map mr_output l) l cc nil); simpl; *)     intros HH. *)   apply HH. *)   - reflexivity. *)   - rewrite forallb_forall; intuition. *)   - rewrite app_nil_r. reflexivity. *) Qed. *)  Admitted.

End NNRCMRToCldMR.