|
1 | 1 | (ns tech.v3.datatype.mmap
|
2 | 2 | (:require [clojure.java.io :as io]
|
3 | 3 | [tech.resource :as resource]
|
4 |
| - [tech.v3.datatype.protocols :as dtype-proto] |
5 |
| - [tech.v3.datatype.casting :as casting] |
6 |
| - [tech.v3.datatype.typecast :as typecast] |
7 |
| - [tech.v3.parallel.for :as parallel-for] |
8 |
| - [primitive-math :as pmath] |
9 |
| - [clojure.tools.logging :as log]) |
| 4 | + [clojure.tools.logging :as log] |
| 5 | + [tech.v3.datatype.native-buffer :as native-buffer]) |
10 | 6 | (:import [xerial.larray.mmap MMapBuffer MMapMode]
|
11 | 7 | [xerial.larray.buffer UnsafeUtil]
|
12 | 8 | [sun.misc Unsafe]))
|
|
20 | 16 | UnsafeUtil/unsafe)
|
21 | 17 |
|
22 | 18 |
|
23 |
| -(defmacro native-buffer->reader |
24 |
| - [datatype advertised-datatype buffer address n-elems] |
25 |
| - (let [byte-width (casting/numeric-byte-width datatype)] |
26 |
| - `(reify |
27 |
| - dtype-proto/PToNativeBuffer |
28 |
| - (convertible-to-native-buffer? [this#] true) |
29 |
| - (->native-buffer [this#] ~buffer) |
30 |
| - ;;Forward protocol methods that are efficiently implemented by the buffer |
31 |
| - dtype-proto/PBuffer |
32 |
| - (sub-buffer [this# offset# length#] |
33 |
| - (-> (dtype-proto/sub-buffer ~buffer offset# length#) |
34 |
| - (dtype-proto/->reader {}))) |
35 |
| - ~(typecast/datatype->reader-type (casting/safe-flatten datatype)) |
36 |
| - (elemwiseDatatype [rdr#] ~advertised-datatype) |
37 |
| - (lsize [rdr#] ~n-elems) |
38 |
| - (read [rdr# ~'idx] |
39 |
| - ~(case datatype |
40 |
| - :int8 `(.getByte (unsafe) (pmath/+ ~address ~'idx)) |
41 |
| - :uint8 `(-> (.getByte (unsafe) (pmath/+ ~address ~'idx)) |
42 |
| - (pmath/byte->ubyte)) |
43 |
| - :int16 `(.getShort (unsafe) (pmath/+ ~address |
44 |
| - (pmath/* ~'idx ~byte-width))) |
45 |
| - :uint16 `(-> (.getShort (unsafe) (pmath/+ ~address |
46 |
| - (pmath/* ~'idx ~byte-width))) |
47 |
| - (pmath/short->ushort)) |
48 |
| - :int32 `(.getInt (unsafe) (pmath/+ ~address (pmath/* ~'idx ~byte-width))) |
49 |
| - :uint32 `(-> (.getInt (unsafe) (pmath/+ ~address |
50 |
| - (pmath/* ~'idx ~byte-width))) |
51 |
| - (pmath/int->uint)) |
52 |
| - :int64 `(.getLong (unsafe) (pmath/+ ~address |
53 |
| - (pmath/* ~'idx ~byte-width))) |
54 |
| - :uint64 `(-> (.getLong (unsafe) (pmath/+ ~address |
55 |
| - (pmath/* ~'idx ~byte-width)))) |
56 |
| - :float32 `(.getFloat (unsafe) (pmath/+ ~address |
57 |
| - (pmath/* ~'idx ~byte-width))) |
58 |
| - :float64 `(.getDouble (unsafe) (pmath/+ ~address |
59 |
| - (pmath/* ~'idx ~byte-width)))))))) |
60 |
| - |
61 |
| - |
62 |
| -;;Size is in elements, not in bytes |
63 |
| -(defrecord NativeBuffer [^long address ^long n-elems datatype] |
64 |
| - dtype-proto/PToNativeBuffer |
65 |
| - (convertible-to-native-buffer? [this] true) |
66 |
| - (->native-buffer [this] this) |
67 |
| - dtype-proto/PElemwiseDatatype |
68 |
| - (elemwise-datatype [this] datatype) |
69 |
| - dtype-proto/PECount |
70 |
| - (ecount [this] n-elems) |
71 |
| - dtype-proto/PBuffer |
72 |
| - (sub-buffer [this offset length] |
73 |
| - (let [offset (long offset) |
74 |
| - length (long length)] |
75 |
| - (when-not (<= (+ offset length) n-elems) |
76 |
| - (throw (Exception. |
77 |
| - (format "Offset+length (%s) > n-elems (%s)" |
78 |
| - (+ offset length) n-elems)))) |
79 |
| - (NativeBuffer. (+ address offset) length datatype))) |
80 |
| - dtype-proto/PToReader |
81 |
| - (convertible-to-reader? [this] true) |
82 |
| - (->reader [this options] |
83 |
| - (-> (case (casting/un-alias-datatype datatype) |
84 |
| - :int8 (native-buffer->reader :int8 datatype this address n-elems) |
85 |
| - :uint8 (native-buffer->reader :uint8 datatype this address n-elems) |
86 |
| - :int16 (native-buffer->reader :int16 datatype this address n-elems) |
87 |
| - :uint16 (native-buffer->reader :uint16 datatype this address n-elems) |
88 |
| - :int32 (native-buffer->reader :int32 datatype this address n-elems) |
89 |
| - :uint32 (native-buffer->reader :uint32 datatype this address n-elems) |
90 |
| - :int64 (native-buffer->reader :int64 datatype this address n-elems) |
91 |
| - :uint64 (native-buffer->reader :uint64 datatype this address n-elems) |
92 |
| - :float32 (native-buffer->reader :float32 datatype this address n-elems) |
93 |
| - :float64 (native-buffer->reader :float64 datatype this address n-elems)) |
94 |
| - (dtype-proto/->reader options)))) |
95 |
| - |
96 |
| - |
97 |
| -(defn as-native-buffer |
98 |
| - ^NativeBuffer [item] |
99 |
| - (when (dtype-proto/convertible-to-native-buffer? item) |
100 |
| - (dtype-proto/->native-buffer item))) |
101 |
| - |
102 |
| - |
103 |
| -(defn set-native-datatype |
104 |
| - ^NativeBuffer [item datatype] |
105 |
| - (if-let [nb (as-native-buffer item)] |
106 |
| - (let [original-size (.n-elems nb) |
107 |
| - n-bytes (* original-size (casting/numeric-byte-width |
108 |
| - (dtype-proto/elemwise-datatype item))) |
109 |
| - new-byte-width (casting/numeric-byte-width |
110 |
| - (casting/un-alias-datatype datatype))] |
111 |
| - (NativeBuffer. (.address nb) (quot n-bytes new-byte-width) datatype)))) |
112 |
| - |
113 |
| - |
114 |
| -;;One off data reading |
115 |
| -(defn read-double |
116 |
| - (^double [^NativeBuffer native-buffer ^long offset] |
117 |
| - (assert (>= (- (.n-elems native-buffer) offset 8) 0)) |
118 |
| - (.getDouble (unsafe) (+ (.address native-buffer) offset))) |
119 |
| - (^double [^NativeBuffer native-buffer] |
120 |
| - (assert (>= (- (.n-elems native-buffer) 8) 0)) |
121 |
| - (.getDouble (unsafe) (.address native-buffer)))) |
122 |
| - |
123 |
| - |
124 |
| -(defn read-float |
125 |
| - (^double [^NativeBuffer native-buffer ^long offset] |
126 |
| - (assert (>= (- (.n-elems native-buffer) offset 4) 0)) |
127 |
| - (.getFloat (unsafe) (+ (.address native-buffer) offset))) |
128 |
| - (^double [^NativeBuffer native-buffer] |
129 |
| - (assert (>= (- (.n-elems native-buffer) 4) 0)) |
130 |
| - (.getFloat (unsafe) (.address native-buffer)))) |
131 |
| - |
132 |
| - |
133 |
| -(defn read-long |
134 |
| - (^long [^NativeBuffer native-buffer ^long offset] |
135 |
| - (assert (>= (- (.n-elems native-buffer) offset 8) 0)) |
136 |
| - (.getLong (unsafe) (+ (.address native-buffer) offset))) |
137 |
| - (^long [^NativeBuffer native-buffer] |
138 |
| - (assert (>= (- (.n-elems native-buffer) 8) 0)) |
139 |
| - (.getLong (unsafe) (.address native-buffer)))) |
140 |
| - |
141 |
| - |
142 |
| -(defn read-int |
143 |
| - (^long [^NativeBuffer native-buffer ^long offset] |
144 |
| - (assert (>= (- (.n-elems native-buffer) offset 4) 0)) |
145 |
| - (.getInt (unsafe) (+ (.address native-buffer) offset))) |
146 |
| - (^long [^NativeBuffer native-buffer] |
147 |
| - (assert (>= (- (.n-elems native-buffer) 4) 0)) |
148 |
| - (.getInt (unsafe) (.address native-buffer)))) |
149 |
| - |
150 |
| - |
151 |
| -(defn read-short |
152 |
| - (^long [^NativeBuffer native-buffer ^long offset] |
153 |
| - (assert (>= (- (.n-elems native-buffer) offset 2) 0)) |
154 |
| - (unchecked-long |
155 |
| - (.getShort (unsafe) (+ (.address native-buffer) offset)))) |
156 |
| - (^long [^NativeBuffer native-buffer] |
157 |
| - (assert (>= (- (.n-elems native-buffer) 2) 0)) |
158 |
| - (unchecked-long |
159 |
| - (.getShort (unsafe) (.address native-buffer))))) |
160 |
| - |
161 |
| - |
162 |
| -(defn read-byte |
163 |
| - (^long [^NativeBuffer native-buffer ^long offset] |
164 |
| - (assert (>= (- (.n-elems native-buffer) offset 1) 0)) |
165 |
| - (unchecked-long |
166 |
| - (.getByte (unsafe) (+ (.address native-buffer) offset)))) |
167 |
| - (^long [^NativeBuffer native-buffer] |
168 |
| - (assert (>= (- (.n-elems native-buffer) 1) 0)) |
169 |
| - (unchecked-long |
170 |
| - (.getByte (unsafe) (.address native-buffer))))) |
171 |
| - |
172 |
| - |
173 |
| -(defn- unpack-copy-item |
174 |
| - [item ^long item-off] |
175 |
| - (if (instance? NativeBuffer item) |
176 |
| - ;;no further offsetting required for native buffers |
177 |
| - [nil (+ item-off (.address ^NativeBuffer item))] |
178 |
| - (let [ary (:java-array item) |
179 |
| - ary-off (:offset item)] |
180 |
| - [ary (+ item-off ary-off |
181 |
| - (case (dtype-proto/elemwise-datatype ary) |
182 |
| - :boolean Unsafe/ARRAY_BOOLEAN_BASE_OFFSET |
183 |
| - :int8 Unsafe/ARRAY_BYTE_BASE_OFFSET |
184 |
| - :int16 Unsafe/ARRAY_SHORT_BASE_OFFSET |
185 |
| - :int32 Unsafe/ARRAY_INT_BASE_OFFSET |
186 |
| - :int64 Unsafe/ARRAY_LONG_BASE_OFFSET |
187 |
| - :float32 Unsafe/ARRAY_FLOAT_BASE_OFFSET |
188 |
| - :float64 Unsafe/ARRAY_DOUBLE_BASE_OFFSET))]))) |
189 |
| - |
190 |
| - |
191 |
| -(defn copy! |
192 |
| - "Src, dst *must* be same unaliased datatype and that datatype must be a primitive |
193 |
| - datatype. |
194 |
| - src must either be convertible to an array or to a native buffer. |
195 |
| - dst must either be convertible to an array or to a native buffer. |
196 |
| - Uses Unsafe/copyMemory under the covers *without* safePointPolling. |
197 |
| - Returns dst" |
198 |
| - ([src src-off dst dst-off n-elems] |
199 |
| - (let [src-dt (casting/host-flatten (dtype-proto/elemwise-datatype src)) |
200 |
| - dst-dt (casting/host-flatten (dtype-proto/elemwise-datatype dst)) |
201 |
| - src-ec (dtype-proto/ecount src) |
202 |
| - dst-ec (dtype-proto/ecount dst) |
203 |
| - src-off (long src-off) |
204 |
| - dst-off (long dst-off) |
205 |
| - n-elems (long n-elems) |
206 |
| - _ (when-not (>= (- src-ec src-off) n-elems) |
207 |
| - (throw (Exception. (format "Src ecount (%s) - src offset (^%s) is less than op elem count (%s)" |
208 |
| - src-ec src-off n-elems)))) |
209 |
| - _ (when-not (>= (- dst-ec dst-off) n-elems) |
210 |
| - (throw (Exception. (format "Dst ecount (%s) - dst offset (^%s) is less than op elem count (%s)" |
211 |
| - dst-ec dst-off n-elems)))) |
212 |
| - _ (when-not (= src-dt dst-dt) |
213 |
| - (throw (Exception. (format "src datatype (%s) != dst datatype (%s)" |
214 |
| - src-dt dst-dt))))] |
215 |
| - ;;Check if managed heap or native heap |
216 |
| - (let [src (or (dtype-proto/->array-buffer src) |
217 |
| - (dtype-proto/->native-buffer src)) |
218 |
| - dst (or (dtype-proto/->array-buffer dst) |
219 |
| - (dtype-proto/->native-buffer dst)) |
220 |
| - _ (when-not (and src dst) |
221 |
| - (throw (Exception. |
222 |
| - "Src or dst are not convertible to arrays or native buffers"))) |
223 |
| - [src src-off] (unpack-copy-item src src-off) |
224 |
| - [dst dst-off] (unpack-copy-item dst dst-off)] |
225 |
| - (if (< n-elems 1024) |
226 |
| - (.copyMemory (unsafe) src (long src-off) dst (long dst-off) |
227 |
| - (* n-elems (casting/numeric-byte-width |
228 |
| - (casting/un-alias-datatype src-dt)))) |
229 |
| - (parallel-for/indexed-map-reduce |
230 |
| - n-elems |
231 |
| - (fn [^long start-idx ^long group-len] |
232 |
| - (.copyMemory (unsafe) |
233 |
| - src (+ (long src-off) start-idx) |
234 |
| - dst (+ (long dst-off) start-idx) |
235 |
| - (* group-len (casting/numeric-byte-width |
236 |
| - (casting/un-alias-datatype src-dt))))))) |
237 |
| - dst))) |
238 |
| - ([src dst n-elems] |
239 |
| - (copy! src 0 dst 0 n-elems)) |
240 |
| - ([src dst] |
241 |
| - (let [src-ec (dtype-proto/ecount src) |
242 |
| - dst-ec (dtype-proto/ecount dst)] |
243 |
| - (when-not (== src-ec dst-ec) |
244 |
| - (throw (Exception. (format "src ecount (%s) != dst ecount (%s)" |
245 |
| - src-ec dst-ec)))) |
246 |
| - (copy! src 0 dst 0 src-ec)))) |
247 |
| - |
248 |
| - |
249 |
| -(defn free |
250 |
| - [data] |
251 |
| - (let [addr (long (if (instance? NativeBuffer data) |
252 |
| - (.address ^NativeBuffer data) |
253 |
| - (long data)))] |
254 |
| - (when-not (== 0 addr) |
255 |
| - (.freeMemory (unsafe) addr)))) |
256 |
| - |
257 |
| - |
258 |
| -(defn malloc |
259 |
| - (^NativeBuffer [^long n-bytes {:keys [resource-type] |
260 |
| - :or {resource-type :stack}}] |
261 |
| - (let [retval (NativeBuffer. (.allocateMemory (unsafe) n-bytes) |
262 |
| - n-bytes |
263 |
| - :int8) |
264 |
| - addr (.address retval)] |
265 |
| - (when resource-type |
266 |
| - (resource/track retval #(free addr) resource-type)) |
267 |
| - retval)) |
268 |
| - (^NativeBuffer [^long n-bytes] |
269 |
| - (malloc n-bytes {}))) |
270 |
| - |
271 |
| - |
272 | 19 | (defn mmap-file
|
273 | 20 | "Memory map a file returning a native buffer. fpath must resolve to a valid
|
274 | 21 | java.io.File.
|
|
302 | 49 | #(do (log/debugf "closing %s" fpath) (.close map-buf))
|
303 | 50 | resource-type)
|
304 | 51 | (log/debugf "No resource type specified for mmaped file %s" fpath))
|
305 |
| - (->NativeBuffer (.address map-buf) (.size map-buf) :int8))) |
| 52 | + (native-buffer/->NativeBuffer (.address map-buf) (.size map-buf) :int8))) |
306 | 53 | ([fpath]
|
307 | 54 | (mmap-file fpath {})))
|
0 commit comments