Skip to content

Commit

Permalink
Use a Separate Thread Pool for Reading Resources
Browse files Browse the repository at this point in the history
Closes: #354
  • Loading branch information
alexanderkiel committed Mar 30, 2021
1 parent 01d052d commit e1ad7e2
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 42 deletions.
48 changes: 38 additions & 10 deletions modules/db-resource-store/src/blaze/db/resource_store/kv.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
[blaze.db.kv :as kv]
[blaze.db.kv.spec]
[blaze.db.resource-store :as rs]
[blaze.executors :as ex]
[blaze.fhir.spec :as fhir-spec]
[clojure.spec.alpha :as s]
[cognitect.anomalies :as anom]
[integrant.core :as ig]
[taoensso.timbre :as log]))
[taoensso.timbre :as log])
(:import
[java.util.concurrent ExecutorService TimeUnit]))


(defn- parse-msg [hash e]
Expand Down Expand Up @@ -50,34 +53,59 @@
(kv/multi-get kv-store (mapv bs/to-byte-array hashes)))


(deftype KvResourceStore [kv-store]
(deftype KvResourceStore [kv-store executor]
rs/ResourceLookup
(-get [_ hash]
(ac/supply
(some-> (get-content kv-store hash)
(conform-cbor hash))))
(ac/supply-async
#(some-> (get-content kv-store hash)
(conform-cbor hash))
executor))

(-multi-get [_ hashes]
(log/trace "multi-get" (count hashes) "hash(es)")
(ac/supply (into {} entry-thawer (multi-get-content kv-store hashes))))
(ac/supply-async
#(into {} entry-thawer (multi-get-content kv-store hashes))
executor))

rs/ResourceStore
(-put [_ entries]
(ac/supply (kv/put! kv-store (into [] entry-freezer entries)))))


(defn new-kv-resource-store [kv-store]
(->KvResourceStore kv-store))
(defn new-kv-resource-store [kv-store executor]
(->KvResourceStore kv-store executor))


(defmethod ig/pre-init-spec ::rs/kv [_]
(s/keys :req-un [:blaze.db/kv-store]))


(defmethod ig/init-key ::rs/kv
[_ {:keys [kv-store]}]
[_ {:keys [kv-store executor]}]
(log/info "Open key-value store backed resource store.")
(new-kv-resource-store kv-store))
(new-kv-resource-store kv-store executor))


(derive ::rs/kv :blaze.db/resource-store)


(defn- executor-init-msg [num-threads]
(format "Init resource store key-value executor with %d threads" num-threads))


(defmethod ig/init-key ::executor
[_ {:keys [num-threads] :or {num-threads 4}}]
(log/info (executor-init-msg num-threads))
(ex/io-pool num-threads "resource-store-kv-%d"))


(defmethod ig/halt-key! ::executor
[_ ^ExecutorService executor]
(log/info "Stopping resource store key-value executor...")
(.shutdown executor)
(if (.awaitTermination executor 10 TimeUnit/SECONDS)
(log/info "Resource store key-value executor was stopped successfully")
(log/warn "Got timeout while stopping the resource store key-value executor")))


(derive ::executor :blaze.metrics/thread-pool-executor)
34 changes: 24 additions & 10 deletions modules/db-resource-store/test/blaze/db/resource_store/kv_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
[blaze.db.resource-store-spec]
[blaze.db.resource-store.kv :refer [new-kv-resource-store]]
[blaze.db.resource-store.kv-spec]
[blaze.executors :as ex]
[blaze.fhir.hash :as hash]
[blaze.fhir.hash-spec]
[blaze.fhir.spec :as fhir-spec]
[clojure.spec.test.alpha :as st]
[clojure.test :as test :refer [deftest is testing]]
[cuerdas.core :as str]
[integrant.core :as ig]
[taoensso.timbre :as log])
(:import
[java.util.concurrent ExecutorService])
(:refer-clojure :exclude [hash]))


Expand All @@ -30,6 +34,10 @@
(test/use-fixtures :each fixture)


(def executor
(ex/single-thread-executor))


(defn hash [s]
(assert (= 1 (count s)))
(bs/from-hex (str/repeat s 64)))
Expand All @@ -50,15 +58,15 @@
(let [content {:fhir/type :fhir/Patient :id "0"}
hash (hash/generate content)
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(kv/put! kv-store (bs/to-byte-array hash) (fhir-spec/unform-cbor content))

(is (= content @(rs/get store hash)))))

(testing "parsing error"
(let [hash (hash "0")
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(kv/put! kv-store (bs/to-byte-array hash) (invalid-content))

(try
Expand All @@ -70,7 +78,7 @@
(testing "not-found"
(let [hash (hash "0")
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]

(is (nil? @(rs/get store hash)))))

Expand All @@ -80,7 +88,7 @@
(reify kv/KvStore
(-get [_ _]
(throw (Exception. "msg-154312"))))
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]

(try
@(rs/get store hash)
Expand All @@ -93,7 +101,7 @@
(let [content {:fhir/type :fhir/Patient :id "0"}
hash (hash/generate content)
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(kv/put! kv-store (bs/to-byte-array hash) (fhir-spec/unform-cbor content))

(is (= {hash content} @(rs/multi-get store [hash])))))
Expand All @@ -104,7 +112,7 @@
content-1 {:fhir/type :fhir/Patient :id "1"}
hash-1 (hash/generate content-1)
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(kv/put! kv-store (bs/to-byte-array hash-0) (fhir-spec/unform-cbor content-0))
(kv/put! kv-store (bs/to-byte-array hash-1) (fhir-spec/unform-cbor content-1))

Expand All @@ -114,7 +122,7 @@
(testing "parsing error"
(let [hash (hash "0")
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(kv/put! kv-store (bs/to-byte-array hash) (invalid-content))

(try
Expand All @@ -126,7 +134,7 @@
(testing "not-found"
(let [hash (hash "0")
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]

(is (= {} @(rs/multi-get store [hash])))))

Expand All @@ -136,7 +144,7 @@
(reify kv/KvStore
(-multi-get [_ _]
(throw (Exception. "msg-154826"))))
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]

(try
@(rs/multi-get store [hash])
Expand All @@ -148,7 +156,13 @@
(let [content {:fhir/type :fhir/Patient :id "0"}
hash (hash/generate content)
kv-store (new-mem-kv-store)
store (new-kv-resource-store kv-store)]
store (new-kv-resource-store kv-store executor)]
(rs/put store {hash content})

(is (= content @(rs/get store hash)))))


(deftest executor-test
(let [system (ig/init {:blaze.db.resource-store.kv/executor {}})]
(is (instance? ExecutorService (:blaze.db.resource-store.kv/executor system)))
(ig/halt! system)))
7 changes: 6 additions & 1 deletion modules/db-stub/src/blaze/db/api_stub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
(ex/single-thread-executor "indexer"))


(def ^:private resource-store-executor
(ex/single-thread-executor "resource-store"))


(def ^:private clock (Clock/fixed Instant/EPOCH (ZoneId/of "UTC")))


Expand All @@ -60,7 +64,8 @@
:system-as-of-index nil
:type-stats-index nil
:system-stats-index nil})
resource-store (new-kv-resource-store (new-mem-kv-store))
resource-store (new-kv-resource-store (new-mem-kv-store)
resource-store-executor)
tx-log (new-local-tx-log (new-mem-kv-store) clock local-tx-log-executor)
resource-handle-cache (.build (Caffeine/newBuilder))]
(new-node tx-log resource-handle-cache (tx-cache index-kv-store)
Expand Down
7 changes: 6 additions & 1 deletion modules/db/test-perf/blaze/db/api_test_perf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
(ex/single-thread-executor "indexer"))


(def ^:private resource-store-executor
(ex/single-thread-executor "resource-store"))


(defn new-index-kv-store []
(new-mem-kv-store
{:search-param-value-index nil
Expand Down Expand Up @@ -68,7 +72,8 @@
index-kv-store (new-index-kv-store)]
(node/new-node tx-log resource-handle-cache (tx-cache index-kv-store)
resource-indexer-executor 1 indexer-executor index-kv-store
(new-kv-resource-store (new-mem-kv-store))
(new-kv-resource-store (new-mem-kv-store)
resource-store-executor)
search-param-registry (jt/millis 10))))


Expand Down
9 changes: 7 additions & 2 deletions modules/db/test/blaze/db/api_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
(ex/single-thread-executor "indexer"))


(def ^:private resource-store-executor
(ex/single-thread-executor "resource-store"))


(defn new-index-kv-store []
(new-mem-kv-store
{:search-param-value-index nil
Expand Down Expand Up @@ -105,7 +109,8 @@

(defn new-node []
(new-node-with
{:resource-store (new-kv-resource-store (new-mem-kv-store))}))
{:resource-store
(new-kv-resource-store (new-mem-kv-store) resource-store-executor)}))


(defn new-resource-store-failing-on-get []
Expand Down Expand Up @@ -406,7 +411,7 @@
(with-open [node (new-node-with
{:resource-store
(-> (new-mem-kv-store)
(new-kv-resource-store)
(new-kv-resource-store resource-store-executor)
(new-random-slow-resource-store))})]
(let [db-futures
(mapv
Expand Down
7 changes: 6 additions & 1 deletion modules/db/test/blaze/db/node/tx_indexer/verify_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
(ex/single-thread-executor "indexer"))


(def ^:private resource-store-executor
(ex/single-thread-executor "resource-store"))


(defn new-index-kv-store []
(new-mem-kv-store
{:search-param-value-index nil
Expand Down Expand Up @@ -90,7 +94,8 @@
index-kv-store (new-index-kv-store)]
(node/new-node tx-log resource-handle-cache (tx-cache index-kv-store)
resource-indexer-executor 1 indexer-executor index-kv-store
(new-kv-resource-store (new-mem-kv-store))
(new-kv-resource-store (new-mem-kv-store)
resource-store-executor)
search-param-registry (jt/millis 10))))


Expand Down
28 changes: 24 additions & 4 deletions modules/db/test/blaze/db/node_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@
[blaze.db.tx-log.local-spec]
[blaze.executors :as ex]
[clojure.spec.test.alpha :as st]
[clojure.test :as test :refer [deftest testing]]
[clojure.test :as test :refer [deftest is testing]]
[cognitect.anomalies :as anom]
[integrant.core :as ig]
[java-time :as jt]
[juxt.iota :refer [given]]
[taoensso.timbre :as log])
(:import
[com.github.benmanes.caffeine.cache Caffeine]
[java.time Clock Instant ZoneId]))
[java.time Clock Instant ZoneId]
[java.util.concurrent ExecutorService]))


(st/instrument)
(log/set-level! :trace)


(defn fixture [f]
(st/instrument)
(log/with-level :trace (f))
(f)
(st/unstrument))


Expand All @@ -59,6 +62,10 @@
(ex/single-thread-executor "indexer"))


(def ^:private resource-store-executor
(ex/single-thread-executor "resource-store"))


(defn new-index-kv-store []
(new-mem-kv-store
{:search-param-value-index nil
Expand Down Expand Up @@ -94,7 +101,8 @@

(defn new-node []
(new-node-with
{:resource-store (new-kv-resource-store (new-mem-kv-store))}))
{:resource-store
(new-kv-resource-store (new-mem-kv-store) resource-store-executor)}))


(defn new-resource-store-failing-on-get []
Expand Down Expand Up @@ -140,3 +148,15 @@
(catch Exception e
(given (ex-data (ex-cause e))
::anom/category := ::anom/fault))))))))


(deftest resource-indexer-executor-test
(let [system (ig/init {::node/resource-indexer-executor {}})]
(is (instance? ExecutorService (::node/resource-indexer-executor system)))
(ig/halt! system)))


(deftest indexer-executor-test
(let [system (ig/init {::node/indexer-executor {}})]
(is (instance? ExecutorService (::node/indexer-executor system)))
(ig/halt! system)))
14 changes: 9 additions & 5 deletions modules/interaction/src/blaze/interaction/search_type.clj
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,17 @@
(ac/failed-future (ex-anom res))
(let [{:keys [matches includes next-match]}
(build-page db include-defs page-size handles)
matches (d/pull-many db matches)
includes (d/pull-many db (into [] includes))]
(-> (ac/all-of [matches includes])
match-futures (mapv #(d/pull-many db %) (partition-all 100 matches))
include-futures (mapv #(d/pull-many db %) (partition-all 100 includes))]
(-> (ac/all-of (into match-futures include-futures))
(ac/then-apply
(fn [_]
{:entries (entries router @matches @includes)
:num-matches (count @matches)
{:entries
(entries
router
(mapcat deref match-futures)
(mapcat deref include-futures))
:num-matches (count matches)
:next-handle next-match
:clauses clauses})))))))

Expand Down
Loading

0 comments on commit e1ad7e2

Please sign in to comment.