Module NNRCMR


NNRCMR is a language to describe chains of Map/Reduce, followed by some local computation over the result.

Summary:

Section NNRCMR.
  Require Import String.
  Require Import List.
  Require Import EquivDec.
  Require Import Utils.
  Require Import BasicRuntime.
  Require Import cNNRCRuntime.
  Require Import ForeignReduceOps.

  Local Open Scope list_scope.

  Context {fruntime:foreign_runtime}.
  Context {fredop:foreign_reduce_op}.

  Definition var := string.
  Definition nrcmr_env := list (var * ddata).

Abstract Syntax

  
Named Nested Relational Calculus + Map Reduce

Different kind of map functions:

  Inductive map_fun :=
  | MapDist : var * nnrc -> map_fun
  | MapDistFlatten : var * nnrc -> map_fun
  | MapScalar : var * nnrc -> map_fun.

Different kind of reduce functions:
  Inductive reduce_op :=
  | RedOpForeign : foreign_reduce_op_type -> reduce_op.
  
  Inductive reduce_fun :=
  | RedId : reduce_fun
  | RedCollect : (var * nnrc) -> reduce_fun
  | RedOp : reduce_op -> reduce_fun
  | RedSingleton : reduce_fun.

A map-reduce job mr is applied to a collection fetched in the environment at the location mr.(mr_input) and the result store in the environment at the location mr.(mr_output). A function mr.(mr_map) is applied on the input to produce a collection of type coll B. Then, the functions mr.(mr_reduce) is applied on all the elements generated by the map.
  Record mr := mkMR
    { mr_input: var; (* Collection on which the map/reduce is applied. *)
      mr_output: var; (* Collection where the result of the map/reduce is stored. *)
      mr_map: map_fun; (* function applied on the input which generates collection *)
      mr_reduce: reduce_fun }. (* function applied on the collection generated by the map *)

A NNRC expression is a chain of map-reduce, with a last expression describing final/local computation
  
  Record nnrcmr := mkMRChain
    { mr_inputs_loc: vdbindings; (* Input variables with their localizations *)
      mr_chain: list mr; (* Map/Reduce chain *)
      mr_last: (list var * nnrc) * list (var * dlocalization); }. (* Last expression *)

Sanitization

  
This contains support functions in order to emit code which follows lexical constraints imposed by the Spark.

  Section sanitize_local.
    Context (sep:string).
    Context (renamer:string->string).
    Context (avoid:list var).

    Definition nnrc_rename_in1
               (oldvar:var) (e:nnrc) : (var*nnrc)
      := let newvar := nnrc_pick_name sep renamer avoid oldvar e in
         (newvar, nnrc_rename_lazy e oldvar newvar).

    Definition nnrc_rename_in2
               (oldvar1 oldvar2:var) (e:nnrc) : (var*var*nnrc)
      := let newvar1 := nnrc_pick_name sep renamer (oldvar2::avoid) oldvar1 e in
         let newvar2 := nnrc_pick_name sep renamer (newvar1::avoid) oldvar2 e in
         let e'1 := nnrc_rename_lazy e oldvar1 newvar1 in
         let e'2 := nnrc_rename_lazy e'1 oldvar2 newvar2 in
         (newvar1, newvar2, e'2).

    Definition rename_map_fun (m:map_fun) : map_fun
      := match m with
         | MapDist (v, e) => MapDist (nnrc_rename_in1 v e)
         | MapDistFlatten (v,e) => MapDistFlatten (nnrc_rename_in1 v e)
         | MapScalar (v,e) => MapScalar (nnrc_rename_in1 v e)
         end.

    Definition rename_reduce_fun (r:reduce_fun) : reduce_fun
      := match r with
         | RedId => RedId
         | RedCollect (v,e) => RedCollect (nnrc_rename_in1 v e)
         | RedOp op => RedOp op
         | RedSingleton => RedSingleton
         end.

    Definition mr_rename_local (mr0:mr) : mr
      := mkMR mr0.(mr_input) mr0.(mr_output)
                                   (rename_map_fun mr0.(mr_map))
                                   (rename_reduce_fun mr0.(mr_reduce)).

    Definition nnrcmr_rename_local (mrl:nnrcmr)
      := mkMRChain
           mrl.(mr_inputs_loc)
           (map mr_rename_local mrl.(mr_chain))
           mrl.(mr_last).
    
  End sanitize_local.

  Section sanitize_dbs.
    Definition nnrcmr_inoutlist (mrl:nnrcmr)
      := insertion_sort StringOrder.lt_dec
                        (flat_map
                           (fun x => x.(mr_input)::x.(mr_output)::nil)
                           mrl.(mr_chain)).
    
    Definition mr_subst_io substlist (mr0:mr) : mr
      := mkMR (substlist_subst substlist mr0.(mr_input))
              (substlist_subst substlist mr0.(mr_output))
              mr0.(mr_map) mr0.(mr_reduce).

    Definition nnrcmr_subst_io substlist (mrl:nnrcmr) : nnrcmr
      := let chain := (map (mr_subst_io substlist) mrl.(mr_chain)) in
         let last :=
             let (params, n) := fst mrl.(mr_last) in
             let args := snd mrl.(mr_last) in
             let params := List.map (substlist_subst substlist) params in
             let n := nnrc_substlist_subst substlist n in
             let args :=
                 List.map
                   (fun x_loc => (substlist_subst substlist (fst x_loc), snd x_loc))
                   args
             in
             ((params, n), args)
         in
         mkMRChain
           mrl.(mr_inputs_loc)
           chain
           last.

    Context (sep:string).
    Context (renamer:string->string).
    Context (avoid:list var).

    Fixpoint nnrcmr_mk_rename_list (leave_alone:list var) (names:list var) (acc:list (var*var))
      := match names with
         | nil => acc
         | x::l => if (in_dec string_dec x leave_alone)
                   then nnrcmr_mk_rename_list leave_alone l ((x,x)::acc)
                   else
                     let proposedname := renamer x in
                     let newname :=
                         fresh_var_from sep proposedname
                                        (l++(map snd acc)++avoid) in
                     nnrcmr_mk_rename_list leave_alone l ((x,newname)::acc)
         end.

    Definition nnrcmr_rename_graph (mrl:nnrcmr)
      := let leave_alone := List.map fst (mrl.(mr_inputs_loc)) in
         let names := nnrcmr_inoutlist mrl in
         let substlist_rev := nnrcmr_mk_rename_list leave_alone names nil in
         let substlist := rev substlist_rev in
         nnrcmr_subst_io substlist mrl.

  End sanitize_dbs.

Well-formedness

  

  Definition mr_causally_consistent (mr1 mr2:mr) : bool
    := mr1.(mr_input) <>b mr2.(mr_output).

  Definition mr_chain_causally_consistent (mr_chain:list mr) : bool
    := forallb_ordpairs_refl mr_causally_consistent mr_chain.

  Definition nnrcmr_causally_consistent (mrl:nnrcmr) : bool
    := mr_chain_causally_consistent mrl.(mr_chain).

  Definition function_with_no_free_vars (f: var * nnrc) :=
    (forall (x: var), In x (nnrc_free_vars (snd f)) -> x = fst f).

  Definition function2_with_no_free_vars (f: (var * var) * nnrc) :=
    (fst (fst f)) <> (snd (fst f)) /\
    (forall x, In x (nnrc_free_vars (snd f)) -> x = (fst (fst f)) \/ x = (snd (fst f))).

  Definition map_well_formed map :=
    match map with
    | MapDist f => function_with_no_free_vars f
    | MapDistFlatten f => function_with_no_free_vars f
    | MapScalar f => function_with_no_free_vars f
    end.

  Definition reduce_well_formed red :=
    match red with
    | RedId => True
    | RedCollect f => function_with_no_free_vars f
    | RedOp op => True
    | RedSingleton => True
    end.

  Definition mr_well_formed mr :=
    map_well_formed mr.(mr_map) /\ reduce_well_formed mr.(mr_reduce).

  Definition mr_chain_well_formed (l: list mr) :=
    forall mr, In mr l -> mr_well_formed mr.

  Definition nnrcmr_well_formed (mrl: nnrcmr) :=
    mr_chain_well_formed mrl.(mr_chain).


  Definition mr_input_localized mr :=
    match mr.(mr_map) with
    | MapDist _ => (mr.(mr_input), Vdistr)
    | MapDistFlatten _ => (mr.(mr_input), Vdistr)
    | MapScalar _ => (mr.(mr_input), Vlocal)
    end.

  Definition mr_output_localized mr :=
    match mr.(mr_reduce) with
    | RedId => (mr.(mr_output), Vdistr)
    | RedCollect _ => (mr.(mr_output), Vlocal)
    | RedOp op => (mr.(mr_output), Vlocal)
    | RedSingleton => (mr.(mr_output), Vlocal)
    end.

  Definition map_well_localized map d :=
    match map, d with
    | MapDist _, Ddistr _ => True
    | MapDistFlatten _, Ddistr _ => True
    | MapScalar _, Dlocal _ => True
    | _, _ => False
    end.

Evaluation Semantics



  Section Semantics.

  Context (h:brand_relation_t).

Map

  

  Definition mr_map_eval (map: map_fun) (input_d: ddata) : option (list data) :=
    match map with
    | MapDist f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr coll =>
        rmap f_map coll
      | Dlocal d => None
      end
    | MapDistFlatten f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr coll =>
        let nested_coll := rmap f_map coll in
        olift rflatten nested_coll
      | Dlocal d => None
      end
    | MapScalar f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr coll => None
      | Dlocal d =>
        match f_map d with
        | Some (dcoll coll) => Some coll
        | _ => None
        end
      end
    end.


Reduce

  

  Definition reduce_op_eval (rop:reduce_op) (dl:list data) : option data
    := match rop with
       | RedOpForeign op
         => foreign_reduce_op_interp h op dl
       end.
  
  Definition mr_reduce_eval (reduce: reduce_fun) (values_v: list data) : option (ddata) :=
    match reduce with
    | RedId => Some (Ddistr values_v)
    | RedCollect f =>
      let (values_arg, body) := f in
      let v := nnrc_core_eval h ((values_arg, dcoll values_v) :: nil) body in
      lift (fun res => Dlocal res) v
    | RedOp op =>
      lift (fun res => Dlocal res) (reduce_op_eval op values_v)
    | RedSingleton =>
      match values_v with
      | d :: nil => Some (Dlocal d)
      | _ => None
      end
    end.


Map/Reduce

  

  Definition mr_eval (mr:mr) (d: ddata) : option (ddata) :=
    let map_result := mr_map_eval mr.(mr_map) d in
    olift (mr_reduce_eval mr.(mr_reduce)) map_result.

  Lemma mr_eval_ignores_env map_fun red :
    forall (mr_input1 mr_input2 mr_output1 mr_output2:var),
    forall (d:ddata),
      mr_eval (mkMR mr_input1 mr_output1 map_fun red) d =
      mr_eval (mkMR mr_input2 mr_output2 map_fun red) d.
Proof.
    reflexivity.
  Qed.


Map/Reduce Chain

  

  Definition merge_env (x: var) (d: ddata) (env: nrcmr_env) : option nrcmr_env :=
    match d with
    | Ddistr coll =>
      match lookup equiv_dec env x with
      | None => Some ((x, Ddistr coll)::env)
      | Some (Ddistr coll') => Some ((x, Ddistr (coll ++ coll') )::env)
      | Some _ => None
      end
    | Dlocal v =>
      match lookup equiv_dec env x with
      | None => Some ((x, Dlocal v)::env)
      | Some d' => None
      end
    end.

  Definition mr_chain_eval (env:nrcmr_env) (l:list mr) : nrcmr_env * option ddata :=
    List.fold_left
      (fun acc mr =>
         match acc with
         | (env', None) => (env', None)
         | (env', Some _) =>
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_eval mr) oinput_d in
           match ores, olift (fun res => merge_env mr.(mr_output) res env') ores with
           | Some res, Some env'' => (env'', Some res)
           | _, _ => (env', None)
           end
         end)
      l (env, Some (Ddistr nil)).

  Fixpoint build_nnrc_env' (mr_env:nrcmr_env) (params: list var) (args: list (var * dlocalization)) :=
    match (params, args) with
    | (nil, nil) => Some nil
    | (x1::params, (x2, loc)::args) =>
      match lookup equiv_dec mr_env x2 with
      | Some loc_d =>
        lift (fun nnrc_env => (x1, unlocalize_data loc_d) :: nnrc_env) (build_nnrc_env' mr_env params args)
      | None => None
      end
    | _=> None
    end.

  Definition nnrc_env_build (form:list var) (eff: option (list data)): option bindings :=
    olift (zip form) eff.

  Definition effective_params_from_bindings (args:list (var * dlocalization)) (mr_env:nrcmr_env) : option (list data) :=
    lift_map
      (fun (v : var) =>
         lift unlocalize_data (lookup equiv_dec mr_env v))
      (map fst args).
  
  Definition build_nnrc_env (mr_env:nrcmr_env) (params: list var) (args: list (var * dlocalization)) :=
    let eff_params := effective_params_from_bindings args mr_env in
    nnrc_env_build params eff_params.
  
  Definition mr_last_eval (mr_env:nrcmr_env) (mr_last: (list var * nnrc) * list (var * dlocalization)) :=
    let (params, n) := fst mr_last in
    let args := snd mr_last in
    let nnrc_env := build_nnrc_env mr_env params args in
    olift (fun env => nnrc_core_eval h env n) nnrc_env.

  Definition nnrcmr_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_eval env mrl.(mr_chain) with
    | (mr_env, Some _) =>
      mr_last_eval mr_env mrl.(mr_last)
    | (_, None) => None
    end.

  Lemma mr_chain_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
    forall env1 loc_d1,
      mr_chain_eval env l1 = (env1, Some loc_d1) ->
      mr_chain_eval env (l1 ++ (r :: l2)) = mr_chain_eval env1 (r :: l2).
Proof.
    intros env1 loc_d1.
    unfold mr_chain_eval.
    rewrite fold_left_app.
    generalize (fold_left
        (fun (acc : list (var * ddata) * option ddata)
           (mr0 : NNRCMR.mr) =>
         let (env', y) := acc in
         match y with
         | Some _ =>
             match
               olift (mr_eval mr0) (lookup equiv_dec env' (mr_input mr0))
             with
             | Some res =>
                 match
                   olift
                     (fun res0 : ddata =>
                      merge_env (mr_output mr0) res0 env')
                     (Some res)
                 with
                 | Some env'' => (env'', Some res)
                 | None => (env', None)
                 end
             | None => (env', None)
             end
         | None => (env', None)
         end) l1 (env, Some (Ddistr nil))) as res1.
    intros res1 Hres1.
    rewrite Hres1.
    reflexivity.
  Qed.

  Lemma mr_chain_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
    forall env' loc_d,
      mr_chain_eval env (l1 ++ l2) = (env', Some loc_d) ->
      exists env1 loc_d1,
        mr_chain_eval env l1 = (env1, Some loc_d1).
Proof.
    intros env' loc_d.
    unfold mr_chain_eval.
    rewrite fold_left_app.
    case (fold_left
        (fun (acc : list (var * ddata) * option ddata)
           (mr : mr) =>
         let (env'0, y) := acc in
         match y with
         | Some _ =>
             match
               olift (mr_eval mr) (lookup equiv_dec env'0 (mr_input mr))
             with
             | Some res =>
                 match
                   olift
                     (fun res0 : ddata =>
                      merge_env (mr_output mr) res0 env'0)
                     (Some res)
                 with
                 | Some env'' => (env'', Some res)
                 | None => (env'0, None)
                 end
             | None => (env'0, None)
             end
         | None => (env'0, None)
         end) l1 (env, Some (Ddistr nil))).
    intros env1 o1.
    destruct o1; simpl.
    - Case "Some"%string.
      intros.
      exists env1.
      exists d.
      reflexivity.
    - Case "None"%string.
      induction l2;
        simpl; try congruence.
      intro Hl2.
      apply (IHl2 Hl2).
  Qed.

  End Semantics.

A 'lazy' semantics


In the alternative 'lazy' semantics, evaluation of a given map/reduce isn't triggered unless there is at least one value in its input collection.

This alternative semantics provides a model which is closer to the run-time behavior of some targets, e.g., for Cloudant.
  
  Section SemanticsLazy.

  Context (h:brand_relation_t).

  Definition mr_lazy_eval (mr:mr) (d: ddata) : option (ddata) :=
    match d with
    | Ddistr nil => None
    | _ =>
      match mr_map_eval h mr.(mr_map) d with
      | Some nil => None
      | map_result =>
        olift (mr_reduce_eval h mr.(mr_reduce)) map_result
      end
    end.

  Definition mr_chain_lazy_eval (env:nrcmr_env) (l:list mr) : nrcmr_env * option ddata :=
    List.fold_left
      (fun (acc: nrcmr_env * option ddata) mr =>
         match acc with
         | (env', None) => (env', None)
         | (env', Some _) =>
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_lazy_eval mr) oinput_d in
           match ores, olift (fun res => merge_env mr.(mr_output) res env') ores with
           | Some res, Some env'' => (env'', Some res)
           | _, _ => (env', None)
           end
       end)
      l (env, Some (Ddistr nil)).

  Definition nnrcmr_lazy_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_lazy_eval env mrl.(mr_chain) with
    | (mr_env, Some _) =>
      mr_last_eval h mr_env mrl.(mr_last)
    | (_, None) => None
    end.

  Lemma mr_chain_lazy_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
    forall env1 loc_d1,
      mr_chain_lazy_eval env l1 = (env1, Some loc_d1) ->
      mr_chain_lazy_eval env (l1 ++ (r :: l2)) = mr_chain_lazy_eval env1 (r :: l2).
Proof.
    intros env1 loc_d1.
    unfold mr_chain_lazy_eval.
    rewrite fold_left_app.
    generalize (fold_left
        (fun (acc : nrcmr_env * option ddata) (mr0 : NNRCMR.mr) =>
         let (env', o) := acc in
         match o with
         | Some _ =>
             match
               olift (mr_lazy_eval mr0)
                 (lookup equiv_dec env' (mr_input mr0))
             with
             | Some res =>
                 match
                   olift
                     (fun res0 : ddata =>
                      merge_env (mr_output mr0) res0 env')
                     (Some res)
                 with
                 | Some env'' => (env'', Some res)
                 | None => (env', None)
                 end
             | None => (env', None)
             end
         | None => (env', None)
         end) l1 (env, Some (Ddistr nil))) as res1.
    intros res1 Hres1.
    rewrite Hres1.
    reflexivity.
  Qed.

  Lemma mr_chain_lazy_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
    forall env' loc_d,
      mr_chain_lazy_eval env (l1 ++ l2) = (env', Some loc_d) ->
      exists env1 loc_d1,
        mr_chain_lazy_eval env l1 = (env1, Some loc_d1).
Proof.
    intros env' loc_d.
    unfold mr_chain_lazy_eval.
    rewrite fold_left_app.
    case (fold_left
        (fun (acc : nrcmr_env * option ddata) (mr : mr) =>
         let (env'0, o) := acc in
         match o with
         | Some _ =>
             match
               olift (mr_lazy_eval mr) (lookup equiv_dec env'0 (mr_input mr))
             with
             | Some res =>
                 match
                   olift
                     (fun res0 : ddata =>
                      merge_env (mr_output mr) res0 env'0)
                     (Some res)
                 with
                 | Some env'' => (env'', Some res)
                 | None => (env'0, None)
                 end
             | None => (env'0, None)
             end
         | None => (env'0, None)
         end) l1 (env, Some (Ddistr nil))).
    intros env1 o1.
    destruct o1; simpl.
    - Case "Some"%string.
      intros.
      exists env1.
      exists d.
      reflexivity.
    - Case "None"%string.
      induction l2;
        simpl; try congruence.
      intro Hl2.
      apply (IHl2 Hl2).
  Qed.

  Lemma mr_chain_lazy_eval_correct (env:nrcmr_env) (l:list mr):
    forall env' res,
      mr_chain_lazy_eval env l = (env', Some res) ->
      mr_chain_eval h env l = (env', Some res).
Proof.
    induction l using @rev_ind.
    - Case "l = nil"%string.
      intros env' res.
      unfold mr_chain_lazy_eval.
      unfold mr_chain_eval.
      simpl.
      congruence.
    - Case "l <> nil"%string.
      intros env' res Hlazy.
      generalize (mr_chain_lazy_eval_progress env l (x::nil) _ _ Hlazy).
      intros Hlazy_l.
      inversion Hlazy_l as [env1 Htmp]; clear Hlazy_l.
      inversion Htmp as [loc_d1 Hlazy_l]; clear Htmp.
      rewrite (mr_chain_eval_split h env l x nil env1 loc_d1 (IHl env1 loc_d1 Hlazy_l)).
      rewrite (mr_chain_lazy_eval_split env l x nil env1 loc_d1 Hlazy_l) in Hlazy.
      unfold mr_chain_lazy_eval in Hlazy; simpl in *.
      unfold mr_chain_eval; simpl in *.
      destruct (lookup equiv_dec env1 (mr_input x));
        simpl in *; try congruence.
      unfold mr_lazy_eval in Hlazy; simpl in *.
      unfold mr_eval; simpl in *.
      destruct d; simpl in *.
      + destruct (mr_map_eval h (mr_map x) (Dlocal d));
        simpl in *; try congruence.
        destruct l0;
        simpl in *; try congruence.
        assumption.
      + destruct l0;
        simpl in *; try congruence.
        destruct (mr_map_eval h (mr_map x) (Ddistr (d :: l0)));
        simpl in *; try congruence.
        destruct l1;
        simpl in *; try congruence.
        assumption.
  Qed.

  End SemanticsLazy.

  Section ReduceEmpty.

    Context (h:brand_relation_t).

    Definition mr_reduce_empty (mr: mr): option nnrc :=
      match mr.(mr_reduce) with
      | RedId =>
        Some (NNRCConst (dcoll nil))
      | RedCollect (x, n) =>
        Some (nnrc_subst n x (NNRCConst (dcoll nil)))
      | RedOp op =>
        lift (fun d => NNRCConst d) (reduce_op_eval h op nil)
      | RedSingleton =>
        match mr.(mr_map) with
        | MapScalar (x, n) =>
          Some (nnrc_subst n x (NNRCConst (dcoll nil)))
        | _ => None
        end
      end.

    Definition get_mr_chain_result (res: nrcmr_env * option ddata) : option data :=
      match res with
      | (_, Some ld) => Some (unlocalize_data ld)
      | (_, None) => None
      end.

    Lemma mr_reduce_empty_correct:
      forall (l:list mr) (mr:mr) (env:nrcmr_env),
      forall res,
        get_mr_chain_result (mr_chain_eval h env (l ++ mr::nil)) = Some (normalize_data h res) ->
        (exists pre_res, snd (mr_chain_lazy_eval h env l) = Some pre_res) ->
        snd (mr_chain_lazy_eval h env (l ++ mr::nil)) = None ->
        olift (fun n => nnrc_core_eval h nil n) (mr_reduce_empty mr) = Some (normalize_data h res).
Proof.
      induction l.
      - Case "l = nil"%string.
        intros mr env res.
        simpl.
        intros Heval_mr_get_res Hlazy_nil Hlazy_mr_snd.
        unfold mr_reduce_empty.
        destruct mr; simpl in *.
        unfold mr_chain_eval in Heval_mr_get_res; simpl in *.
        unfold mr_chain_lazy_eval in Hlazy_mr_snd; simpl in *.
        destruct (lookup equiv_dec env mr_input0);
          simpl in *; try congruence.
        unfold mr_eval in Heval_mr_get_res; simpl in *.
        unfold mr_lazy_eval in Hlazy_mr_snd; simpl in *.
        destruct d; simpl in *.
        + SCase "scalar"%string.
          destruct (mr_map_eval h mr_map0 (Dlocal d));
            simpl in *; try congruence.
          destruct l; simpl in *.
          * SSCase "coll = nil"%string.
            destruct mr_reduce0.
            SSSCase "RedId"%string. {
              simpl in *.
              destruct (match
                              @lookup var ddata
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some l =>
                                match l return (option nrcmr_env) with
                                | Dlocal _ => @None nrcmr_env
                                | Ddistr coll' =>
                                    @Some (list (prod var ddata))
                                      (@cons (prod var ddata)
                                         (@pair var ddata mr_output0
                                            (Ddistr coll')) env)
                                end
                            | None =>
                                @Some (list (prod var ddata))
                                  (@cons (prod var ddata)
                                     (@pair var ddata mr_output0
                                        (Ddistr
                                           (@nil
                                              (@data
                                                 (@foreign_runtime_data
                                                 fruntime))))) env)
                            end);
                simpl in *; try congruence.
            }
            SSSCase "RedCollect"%string. {
              simpl in *.
              destruct p; simpl in *.
              assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                      nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                [ | rewrite Heq in *; clear Heq ]. {
                rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                try reflexivity.
                simpl.
                unfold disjoint.
                simpl; intros;
                congruence.
              }
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                (@ddata
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some _ => @None nrcmr_env
                            | None =>
                                @Some
                                  (list
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))))
                                  (@cons
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))
                                        mr_output0
                                        (@Dlocal
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                            end);
                simpl in *; try congruence.
            }
            SSSCase "RedOp"%string. {
              simpl in *.
              destruct (reduce_op_eval h r nil);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                (@ddata
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some _ => @None nrcmr_env
                            | None =>
                                @Some
                                  (list
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))))
                                  (@cons
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))
                                        mr_output0
                                        (@Dlocal
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                            end);
                simpl in *; try congruence.
              inversion Heval_mr_get_res.
              rewrite normalize_idem.
              reflexivity.
            }
            SSSCase "RedCollect"%string. {
              simpl in *.
              congruence.
            }
          * SSCase "coll <> nil"%string. {
              destruct (mr_reduce_eval h mr_reduce0 (d0 :: l));
              simpl in *; try congruence.
              destruct (merge_env mr_output0 d1 env);
                simpl in *; try congruence.
            }
        + SCase "distributed"%string.
          destruct l; simpl in *.
          * SSCase "coll = nil"%string.
            destruct mr_map0.
            SSSCase "MapDist"%string. {
              simpl in *.
              destruct mr_reduce0.
              SSSSCase "RedId"%string. {
                simpl in *.
                destruct (lookup equiv_dec env mr_output0);
                  simpl in *; try congruence.
                destruct (match d return (option nrcmr_env) with
                            | Dlocal _ => @None nrcmr_env
                            | Ddistr coll' =>
                                @Some (list (prod var ddata))
                                  (@cons (prod var ddata)
                                     (@pair var ddata mr_output0
                                        (Ddistr coll')) env)
                            end);
                  simpl in *; try congruence.
              }
              SSSSCase "RedCollect"%string. {
                simpl in *.
                destruct p0; simpl in *.
                assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                        nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                  [ | rewrite Heq in *; clear Heq ]. {
                  rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                  try reflexivity.
                  simpl.
                  unfold disjoint.
                  simpl; intros;
                  congruence.
                }
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                (@ddata
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some _ => @None nrcmr_env
                            | None =>
                                @Some
                                  (list
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))))
                                  (@cons
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))
                                        mr_output0
                                        (@Dlocal
                                           (@foreign_runtime_data fruntime)
                                           d)) env)
                            end);
                simpl in *; try congruence.
              }
              SSSSCase "RedOp"%string. {
                simpl in *.
                destruct (reduce_op_eval h r nil);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                  (@ddata
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              with
                              | Some _ => @None nrcmr_env
                              | None =>
                                  @Some
                                    (list
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))))
                                    (@cons
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))
                                          mr_output0
                                          (@Dlocal
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                              end);
                  simpl in *; try congruence.
                inversion Heval_mr_get_res.
                rewrite normalize_idem.
                reflexivity.
              }
              SSSSCase "RedSingleton"%string. {
                simpl in *.
                congruence.
              }
            }
            SSSCase "MapDistFlatten"%string. {
              simpl in *.
              destruct mr_reduce0.
              SSSSCase "RedId"%string. {
                simpl in *.
                destruct (lookup equiv_dec env mr_output0);
                  simpl in *; try congruence.
                destruct (match d return (option nrcmr_env) with
                                | Dlocal _ => @None nrcmr_env
                                | Ddistr coll' =>
                                    @Some (list (prod var ddata))
                                      (@cons (prod var ddata)
                                         (@pair var ddata mr_output0
                                            (Ddistr coll')) env)
                                end);
                  simpl in *; try congruence.
              }
              SSSSCase "RedCollect"%string. {
                simpl in *.
                destruct p0; simpl in *.
                assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                        nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                  [ | rewrite Heq in *; clear Heq ]. {
                  rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                  try reflexivity.
                  simpl.
                  unfold disjoint.
                  simpl; intros;
                  congruence.
                }
                destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                  (@ddata
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              with
                              | Some _ => @None nrcmr_env
                              | None =>
                                  @Some
                                    (list
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))))
                                    (@cons
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))
                                          mr_output0
                                          (@Dlocal
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                              end);
                  simpl in *; try congruence.
              }
              SSSSCase "RedOp"%string. {
                simpl in *.
                destruct (reduce_op_eval h r nil);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                  (@ddata
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              with
                              | Some _ => @None nrcmr_env
                              | None =>
                                  @Some
                                    (list
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))))
                                    (@cons
                                       (prod var
                                          (@ddata
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                          (@ddata
                                             (@foreign_runtime_data fruntime))
                                          mr_output0
                                          (@Dlocal
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                              end);
                  simpl in *; try congruence.
                inversion Heval_mr_get_res.
                rewrite normalize_idem.
                reflexivity.
              }
              SSSSCase "RedSingleton"%string. {
                simpl in *.
                congruence.
              }
            }
            SSSCase "MapScalar"%string.
            simpl in *.
            congruence.
          * SSCase "coll = nil"%string.
            destruct (mr_map_eval h mr_map0 (Ddistr (d :: l)));
              simpl in *; try congruence.
            destruct l0;
            [ | destruct (mr_reduce_eval h mr_reduce0 (d0 :: l0));
                  simpl in *; try congruence;
                  destruct (merge_env mr_output0 d1 env);
                  simpl in *; try congruence ].
            destruct mr_reduce0.
            SSSSCase "RedId"%string. {
              simpl in *.
              destruct (lookup equiv_dec env mr_output0);
                simpl in *; try congruence.
              destruct (match d0 return (option nrcmr_env) with
                              | Dlocal _ => @None nrcmr_env
                              | Ddistr coll' =>
                                  @Some (list (prod var ddata))
                                    (@cons (prod var ddata)
                                       (@pair var ddata mr_output0
                                          (Ddistr coll')) env)
                              end);
                simpl in *; try congruence.
            }
            SSSSCase "RedCollect"%string. {
              simpl in *.
              destruct p; simpl in *.
              assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                      nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                [ | rewrite Heq in *; clear Heq ]. {
                rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                try reflexivity.
                simpl.
                unfold disjoint.
                simpl; intros;
                congruence.
              }
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                (@ddata
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some _ => @None nrcmr_env
                            | None =>
                                @Some
                                  (list
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))))
                                  (@cons
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))
                                        mr_output0
                                        (@Dlocal
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                            end);
                simpl in *; try congruence.

            }
            SSSSCase "RedOp"%string. {
              simpl in *.
              destruct (reduce_op_eval h r nil);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                (@ddata
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            with
                            | Some _ => @None nrcmr_env
                            | None =>
                                @Some
                                  (list
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))))
                                  (@cons
                                     (prod var
                                        (@ddata
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                        (@ddata
                                           (@foreign_runtime_data fruntime))
                                        mr_output0
                                        (@Dlocal
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                            end);
                simpl in *; try congruence.
              inversion Heval_mr_get_res.
              rewrite normalize_idem.
              reflexivity.
            }
            SSSSCase "RedSingleton"%string. {
              simpl in *.
              congruence.
            }
      - Case "l <> nil"%string.
        intros mr env res.
        intros Heval_a_l_mr_get_result Hlazy_a_l_snd_exists Hlazy_a_l_mr_snd.
        inversion Hlazy_a_l_snd_exists as [pre_res Hlazy_a_l_snd].
        specialize (IHl mr).
        assert (a :: l = (a :: nil) ++ l) as Heq;
          [ reflexivity | rewrite Heq in *; clear Heq].
        assert (((a :: nil) ++ l) ++ mr :: nil = (a :: nil) ++ (l ++ mr :: nil)) as Heq;
          [ reflexivity | rewrite Heq in *; clear Heq].
        case_eq (mr_chain_eval h env ((a :: nil) ++ l ++ mr :: nil)).
        intros env_eval_a_l_mr o Heval_a_l_mr.
        destruct o;
        [ | rewrite Heval_a_l_mr in Heval_a_l_mr_get_result; simpl in *; congruence ].
        rename d into loc_res.
        assert (unlocalize_data loc_res = normalize_data h res) as Hres_loc_res;
          [ rewrite Heval_a_l_mr in Heval_a_l_mr_get_result; simpl in *; congruence
          | rewrite <- Hres_loc_res in * ].
        clear Heval_a_l_mr_get_result.
        case_eq (mr_chain_lazy_eval h env ((a :: nil) ++ l)).
        intros env_lazy_a_l o Hlazy_a_l.
        destruct o;
        [ | rewrite Hlazy_a_l in Hlazy_a_l_snd; simpl in *; congruence ].
        assert (d = pre_res) as Heq;
          [ rewrite Hlazy_a_l in Hlazy_a_l_snd; simpl in *; congruence
          | rewrite Heq in *; clear Heq d ].
        clear Hlazy_a_l_snd.
        case_eq (mr_chain_lazy_eval h env ((a :: nil) ++ l ++ mr :: nil)).
        intros env_lazy_a_l_mr o Hlazy_a_l_mr.
        destruct o;
        [ rewrite Hlazy_a_l_mr in Hlazy_a_l_mr_snd; simpl in *; congruence | ].
        clear Hlazy_a_l_mr_snd.
        generalize (mr_chain_lazy_eval_progress h env (a::nil) l _ _ Hlazy_a_l).
        intros Hlazy_a.
        inversion Hlazy_a as [env_a Htmp]; clear Hlazy_a.
        inversion Htmp as [loc_d_a Hlazy_a]; clear Htmp.
        specialize (IHl env_a res).
        generalize (mr_chain_lazy_eval_correct _ _ _ _ _ Hlazy_a).
        intros Heval_a.
        assert (get_mr_chain_result (mr_chain_eval h env_a (l ++ mr :: nil)) = Some (normalize_data h res)) as Heval_l_mr_snd. {
          destruct l.
          + assert (mr_chain_eval h env ((a :: nil) ++ nil ++ mr :: nil) =
                    mr_chain_eval h env ((a :: nil) ++ mr :: nil)) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_eval_split h env _ _ _ _ _ Heval_a) in Heval_a_l_mr.
            simpl in *.
            rewrite Heval_a_l_mr.
            simpl.
            rewrite Hres_loc_res.
            reflexivity.
          + assert (((a :: nil) ++ (m :: l) ++ mr :: nil) =
                    ((a :: nil) ++ (m :: l ++ mr :: nil))) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_eval_split h env _ _ _ _ _ Heval_a) in Heval_a_l_mr.
            simpl in *.
            rewrite Heval_a_l_mr.
            rewrite <- Hres_loc_res.
            reflexivity.
        }
        specialize (IHl Heval_l_mr_snd).
        assert (exists pre_res, snd (mr_chain_lazy_eval h env_a l) = Some pre_res) as Hlazy_l_snd. {
          destruct l.
          + simpl.
            exists (Ddistr nil).
            reflexivity.
          + exists pre_res.
            assert ((a :: nil) ++ (m :: l) ++ mr :: nil =
                    (a :: nil) ++ m :: (l ++ mr :: nil)) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l.
            simpl in *.
            rewrite Hlazy_a_l.
            reflexivity.
        }
        specialize (IHl Hlazy_l_snd).
        assert (snd (mr_chain_lazy_eval h env_a (l ++ mr :: nil)) = None) as Hlazy_l_mr_snd. {
          destruct l.
          + assert ((a :: nil) ++ nil ++ mr :: nil =
                    (a :: nil) ++ mr :: nil) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l_mr.
            simpl in *.
            rewrite Hlazy_a_l_mr.
            reflexivity.
          + assert (((a :: nil) ++ (m :: l) ++ mr :: nil) =
                    ((a :: nil) ++ (m :: l ++ mr :: nil))) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l_mr.
            simpl in *.
            rewrite Hlazy_a_l_mr.
            reflexivity.
        }
        specialize (IHl Hlazy_l_mr_snd).
        simpl in *.
        rewrite IHl.
        rewrite Hres_loc_res.
        reflexivity.
    Qed.



  End ReduceEmpty.

NNRCMR Library


The following are built-in Map/Reduces which can be useful for translation, optimization, etc.
  
  Section mr_library.


The function (fun d => d)
    Definition id_function :=
      let d := "x"%string in
      (d, NNRCVar d).

    Lemma id_function_no_free_vars: function_with_no_free_vars id_function.
Proof.
      intros x.
      unfold id_function.
      simpl.
      intros H; destruct H.
      - congruence.
      - contradiction.
    Qed.
    
The function (fun d => coll d)
    Definition coll_function :=
      let d := "x"%string in
      (d, NNRCUnop AColl (NNRCVar d)).

    Lemma coll_function_no_free_vars: function_with_no_free_vars coll_function.
Proof.
      intros x.
      unfold coll_function.
      simpl.
      intros H; destruct H.
      - congruence.
      - contradiction.
    Qed.

The function (fun v => d)
    Definition constant_function (d:data) :=
      let v := "x"%string in
      (v, NNRCConst d).

    Lemma constant_function_no_free_vars (d:data): function_with_no_free_vars (constant_function d).
Proof.
      intros x.
      unfold constant_function.
      contradiction.
    Qed.

Map-Reduce job that transform a scalar collection into a distribued one
    Definition mr_scalar_to_distributed (input: var) (output: var) :=
      mkMR
        input
        output
        (MapScalar id_function)
        RedId.

Map-Reduce job that flatten its input into its output
    Definition mr_flatten_scalar (input: var) (output: var) :=
      mkMR
        input
        output
        (MapDistFlatten id_function)
        RedId.

    Lemma mr_flatten_scalar_wf (init: var) (output: var):
      mr_well_formed (mr_flatten_scalar init output).
Proof.
      unfold mr_flatten_scalar.
      unfold mr_well_formed.
      simpl.
      repeat split; try congruence.
      apply id_function_no_free_vars.
    Qed.

    Lemma mr_scalar_to_distributed_wf (init: var) (output: var):
      mr_well_formed (mr_scalar_to_distributed init output).
Proof.
      unfold mr_scalar_to_distributed.
      unfold mr_well_formed.
      simpl.
      repeat split; try congruence.
      apply id_function_no_free_vars.
    Qed.

  End mr_library.

End NNRCMR.