Skip to content

Commit

Permalink
Optimize Thread Usage while Loading Resources
Browse files Browse the repository at this point in the history
Before this change, the KV Resource Store used it's thread pool for both
loading and parsing/conforming the resources. However loading is I/O but
parsing/conforming is CPU bound. So it is better to use the KV Resource
Store Executor only for the loading, the I/O part.

With this change the KV Resource Store uses do-async to change to a
common ForkJoinPool thread. This change has also the advantage that
functions that are applied after the returned future do not have to
change the thread pool because the futures are already on the common
ForkJoinPool.

The performance improves by a little bit because the threads are now
directly usable to lead the next resource instead having to parse/
conform the current one.

Removed all pool switching async functions. Tested the thread pool used.
  • Loading branch information
alexanderkiel committed Jul 23, 2024
1 parent e3eb6c7 commit 123d0a9
Show file tree
Hide file tree
Showing 27 changed files with 191 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
[blaze.fhir.spec :as fhir-spec]
[blaze.fhir.test-util :refer [given-failed-future]]
[blaze.log]
[blaze.module.test-util :refer [with-system]]
[blaze.module.test-util :as mtu :refer [with-system]]
[blaze.test-util :as tu :refer [given-thrown]]
[clojure.spec.alpha :as s]
[clojure.spec.test.alpha :as st]
Expand All @@ -22,6 +22,7 @@
[cognitect.anomalies :as anom]
[integrant.core :as ig]
[jsonista.core :as j]
[juxt.iota :refer [given]]
[taoensso.timbre :as log])
(:import
[com.datastax.oss.driver.api.core
Expand All @@ -33,12 +34,11 @@
[com.fasterxml.jackson.dataformat.cbor CBORFactory]
[java.net InetSocketAddress]
[java.nio ByteBuffer]
#_{:clj-kondo/ignore [:unused-import]}
[java.util.concurrent CompletionStage]))

(set! *warn-on-reflection* true)
(st/instrument)
(log/set-level! :trace)
(log/set-min-level! :trace)

(test/use-fixtures :each tu/fixture)

Expand Down Expand Up @@ -227,7 +227,9 @@

(with-redefs [cass/session (fn [_] session)]
(with-system [{store ::rs/cassandra} {::rs/cassandra {}}]
(is (= content @(rs/get store hash)))))))
(given @(mtu/assoc-thread-name (rs/get store hash))
identity := content
[meta :thread-name] :? mtu/common-pool-thread?)))))

(testing "success after one retry due to timeout"
(let [content {:fhir/type :fhir/Patient :id "0"}
Expand All @@ -253,7 +255,10 @@

(with-redefs [cass/session (fn [_] session)]
(with-system [{store ::rs/cassandra} {::rs/cassandra {}}]
(is (= content @(rs/get store hash))))))))
(testing "content matches"
(given @(mtu/assoc-thread-name (rs/get store hash))
identity := content
[meta :thread-name] :? mtu/common-pool-thread?)))))))

(deftest multi-get-test
(testing "not found"
Expand All @@ -275,7 +280,10 @@

(with-redefs [cass/session (fn [_] session)]
(with-system [{store ::rs/cassandra} {::rs/cassandra {}}]
(is (empty? @(rs/multi-get store [hash])))))))
(testing "result is empty"
(given @(mtu/assoc-thread-name (rs/multi-get store [hash]))
identity :? empty?
[meta :thread-name] :? mtu/common-pool-thread?))))))

(testing "success"
(let [content {:fhir/type :fhir/Patient :id "0"}
Expand All @@ -298,7 +306,9 @@

(with-redefs [cass/session (fn [_] session)]
(with-system [{store ::rs/cassandra} {::rs/cassandra {}}]
(is (= {hash content} @(rs/multi-get store [hash]))))))))
(given @(mtu/assoc-thread-name (rs/multi-get store [hash]))
identity := {hash content}
[meta :thread-name] :? mtu/common-pool-thread?))))))

(def bound-put-statement (reify BoundStatement))

Expand Down
2 changes: 1 addition & 1 deletion modules/db-resource-store/src/blaze/db/resource_store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

(defn get
"Returns a CompletableFuture that will complete with the resource content of
`hash` or nil if it was not found."
`hash` or will complete with nil if it was not found."
[store hash]
(-get store hash))

Expand Down
9 changes: 5 additions & 4 deletions modules/db-resource-store/src/blaze/db/resource_store/kv.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"A resource store implementation that uses a kev-value store as backend."
(:require
[blaze.anomaly :as ba :refer [when-ok]]
[blaze.async.comp :as ac :refer [do-sync]]
[blaze.async.comp :as ac :refer [do-async do-sync]]
[blaze.coll.core :as coll]
[blaze.db.kv :as kv]
[blaze.db.kv.spec]
Expand Down Expand Up @@ -76,11 +76,12 @@
(with-open [_ (prom/timer duration-seconds "get-resource")]
(kv/get kv-store :default (hash/to-byte-array hash))))

(defn- get-and-parse [kv-store hash]
(some-> (get-content kv-store hash) (parse-and-conform-cbor hash)))
(defn- get-content-async [kv-store executor hash]
(ac/supply-async #(get-content kv-store hash) executor))

(defn- get-and-parse-async [kv-store executor hash]
(ac/supply-async #(get-and-parse kv-store hash) executor))
(do-async [bytes (get-content-async kv-store executor hash)]
(some-> bytes (parse-and-conform-cbor hash))))

(defn- multi-get-and-parse-async [kv-store executor hashes]
(mapv (partial get-and-parse-async kv-store executor) hashes))
Expand Down
45 changes: 28 additions & 17 deletions modules/db-resource-store/test/blaze/db/resource_store/kv_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
[blaze.fhir.hash-spec]
[blaze.fhir.spec :as fhir-spec]
[blaze.fhir.test-util :refer [given-failed-future]]
[blaze.log]
[blaze.metrics.spec]
[blaze.module.test-util :refer [with-system]]
[blaze.module.test-util :as mtu :refer [with-system]]
[blaze.test-util :as tu :refer [given-thrown]]
[clojure.spec.alpha :as s]
[clojure.spec.test.alpha :as st]
Expand All @@ -25,13 +26,14 @@
[cognitect.anomalies :as anom]
[integrant.core :as ig]
[jsonista.core :as j]
[juxt.iota :refer [given]]
[taoensso.timbre :as log])
(:import
[com.fasterxml.jackson.dataformat.cbor CBORFactory]))

(set! *warn-on-reflection* true)
(st/instrument)
(log/set-level! :trace)
(log/set-min-level! :trace)

(test/use-fixtures :each tu/fixture)

Expand Down Expand Up @@ -99,7 +101,7 @@
(with-system [{collector ::rs-kv/duration-seconds} {::rs-kv/duration-seconds {}}]
(is (s/valid? :blaze.metrics/collector collector))))

(def ^:private system
(def ^:private config
{::rs/kv
{:kv-store (ig/ref ::kv/mem)
:executor (ig/ref ::rs-kv/executor)}
Expand Down Expand Up @@ -136,29 +138,31 @@
(deftest get-test
(testing "success"
(let [content {:fhir/type :fhir/Patient :id "0"}]
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(put! kv-store (hash) content)

(is (= content @(rs/get store (hash)))))))
(given @(mtu/assoc-thread-name (rs/get store (hash)))
identity := content
[meta :thread-name] :? mtu/common-pool-thread?))))

(testing "parsing error"
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(kv/put! kv-store [[:default (hash/to-byte-array (hash)) (invalid-content)]])

(given-failed-future (rs/get store (hash))
::anom/category := ::anom/incorrect
::anom/message :# "Error while parsing resource content(.|\\s)*")))

(testing "conforming error"
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(kv/put! kv-store [[:default (hash/to-byte-array (hash)) (j/write-value-as-bytes {} cbor-object-mapper)]])

(given-failed-future (rs/get store (hash))
::anom/category := ::anom/fault
::anom/message := (format "Error while conforming resource content with hash `%s`." (hash)))))

(testing "not-found"
(with-system [{store ::rs/kv} system]
(with-system [{store ::rs/kv} config]
(is (nil? @(rs/get store (hash))))))

(testing "error"
Expand All @@ -171,33 +175,40 @@
(testing "success"
(testing "with one hash"
(let [content {:fhir/type :fhir/Patient :id "0"}]
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(put! kv-store (hash) content)

(is (= {(hash) content} @(rs/multi-get store [(hash)]))))))
(given @(mtu/assoc-thread-name (rs/multi-get store [(hash)]))
identity := {(hash) content}))))

(testing "with two hashes"
(let [content-0 {:fhir/type :fhir/Patient :id "0"}
content-1 {:fhir/type :fhir/Patient :id "1"}]
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(put! kv-store (hash "0") content-0)
(put! kv-store (hash "1") content-1)

(is (= {(hash "0") content-0 (hash "1") content-1}
@(rs/multi-get store [(hash "0") (hash "1")])))))))
(testing "content matches"
(given @(mtu/assoc-thread-name (rs/multi-get store [(hash "0") (hash "1")]))
identity := {(hash "0") content-0 (hash "1") content-1}
[meta :thread-name] :? mtu/common-pool-thread?))))))

(testing "parsing error"
(let [hash (hash)]
(with-system [{store ::rs/kv kv-store ::kv/mem} system]
(with-system [{store ::rs/kv kv-store ::kv/mem} config]
(kv/put! kv-store [[:default (hash/to-byte-array hash) (invalid-content)]])

(given-failed-future (rs/multi-get store [hash])
::anom/category := ::anom/incorrect
::anom/message :# "Error while parsing resource content(.|\\s)*"))))

(testing "not-found"
(with-system [{store ::rs/kv} system]
(is (= {} @(rs/multi-get store [(hash)])))))
(with-system [{store ::rs/kv} config]

(testing "result is empty"
(given @(mtu/assoc-thread-name (rs/multi-get store [(hash)]))
identity :? empty?
[meta :thread-name] :? mtu/common-pool-thread?))))

(testing "error"
(testing "with one hash"
Expand Down Expand Up @@ -227,7 +238,7 @@

(deftest put-test
(let [content {:fhir/type :fhir/Patient :id "0"}]
(with-system [{store ::rs/kv} system]
(with-system [{store ::rs/kv} config]
@(rs/put! store {(hash) content})

(is (= content @(rs/get store (hash)))))))
Expand Down
12 changes: 9 additions & 3 deletions modules/db/src/blaze/db/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@
Returns a CompletableFuture that will complete with the database after the
transaction in case of success or will complete exceptionally with an anomaly
in case of a transaction error or other errors."
in case of a transaction error or other errors.
Functions applied after the returned future are executed on the common
ForkJoinPool."
[node tx-ops]
(-> (np/-submit-tx node tx-ops)
(ac/then-compose-async #(np/-tx-result node %))))
(ac/then-compose #(np/-tx-result node %))))

(defn changed-resources-publisher
"Returns a publisher that publishes all changed resources of `type`."
Expand Down Expand Up @@ -459,7 +462,10 @@

(defn pull
"Returns a CompletableFuture that will complete with the resource of
`resource-handle` or an anomaly in case of errors."
`resource-handle` or an anomaly in case of errors.
Functions applied after the returned future are executed on the common
ForkJoinPool."
[node-or-db resource-handle]
(p/-pull node-or-db resource-handle))

Expand Down
2 changes: 1 addition & 1 deletion modules/db/src/blaze/db/impl/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
(set! *warn-on-reflection* true)
(set! *unchecked-math* :warn-on-boxed)

(deftype Db [node kv-store basis-t t]
(defrecord Db [node kv-store basis-t t]
p/Db
(-node [_]
node)
Expand Down
8 changes: 4 additions & 4 deletions modules/db/src/blaze/db/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"This namespace contains the local database node component."
(:require
[blaze.anomaly :as ba :refer [if-ok when-ok]]
[blaze.async.comp :as ac :refer [do-async do-sync]]
[blaze.async.comp :as ac :refer [do-sync]]
[blaze.async.flow :as flow]
[blaze.db.api :as d]
[blaze.db.impl.batch-db :as batch-db]
Expand Down Expand Up @@ -335,19 +335,19 @@

p/Pull
(-pull [_ resource-handle]
(do-async [resource (get-resource resource-store resource-handle)]
(do-sync [resource (get-resource resource-store resource-handle)]
(or (some->> resource (enhance-resource tx-cache resource-handle))
(resource-content-not-found-anom resource-handle))))

(-pull-content [_ resource-handle]
(do-async [resource (get-resource resource-store resource-handle)]
(do-sync [resource (get-resource resource-store resource-handle)]
(or (some-> resource (with-meta (meta resource-handle)))
(resource-content-not-found-anom resource-handle))))

(-pull-many [_ resource-handles]
(let [resource-handles (vec resource-handles) ; don't evaluate resource-handles twice
hashes (hashes-of-non-deleted resource-handles)]
(do-async [resources (rs/multi-get resource-store hashes)]
(do-sync [resources (rs/multi-get resource-store hashes)]
(into
[]
(comp (map (partial to-resource tx-cache resources))
Expand Down
7 changes: 3 additions & 4 deletions modules/db/src/blaze/db/node/resource_indexer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"This namespace contains the resource indexer component."
(:require
[blaze.anomaly :as ba]
[blaze.async.comp :as ac :refer [do-async do-sync]]
[blaze.async.comp :as ac :refer [do-sync]]
[blaze.coll.core :as coll]
[blaze.db.impl.codec :as codec]
[blaze.db.impl.index.compartment.resource :as cr]
Expand Down Expand Up @@ -131,8 +131,7 @@
(if resources
(index-resources* context resources)
(-> (rs/multi-get resource-store (hashes tx-cmds))
(ac/then-compose-async
(partial index-resources* context))))))
(ac/then-compose (partial index-resources* context))))))

(defn- re-index-resource [search-param [hash resource]]
(log/trace "Re-index resource with hash" (str hash))
Expand All @@ -150,7 +149,7 @@
(defn- re-index-resources*
[{:keys [resource-store kv-store executor]} search-param resource-handles]
(log/trace "Re-index" (count resource-handles) "resource(s)")
(do-async [resources (rs/multi-get resource-store (mapv rh/hash resource-handles))]
(do-sync [resources (rs/multi-get resource-store (mapv rh/hash resource-handles))]
(async-re-index-resources kv-store executor search-param resources)))

(defn re-index-resources
Expand Down
Loading

0 comments on commit 123d0a9

Please sign in to comment.