NNRCMR is a language to describe chains of Map/Reduce, followed by some local computation over the result.


Section NNRCMR.
  Require Import String.
  Require Import List.
  Require Import EquivDec.
  Require Import Utils.
  Require Import BasicRuntime.
  Require Import cNNRCRuntime.
  Require Import ForeignReduceOps.

  Local Open Scope list_scope.

  Context {fruntime:foreign_runtime}.
  Context {fredop:foreign_reduce_op}.

  Definition var := string.
  Definition nrcmr_env := list (var * ddata).

Abstract Syntax

Named Nested Relational Calculus + Map Reduce

Different kind of map functions:

  Inductive map_fun :=
  | MapDist : var * nnrc -> map_fun
  | MapDistFlatten : var * nnrc -> map_fun
  | MapScalar : var * nnrc -> map_fun.

Different kind of reduce functions:
  Inductive reduce_op :=
  | RedOpForeign : foreign_reduce_op_type -> reduce_op.
  Inductive reduce_fun :=
  | RedId : reduce_fun
  | RedCollect : (var * nnrc) -> reduce_fun
  | RedOp : reduce_op -> reduce_fun
  | RedSingleton : reduce_fun.

A map-reduce job mr is applied to a collection fetched in the environment at the location mr.(mr_input) and the result store in the environment at the location mr.(mr_output). A function mr.(mr_map) is applied on the input to produce a collection of type coll B. Then, the functions mr.(mr_reduce) is applied on all the elements generated by the map.
  Record mr := mkMR
    { mr_input: var; (* Collection on which the map/reduce is applied. *)
      mr_output: var; (* Collection where the result of the map/reduce is stored. *)
      mr_map: map_fun; (* function applied on the input which generates collection *)
      mr_reduce: reduce_fun }. (* function applied on the collection generated by the map *)

A NNRC expression is a chain of map-reduce, with a last expression describing final/local computation
  Record nnrcmr := mkMRChain
    { mr_inputs_loc: vdbindings; (* Input variables with their localizations *)
      mr_chain: list mr; (* Map/Reduce chain *)
      mr_last: (list var * nnrc) * list (var * dlocalization); }. (* Last expression *)


This contains support functions in order to emit code which follows lexical constraints imposed by the Spark.

  Section sanitize_local.
    Context (sep:string).
    Context (renamer:string->string).
    Context (avoid:list var).

    Definition nnrc_rename_in1
               (oldvar:var) (e:nnrc) : (var*nnrc)
      := let newvar := nnrc_pick_name sep renamer avoid oldvar e in
         (newvar, nnrc_rename_lazy e oldvar newvar).

    Definition nnrc_rename_in2
               (oldvar1 oldvar2:var) (e:nnrc) : (var*var*nnrc)
      := let newvar1 := nnrc_pick_name sep renamer (oldvar2::avoid) oldvar1 e in
         let newvar2 := nnrc_pick_name sep renamer (newvar1::avoid) oldvar2 e in
         let e'1 := nnrc_rename_lazy e oldvar1 newvar1 in
         let e'2 := nnrc_rename_lazy e'1 oldvar2 newvar2 in
         (newvar1, newvar2, e'2).

    Definition rename_map_fun (m:map_fun) : map_fun
      := match m with
         | MapDist (v, e) => MapDist (nnrc_rename_in1 v e)
         | MapDistFlatten (v,e) => MapDistFlatten (nnrc_rename_in1 v e)
         | MapScalar (v,e) => MapScalar (nnrc_rename_in1 v e)

    Definition rename_reduce_fun (r:reduce_fun) : reduce_fun
      := match r with
         | RedId => RedId
         | RedCollect (v,e) => RedCollect (nnrc_rename_in1 v e)
         | RedOp op => RedOp op
         | RedSingleton => RedSingleton

    Definition mr_rename_local (mr0:mr) : mr
      := mkMR mr0.(mr_input) mr0.(mr_output)
                                   (rename_map_fun mr0.(mr_map))
                                   (rename_reduce_fun mr0.(mr_reduce)).

    Definition nnrcmr_rename_local (mrl:nnrcmr)
      := mkMRChain
           (map mr_rename_local mrl.(mr_chain))
  End sanitize_local.

  Section sanitize_dbs.
    Definition nnrcmr_inoutlist (mrl:nnrcmr)
      := insertion_sort StringOrder.lt_dec
                           (fun x => x.(mr_input)::x.(mr_output)::nil)
    Definition mr_subst_io substlist (mr0:mr) : mr
      := mkMR (substlist_subst substlist mr0.(mr_input))
              (substlist_subst substlist mr0.(mr_output))
              mr0.(mr_map) mr0.(mr_reduce).

    Definition nnrcmr_subst_io substlist (mrl:nnrcmr) : nnrcmr
      := let chain := (map (mr_subst_io substlist) mrl.(mr_chain)) in
         let last :=
             let (params, n) := fst mrl.(mr_last) in
             let args := snd mrl.(mr_last) in
             let params := (substlist_subst substlist) params in
             let n := nnrc_substlist_subst substlist n in
             let args :=
                   (fun x_loc => (substlist_subst substlist (fst x_loc), snd x_loc))
             ((params, n), args)

    Context (sep:string).
    Context (renamer:string->string).
    Context (avoid:list var).

    Fixpoint nnrcmr_mk_rename_list (leave_alone:list var) (names:list var) (acc:list (var*var))
      := match names with
         | nil => acc
         | x::l => if (in_dec string_dec x leave_alone)
                   then nnrcmr_mk_rename_list leave_alone l ((x,x)::acc)
                     let proposedname := renamer x in
                     let newname :=
                         fresh_var_from sep proposedname
                                        (l++(map snd acc)++avoid) in
                     nnrcmr_mk_rename_list leave_alone l ((x,newname)::acc)

    Definition nnrcmr_rename_graph (mrl:nnrcmr)
      := let leave_alone := fst (mrl.(mr_inputs_loc)) in
         let names := nnrcmr_inoutlist mrl in
         let substlist_rev := nnrcmr_mk_rename_list leave_alone names nil in
         let substlist := rev substlist_rev in
         nnrcmr_subst_io substlist mrl.

  End sanitize_dbs.



  Definition mr_causally_consistent (mr1 mr2:mr) : bool
    := mr1.(mr_input) <>b mr2.(mr_output).

  Definition mr_chain_causally_consistent (mr_chain:list mr) : bool
    := forallb_ordpairs_refl mr_causally_consistent mr_chain.

  Definition nnrcmr_causally_consistent (mrl:nnrcmr) : bool
    := mr_chain_causally_consistent mrl.(mr_chain).

  Definition function_with_no_free_vars (f: var * nnrc) :=
    (forall (x: var), In x (nnrc_free_vars (snd f)) -> x = fst f).

  Definition function2_with_no_free_vars (f: (var * var) * nnrc) :=
    (fst (fst f)) <> (snd (fst f)) /\
    (forall x, In x (nnrc_free_vars (snd f)) -> x = (fst (fst f)) \/ x = (snd (fst f))).

  Definition map_well_formed map :=
    match map with
    | MapDist f => function_with_no_free_vars f
    | MapDistFlatten f => function_with_no_free_vars f
    | MapScalar f => function_with_no_free_vars f

  Definition reduce_well_formed red :=
    match red with
    | RedId => True
    | RedCollect f => function_with_no_free_vars f
    | RedOp op => True
    | RedSingleton => True

  Definition mr_well_formed mr :=
    map_well_formed mr.(mr_map) /\ reduce_well_formed mr.(mr_reduce).

  Definition mr_chain_well_formed (l: list mr) :=
    forall mr, In mr l -> mr_well_formed mr.

  Definition nnrcmr_well_formed (mrl: nnrcmr) :=
    mr_chain_well_formed mrl.(mr_chain).

  Definition mr_input_localized mr :=
    match mr.(mr_map) with
    | MapDist _ => (mr.(mr_input), Vdistr)
    | MapDistFlatten _ => (mr.(mr_input), Vdistr)
    | MapScalar _ => (mr.(mr_input), Vlocal)

  Definition mr_output_localized mr :=
    match mr.(mr_reduce) with
    | RedId => (mr.(mr_output), Vdistr)
    | RedCollect _ => (mr.(mr_output), Vlocal)
    | RedOp op => (mr.(mr_output), Vlocal)
    | RedSingleton => (mr.(mr_output), Vlocal)

  Definition map_well_localized map d :=
    match map, d with
    | MapDist _, Ddistr _ => True
    | MapDistFlatten _, Ddistr _ => True
    | MapScalar _, Dlocal _ => True
    | _, _ => False

Evaluation Semantics

  Section Semantics.

  Context (h:brand_relation_t).



  Definition mr_map_eval (map: map_fun) (input_d: ddata) : option (list data) :=
    match map with
    | MapDist f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      match input_d with
      | Ddistr coll =>
        rmap f_map coll
      | Dlocal d => None
    | MapDistFlatten f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      match input_d with
      | Ddistr coll =>
        let nested_coll := rmap f_map coll in
        olift rflatten nested_coll
      | Dlocal d => None
    | MapScalar f =>
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      match input_d with
      | Ddistr coll => None
      | Dlocal d =>
        match f_map d with
        | Some (dcoll coll) => Some coll
        | _ => None



  Definition reduce_op_eval (rop:reduce_op) (dl:list data) : option data
    := match rop with
       | RedOpForeign op
         => foreign_reduce_op_interp h op dl
  Definition mr_reduce_eval (reduce: reduce_fun) (values_v: list data) : option (ddata) :=
    match reduce with
    | RedId => Some (Ddistr values_v)
    | RedCollect f =>
      let (values_arg, body) := f in
      let v := nnrc_core_eval h ((values_arg, dcoll values_v) :: nil) body in
      lift (fun res => Dlocal res) v
    | RedOp op =>
      lift (fun res => Dlocal res) (reduce_op_eval op values_v)
    | RedSingleton =>
      match values_v with
      | d :: nil => Some (Dlocal d)
      | _ => None



  Definition mr_eval (mr:mr) (d: ddata) : option (ddata) :=
    let map_result := mr_map_eval mr.(mr_map) d in
    olift (mr_reduce_eval mr.(mr_reduce)) map_result.

  Lemma mr_eval_ignores_env map_fun red :
    forall (mr_input1 mr_input2 mr_output1 mr_output2:var),
    forall (d:ddata),
      mr_eval (mkMR mr_input1 mr_output1 map_fun red) d =
      mr_eval (mkMR mr_input2 mr_output2 map_fun red) d.

Map/Reduce Chain


  Definition merge_env (x: var) (d: ddata) (env: nrcmr_env) : option nrcmr_env :=
    match d with
    | Ddistr coll =>
      match lookup equiv_dec env x with
      | None => Some ((x, Ddistr coll)::env)
      | Some (Ddistr coll') => Some ((x, Ddistr (coll ++ coll') )::env)
      | Some _ => None
    | Dlocal v =>
      match lookup equiv_dec env x with
      | None => Some ((x, Dlocal v)::env)
      | Some d' => None

  Definition mr_chain_eval (env:nrcmr_env) (l:list mr) : nrcmr_env * option ddata :=
      (fun acc mr =>
         match acc with
         | (env', None) => (env', None)
         | (env', Some _) =>
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_eval mr) oinput_d in
           match ores, olift (fun res => merge_env mr.(mr_output) res env') ores with
           | Some res, Some env'' => (env'', Some res)
           | _, _ => (env', None)
      l (env, Some (Ddistr nil)).

  Fixpoint build_nnrc_env' (mr_env:nrcmr_env) (params: list var) (args: list (var * dlocalization)) :=
    match (params, args) with
    | (nil, nil) => Some nil
    | (x1::params, (x2, loc)::args) =>
      match lookup equiv_dec mr_env x2 with
      | Some loc_d =>
        lift (fun nnrc_env => (x1, unlocalize_data loc_d) :: nnrc_env) (build_nnrc_env' mr_env params args)
      | None => None
    | _=> None

  Definition nnrc_env_build (form:list var) (eff: option (list data)): option bindings :=
    olift (zip form) eff.

  Definition effective_params_from_bindings (args:list (var * dlocalization)) (mr_env:nrcmr_env) : option (list data) :=
      (fun (v : var) =>
         lift unlocalize_data (lookup equiv_dec mr_env v))
      (map fst args).
  Definition build_nnrc_env (mr_env:nrcmr_env) (params: list var) (args: list (var * dlocalization)) :=
    let eff_params := effective_params_from_bindings args mr_env in
    nnrc_env_build params eff_params.
  Definition mr_last_eval (mr_env:nrcmr_env) (mr_last: (list var * nnrc) * list (var * dlocalization)) :=
    let (params, n) := fst mr_last in
    let args := snd mr_last in
    let nnrc_env := build_nnrc_env mr_env params args in
    olift (fun env => nnrc_core_eval h env n) nnrc_env.

  Definition nnrcmr_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_eval env mrl.(mr_chain) with
    | (mr_env, Some _) =>
      mr_last_eval mr_env mrl.(mr_last)
    | (_, None) => None

  Lemma mr_chain_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
    forall env1 loc_d1,
      mr_chain_eval env l1 = (env1, Some loc_d1) ->
      mr_chain_eval env (l1 ++ (r :: l2)) = mr_chain_eval env1 (r :: l2).
    intros env1 loc_d1.
    unfold mr_chain_eval.
    rewrite fold_left_app.
    generalize (fold_left
        (fun (acc : list (var * ddata) * option ddata)
           (mr0 : =>
         let (env', y) := acc in
         match y with
         | Some _ =>
               olift (mr_eval mr0) (lookup equiv_dec env' (mr_input mr0))
             | Some res =>
                     (fun res0 : ddata =>
                      merge_env (mr_output mr0) res0 env')
                     (Some res)
                 | Some env'' => (env'', Some res)
                 | None => (env', None)
             | None => (env', None)
         | None => (env', None)
         end) l1 (env, Some (Ddistr nil))) as res1.
    intros res1 Hres1.
    rewrite Hres1.

  Lemma mr_chain_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
    forall env' loc_d,
      mr_chain_eval env (l1 ++ l2) = (env', Some loc_d) ->
      exists env1 loc_d1,
        mr_chain_eval env l1 = (env1, Some loc_d1).
    intros env' loc_d.
    unfold mr_chain_eval.
    rewrite fold_left_app.
    case (fold_left
        (fun (acc : list (var * ddata) * option ddata)
           (mr : mr) =>
         let (env'0, y) := acc in
         match y with
         | Some _ =>
               olift (mr_eval mr) (lookup equiv_dec env'0 (mr_input mr))
             | Some res =>
                     (fun res0 : ddata =>
                      merge_env (mr_output mr) res0 env'0)
                     (Some res)
                 | Some env'' => (env'', Some res)
                 | None => (env'0, None)
             | None => (env'0, None)
         | None => (env'0, None)
         end) l1 (env, Some (Ddistr nil))).
    intros env1 o1.
    destruct o1; simpl.
    - Case "Some"%string.
      exists env1.
      exists d.
    - Case "None"%string.
      induction l2;
        simpl; try congruence.
      intro Hl2.
      apply (IHl2 Hl2).

  End Semantics.

A 'lazy' semantics

In the alternative 'lazy' semantics, evaluation of a given map/reduce isn't triggered unless there is at least one value in its input collection.

This alternative semantics provides a model which is closer to the run-time behavior of some targets, e.g., for Cloudant.
  Section SemanticsLazy.

  Context (h:brand_relation_t).

  Definition mr_lazy_eval (mr:mr) (d: ddata) : option (ddata) :=
    match d with
    | Ddistr nil => None
    | _ =>
      match mr_map_eval h mr.(mr_map) d with
      | Some nil => None
      | map_result =>
        olift (mr_reduce_eval h mr.(mr_reduce)) map_result

  Definition mr_chain_lazy_eval (env:nrcmr_env) (l:list mr) : nrcmr_env * option ddata :=
      (fun (acc: nrcmr_env * option ddata) mr =>
         match acc with
         | (env', None) => (env', None)
         | (env', Some _) =>
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_lazy_eval mr) oinput_d in
           match ores, olift (fun res => merge_env mr.(mr_output) res env') ores with
           | Some res, Some env'' => (env'', Some res)
           | _, _ => (env', None)
      l (env, Some (Ddistr nil)).

  Definition nnrcmr_lazy_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_lazy_eval env mrl.(mr_chain) with
    | (mr_env, Some _) =>
      mr_last_eval h mr_env mrl.(mr_last)
    | (_, None) => None

  Lemma mr_chain_lazy_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
    forall env1 loc_d1,
      mr_chain_lazy_eval env l1 = (env1, Some loc_d1) ->
      mr_chain_lazy_eval env (l1 ++ (r :: l2)) = mr_chain_lazy_eval env1 (r :: l2).
    intros env1 loc_d1.
    unfold mr_chain_lazy_eval.
    rewrite fold_left_app.
    generalize (fold_left
        (fun (acc : nrcmr_env * option ddata) (mr0 : =>
         let (env', o) := acc in
         match o with
         | Some _ =>
               olift (mr_lazy_eval mr0)
                 (lookup equiv_dec env' (mr_input mr0))
             | Some res =>
                     (fun res0 : ddata =>
                      merge_env (mr_output mr0) res0 env')
                     (Some res)
                 | Some env'' => (env'', Some res)
                 | None => (env', None)
             | None => (env', None)
         | None => (env', None)
         end) l1 (env, Some (Ddistr nil))) as res1.
    intros res1 Hres1.
    rewrite Hres1.

  Lemma mr_chain_lazy_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
    forall env' loc_d,
      mr_chain_lazy_eval env (l1 ++ l2) = (env', Some loc_d) ->
      exists env1 loc_d1,
        mr_chain_lazy_eval env l1 = (env1, Some loc_d1).
    intros env' loc_d.
    unfold mr_chain_lazy_eval.
    rewrite fold_left_app.
    case (fold_left
        (fun (acc : nrcmr_env * option ddata) (mr : mr) =>
         let (env'0, o) := acc in
         match o with
         | Some _ =>
               olift (mr_lazy_eval mr) (lookup equiv_dec env'0 (mr_input mr))
             | Some res =>
                     (fun res0 : ddata =>
                      merge_env (mr_output mr) res0 env'0)
                     (Some res)
                 | Some env'' => (env'', Some res)
                 | None => (env'0, None)
             | None => (env'0, None)
         | None => (env'0, None)
         end) l1 (env, Some (Ddistr nil))).
    intros env1 o1.
    destruct o1; simpl.
    - Case "Some"%string.
      exists env1.
      exists d.
    - Case "None"%string.
      induction l2;
        simpl; try congruence.
      intro Hl2.
      apply (IHl2 Hl2).

  Lemma mr_chain_lazy_eval_correct (env:nrcmr_env) (l:list mr):
    forall env' res,
      mr_chain_lazy_eval env l = (env', Some res) ->
      mr_chain_eval h env l = (env', Some res).
    induction l using @rev_ind.
    - Case "l = nil"%string.
      intros env' res.
      unfold mr_chain_lazy_eval.
      unfold mr_chain_eval.
    - Case "l <> nil"%string.
      intros env' res Hlazy.
      generalize (mr_chain_lazy_eval_progress env l (x::nil) _ _ Hlazy).
      intros Hlazy_l.
      inversion Hlazy_l as [env1 Htmp]; clear Hlazy_l.
      inversion Htmp as [loc_d1 Hlazy_l]; clear Htmp.
      rewrite (mr_chain_eval_split h env l x nil env1 loc_d1 (IHl env1 loc_d1 Hlazy_l)).
      rewrite (mr_chain_lazy_eval_split env l x nil env1 loc_d1 Hlazy_l) in Hlazy.
      unfold mr_chain_lazy_eval in Hlazy; simpl in *.
      unfold mr_chain_eval; simpl in *.
      destruct (lookup equiv_dec env1 (mr_input x));
        simpl in *; try congruence.
      unfold mr_lazy_eval in Hlazy; simpl in *.
      unfold mr_eval; simpl in *.
      destruct d; simpl in *.
      + destruct (mr_map_eval h (mr_map x) (Dlocal d));
        simpl in *; try congruence.
        destruct l0;
        simpl in *; try congruence.
      + destruct l0;
        simpl in *; try congruence.
        destruct (mr_map_eval h (mr_map x) (Ddistr (d :: l0)));
        simpl in *; try congruence.
        destruct l1;
        simpl in *; try congruence.

  End SemanticsLazy.

  Section ReduceEmpty.

    Context (h:brand_relation_t).

    Definition mr_reduce_empty (mr: mr): option nnrc :=
      match mr.(mr_reduce) with
      | RedId =>
        Some (NNRCConst (dcoll nil))
      | RedCollect (x, n) =>
        Some (nnrc_subst n x (NNRCConst (dcoll nil)))
      | RedOp op =>
        lift (fun d => NNRCConst d) (reduce_op_eval h op nil)
      | RedSingleton =>
        match mr.(mr_map) with
        | MapScalar (x, n) =>
          Some (nnrc_subst n x (NNRCConst (dcoll nil)))
        | _ => None

    Definition get_mr_chain_result (res: nrcmr_env * option ddata) : option data :=
      match res with
      | (_, Some ld) => Some (unlocalize_data ld)
      | (_, None) => None

    Lemma mr_reduce_empty_correct:
      forall (l:list mr) (mr:mr) (env:nrcmr_env),
      forall res,
        get_mr_chain_result (mr_chain_eval h env (l ++ mr::nil)) = Some (normalize_data h res) ->
        (exists pre_res, snd (mr_chain_lazy_eval h env l) = Some pre_res) ->
        snd (mr_chain_lazy_eval h env (l ++ mr::nil)) = None ->
        olift (fun n => nnrc_core_eval h nil n) (mr_reduce_empty mr) = Some (normalize_data h res).
      induction l.
      - Case "l = nil"%string.
        intros mr env res.
        intros Heval_mr_get_res Hlazy_nil Hlazy_mr_snd.
        unfold mr_reduce_empty.
        destruct mr; simpl in *.
        unfold mr_chain_eval in Heval_mr_get_res; simpl in *.
        unfold mr_chain_lazy_eval in Hlazy_mr_snd; simpl in *.
        destruct (lookup equiv_dec env mr_input0);
          simpl in *; try congruence.
        unfold mr_eval in Heval_mr_get_res; simpl in *.
        unfold mr_lazy_eval in Hlazy_mr_snd; simpl in *.
        destruct d; simpl in *.
        + SCase "scalar"%string.
          destruct (mr_map_eval h mr_map0 (Dlocal d));
            simpl in *; try congruence.
          destruct l; simpl in *.
          * SSCase "coll = nil"%string.
            destruct mr_reduce0.
            SSSCase "RedId"%string. {
              simpl in *.
              destruct (match
                              @lookup var ddata
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some l =>
                                match l return (option nrcmr_env) with
                                | Dlocal _ => @None nrcmr_env
                                | Ddistr coll' =>
                                    @Some (list (prod var ddata))
                                      (@cons (prod var ddata)
                                         (@pair var ddata mr_output0
                                            (Ddistr coll')) env)
                            | None =>
                                @Some (list (prod var ddata))
                                  (@cons (prod var ddata)
                                     (@pair var ddata mr_output0
                                                 fruntime))))) env)
                simpl in *; try congruence.
            SSSCase "RedCollect"%string. {
              simpl in *.
              destruct p; simpl in *.
              assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                      nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                [ | rewrite Heq in *; clear Heq ]. {
                rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                try reflexivity.
                unfold disjoint.
                simpl; intros;
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some _ => @None nrcmr_env
                            | None =>
                                     (prod var
                                           (@foreign_runtime_data fruntime))))
                                     (prod var
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                           (@foreign_runtime_data fruntime))
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                simpl in *; try congruence.
            SSSCase "RedOp"%string. {
              simpl in *.
              destruct (reduce_op_eval h r nil);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some _ => @None nrcmr_env
                            | None =>
                                     (prod var
                                           (@foreign_runtime_data fruntime))))
                                     (prod var
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                           (@foreign_runtime_data fruntime))
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                simpl in *; try congruence.
              inversion Heval_mr_get_res.
              rewrite normalize_idem.
            SSSCase "RedCollect"%string. {
              simpl in *.
          * SSCase "coll <> nil"%string. {
              destruct (mr_reduce_eval h mr_reduce0 (d0 :: l));
              simpl in *; try congruence.
              destruct (merge_env mr_output0 d1 env);
                simpl in *; try congruence.
        + SCase "distributed"%string.
          destruct l; simpl in *.
          * SSCase "coll = nil"%string.
            destruct mr_map0.
            SSSCase "MapDist"%string. {
              simpl in *.
              destruct mr_reduce0.
              SSSSCase "RedId"%string. {
                simpl in *.
                destruct (lookup equiv_dec env mr_output0);
                  simpl in *; try congruence.
                destruct (match d return (option nrcmr_env) with
                            | Dlocal _ => @None nrcmr_env
                            | Ddistr coll' =>
                                @Some (list (prod var ddata))
                                  (@cons (prod var ddata)
                                     (@pair var ddata mr_output0
                                        (Ddistr coll')) env)
                  simpl in *; try congruence.
              SSSSCase "RedCollect"%string. {
                simpl in *.
                destruct p0; simpl in *.
                assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                        nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                  [ | rewrite Heq in *; clear Heq ]. {
                  rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                  try reflexivity.
                  unfold disjoint.
                  simpl; intros;
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some _ => @None nrcmr_env
                            | None =>
                                     (prod var
                                           (@foreign_runtime_data fruntime))))
                                     (prod var
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                           (@foreign_runtime_data fruntime))
                                           (@foreign_runtime_data fruntime)
                                           d)) env)
                simpl in *; try congruence.
              SSSSCase "RedOp"%string. {
                simpl in *.
                destruct (reduce_op_eval h r nil);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              | Some _ => @None nrcmr_env
                              | None =>
                                       (prod var
                                             (@foreign_runtime_data fruntime))))
                                       (prod var
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                             (@foreign_runtime_data fruntime))
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                  simpl in *; try congruence.
                inversion Heval_mr_get_res.
                rewrite normalize_idem.
              SSSSCase "RedSingleton"%string. {
                simpl in *.
            SSSCase "MapDistFlatten"%string. {
              simpl in *.
              destruct mr_reduce0.
              SSSSCase "RedId"%string. {
                simpl in *.
                destruct (lookup equiv_dec env mr_output0);
                  simpl in *; try congruence.
                destruct (match d return (option nrcmr_env) with
                                | Dlocal _ => @None nrcmr_env
                                | Ddistr coll' =>
                                    @Some (list (prod var ddata))
                                      (@cons (prod var ddata)
                                         (@pair var ddata mr_output0
                                            (Ddistr coll')) env)
                  simpl in *; try congruence.
              SSSSCase "RedCollect"%string. {
                simpl in *.
                destruct p0; simpl in *.
                assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                        nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                  [ | rewrite Heq in *; clear Heq ]. {
                  rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                  try reflexivity.
                  unfold disjoint.
                  simpl; intros;
                destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              | Some _ => @None nrcmr_env
                              | None =>
                                       (prod var
                                             (@foreign_runtime_data fruntime))))
                                       (prod var
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                             (@foreign_runtime_data fruntime))
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                  simpl in *; try congruence.
              SSSSCase "RedOp"%string. {
                simpl in *.
                destruct (reduce_op_eval h r nil);
                  simpl in *; try congruence.
                destruct (match
                                @lookup var
                                     (@foreign_runtime_data fruntime))
                                  (@equiv_dec var (@eq var)
                                     (@eq_equivalence var) string_eqdec) env
                                  mr_output0 return (option nrcmr_env)
                              | Some _ => @None nrcmr_env
                              | None =>
                                       (prod var
                                             (@foreign_runtime_data fruntime))))
                                       (prod var
                                             (@foreign_runtime_data fruntime)))
                                       (@pair var
                                             (@foreign_runtime_data fruntime))
                                             (@foreign_runtime_data fruntime)
                                             d)) env)
                  simpl in *; try congruence.
                inversion Heval_mr_get_res.
                rewrite normalize_idem.
              SSSSCase "RedSingleton"%string. {
                simpl in *.
            SSSCase "MapScalar"%string.
            simpl in *.
          * SSCase "coll = nil"%string.
            destruct (mr_map_eval h mr_map0 (Ddistr (d :: l)));
              simpl in *; try congruence.
            destruct l0;
            [ | destruct (mr_reduce_eval h mr_reduce0 (d0 :: l0));
                  simpl in *; try congruence;
                  destruct (merge_env mr_output0 d1 env);
                  simpl in *; try congruence ].
            destruct mr_reduce0.
            SSSSCase "RedId"%string. {
              simpl in *.
              destruct (lookup equiv_dec env mr_output0);
                simpl in *; try congruence.
              destruct (match d0 return (option nrcmr_env) with
                              | Dlocal _ => @None nrcmr_env
                              | Ddistr coll' =>
                                  @Some (list (prod var ddata))
                                    (@cons (prod var ddata)
                                       (@pair var ddata mr_output0
                                          (Ddistr coll')) env)
                simpl in *; try congruence.
            SSSSCase "RedCollect"%string. {
              simpl in *.
              destruct p; simpl in *.
              assert (nnrc_core_eval h nil (nnrc_subst n v (NNRCConst (dcoll nil))) =
                      nnrc_core_eval h ((v, dcoll nil) :: nil) n) as Heq;
                [ | rewrite Heq in *; clear Heq ]. {
                rewrite (nnrc_core_eval_cons_subst_disjoint _ (NNRCConst (dcoll nil)));
                try reflexivity.
                unfold disjoint.
                simpl; intros;
              destruct (nnrc_core_eval h ((v, dcoll nil) :: nil) n);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some _ => @None nrcmr_env
                            | None =>
                                     (prod var
                                           (@foreign_runtime_data fruntime))))
                                     (prod var
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                           (@foreign_runtime_data fruntime))
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                simpl in *; try congruence.

            SSSSCase "RedOp"%string. {
              simpl in *.
              destruct (reduce_op_eval h r nil);
                simpl in *; try congruence.
              destruct (match
                              @lookup var
                                   (@foreign_runtime_data fruntime))
                                (@equiv_dec var (@eq var)
                                   (@eq_equivalence var) string_eqdec) env
                                mr_output0 return (option nrcmr_env)
                            | Some _ => @None nrcmr_env
                            | None =>
                                     (prod var
                                           (@foreign_runtime_data fruntime))))
                                     (prod var
                                           (@foreign_runtime_data fruntime)))
                                     (@pair var
                                           (@foreign_runtime_data fruntime))
                                           (@foreign_runtime_data fruntime)
                                           d0)) env)
                simpl in *; try congruence.
              inversion Heval_mr_get_res.
              rewrite normalize_idem.
            SSSSCase "RedSingleton"%string. {
              simpl in *.
      - Case "l <> nil"%string.
        intros mr env res.
        intros Heval_a_l_mr_get_result Hlazy_a_l_snd_exists Hlazy_a_l_mr_snd.
        inversion Hlazy_a_l_snd_exists as [pre_res Hlazy_a_l_snd].
        specialize (IHl mr).
        assert (a :: l = (a :: nil) ++ l) as Heq;
          [ reflexivity | rewrite Heq in *; clear Heq].
        assert (((a :: nil) ++ l) ++ mr :: nil = (a :: nil) ++ (l ++ mr :: nil)) as Heq;
          [ reflexivity | rewrite Heq in *; clear Heq].
        case_eq (mr_chain_eval h env ((a :: nil) ++ l ++ mr :: nil)).
        intros env_eval_a_l_mr o Heval_a_l_mr.
        destruct o;
        [ | rewrite Heval_a_l_mr in Heval_a_l_mr_get_result; simpl in *; congruence ].
        rename d into loc_res.
        assert (unlocalize_data loc_res = normalize_data h res) as Hres_loc_res;
          [ rewrite Heval_a_l_mr in Heval_a_l_mr_get_result; simpl in *; congruence
          | rewrite <- Hres_loc_res in * ].
        clear Heval_a_l_mr_get_result.
        case_eq (mr_chain_lazy_eval h env ((a :: nil) ++ l)).
        intros env_lazy_a_l o Hlazy_a_l.
        destruct o;
        [ | rewrite Hlazy_a_l in Hlazy_a_l_snd; simpl in *; congruence ].
        assert (d = pre_res) as Heq;
          [ rewrite Hlazy_a_l in Hlazy_a_l_snd; simpl in *; congruence
          | rewrite Heq in *; clear Heq d ].
        clear Hlazy_a_l_snd.
        case_eq (mr_chain_lazy_eval h env ((a :: nil) ++ l ++ mr :: nil)).
        intros env_lazy_a_l_mr o Hlazy_a_l_mr.
        destruct o;
        [ rewrite Hlazy_a_l_mr in Hlazy_a_l_mr_snd; simpl in *; congruence | ].
        clear Hlazy_a_l_mr_snd.
        generalize (mr_chain_lazy_eval_progress h env (a::nil) l _ _ Hlazy_a_l).
        intros Hlazy_a.
        inversion Hlazy_a as [env_a Htmp]; clear Hlazy_a.
        inversion Htmp as [loc_d_a Hlazy_a]; clear Htmp.
        specialize (IHl env_a res).
        generalize (mr_chain_lazy_eval_correct _ _ _ _ _ Hlazy_a).
        intros Heval_a.
        assert (get_mr_chain_result (mr_chain_eval h env_a (l ++ mr :: nil)) = Some (normalize_data h res)) as Heval_l_mr_snd. {
          destruct l.
          + assert (mr_chain_eval h env ((a :: nil) ++ nil ++ mr :: nil) =
                    mr_chain_eval h env ((a :: nil) ++ mr :: nil)) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_eval_split h env _ _ _ _ _ Heval_a) in Heval_a_l_mr.
            simpl in *.
            rewrite Heval_a_l_mr.
            rewrite Hres_loc_res.
          + assert (((a :: nil) ++ (m :: l) ++ mr :: nil) =
                    ((a :: nil) ++ (m :: l ++ mr :: nil))) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_eval_split h env _ _ _ _ _ Heval_a) in Heval_a_l_mr.
            simpl in *.
            rewrite Heval_a_l_mr.
            rewrite <- Hres_loc_res.
        specialize (IHl Heval_l_mr_snd).
        assert (exists pre_res, snd (mr_chain_lazy_eval h env_a l) = Some pre_res) as Hlazy_l_snd. {
          destruct l.
          + simpl.
            exists (Ddistr nil).
          + exists pre_res.
            assert ((a :: nil) ++ (m :: l) ++ mr :: nil =
                    (a :: nil) ++ m :: (l ++ mr :: nil)) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l.
            simpl in *.
            rewrite Hlazy_a_l.
        specialize (IHl Hlazy_l_snd).
        assert (snd (mr_chain_lazy_eval h env_a (l ++ mr :: nil)) = None) as Hlazy_l_mr_snd. {
          destruct l.
          + assert ((a :: nil) ++ nil ++ mr :: nil =
                    (a :: nil) ++ mr :: nil) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l_mr.
            simpl in *.
            rewrite Hlazy_a_l_mr.
          + assert (((a :: nil) ++ (m :: l) ++ mr :: nil) =
                    ((a :: nil) ++ (m :: l ++ mr :: nil))) as Heq;
            [ reflexivity | rewrite Heq in *; clear Heq ].
            rewrite (mr_chain_lazy_eval_split h env _ _ _ _ _ Hlazy_a) in Hlazy_a_l_mr.
            simpl in *.
            rewrite Hlazy_a_l_mr.
        specialize (IHl Hlazy_l_mr_snd).
        simpl in *.
        rewrite IHl.
        rewrite Hres_loc_res.

  End ReduceEmpty.

NNRCMR Library

The following are built-in Map/Reduces which can be useful for translation, optimization, etc.
  Section mr_library.

The function (fun d => d)
    Definition id_function :=
      let d := "x"%string in
      (d, NNRCVar d).

    Lemma id_function_no_free_vars: function_with_no_free_vars id_function.
      intros x.
      unfold id_function.
      intros H; destruct H.
      - congruence.
      - contradiction.
The function (fun d => coll d)
    Definition coll_function :=
      let d := "x"%string in
      (d, NNRCUnop AColl (NNRCVar d)).

    Lemma coll_function_no_free_vars: function_with_no_free_vars coll_function.
      intros x.
      unfold coll_function.
      intros H; destruct H.
      - congruence.
      - contradiction.

The function (fun v => d)
    Definition constant_function (d:data) :=
      let v := "x"%string in
      (v, NNRCConst d).

    Lemma constant_function_no_free_vars (d:data): function_with_no_free_vars (constant_function d).
      intros x.
      unfold constant_function.

Map-Reduce job that transform a scalar collection into a distribued one
    Definition mr_scalar_to_distributed (input: var) (output: var) :=
        (MapScalar id_function)

Map-Reduce job that flatten its input into its output
    Definition mr_flatten_scalar (input: var) (output: var) :=
        (MapDistFlatten id_function)

    Lemma mr_flatten_scalar_wf (init: var) (output: var):
      mr_well_formed (mr_flatten_scalar init output).
      unfold mr_flatten_scalar.
      unfold mr_well_formed.
      repeat split; try congruence.
      apply id_function_no_free_vars.

    Lemma mr_scalar_to_distributed_wf (init: var) (output: var):
      mr_well_formed (mr_scalar_to_distributed init output).
      unfold mr_scalar_to_distributed.
      unfold mr_well_formed.
      repeat split; try congruence.
      apply id_function_no_free_vars.

  End mr_library.