Qcert.NNRCMR.Lang.NNRCMR


Section NNRCMR.




  Context {fruntime:foreign_runtime}.
  Context {fredop:foreign_reduce_op}.

  Definition var := string.
  Definition nrcmr_env := list (var × ddata).

Named Nested Relational Calculus + Map Reduce

  Inductive map_fun :=
  | MapDist : var × nnrc map_fun
  | MapDistFlatten : var × nnrc map_fun
  | MapScalar : var × nnrc map_fun.

  Inductive reduce_op :=
  | RedOpForeign : foreign_reduce_op_type reduce_op.

  Inductive reduce_fun :=
  | RedId : reduce_fun
  | RedCollect : (var × nnrc) reduce_fun
  | RedOp : reduce_op reduce_fun
  | RedSingleton : reduce_fun.

  Record mr := mkMR
    { mr_input: var;
      mr_output: var;
      mr_map: map_fun;
      mr_reduce: reduce_fun }.

  Record nnrcmr := mkMRChain
    { mr_inputs_loc: vdbindings;
      mr_chain: list mr;
      mr_last: (list var × nnrc) × list (var × dlocalization); }.

  Section sanitize_local.
    Context (sep:string).
    Context (renamer:stringstring).
    Context (avoid:list var).

    Definition nnrc_rename_in1
               (oldvar:var) (e:nnrc) : (var×nnrc)
      := let newvar := nnrc_pick_name sep renamer avoid oldvar e in
         (newvar, nnrc_rename_lazy e oldvar newvar).

    Definition nnrc_rename_in2
               (oldvar1 oldvar2:var) (e:nnrc) : (var×var×nnrc)
      := let newvar1 := nnrc_pick_name sep renamer (oldvar2::avoid) oldvar1 e in
         let newvar2 := nnrc_pick_name sep renamer (newvar1::avoid) oldvar2 e in
         let e'1 := nnrc_rename_lazy e oldvar1 newvar1 in
         let e'2 := nnrc_rename_lazy e'1 oldvar2 newvar2 in
         (newvar1, newvar2, e'2).

    Definition rename_map_fun (m:map_fun) : map_fun
      := match m with
         | MapDist (v, e)MapDist (nnrc_rename_in1 v e)
         | MapDistFlatten (v,e)MapDistFlatten (nnrc_rename_in1 v e)
         | MapScalar (v,e)MapScalar (nnrc_rename_in1 v e)
         end.

    Definition rename_reduce_fun (r:reduce_fun) : reduce_fun
      := match r with
         | RedIdRedId
         | RedCollect (v,e)RedCollect (nnrc_rename_in1 v e)
         | RedOp opRedOp op
         | RedSingletonRedSingleton
         end.

    Definition mr_rename_local (mr0:mr) : mr
      := mkMR mr0.(mr_input) mr0.(mr_output)
                                   (rename_map_fun mr0.(mr_map))
                                   (rename_reduce_fun mr0.(mr_reduce)).

    Definition nnrcmr_rename_local (mrl:nnrcmr)
      := mkMRChain
           mrl.(mr_inputs_loc)
           (map mr_rename_local mrl.(mr_chain))
           mrl.(mr_last).

  End sanitize_local.

  Section sanitize_dbs.
    Definition nnrcmr_inoutlist (mrl:nnrcmr)
      := insertion_sort StringOrder.lt_dec
                        (flat_map
                           (fun xx.(mr_input)::x.(mr_output)::nil)
                           mrl.(mr_chain)).

    Definition mr_subst_io substlist (mr0:mr) : mr
      := mkMR (substlist_subst substlist mr0.(mr_input))
              (substlist_subst substlist mr0.(mr_output))
              mr0.(mr_map) mr0.(mr_reduce).

    Definition nnrcmr_subst_io substlist (mrl:nnrcmr) : nnrcmr
      := let chain := (map (mr_subst_io substlist) mrl.(mr_chain)) in
         let last :=
             let (params, n) := fst mrl.(mr_last) in
             let args := snd mrl.(mr_last) in
             let params := List.map (substlist_subst substlist) params in
             let n := nnrc_substlist_subst substlist n in
             let args :=
                 List.map
                   (fun x_loc(substlist_subst substlist (fst x_loc), snd x_loc))
                   args
             in
             ((params, n), args)
         in
         mkMRChain
           mrl.(mr_inputs_loc)
           chain
           last.

    Context (sep:string).
    Context (renamer:stringstring).
    Context (avoid:list var).

    Fixpoint nnrcmr_mk_rename_list (leave_alone:list var) (names:list var) (acc:list (var×var))
      := match names with
         | nilacc
         | x::lif (in_dec string_dec x leave_alone)
                   then nnrcmr_mk_rename_list leave_alone l ((x,x)::acc)
                   else
                     let proposedname := renamer x in
                     let newname :=
                         fresh_var_from sep proposedname
                                        (l++(map snd acc)++avoid) in
                     nnrcmr_mk_rename_list leave_alone l ((x,newname)::acc)
         end.

    Definition nnrcmr_rename_graph (mrl:nnrcmr)
      := let leave_alone := List.map fst (mrl.(mr_inputs_loc)) in
         let names := nnrcmr_inoutlist mrl in
         let substlist_rev := nnrcmr_mk_rename_list leave_alone names nil in
         let substlist := rev substlist_rev in
         nnrcmr_subst_io substlist mrl.

  End sanitize_dbs.


  Definition mr_causally_consistent (mr1 mr2:mr) : bool
    := mr1.(mr_input) b mr2.(mr_output).

  Definition mr_chain_causally_consistent (mr_chain:list mr) : bool
    := forallb_ordpairs_refl mr_causally_consistent mr_chain.

  Definition nnrcmr_causally_consistent (mrl:nnrcmr) : bool
    := mr_chain_causally_consistent mrl.(mr_chain).

  Definition function_with_no_free_vars (f: var × nnrc) :=
    ( (x: var), In x (nnrc_free_vars (snd f)) x = fst f).

  Definition function2_with_no_free_vars (f: (var × var) × nnrc) :=
    (fst (fst f)) (snd (fst f))
    ( x, In x (nnrc_free_vars (snd f)) x = (fst (fst f)) x = (snd (fst f))).

  Definition map_well_formed map :=
    match map with
    | MapDist ffunction_with_no_free_vars f
    | MapDistFlatten ffunction_with_no_free_vars f
    | MapScalar ffunction_with_no_free_vars f
    end.

  Definition reduce_well_formed red :=
    match red with
    | RedIdTrue
    | RedCollect ffunction_with_no_free_vars f
    | RedOp opTrue
    | RedSingletonTrue
    end.

  Definition mr_well_formed mr :=
    map_well_formed mr.(mr_map) reduce_well_formed mr.(mr_reduce).

  Definition mr_chain_well_formed (l: list mr) :=
     mr, In mr l mr_well_formed mr.

  Definition nnrcmr_well_formed (mrl: nnrcmr) :=
    mr_chain_well_formed mrl.(mr_chain).


  Definition mr_input_localized mr :=
    match mr.(mr_map) with
    | MapDist _(mr.(mr_input), Vdistr)
    | MapDistFlatten _(mr.(mr_input), Vdistr)
    | MapScalar _(mr.(mr_input), Vlocal)
    end.

  Definition mr_output_localized mr :=
    match mr.(mr_reduce) with
    | RedId(mr.(mr_output), Vdistr)
    | RedCollect _(mr.(mr_output), Vlocal)
    | RedOp op(mr.(mr_output), Vlocal)
    | RedSingleton(mr.(mr_output), Vlocal)
    end.

  Definition map_well_localized map d :=
    match map, d with
    | MapDist _, Ddistr _True
    | MapDistFlatten _, Ddistr _True
    | MapScalar _, Dlocal _True
    | _, _False
    end.


  Section Semantics.

  Context (h:brand_relation_t).


  Definition mr_map_eval (map: map_fun) (input_d: ddata) : option (list data) :=
    match map with
    | MapDist f
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr coll
        rmap f_map coll
      | Dlocal dNone
      end
    | MapDistFlatten f
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr coll
        let nested_coll := rmap f_map coll in
        olift rflatten nested_coll
      | Dlocal dNone
      end
    | MapScalar f
      let f_map (d:data) : option data :=
          let (doc, body) := f in
          nnrc_core_eval h ((doc,d)::nil) body
      in
      match input_d with
      | Ddistr collNone
      | Dlocal d
        match f_map d with
        | Some (dcoll coll) ⇒ Some coll
        | _None
        end
      end
    end.


  Definition reduce_op_eval (rop:reduce_op) (dl:list data) : option data
    := match rop with
       | RedOpForeign op
         ⇒ foreign_reduce_op_interp h op dl
       end.

  Definition mr_reduce_eval (reduce: reduce_fun) (values_v: list data) : option (ddata) :=
    match reduce with
    | RedIdSome (Ddistr values_v)
    | RedCollect f
      let (values_arg, body) := f in
      let v := nnrc_core_eval h ((values_arg, dcoll values_v) :: nil) body in
      lift (fun resDlocal res) v
    | RedOp op
      lift (fun resDlocal res) (reduce_op_eval op values_v)
    | RedSingleton
      match values_v with
      | d :: nilSome (Dlocal d)
      | _None
      end
    end.


  Definition mr_eval (mr:mr) (d: ddata) : option (ddata) :=
    let map_result := mr_map_eval mr.(mr_map) d in
    olift (mr_reduce_eval mr.(mr_reduce)) map_result.

  Lemma mr_eval_ignores_env map_fun red :
     (mr_input1 mr_input2 mr_output1 mr_output2:var),
     (d:ddata),
      mr_eval (mkMR mr_input1 mr_output1 map_fun red) d =
      mr_eval (mkMR mr_input2 mr_output2 map_fun red) d.


  Definition merge_env (x: var) (d: ddata) (env: nrcmr_env) : option nrcmr_env :=
    match d with
    | Ddistr coll
      match lookup equiv_dec env x with
      | NoneSome ((x, Ddistr coll)::env)
      | Some (Ddistr coll') ⇒ Some ((x, Ddistr (coll ++ coll') )::env)
      | Some _None
      end
    | Dlocal v
      match lookup equiv_dec env x with
      | NoneSome ((x, Dlocal v)::env)
      | Some d'None
      end
    end.

  Definition mr_chain_eval (env:nrcmr_env) (l:list mr) : nrcmr_env × option ddata :=
    List.fold_left
      (fun acc mr
         match acc with
         | (env', None)(env', None)
         | (env', Some _)
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_eval mr) oinput_d in
           match ores, olift (fun resmerge_env mr.(mr_output) res env') ores with
           | Some res, Some env''(env'', Some res)
           | _, _(env', None)
           end
         end)
      l (env, Some (Ddistr nil)).

  Fixpoint build_nnrc_env' (mr_env:nrcmr_env) (params: list var) (args: list (var × dlocalization)) :=
    match (params, args) with
    | (nil, nil)Some nil
    | (x1::params, (x2, loc)::args)
      match lookup equiv_dec mr_env x2 with
      | Some loc_d
        lift (fun nnrc_env(x1, unlocalize_data loc_d) :: nnrc_env) (build_nnrc_env' mr_env params args)
      | NoneNone
      end
    | _None
    end.

  Definition nnrc_env_build (form:list var) (eff: option (list data)): option bindings :=
    olift (zip form) eff.

  Definition effective_params_from_bindings (args:list (var × dlocalization)) (mr_env:nrcmr_env) : option (list data) :=
    lift_map
      (fun (v : var) ⇒
         lift unlocalize_data (lookup equiv_dec mr_env v))
      (map fst args).

  Definition build_nnrc_env (mr_env:nrcmr_env) (params: list var) (args: list (var × dlocalization)) :=
    let eff_params := effective_params_from_bindings args mr_env in
    nnrc_env_build params eff_params.

  Definition mr_last_eval (mr_env:nrcmr_env) (mr_last: (list var × nnrc) × list (var × dlocalization)) :=
    let (params, n) := fst mr_last in
    let args := snd mr_last in
    let nnrc_env := build_nnrc_env mr_env params args in
    olift (fun envnnrc_core_eval h env n) nnrc_env.

  Definition nnrcmr_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_eval env mrl.(mr_chain) with
    | (mr_env, Some _)
      mr_last_eval mr_env mrl.(mr_last)
    | (_, None)None
    end.

  Lemma mr_chain_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
     env1 loc_d1,
      mr_chain_eval env l1 = (env1, Some loc_d1)
      mr_chain_eval env (l1 ++ (r :: l2)) = mr_chain_eval env1 (r :: l2).

  Lemma mr_chain_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
     env' loc_d,
      mr_chain_eval env (l1 ++ l2) = (env', Some loc_d)
       env1 loc_d1,
        mr_chain_eval env l1 = (env1, Some loc_d1).

  End Semantics.

  Section SemanticsLazy.

  Context (h:brand_relation_t).

  Definition mr_lazy_eval (mr:mr) (d: ddata) : option (ddata) :=
    match d with
    | Ddistr nilNone
    | _
      match mr_map_eval h mr.(mr_map) d with
      | Some nilNone
      | map_result
        olift (mr_reduce_eval h mr.(mr_reduce)) map_result
      end
    end.

  Definition mr_chain_lazy_eval (env:nrcmr_env) (l:list mr) : nrcmr_env × option ddata :=
    List.fold_left
      (fun (acc: nrcmr_env × option ddata) mr
         match acc with
         | (env', None)(env', None)
         | (env', Some _)
           let oinput_d := lookup equiv_dec env' mr.(mr_input) in
           let ores := olift (mr_lazy_eval mr) oinput_d in
           match ores, olift (fun resmerge_env mr.(mr_output) res env') ores with
           | Some res, Some env''(env'', Some res)
           | _, _(env', None)
           end
       end)
      l (env, Some (Ddistr nil)).

  Definition nnrcmr_lazy_eval (env:nrcmr_env) (mrl:nnrcmr) : option data :=
    match mr_chain_lazy_eval env mrl.(mr_chain) with
    | (mr_env, Some _)
      mr_last_eval h mr_env mrl.(mr_last)
    | (_, None)None
    end.

  Lemma mr_chain_lazy_eval_split (env: nrcmr_env) (l1:list mr) (r: mr) (l2: list mr):
     env1 loc_d1,
      mr_chain_lazy_eval env l1 = (env1, Some loc_d1)
      mr_chain_lazy_eval env (l1 ++ (r :: l2)) = mr_chain_lazy_eval env1 (r :: l2).

  Lemma mr_chain_lazy_eval_progress (env:nrcmr_env) (l1:list mr) (l2:list mr):
     env' loc_d,
      mr_chain_lazy_eval env (l1 ++ l2) = (env', Some loc_d)
       env1 loc_d1,
        mr_chain_lazy_eval env l1 = (env1, Some loc_d1).

  Lemma mr_chain_lazy_eval_correct (env:nrcmr_env) (l:list mr):
     env' res,
      mr_chain_lazy_eval env l = (env', Some res)
      mr_chain_eval h env l = (env', Some res).

  End SemanticsLazy.

  Section ReduceEmpty.

    Context (h:brand_relation_t).

    Definition mr_reduce_empty (mr: mr): option nnrc :=
      match mr.(mr_reduce) with
      | RedId
        Some (NNRCConst (dcoll nil))
      | RedCollect (x, n)
        Some (nnrc_subst n x (NNRCConst (dcoll nil)))
      | RedOp op
        lift (fun dNNRCConst d) (reduce_op_eval h op nil)
      | RedSingleton
        match mr.(mr_map) with
        | MapScalar (x, n)
          Some (nnrc_subst n x (NNRCConst (dcoll nil)))
        | _None
        end
      end.

    Definition get_mr_chain_result (res: nrcmr_env × option ddata) : option data :=
      match res with
      | (_, Some ld)Some (unlocalize_data ld)
      | (_, None)None
      end.

    Lemma mr_reduce_empty_correct:
       (l:list mr) (mr:mr) (env:nrcmr_env),
       res,
        get_mr_chain_result (mr_chain_eval h env (l ++ mr::nil)) = Some (normalize_data h res)
        ( pre_res, snd (mr_chain_lazy_eval h env l) = Some pre_res)
        snd (mr_chain_lazy_eval h env (l ++ mr::nil)) = None
        olift (fun nnnrc_core_eval h nil n) (mr_reduce_empty mr) = Some (normalize_data h res).



  End ReduceEmpty.

  Section mr_library.


The function (fun d => d)
    Definition id_function :=
      let d := "x"%string in
      (d, NNRCVar d).

    Lemma id_function_no_free_vars: function_with_no_free_vars id_function.

The function (fun d => coll d)
    Definition coll_function :=
      let d := "x"%string in
      (d, NNRCUnop AColl (NNRCVar d)).

    Lemma coll_function_no_free_vars: function_with_no_free_vars coll_function.

The function (fun v => d)
    Definition constant_function (d:data) :=
      let v := "x"%string in
      (v, NNRCConst d).

    Lemma constant_function_no_free_vars (d:data): function_with_no_free_vars (constant_function d).

Map-Reduce job that transform a scalar collection into a distribued one
    Definition mr_scalar_to_distributed (input: var) (output: var) :=
      mkMR
        input
        output
        (MapScalar id_function)
        RedId.

Map-Reduce job that flatten its input into its output
    Definition mr_flatten_scalar (input: var) (output: var) :=
      mkMR
        input
        output
        (MapDistFlatten id_function)
        RedId.

    Lemma mr_flatten_scalar_wf (init: var) (output: var):
      mr_well_formed (mr_flatten_scalar init output).

    Lemma mr_scalar_to_distributed_wf (init: var) (output: var):
      mr_well_formed (mr_scalar_to_distributed init output).

  End mr_library.

End NNRCMR.