Skip to content

Commit

Permalink
Implement a Transaction Cache
Browse files Browse the repository at this point in the history
The transaction cache holds transaction data that currently only
consists of the instant at which the transaction happened. Transaction
data is keyed by t.
  • Loading branch information
alexanderkiel committed Mar 4, 2021
1 parent e60b84e commit 1bbef6f
Show file tree
Hide file tree
Showing 22 changed files with 285 additions and 65 deletions.
8 changes: 7 additions & 1 deletion dev/blaze/dev.clj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
(.invalidateAll ^Cache (:blaze.db/resource-handle-cache system))
)

;; Transaction Cache
(comment
(str (cc/-stats (:blaze.db/tx-cache system)))
(resource-cache/invalidate-all! (:blaze.db/tx-cache system))
)

;; Resource Cache
(comment
(str (cc/-stats (:blaze.db/resource-cache system)))
Expand All @@ -83,7 +89,7 @@

(.hash (d/resource-handle db "Patient" "01f5d727-e75c-4662-aecd-df2ffccd2e27"))

@(blaze.db.node/load-tx-result node (:kv-store node) 21228)
@(blaze.db.node/load-tx-result node 21228)

)

Expand Down
12 changes: 9 additions & 3 deletions modules/db-stub/src/blaze/db/api_stub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require
[blaze.db.api :as d]
[blaze.db.api-spec]
[blaze.db.impl.index.tx-success :as tsi]
[blaze.db.kv.mem :refer [new-mem-kv-store]]
[blaze.db.kv.mem-spec]
[blaze.db.node :refer [new-node]]
Expand Down Expand Up @@ -38,6 +39,11 @@

(def ^:private clock (Clock/fixed Instant/EPOCH (ZoneId/of "UTC")))


(defn- tx-cache [index-kv-store]
(.build (Caffeine/newBuilder) (tsi/cache-loader index-kv-store)))


(defn mem-node ^Closeable []
(let [index-kv-store
(new-mem-kv-store
Expand All @@ -57,9 +63,9 @@
resource-store (new-kv-resource-store (new-mem-kv-store))
tx-log (new-local-tx-log (new-mem-kv-store) clock local-tx-log-executor)
resource-handle-cache (.build (Caffeine/newBuilder))]
(new-node tx-log resource-handle-cache resource-indexer-executor 1
indexer-executor index-kv-store resource-store
search-param-registry (jt/millis 10))))
(new-node tx-log resource-handle-cache (tx-cache index-kv-store)
resource-indexer-executor 1 indexer-executor index-kv-store
resource-store search-param-registry (jt/millis 10))))


(defn- submit-txs [node txs]
Expand Down
15 changes: 11 additions & 4 deletions modules/db/src/blaze/db/impl/index/tx_success.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[blaze.db.impl.index.cbor :as cbor]
[blaze.db.kv :as kv])
(:import
[com.github.benmanes.caffeine.cache CacheLoader LoadingCache]
[java.time Instant]))


Expand All @@ -18,19 +19,25 @@
:blaze.db.tx/instant (Instant/ofEpochMilli inst)}))


(defn- encode-key [t]
(defn encode-key [t]
(-> (bb/allocate Long/BYTES)
(bb/put-long! t)
(bb/array)))


(defn cache-loader [kv-store]
(reify CacheLoader
(load [_ t]
(some-> (kv/get kv-store :tx-success-index (encode-key t))
(decode-tx t)))))


(defn tx
"Returns the transaction with `t` using `kv-store` or nil of none was found.
Errored transactions are returned by `blaze.db.impl.index.tx-error/tx-error`."
[kv-store t]
(some-> (kv/get kv-store :tx-success-index (encode-key t))
(decode-tx t)))
[^LoadingCache tx-cache t]
(.get tx-cache t))


(defn last-t
Expand Down
34 changes: 18 additions & 16 deletions modules/db/src/blaze/db/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@
future))


(defn load-tx-result [node kv-store t]
(if (tsi/tx kv-store t)
(defn load-tx-result [{:keys [tx-cache kv-store] :as node} t]
(if (tsi/tx tx-cache t)
(ac/completed-future (db/db node t))
(if-let [anomaly (te/tx-error kv-store t)]
(ac/failed-future
Expand Down Expand Up @@ -194,14 +194,15 @@
(assoc :blaze.db/tx tx)))


(defn- enhance-resource [kv-store {:keys [t] :as handle} resource]
(let [tx (tsi/tx kv-store t)]
(defn- enhance-resource [tx-cache {:keys [t] :as handle} resource]
(let [tx (tsi/tx tx-cache t)]
(-> (update resource :meta enhance-resource-meta t tx)
(with-meta (mk-meta handle tx)))))


(defrecord Node [tx-log rh-cache kv-store resource-store search-param-registry
resource-indexer state run? poll-timeout finished]
(defrecord Node [tx-log rh-cache tx-cache kv-store resource-store
search-param-registry resource-indexer state run? poll-timeout
finished]
p/Node
(-db [node]
(db/db node (:t @state)))
Expand Down Expand Up @@ -229,18 +230,18 @@
(cond
(<= t current-t)
(do (remove-watch state watcher)
(load-tx-result node kv-store t))
(load-tx-result node t))

(:e current-state)
(do (remove-watch state watcher)
(ac/failed-future (:e current-state)))

:else
(ac/then-compose watcher (fn [_] (load-tx-result node kv-store t))))))
(ac/then-compose watcher (fn [_] (load-tx-result node t))))))

p/Tx
(-tx [_ t]
(tsi/tx kv-store t))
(tsi/tx tx-cache t))

rs/ResourceLookup
(-get [_ hash]
Expand Down Expand Up @@ -279,7 +280,7 @@
p/Pull
(-pull [_ resource-handle]
(-> (rs/get resource-store (:hash resource-handle))
(ac/then-apply #(enhance-resource kv-store resource-handle %))))
(ac/then-apply #(enhance-resource tx-cache resource-handle %))))

(-pull-content [_ resource-handle]
(-> (rs/get resource-store (:hash resource-handle))
Expand All @@ -291,7 +292,7 @@
(fn [resources]
(mapv
(fn [{:keys [hash] :as resource-handle}]
(enhance-resource kv-store resource-handle (get resources hash)))
(enhance-resource tx-cache resource-handle (get resources hash)))
resource-handles)))))

Runnable
Expand Down Expand Up @@ -328,7 +329,7 @@

(defn new-node
"Creates a new local database node."
[tx-log resource-handle-cache resource-indexer-executor
[tx-log resource-handle-cache tx-cache resource-indexer-executor
resource-indexer-batch-size indexer-executor kv-store
resource-store search-param-registry poll-timeout]
(let [resource-indexer (new-resource-indexer resource-store
Expand All @@ -337,8 +338,8 @@
resource-indexer-executor
resource-indexer-batch-size)
indexer-abort-reason (atom nil)
node (->Node tx-log resource-handle-cache kv-store resource-store
search-param-registry resource-indexer
node (->Node tx-log resource-handle-cache tx-cache kv-store
resource-store search-param-registry resource-indexer
(atom {:t (or (tsi/last-t kv-store) 0)
:error-t 0})
(volatile! true)
Expand All @@ -357,6 +358,7 @@
:req-un
[:blaze.db/tx-log
:blaze.db/resource-handle-cache
:blaze.db/tx-cache
::resource-indexer-executor
::resource-indexer-batch-size
::indexer-executor
Expand All @@ -371,11 +373,11 @@


(defmethod ig/init-key :blaze.db/node
[_ {:keys [tx-log resource-handle-cache resource-indexer-executor
[_ {:keys [tx-log resource-handle-cache tx-cache resource-indexer-executor
resource-indexer-batch-size indexer-executor kv-store
resource-store search-param-registry]}]
(log/info (init-msg resource-indexer-batch-size))
(new-node tx-log resource-handle-cache resource-indexer-executor
(new-node tx-log resource-handle-cache tx-cache resource-indexer-executor
resource-indexer-batch-size indexer-executor kv-store resource-store
search-param-registry (jt/seconds 1)))

Expand Down
3 changes: 1 addition & 2 deletions modules/db/src/blaze/db/resource_handle_cache.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
Caffeine is used because it have better performance characteristics as a
ConcurrentHashMap."
(:require
[blaze.db.resource-cache.spec]
[blaze.db.resource-store.spec]
[blaze.db.resource-handle-cache.spec]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[taoensso.timbre :as log])
Expand Down
7 changes: 7 additions & 0 deletions modules/db/src/blaze/db/resource_handle_cache/spec.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(ns blaze.db.resource-handle-cache.spec
(:require
[clojure.spec.alpha :as s]))


(s/def :blaze.db.resource-handle-cache/max-size
nat-int?)
14 changes: 9 additions & 5 deletions modules/db/src/blaze/db/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@
[blaze.db.tx-log.spec]
[clojure.spec.alpha :as s])
(:import
[com.github.benmanes.caffeine.cache Cache]))
[com.github.benmanes.caffeine.cache Cache LoadingCache]))


(s/def :blaze.db/node
#(satisfies? p/Node %))


(s/def :blaze.db/resource-cache
:blaze.db/resource-store)


(s/def :blaze.db/resource-handle-cache
#(instance? Cache %))


(s/def :blaze.db/tx-cache
#(instance? LoadingCache %))


(s/def :blaze.db/resource-cache
:blaze.db/resource-store)


(s/def :blaze.db/op
#{:create :put :delete})

Expand Down
36 changes: 36 additions & 0 deletions modules/db/src/blaze/db/tx_cache.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
(ns blaze.db.tx-cache
"A cache for transactions.
Caffeine is used because it have better performance characteristics as a
ConcurrentHashMap."
(:require
[blaze.db.impl.index.tx-success :as tsi]
[blaze.db.kv.spec]
[blaze.db.tx-cache.spec]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[taoensso.timbre :as log])
(:import
[com.github.benmanes.caffeine.cache Caffeine]))


(set! *warn-on-reflection* true)


(defn- new-tx-cache
"Creates a new transaction cache."
[kv-store max-size]
(-> (Caffeine/newBuilder)
(.maximumSize max-size)
(.recordStats)
(.build (tsi/cache-loader kv-store))))


(defmethod ig/pre-init-spec :blaze.db/tx-cache [_]
(s/keys :req-un [:blaze.db/kv-store] :opt-un [::max-size]))


(defmethod ig/init-key :blaze.db/tx-cache
[_ {:keys [kv-store max-size] :or {max-size 0}}]
(log/info "Create transaction cache with a size of" max-size "transactions")
(new-tx-cache kv-store max-size))
7 changes: 7 additions & 0 deletions modules/db/src/blaze/db/tx_cache/spec.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(ns blaze.db.tx-cache.spec
(:require
[clojure.spec.alpha :as s]))


(s/def :blaze.db.tx-cache/max-size
nat-int?)
12 changes: 9 additions & 3 deletions modules/db/test-perf/blaze/db/api_test_perf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require
[blaze.anomaly :refer [ex-anom]]
[blaze.db.api :as d]
[blaze.db.impl.index.tx-success :as tsi]
[blaze.db.kv.mem :refer [new-mem-kv-store]]
[blaze.db.node :as node]
[blaze.db.resource-store.kv :refer [new-kv-resource-store]]
Expand Down Expand Up @@ -57,11 +58,16 @@
(def clock (Clock/fixed Instant/EPOCH (ZoneId/of "UTC")))


(defn- tx-cache [index-kv-store]
(.build (Caffeine/newBuilder) (tsi/cache-loader index-kv-store)))


(defn new-node []
(let [tx-log (new-local-tx-log (new-mem-kv-store) clock local-tx-log-executor)
resource-handle-cache (.build (Caffeine/newBuilder))]
(node/new-node tx-log resource-handle-cache resource-indexer-executor 1
indexer-executor (new-index-kv-store)
resource-handle-cache (.build (Caffeine/newBuilder))
index-kv-store (new-index-kv-store)]
(node/new-node tx-log resource-handle-cache (tx-cache index-kv-store)
resource-indexer-executor 1 indexer-executor index-kv-store
(new-kv-resource-store (new-mem-kv-store))
search-param-registry (jt/millis 10))))

Expand Down
14 changes: 10 additions & 4 deletions modules/db/test/blaze/db/api_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[blaze.db.api-spec]
[blaze.db.impl.db-spec]
[blaze.db.impl.index.resource-search-param-value-test-util :as r-sp-v-tu]
[blaze.db.impl.index.tx-success :as tsi]
[blaze.db.kv.mem :refer [new-mem-kv-store]]
[blaze.db.kv.mem-spec]
[blaze.db.node :as node]
Expand Down Expand Up @@ -89,12 +90,17 @@
(def clock (Clock/fixed Instant/EPOCH (ZoneId/of "UTC")))


(defn- tx-cache [index-kv-store]
(.build (Caffeine/newBuilder) (tsi/cache-loader index-kv-store)))


(defn new-node-with [{:keys [resource-store]}]
(let [tx-log (new-local-tx-log (new-mem-kv-store) clock local-tx-log-executor)
resource-handle-cache (.build (Caffeine/newBuilder))]
(node/new-node tx-log resource-handle-cache resource-indexer-executor 1
indexer-executor (new-index-kv-store) resource-store
search-param-registry (jt/millis 10))))
resource-handle-cache (.build (Caffeine/newBuilder))
index-kv-store (new-index-kv-store)]
(node/new-node tx-log resource-handle-cache (tx-cache index-kv-store)
resource-indexer-executor 1 indexer-executor index-kv-store
resource-store search-param-registry (jt/millis 10))))


(defn new-node []
Expand Down
2 changes: 1 addition & 1 deletion modules/db/test/blaze/db/impl/index/tx_success_spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


(s/fdef tsi/tx
:args (s/cat :kv-store :blaze.db/kv-store :t :blaze.db/t)
:args (s/cat :tx-cache :blaze.db/tx-cache :t :blaze.db/t)
:ret (s/nilable :blaze.db/tx))


Expand Down
Loading

0 comments on commit 1bbef6f

Please sign in to comment.