Qcert.CldMR.Lang.CldMR


Section CldMR.



  Context {fruntime:foreign_runtime}.
  Context {fredop:foreign_reduce_op}.
  Context {fcloudant:foreign_cloudant}.

  Context (h:list(string×string)).

This provides support for the specific kind of quote "map/reduce" unquote that Cloudant actually provides.
In many ways, Cloudant's notion of 'views', despite being called a map/reduce and providing some of the capabilities that are available in more common M/R framework such as Hadoop, is quite a different beast. Those differences are not entirely arbitrary, and the framework is at once less expressive than M/R and also more powerful and offers the following capabilities:
  • Cloudant M/R can be chained through a special directive called dbcopy which creates a new database that can be the input to a subsequent Cloudant M/R view.
  • Cloudant M/R can be executed in an incremental fashion, i.e., changes on the input are propagated with limited recomputation and exploiting results from previous executions which are cached.
To achieve that, Cloudant's M/R relies on a number of invariants, which we here try and expse and enforce. The most important such invariants are:
  • dbcopy can only be present if a reduce is present as well
  • the result of a dbcopy directive is implicitely cast into as collection of JSON documents in the form of key/value pairs, which are used to populate the newly created database/view.
  • A subsequent map over that dbcopy must access data accordingly (in such key/values JSON structure).
  • The "reduce" part of Cloudant M/R is heavily constrained and must provide two distinct functions: one called reduce, the other called rereduce. The rereduce function must be associative and commutative.
Named Nested Relational Calculus + Map Reduce FOR CLOUDANT


  Inductive cld_map_fun :=
  | CldMapId : var × nnrc cld_map_fun
  | CldMapFlatten : var × nnrc cld_map_fun.
  Inductive cld_map_emit :=
  | CldEmitDist : cld_map_emit
  | CldEmitCollect : nat cld_map_emit.

  Record cld_map :=
    mkMapCld
      { map_fun: cld_map_fun;
        map_emit: cld_map_emit }.

  Inductive cld_numeric_type :=
  | Cld_int
  | Cld_float.

  Global Instance cld_numeric_type_eqdec : EqDec cld_numeric_type eq.

  Inductive cld_reduce_op :=
  | CldRedOpCount : cld_reduce_op
  | CldRedOpSum (typ:cld_numeric_type): cld_reduce_op
  | CldRedOpStats (typ:cld_numeric_type): cld_reduce_op.

  Inductive cld_reduce_fun :=
  | CldRedId : cld_reduce_fun
  | CldRedAggregate : ((var × var) × nnrc) (var × nnrc) cld_reduce_fun
  | CldRedOp : cld_reduce_op cld_reduce_fun.

  Record cld_reduce :=
    mkReduceCld
      { reduce_fun: cld_reduce_fun;
        reduce_output : option var }.

  Record cld_mr :=
    mkMRCld
      { cld_mr_input: var;
        cld_mr_map: cld_map;
        cld_mr_reduce: option cld_reduce;
        cld_mr_reduce_default: option nnrc }.

  Record cld_mrl :=
    mkMRCldChain
      { cld_mr_chain: list cld_mr;
        cld_mr_last: ((list var) × nnrc) × (list var) }.


  Definition cld_mr_causally_consistent (mr1 mr2:cld_mr) : bool
    := match mr2.(cld_mr_reduce) with
       | Some r
         match r.(reduce_output) with
         | Some omr1.(cld_mr_input) b o
         | Nonetrue
         end
       | Nonetrue
       end.

  Definition cld_mr_chain_causally_consistent : list cld_mr bool
    := forallb_ordpairs_refl cld_mr_causally_consistent.

  Definition cld_mrl_causally_consistent (cld_mrl: cld_mrl) : bool
    := cld_mr_chain_causally_consistent cld_mrl.(cld_mr_chain).

  Definition cld_mr_io_vars mr0 : list var
    := mr0.(cld_mr_input)::
      match mr0.(cld_mr_reduce) with
       | Some r
         match r.(reduce_output) with
         | Some oo::nil
         | Nonenil
         end
       | Nonenil
      end.

  Definition mr_chain_io_vars (l : list mr) : list var
    := map mr_input l ++ map mr_output l.

  Definition nnrcmr_io_vars (mrl : nnrcmr) : list var
    := mr_chain_io_vars mrl.(mr_chain).

  Definition cld_mr_chain_io_vars : list cld_mr list var
    := flat_map cld_mr_io_vars.

  Definition cld_mrl_io_vars (cld_mrl:cld_mrl): list var
    := cld_mr_chain_io_vars cld_mrl.(cld_mr_chain).


  Definition function_with_no_free_vars (f: var × nnrc) :=
    ( (x: var), In x (nnrc_free_vars (snd f)) x = fst f).
  Definition function2_with_no_free_vars (f: (var × var) × nnrc) :=
    (fst (fst f)) (snd (fst f))
    ( x, In x (nnrc_free_vars (snd f)) x = (fst (fst f)) x = (snd (fst f))).

  Definition init_vkey := "vkey$"%string.
  Definition init_vval := "vval$"%string.


  Definition add_keys_to_binding (binding: string × (list data)) : string × data :=
    (fst binding, pack_kvl (init_keys (snd binding))).

  Definition lift_binding_to_coll (binding: string × data) : option (string × (list data)) :=
    match snd binding with
    | dcoll collSome (fst binding, coll)
    | _None
    end.

  Definition cld_load_init_env
             (initunit: var) (cenv: list (string × data)) : option bindings
    :=
      match lift_map lift_binding_to_coll cenv with
      | Some cenv
        let full_bindings := (initunit, (dunit::nil)) :: cenv in
        Some (map add_keys_to_binding full_bindings)
      | NoneNone
      end.


  Definition apply_map_fun_with_keys (doc:var) (body:nnrc) :
    list (data × data) option (list (data × data)) :=
    fun coll
      let f_map (d:data×data) :=
          let '(k, v) := d in
          match nnrc_core_eval h ((doc,v)::nil) body with
          | NoneNone
          | Some resSome (k, res)
          end
      in rmap f_map coll.

  Definition apply_map_fun_without_keys (doc:var) (body:nnrc) :
    list (data × data) option (list data) :=
    fun coll
      let f_map (d:data×data) :=
          let (_, v) := d in
          match nnrc_core_eval h ((doc,v)::nil) body with
          | NoneNone
          | Some resSome res
          end
      in rmap f_map coll.

  Definition cld_mr_map_eval (map: cld_map) (coll: list (data × data)) : option (list (data × data)) :=
    let map_f := map.(map_fun) in
    let emit_f := map.(map_emit) in
    match map_f with
    | CldMapId (doc, body)
      match emit_f with
      
      | CldEmitDist
        let nested_map := apply_map_fun_with_keys doc body coll in
        nested_map
      
      | CldEmitCollect n
        let nested_map := apply_map_fun_without_keys doc body coll in
        olift (map_without_key (map_const_key n)) nested_map
      end
    | CldMapFlatten (doc, body)
      match emit_f with
      
      | CldEmitDist
        let nested_map := apply_map_fun_with_keys doc body coll in
        olift (flat_map_with_key map_invent_key) nested_map
      
      | CldEmitCollect n
        let nested_map := apply_map_fun_without_keys doc body coll in
        let flattened_map := olift flat_map_without_key nested_map in
        olift (map_without_key (map_const_key n)) flattened_map
      end
    end.

  Lemma mapIdDist_is_map (map:var×nnrc) (coll:list data) :
    lift cld_get_values (cld_mr_map_eval (mkMapCld (CldMapId map) (CldEmitDist)) (init_keys coll)) = (mr_map_eval h (MapDist map) (Ddistr coll)).

  Lemma lift_map_boxed_cons n d coll:
    (lift (fun kvs : list (data × data) ⇒ map snd kvs)
          (lift (fun t' : list (data × data) ⇒ (box_key (n :: nil), d) :: t') coll))
    = lift (cons d) (lift (fun kvs : list (data × data) ⇒ map snd kvs) coll).

  Lemma mapIdCollect_is_map (map:var×nnrc) (n:nat) (coll:list data) :
    lift cld_get_values (cld_mr_map_eval (mkMapCld (CldMapId map) (CldEmitCollect n)) (init_keys coll)) = (mr_map_eval h (MapDist map) (Ddistr coll)).


  Definition cld_mr_group_by_eval (l: list (data × data)) : list (data × (list data)) :=
    group_by_iter_eval_alt l.


  Definition cld_mr_aggregate_eval (f_reduce: (var × var) × nnrc) (f_rereduce: (var × nnrc)) (groups: list (data × (list data))) : option (list (data × data)) :=
    let (key_values_args, body) := f_reduce in
    let (key_arg, values_arg) := key_values_args in
    let f_reduce (key_values_v: data × list data) : option (data × data) :=
        let (key_v, values_v) := key_values_v in
        match nnrc_core_eval h ((values_arg, dcoll values_v) :: (key_arg, key_v) :: nil) body with
          NoneNone
        | Some resSome (key_v, res)
        end
    in
    let reduced := rmap f_reduce groups in
    let f_rereduce (key_value_v: (data × data)) : option (data × data) :=
        let '(key_v, value_v) := key_value_v in
        let '(values_arg, rebody) := f_rereduce in
        match nnrc_core_eval h ((values_arg, dcoll (value_v::nil)) :: nil) rebody with
        | NoneNone
        | Some resSome (key_v, res)
        end
    in
    olift (rmap f_rereduce) reduced.

  Definition cloudant_sum_op (typ:cld_numeric_type)
    := match typ with
       | Cld_intASum
       | Cld_floatcloudant_float_sum_op
       end.

    Definition cloudant_min_op (typ:cld_numeric_type)
    := match typ with
       | Cld_intANumMin
       | Cld_floatcloudant_float_min_op
       end.

    Definition cloudant_max_op (typ:cld_numeric_type)
    := match typ with
       | Cld_intANumMax
       | Cld_floatcloudant_float_max_op
       end.

  Definition cld_mr_reduce_eval (red_fun: cld_reduce_fun) (coll: list (data × data)) : option (list (data × data)) :=
    match red_fun with
    | CldRedIdSome coll
    | CldRedAggregate f_reduce f_rereduce
      let groups := cld_mr_group_by_eval coll in
      cld_mr_aggregate_eval f_reduce f_rereduce groups
    | CldRedOp CldRedOpCount
      let uop := ACount in
      let v := fun_of_unaryop h uop (dcoll (List.map snd coll)) in
      let key := dcoll (dnat 0::nil) in
      lift (fun res(key, res)::nil) v
    | CldRedOp (CldRedOpSum typ) ⇒
      let uop := cloudant_sum_op typ in
      let v := fun_of_unaryop h uop (dcoll (List.map snd coll)) in
      let key := dcoll (dnat 0::nil) in
      lift (fun res(key, res)::nil) v
    | CldRedOp (CldRedOpStats typ) ⇒
      let coll := dcoll (List.map snd coll) in
      let count := fun_of_unaryop h ACount coll in
      let sum := fun_of_unaryop h (cloudant_sum_op typ) coll in
      let min := fun_of_unaryop h (cloudant_min_op typ) coll in
      let max := fun_of_unaryop h (cloudant_max_op typ) coll in
      let v :=
          match (count, sum, min, max) with
          | (Some count, Some sum, Some min, Some max)
            Some (drec (("count"%string, count)
                          ::("max"%string, max)
                          ::("min"%string, min)
                          ::("sum"%string, sum)
                          ::nil))
          | _None
          end
      in
      let key := dcoll (dnat 0::nil) in
      lift (fun res(key, res)::nil) v
    end.

  Lemma cld_mr_reduce_flatten_id (l:list (data × data)) :
    (cld_mr_reduce_eval CldRedId l) = Some l.


  Definition cld_mr_eval (mr:cld_mr) (coll: list (data × data)) : option ((list (data×data)) × option var) :=
    let map_result :=
        cld_mr_map_eval mr.(cld_mr_map) coll
    in
    match mr.(cld_mr_reduce) with
    | Nonelift (fun x(x,None)) map_result
    | Some reduce
      let reduce_result := olift (cld_mr_reduce_eval reduce.(reduce_fun)) map_result in
      lift (fun x(x, reduce.(reduce_output))) reduce_result
    end.




  Definition cld_merge_env (x: var) (coll: list data) (env: bindings) : option bindings :=
    match lookup equiv_dec env x with
    | NoneSome ((x, dcoll coll)::env)
    | Some d
      match d with
      | dcoll coll'Some ((x, dcoll (coll ++ coll') )::env)
      | _None
      end
    end.

  Definition nnrc_env_of_cld_env (form:list var) (eff: option (list data)): option bindings :=
    olift (zip form) eff.

  Definition effective_params_from_bindings (eff:list var) (cld_env:bindings) : option (list data) :=
    lift_map
      (fun (v : var) ⇒
         match lookup equiv_dec cld_env v with
         | NoneNone
         | Some db
           lift (fun ldcoll (List.map snd l)) (unpack_kvl db)
         end)
      eff.

  Definition cld_mr_eval_last (cld_env:bindings) (mr_last: ((list var) × nnrc) × (list var)) : option data :=
    let (formal_params, n) := fst mr_last in
    let effective_params := effective_params_from_bindings (snd mr_last) cld_env in
    let onrc_env := nnrc_env_of_cld_env formal_params effective_params in
    olift (fun nnrc_envnnrc_core_eval h nnrc_env n) onrc_env.

  Definition cld_mr_chain_eval_inner (env:bindings) (l:list cld_mr) : option (bindings × list (data × data)) :=
    List.fold_left
      (fun (acc: option (bindings × list (data × data))) mr
         match acc with
         | NoneNone
         | Some (env', _)
           let cld_input := mr.(cld_mr_input) in
           match lookup equiv_dec env' cld_input with
           | NoneNone
           | Some kvl
             let coll := unpack_kvl kvl in
             match olift (cld_mr_eval mr) coll with
             | NoneNone
             | Some (res,None)Some (env, res)
             | Some (res,Some x)
               let env'' := cld_merge_env x (pre_pack_kvl res) env' in
               olift (fun envSome (env, res)) env''
             end
           end
         end)
      l (Some (env,nil)).

  Definition cld_mrl_eval (env:bindings) (cld_mrl:cld_mrl) : option data :=
    match cld_mr_chain_eval_inner env cld_mrl.(cld_mr_chain) with
    | NoneNone
    | Some (env_res, coll)
      cld_mr_eval_last env_res cld_mrl.(cld_mr_last)
    end.


  Section cld_mr_library.

    Definition idReduce (v_out:option var) : cld_reduce :=
      mkReduceCld CldRedId v_out.

    Definition collectReduce (v_out:option var) : cld_reduce :=
      mkReduceCld (CldRedAggregate (init_vkey, init_vval, NNRCVar init_vval)
                                   (init_vval, NNRCUnop AFlatten (NNRCVar init_vval))) v_out.

    Definition opReduce (op: cld_reduce_op) (v_out:option var) : cld_reduce :=
      mkReduceCld (CldRedOp op) v_out.

    Definition defaultMR : cld_mr :=
      mkMRCld init_vval (mkMapCld (CldMapId (init_vval, NNRCConst dunit)) (CldEmitCollect (99%nat))) None None .

  End cld_mr_library.


  Section sanitize.
    Definition cldAllowedIdentifierInitialCharacters :=
      ["a";"b";"c";"d";"e";"f";"g";"h";"i";"j";"k";"l";"m";"n";"o";"p";"q";"r";"s";"t";"u";"v";"w";"x";"y";"z"]%char.

    Definition cldAllowedIdentifierCharacters_fromdocs := ["a";"b";"c";"d";"e";"f";"g";"h";"i";"j";"k";"l";"m";"n";"o";"p";"q";"r";"s";"t";"u";"v";"w";"x";"y";"z";"0";"1";"2";"3";"4";"5";"6";"7";"8";"9";"_";"$";",";"+";"-";"/"]%char.

    Definition cldAllowedIdentifierCharacters := ["a";"b";"c";"d";"e";"f";"g";"h";"i";"j";"k";"l";"m";"n";"o";"p";"q";"r";"s";"t";"u";"v";"w";"x";"y";"z";"0";"1";"2";"3";"4";"5";"6";"7";"8";"9";"_"]%char.

    Definition cldIdentifierInitialCharacterToAdd := "z"%char.
    Definition cldIdenitiferCharacterForReplacement := "z"%char.

    Definition cldIdentifierFixInitial (ident:string) : string
    := match ident with
       
       | EmptyString
         String cldIdentifierInitialCharacterToAdd EmptyString
       | String a _
         if in_dec ascii_dec a cldAllowedIdentifierInitialCharacters
         then ident
         else String cldIdentifierInitialCharacterToAdd ident
       end.

    Definition cldIdentifierSanitizeChar (a:ascii)
      := if a == "$"%char
         then "_"%char
         else if in_dec ascii_dec a cldAllowedIdentifierCharacters
              then a
              else cldIdenitiferCharacterForReplacement.

  Definition cldIdentifierSanitizeBody (ident:string)
    := map_string cldIdentifierSanitizeChar ident.

  Definition cldIdentifierSanitize (ident:string)
    := cldIdentifierFixInitial (cldIdentifierSanitizeBody (mk_lower ident)).

  Definition cldSafeSeparator := "_"%string.

  Definition cldAvoidList : list string := [].

  End sanitize.
End CldMR.