Section Dataframe.
Require Import Basics.
Require Import String.
Require Import List.
Require Import Arith.
Require Import ZArith.
Require Import EquivDec.
Require Import Morphisms.
Require Import BasicSystem.
Require Import DNNRCBase.
Context {
fruntime:
foreign_runtime}.
Context {
ftype:
ForeignType.foreign_type}.
Context {
m :
TBrandModel.brand_model}.
Definition var :=
string.
Inductive column :=
|
CCol :
string ->
column
|
CDot :
string ->
column ->
column
|
CLit :
data *
rtype₀ ->
column
|
CPlus :
column ->
column ->
column
|
CEq :
column ->
column ->
column
|
CLessThan :
column ->
column ->
column
|
CNeg :
column ->
column
|
CToString :
column ->
column
|
CSConcat :
column ->
column ->
column
|
CUDFCast :
list string ->
column ->
column
|
CUDFUnbrand :
rtype₀ ->
column ->
column.
Inductive dataframe :=
|
DSVar :
string ->
dataframe
|
DSSelect :
list (
string *
column) ->
dataframe ->
dataframe
|
DSFilter :
column ->
dataframe ->
dataframe
|
DSCartesian :
dataframe ->
dataframe ->
dataframe
|
DSExplode :
string ->
dataframe ->
dataframe.
Section eval.
Context (
h:
brand_relation_t).
Evaluate a column expression in an environment of toplevel columns
* i.e. a row in a dataframe.
Fixpoint fun_of_column (
c:
column) (
row:
list (
string *
data)) :
option data :=
let fc :=
flip fun_of_column row in
match c with
|
CCol n =>
lookup string_eqdec row n
|
CNeg c1 =>
olift (
unudbool negb) (
fc c1)
|
CDot n c1 =>
match fc c1 with
|
Some (
drec fs) =>
edot fs n
|
_ =>
None
end
|
CLit (
d,
_) =>
Some (
normalize_data h d)
|
CPlus c1 c2 =>
match fc c1,
fc c2 with
|
Some (
dnat l),
Some (
dnat r) =>
Some (
dnat (
Z.add l r))
|
_,
_ =>
None
end
|
CEq c1 c2 =>
lift2 (
fun x y =>
dbool (
if data_eq_dec x y then true else false)) (
fc c1) (
fc c2)
|
CLessThan c1 c2 =>
None
|
CToString c1 =>
lift (
compose dstring dataToString) (
fc c1)
|
CSConcat c1 c2 =>
match fc c1,
fc c2 with
|
Some (
dstring l),
Some (
dstring r) =>
Some (
dstring (
l ++
r))
|
_,
_ =>
None
end
|
CUDFCast target_brands column_of_runtime_brands =>
match fc column_of_runtime_brands with
|
Some (
dcoll runtime_brand_strings) =>
lift (
fun runtime_brands =>
dbool (
if sub_brands_dec h runtime_brands target_brands then true else false))
(
listo_to_olist (
map (
fun s =>
match s with dstring s =>
Some s |
_ =>
None end)
runtime_brand_strings))
|
_ =>
None
end
|
CUDFUnbrand _ _ =>
None
end.
Fixpoint dataframe_eval (
dsenv :
coll_bindings) (
e:
dataframe) :
option (
list data) :=
match e with
|
DSVar s =>
lookup equiv_dec dsenv s
|
DSSelect cs d =>
match dataframe_eval dsenv d with
|
Some rows =>
let cfuns:
list (
string * (
list (
string *
data) ->
option data)) :=
map (
fun p => (
fst p,
fun_of_column (
snd p))) (
rec_sort cs)
in
let rfun:
data ->
option (
list (
string *
data)) :=
fun row =>
match row with
|
drec fs =>
listo_to_olist (
map (
fun p =>
lift (
fun r => (
fst p,
r)) ((
snd p)
fs))
cfuns)
|
_ =>
None
end
in
let results :=
map (
compose (
lift drec)
rfun)
rows in
listo_to_olist results
|
_ =>
None
end
|
DSFilter c d =>
let cfun :=
fun_of_column c in
lift
(
filter (
fun row =>
match row with
|
drec fs =>
match cfun fs with
|
Some (
dbool true) =>
true
|
_ =>
false
end
|
_ =>
false
end))
(
dataframe_eval dsenv d)
|
DSCartesian d1 d2 =>
match dataframe_eval dsenv d1,
dataframe_eval dsenv d2 with
|
Some rs1,
Some rs2 =>
let data :=
flat_map (
fun r1 =>
map (
fun r2 =>
match r1,
r2 with
|
drec a,
drec b =>
Some (
drec (
rec_concat_sort a b))
|
_,
_ =>
None
end)
rs2)
rs1 in
listo_to_olist data
|
_,
_ =>
None
end
|
DSExplode s d1 =>
match dataframe_eval dsenv d1 with
|
Some l =>
let data :=
flat_map (
fun row =>
match row with
|
drec fields =>
match edot fields s with
|
Some (
dcoll inners) =>
map (
fun inner =>
orecconcat (
drec fields) (
drec ((
s,
inner)::
nil)))
inners
|
_ =>
None::
nil
end
|
_ =>
None::
nil
end)
l in
listo_to_olist data
|
_ =>
None
end
end.
End eval.
Section DataframePlug.
Definition wrap_dataframe_eval h dsenv q :=
lift dcoll (@
dataframe_eval h dsenv q).
Lemma fun_of_column_normalized {
h c r o}:
Forall (
fun d :
string *
data =>
data_normalized h (
snd d))
r ->
fun_of_column h c r =
Some o ->
data_normalized h o.
Proof.
revert r o.
induction c;
simpl;
intros rl o Fd;
intros eqq.
-
apply lookup_in in eqq.
rewrite Forall_forall in Fd.
apply (
Fd _ eqq).
-
unfold flip in eqq.
specialize (
IHc rl).
match_destr_in eqq.
match_destr_in eqq.
specialize (
IHc _ Fd (
eq_refl _)).
eapply data_normalized_edot;
eauto.
-
destruct p.
invcs eqq.
apply normalize_normalizes.
-
unfold flip in eqq.
match_destr_in eqq.
match_destr_in eqq.
match_destr_in eqq.
match_destr_in eqq.
invcs eqq.
constructor.
-
unfold flip in eqq.
destruct (
fun_of_column h c1 rl);
simpl in eqq;
try discriminate.
destruct (
fun_of_column h c2 rl);
simpl in eqq;
try discriminate.
invcs eqq.
constructor.
-
discriminate.
-
unfold flip,
olift in eqq.
match_destr_in eqq.
unfold unudbool in eqq.
match_destr_in eqq.
invcs eqq.
constructor.
-
unfold flip,
lift in eqq.
match_destr_in eqq.
invcs eqq.
unfold compose.
constructor.
-
unfold flip in eqq.
repeat match_destr_in eqq.
invcs eqq.
constructor.
-
unfold flip,
lift in eqq.
repeat match_destr_in eqq;
invcs eqq;
constructor.
-
discriminate.
Qed.
Lemma dataframe_eval_normalized h :
forall q:
dataframe,
forall (
constant_env:
coll_bindings) (
o:
data),
Forall (
fun x =>
data_normalized h (
snd x)) (
bindings_of_coll_bindings constant_env) ->
wrap_dataframe_eval h constant_env q =
Some o ->
data_normalized h o.
Proof.
unfold wrap_dataframe_eval,
bindings_of_coll_bindings.
intros ds ce d Fb de.
apply some_lift in de.
destruct de as [
dl de ?];
subst.
rewrite Forall_map in Fb.
simpl in Fb.
rewrite Forall_forall in Fb.
revert dl de.
induction ds;
simpl;
intros dl de.
-
apply lookup_in in de.
specialize (
Fb _ de);
simpl in Fb;
trivial.
-
destruct (
dataframe_eval h ce ds);
try discriminate.
specialize (
IHds _ (
eq_refl _)).
apply listo_to_olist_some in de.
unfold compose in de.
invcs IHds.
rename H0 into IHds.
constructor.
revert dl de.
induction l0;
simpl;
intros dl de.
+
symmetry in de;
apply map_eq_nil in de.
subst;
trivial.
+
destruct dl;
simpl in de;
try discriminate.
invcs de.
invcs IHds.
constructor; [|
auto].
apply some_lift in H0.
destruct H0 as [?
eqq ?];
subst.
match_destr_in eqq.
specialize (
IHl0 H4 _ H1).
apply listo_to_olist_some in eqq.
rewrite map_map in eqq.
simpl in eqq.
constructor.
* {
revert H1 eqq.
generalize (
rec_sort l).
clear l.
intros l H1 eqq.
invcs H3.
revert x eqq H0 dl H1 IHl0.
induction l;
simpl;
intros ll eqql dnll dl eqq2 IHl0.
-
symmetry in eqql;
apply map_eq_nil in eqql.
subst;
trivial.
-
destruct ll;
invcs eqql.
apply some_lift in H0.
destruct H0 as [?
fc ?];
subst.
generalize (
fun_of_column_normalized dnll fc);
intros dnx.
constructor;
simpl;
trivial.
specialize (
IHl _ H1 dnll).
cut (
exists dl,
map
(
fun x :
data =>
lift drec
match x with
|
dunit =>
None
|
dnat _ =>
None
|
dbool _ =>
None
|
dstring _ =>
None
|
dcoll _ =>
None
|
drec fs =>
listo_to_olist
(
map
(
fun p :
string * (
list (
string *
data) ->
option data) =>
lift (
fun r :
data => (
fst p,
r)) (
snd p fs))
(
map (
fun p :
string *
column => (
fst p,
fun_of_column h (
snd p)))
l))
|
dleft _ =>
None
|
dright _ =>
None
|
dbrand _ _ =>
None
|
dforeign _ =>
None
end)
l0 =
map Some dl /\
Forall (
fun x :
data =>
data_normalized h x)
dl).
{
intros [? [??]];
eauto. }
revert H4 dl eqq2 IHl0.
clear.
{
induction l0;
simpl;
intros Fdn1 dl eqq Fdnl.
-
eauto.
-
destruct dl;
try discriminate.
simpl in eqq;
invcs eqq.
invcs Fdn1.
invcs Fdnl.
specialize (
IHl0 H4 _ H1 H6).
destruct IHl0 as [
dl' [
dl'
eqq Fd']].
apply some_lift in H0.
destruct H0 as [
dl''
de ?];
subst.
destruct a0;
try discriminate.
case_eq (
lift (
fun r :
data => (
fst a,
r)) (
fun_of_column h (
snd a)
l1) )
; [
intros ?
eqq2 |
intros eqq2];
rewrite eqq2 in de;
try discriminate.
match_destr_in de.
simpl.
exists (
drec l2::
dl');
simpl.
split.
+
rewrite dl'
eqq;
trivial.
+
constructor;
trivial.
invcs de.
invcs H5.
invcs H0.
constructor;
trivial.
apply is_list_sorted_cons_inv in H2;
trivial.
}
}
*
assert (
dxdl:
domain x =
domain (
rec_sort l)).
{
revert x eqq.
clear.
generalize (
rec_sort l).
induction l0;
simpl;
intros x eqq.
-
symmetry in eqq;
apply map_eq_nil in eqq.
subst;
trivial.
-
destruct x;
try discriminate.
simpl in eqq.
invcs eqq.
apply some_lift in H0.
destruct H0 as [?
fc ?];
subst.
simpl.
rewrite IHl0 by trivial.
trivial.
}
rewrite dxdl;
clear dxdl.
eauto.
-
apply some_lift in de.
destruct de as [
dl'
de ?];
subst.
specialize (
IHds _ de).
constructor.
apply Forall_filter.
invcs IHds.
trivial.
-
destruct (
dataframe_eval h ce ds1);
try discriminate.
specialize (
IHds1 _ (
eq_refl _)).
destruct (
dataframe_eval h ce ds2);
try discriminate.
specialize (
IHds2 _ (
eq_refl _)).
constructor.
invcs IHds1.
invcs IHds2.
apply listo_to_olist_some in de.
rewrite flat_map_concat_map in de.
revert dl H0 de.
induction l;
simpl;
intros dl Fdl eqq.
+
symmetry in eqq;
apply map_eq_nil in eqq;
subst;
trivial.
+
invcs Fdl.
symmetry in eqq.
apply map_app_break in eqq.
destruct eqq as [
b' [
c' [
eqq1 [
eqq2 eqq3]]]].
subst.
apply Forall_app;
auto.
clear eqq3 IHl.
revert b'
H1 eqq2.
induction l0;
simpl;
intros b'
Fdn eqq2.
*
symmetry in eqq2;
apply map_eq_nil in eqq2;
subst;
trivial.
*
invcs Fdn.
destruct b';
try discriminate.
simpl in eqq2.
invcs eqq2.
destruct a;
try discriminate.
destruct a0;
try discriminate.
invcs H0.
constructor;
auto.
apply data_normalized_rec_concat_sort;
trivial.
-
destruct (
dataframe_eval h ce ds);
try discriminate.
specialize (
IHds _ (
eq_refl _)).
apply listo_to_olist_some in de.
constructor.
invcs IHds.
revert dl H0 de.
induction l;
simpl;
intros dl Fdn de.
+
symmetry in de;
apply map_eq_nil in de;
subst;
trivial.
+
invcs Fdn.
symmetry in de.
apply map_app_break in de.
destruct de as [
b' [
c' [
eqq1 [
eqq2 eqq3]]]].
subst.
apply Forall_app;
auto.
clear eqq3 IHl.
destruct a;
destruct b';
simpl in *;
try discriminate;
try solve[
constructor].
case_eq (
edot l0 s); [
intros ?
eqq|
intros eqq]
;
rewrite eqq in eqq2;
try discriminate.
destruct d0;
try discriminate.
destruct l1;
try discriminate.
simpl in eqq2.
invcs eqq2.
assert (
meq:
map Some (
map (
fun inner :
data =>
drec (
rec_concat_sort l0 ((
s,
inner) ::
nil)))
l1) =
map Some b')
by (
rewrite map_map;
trivial).
clear H3.
apply map_inj in meq; [|
inversion 1;
congruence].
subst.
generalize (
data_normalized_edot _ _ _ _ eqq H1);
intros dn1.
invcs dn1.
invcs H0.
invcs H1.
constructor.
*
apply dnrec_sort.
apply Forall_app;
auto.
*
rewrite Forall_map.
revert H5.
apply Forall_impl;
intros.
apply dnrec_sort.
apply Forall_app;
auto.
Qed.
Global Program Instance SparkIRPlug : (@
AlgPlug _ dataframe) :=
mkAlgPlug wrap_dataframe_eval dataframe_eval_normalized.
End DataframePlug.
End Dataframe.