Skip to content

Commit

Permalink
datetime serialization support.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Jul 24, 2021
1 parent 24a7844 commit 159eb09
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 124 deletions.
99 changes: 68 additions & 31 deletions src/tech/v3/dataset.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
[tech.v3.datatype.casting :as casting]
[tech.v3.datatype.argops :as argops]
[tech.v3.datatype.arrays :as arrays]
[tech.v3.datatype.datetime :as dtype-dt]
[java.time :refer [LocalDate Instant]]
[tech.v3.dataset.impl.dataset :as ds-impl]
[tech.v3.dataset.impl.column :as col-impl]
[tech.v3.dataset.protocols :as ds-proto]
Expand Down Expand Up @@ -631,15 +633,25 @@ user> (ds/missing (*1 :c))
(reduce dtype/set-and (map ds-proto/-missing col-seq)))
(defn- numeric-data->b64
[data]
(-> (clone data)
(dtype/ensure-typed-array)
(aget "buffer")
(js/Uint8Array.)
(b64/fromByteArray)))
(defn- string-col->data
[col]
;;make a new string table.
(let [strmap (js/Map.)
strtable (js/Array.)
indexes (js/Array.)]
indexes (dtype/make-container :int32 (count col))
idx-aget (dtype/as-agetable indexes)]
(dtype/indexed-iterate!
(fn [idx strval]
(.push indexes
(aset idx-aget idx
(when strval
(if-let [cur-idx (.get strmap strval)]
cur-idx
Expand All @@ -648,16 +660,14 @@ user> (ds/missing (*1 :c))
(.set strmap strval cur-idx)
cur-idx))))) col)
{:strtable strtable
:indexes indexes}))
:indexes (numeric-data->b64 indexes)}))
(defn- numeric-data->b64
[data]
(-> (clone data)
(dtype/ensure-typed-array)
(aget "buffer")
(js/Uint8Array.)
(b64/fromByteArray)))
(defn- obj-col->numeric-b64
[col dtype convert-fn]
(-> (dtype/emap #(if % (convert-fn %) 0) dtype col)
(clone)
(numeric-data->b64)))
(defn- col->data
Expand All @@ -674,6 +684,10 @@ user> (ds/missing (*1 :c))
(numeric-data->b64 (dtype/make-container :uint8 (aget col "buf")))
(= :string col-dt)
(string-col->data col)
(= :local-date col-dt)
(obj-col->numeric-b64 col :int32 dtype-dt/local-date->epoch-days)
(= :instant col-dt)
(obj-col->numeric-b64 col :int64 dtype-dt/instant->epoch-milliseconds)
:else
(dtype/as-js-array (dtype/make-container :object (aget col "buf"))))}))
Expand All @@ -688,12 +702,30 @@ user> (ds/missing (*1 :c))
:columns (mapv col->data (columns ds))})
(defn- b64->numeric-data
[b64data dtype]
(let [bdata (-> (b64/toByteArray b64data)
(aget "buffer"))]
(case dtype
:int8 (js/Int8Array. bdata)
:uint8 bdata
:int16 (js/Int16Array. bdata)
:uint16 (js/Uint16Array. bdata)
:int32 (js/Int32Array. bdata)
:uint32 (js/Uint32Array. bdata)
:int64 (js/BigInt64Array. bdata)
:uint64 (js/BigUint64Array. bdata)
:float32 (js/Float32Array. bdata)
:float64 (js/Float64Array. bdata))))
(defn- str-data->coldata
[{:keys [strtable indexes]}]
(let [coldata (dtype/make-container :string (count indexes))
(let [indexes (b64->numeric-data indexes :int32)
coldata (dtype/make-container :string (count indexes))
agetable (dtype/as-agetable coldata)]
(dotimes [idx (count indexes)]
(aset agetable idx (nth strtable (nth indexes idx))))
(aset agetable idx (nth strtable (aget indexes idx))))
coldata))
Expand All @@ -713,21 +745,20 @@ user> (ds/missing (*1 :c))
:data
(cond
(dtype/numeric-type? dtype)
(let [bdata (-> (b64/toByteArray data)
(aget "buffer"))]
(case dtype
:int8 (js/Int8Array. bdata)
:uint8 bdata
:int16 (js/Int16Array. bdata)
:uint16 (js/Uint16Array. bdata)
:int32 (js/Int32Array. bdata)
:uint32 (js/Uint32Array. bdata)
:float32 (js/Float32Array. bdata)
:float64 (js/Float64Array. bdata)))
(b64->numeric-data data dtype)
(= :boolean dtype)
(arrays/make-boolean-array (b64/toByteArray data))
(= :string dtype)
(str-data->coldata data)
(= :local-date dtype)
(->> (b64->numeric-data data :int32)
(dtype/emap dtype-dt/epoch-days->local-date :local-date))
(= :instant dtype)
(->> (b64->numeric-data data :int64)
;;int64 data comes out as js/bigints
(dtype/emap #(-> (js/Number. %)
(dtype-dt/epoch-milliseconds->instant))
:instant))
:else
(if (and (dtype/counted? data)
(dtype/indexed? data))
Expand Down Expand Up @@ -765,24 +796,30 @@ user> (ds/missing (*1 :c))
(comment
(do
(def test-data (repeatedly 5000 #(hash-map
:time (rand)
:temp (rand)
:valid? (if (> (rand) 0.5) true false))))
(def test-data (repeatedly 50000 #(hash-map
:time (rand)
:temp (int (* 255 (rand)))
:valid? (if (> (rand) 0.5) true false))))
(def test-ds (->dataset test-data))
(def test-ds (->dataset test-data {:parser-fn {:temp :uint8}}))
(def min-ds (select-rows test-ds (range 20))))
(def ignored (time (->> (cljs.core/concat test-data test-data)
(sort-by :time)
(cljs.core/sort-by :time)
(dedupe)
(count))))
;;600ms
(def ignored (time (merge-by-column test-ds test-ds :time)))
;;10-20ms
(def writer (t/writer :json))
(def ignored (time (.write @writer* test-data)))
(def ignored-raw (time (.write writer test-data)))
(def ignored (time (dataset->json test-ds)))
(def ignored-ds (time (dataset->transit-str test-ds)))
(def ignored (let [data (.write @writer* test-data)]
(time (.read @reader* data))))
Expand Down
3 changes: 2 additions & 1 deletion src/tech/v3/dataset/impl/dataset.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[tech.v3.datatype.casting :as casting]
[tech.v3.datatype.arrays :as dt-arrays]
[tech.v3.datatype.protocols :as dt-proto]
[tech.v3.datatype.datetime :as dtype-dt]
[tech.v3.dataset.impl.column :as col-impl]
[tech.v3.dataset.io.column-parsers :as col-parsers]
[tech.v3.dataset.protocols :as ds-proto]
Expand Down Expand Up @@ -303,7 +304,7 @@
;;step 1 is to stringify the data
(let [reader-data (if (casting/numeric-type? (dtype/elemwise-datatype reader-data))
(fmt/format-sequence reader-data)
(map #(when (not (nil? %)) (pr-str %)) reader-data))]
(map #(when (not (nil? %)) (.toString %)) reader-data))]
;;step 2 is to format the data respecting multiple line and max-width params.
(->> reader-data
(mapv (fn [strval]
Expand Down
154 changes: 80 additions & 74 deletions src/tech/v3/datatype/arrays.cljs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns tech.v3.datatype.arrays
(:require [tech.v3.datatype.protocols :as dt-proto]
[tech.v3.datatype.base :as dt-base]
[tech.v3.datatype.argtypes :as argtypes])
[tech.v3.datatype.argtypes :as argtypes]
[tech.v3.datatype.casting :as casting])
(:refer-clojure :exclude [make-array]))

(set! *unchecked-arrays* true)
Expand All @@ -14,9 +15,12 @@
js/Uint16Array :uint16
js/Int32Array :int32
js/Uint32Array :uint32
js/BigInt64Array :int64
js/BigUint64Array :uint64
js/Float32Array :float32
js/Float64Array :float64})


(def typed-array-types (set (map second ary-types)))


Expand Down Expand Up @@ -133,79 +137,79 @@


(doseq [ary-type (map first ary-types)]
(extend-type ary-type
dt-proto/PElemwiseDatatype
(-elemwise-datatype [item] (ary-types ary-type))
dt-proto/PDatatype
(-datatype [item] :typed-array)
dt-proto/PToTypedArray
(-convertible-to-typed-array? [item] true)
(->typed-array [item] item)
dt-proto/PSubBufferCopy
(-sub-buffer-copy [item off len]
(.slice item off (+ off len)))
dt-proto/PSubBuffer
(-sub-buffer [item off len]
(.subarray item off (+ off len)))
IHash
(-hash [o] (hash-agetable o))
IEquiv
(-equiv [this other]
(equiv-agetable this other))
ICloneable
(-clone [item]
(let [len (aget item "length")
retval (js* "new item.constructor(len)")]
(.set retval item)
retval))
ISequential
ISeqable
(-seq [array] (array-seq array))
ISeq
(-first [array] (aget array 0))
(-rest [array] (.subarray array 1))
IIndexed
(-nth
([array n]
(aget array n))
([array n not-found]
(if (< n (count array))
(aget array n)
not-found)))
ICounted
(-count [array] (.-length array))
IReduce
(-reduce
([array f] (array-reduce array f))
([array f start] (array-reduce array f start)))
IPrintWithWriter
(-pr-writer [rdr writer opts]
(-write writer (dt-base/reader->str rdr "typed-array")))
dt-proto/PSetValue
(-set-value! [item idx data]
(cond
(number? data)
(aset item idx data)
(dt-proto/-convertible-to-typed-array? data)
(.set item (dt-proto/->typed-array data) idx)
(dt-proto/-convertible-to-js-array? data)
(let [data (dt-proto/->js-array data)]
(dotimes [didx (count data)]
(aset item (+ idx didx) (aget data didx))))
;;common case for integer ranges
(dt-base/integer-range? data)
(if (and (= 1 (aget data "step"))
(= 0 (aget data "start")))
(dotimes [ridx (count data)]
(aset item (+ ridx idx) ridx))
(dt-base/indexed-iterate-range! #(aset item (+ idx %1) %2) data))
:else
(dotimes [didx (count data)]
(aset item (+ idx didx) (nth data didx))))
item)
dt-proto/PSetConstant
(-set-constant! [item offset elem-count data]
(.fill item data offset (+ offset elem-count)))))
(let [cast-fn (casting/cast-fn (ary-types ary-type))]
(extend-type ary-type
dt-proto/PElemwiseDatatype
(-elemwise-datatype [item] (ary-types ary-type))
dt-proto/PDatatype
(-datatype [item] :typed-array)
dt-proto/PToTypedArray
(-convertible-to-typed-array? [item] true)
(->typed-array [item] item)
dt-proto/PSubBufferCopy
(-sub-buffer-copy [item off len]
(.slice item off (+ off len)))
dt-proto/PSubBuffer
(-sub-buffer [item off len]
(.subarray item off (+ off len)))
IHash
(-hash [o] (hash-agetable o))
IEquiv
(-equiv [this other]
(equiv-agetable this other))
ICloneable
(-clone [item]
(let [len (aget item "length")
retval (js* "new item.constructor(len)")]
(.set retval item)
retval))
ISequential
ISeqable
(-seq [array] (array-seq array))
ISeq
(-first [array] (aget array 0))
(-rest [array] (.subarray array 1))
IIndexed
(-nth
([array n]
(aget array n))
([array n not-found]
(if (< n (count array))
(aget array n)
not-found)))
ICounted
(-count [array] (.-length array))
IReduce
(-reduce
([array f] (array-reduce array f))
([array f start] (array-reduce array f start)))
IPrintWithWriter
(-pr-writer [rdr writer opts]
(-write writer (dt-base/reader->str rdr "typed-array")))
dt-proto/PSetValue
(-set-value! [item idx data]
(cond
(or (number? data) (instance? js/BigInt data))
(aset item idx (cast-fn data))
(dt-proto/-convertible-to-typed-array? data)
(.set item (dt-proto/->typed-array data) idx)
(dt-proto/-convertible-to-js-array? data)
(let [data (dt-proto/->js-array data)]
(dotimes [didx (count data)]
(aset item (+ idx didx) (cast-fn (aget data didx)))))
;;common case for integer ranges
(dt-base/integer-range? data)
(if (and (= 1 (aget data "step"))
(= 0 (aget data "start")))
(dotimes [ridx (count data)]
(aset item (+ ridx idx) (cast-fn ridx)))
(dt-base/indexed-iterate-range! #(aset item (+ idx %1) (cast-fn %2)) data))
:else
(dt-base/indexed-iterate! #(aset item (+ idx %1) (cast-fn %2)) data))
item)
dt-proto/PSetConstant
(-set-constant! [item offset elem-count data]
(.fill item (cast-fn data) offset (+ offset elem-count))))))



Expand Down Expand Up @@ -460,6 +464,8 @@
:uint16 (js/Uint16Array. len)
:int32 (js/Int32Array. len)
:uint32 (js/Uint32Array. len)
:int64 (js/BigInt64Array. len)
:uint64 (js/BigUint64Array. len)
:float32 (js/Float32Array. len)
:float64 (js/Float64Array. len)
(js/Array. len))
Expand Down
Loading

0 comments on commit 159eb09

Please sign in to comment.