Skip to content

Commit

Permalink
Thread Pool Resource Indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderkiel committed Apr 24, 2022
1 parent cf957ef commit 1645f14
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 119 deletions.
5 changes: 4 additions & 1 deletion modules/db-stub/src/blaze/db/api_stub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@
:blaze.db.node/resource-indexer
{:kv-store (ig/ref :blaze.db/index-kv-store)
:resource-store (ig/ref ::rs/kv)
:search-param-registry (ig/ref :blaze.db/search-param-registry)}
:search-param-registry (ig/ref :blaze.db/search-param-registry)
:executor (ig/ref :blaze.db.node.resource-indexer/executor)}

:blaze.db.node.resource-indexer/executor {}

:blaze.db/search-param-registry
{:structure-definition-repo (ig/ref :blaze.fhir/structure-definition-repo)}
Expand Down
175 changes: 99 additions & 76 deletions modules/db/src/blaze/db/node/resource_indexer.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns blaze.db.node.resource-indexer
(:require
[blaze.anomaly :as ba :refer [if-ok]]
[blaze.anomaly :as ba]
[blaze.async.comp :as ac]
[blaze.byte-string :as bs]
[blaze.db.impl.codec :as codec]
Expand All @@ -11,14 +11,16 @@
[blaze.db.node.resource-indexer.spec]
[blaze.db.resource-store :as rs]
[blaze.db.search-param-registry :as sr]
[blaze.executors :as ex]
[blaze.fhir.spec :as fhir-spec]
[blaze.module :refer [reg-collector]]
[clojure.core.reducers :as r]
[clojure.spec.alpha :as s]
[cognitect.anomalies :as anom]
[integrant.core :as ig]
[prometheus.alpha :as prom :refer [defhistogram]]
[taoensso.timbre :as log]))
[taoensso.timbre :as log])
(:import
[java.util.concurrent TimeUnit]))


(set! *warn-on-reflection* true)
Expand All @@ -33,26 +35,13 @@
"op")


(def ^:private available-processors
(.availableProcessors (Runtime/getRuntime)))


(def ^:private num-work-units
"The target number of work units carried out in parallel while indexing a coll
of resources.
The idea is, that we want to have 4 work units per available processor.
The number of 4 was chosen for the following reason. Individual work units
will vary greatly in size because the number of index entries of a resource
will vary based on the number of search parameters activated for this resource
type and data available. Choosing one work unit per processor would mean that
each processor gets one work unit and the biggest work unit will dominate the
overall time needed."
(* 4 available-processors))


(defn- batch-size [num-resources]
(max (quot num-resources num-work-units) 1))
(defhistogram index-entries
"Number of index entries of a resource."
{:namespace "blaze"
:subsystem "db"
:name "resource_indexer_index_entries"}
(take 14 (iterate #(* 2 %) 1))
"type")


(defn- compartment-resource-type-entry
Expand All @@ -66,76 +55,78 @@
(codec/id-byte-string id)))


(defn- conj-compartment-resource-type-entries! [res resource compartments]
(transduce
(map #(compartment-resource-type-entry % resource))
conj!
res
compartments))
(defn- compartment-resource-type-entries [resource compartments]
(mapv #(compartment-resource-type-entry % resource) compartments))


(defn- skip-indexing-msg [search-param resource cause-msg]
(format "Skip indexing for search parameter `%s` on resource `%s/%s`. Cause: %s"
(:url search-param) (name (fhir-spec/fhir-type resource))
(:id resource) cause-msg))
(:id resource) (or cause-msg "<unknown>")))


(defn- search-param-index-entries
[search-param linked-compartments hash resource]
(-> (search-param/index-entries search-param linked-compartments hash resource)
(ba/exceptionally
(fn [{::anom/keys [message]}]
(log/warn (skip-indexing-msg search-param resource message))
nil))))

(defn- index-entries [search-param linked-compartments hash resource]
(let [res (search-param/index-entries search-param linked-compartments
hash resource)]
(if (ba/anomaly? res)
(log/warn (skip-indexing-msg search-param resource (::anom/message res)))
res)))

(defn- skip-indexing-compartments-msg [hash message]
(format "Skip indexing compartments of resource with hash `%s` because of: %s"
(bs/hex hash) (or message "<unknown>")))


(defn- linked-compartments [search-param-registry hash resource]
(if-ok [compartments (sr/linked-compartments search-param-registry resource)]
compartments
(fn [e]
(log/warn "Skip indexing compartments of resource with hash "
(bs/hex hash) " because of:" (::anom/message e))
[])))
(-> (sr/linked-compartments search-param-registry resource)
(ba/exceptionally
(fn [{::anom/keys [message]}]
(log/warn (skip-indexing-compartments-msg hash message))
nil))))


(defn- search-params [search-param-registry resource]
(sr/list-by-type search-param-registry (name (:fhir/type resource))))


(defn- enhance-resource [last-updated resource]
(update resource :meta (fnil assoc #fhir/Meta{}) :lastUpdated last-updated))


(defn- index-resource*
[{:keys [search-param-registry last-updated]} hash resource]
(let [resource (enhance-resource last-updated resource)
compartments (linked-compartments search-param-registry hash resource)]
(into
(compartment-resource-type-entries resource compartments)
(mapcat #(search-param-index-entries % compartments hash resource))
(search-params search-param-registry resource))))


(defn- conj-index-entries!
[search-param-registry last-updated res [hash resource]]
(defn- index-resource [context [hash resource]]
(log/trace "Index resource with hash" (bs/hex hash))
(with-open [_ (prom/timer duration-seconds "calc-search-params")]
(let [resource (update resource :meta (fnil assoc #fhir/Meta{})
:lastUpdated last-updated)
compartments (linked-compartments search-param-registry hash resource)]
(transduce
(mapcat #(index-entries % compartments hash resource))
conj!
(conj-compartment-resource-type-entries! res resource compartments)
(sr/list-by-type search-param-registry (name (:fhir/type resource)))))))
(with-open [_ (prom/timer duration-seconds "index-resource")]
(let [entries (index-resource* context hash resource)]
(prom/observe! index-entries (name (:fhir/type resource)) (count entries))
entries)))


(defn- put! [store entries]
(with-open [_ (prom/timer duration-seconds "put")]
(kv/put! store entries)))


(defn- batch-index-resources
[{:keys [kv-store search-param-registry]} last-updated entries]
(->> entries
(r/fold
(batch-size (count entries))
(fn combine
([] (transient []))
([index-entries-a index-entries-b]
(put! kv-store (persistent! index-entries-a))
(put! kv-store (persistent! index-entries-b))
(transient [])))
(partial conj-index-entries! search-param-registry last-updated))
(persistent!)
(put! kv-store)))
(defn- async-index-resource [{:keys [kv-store executor] :as context} entry]
(ac/supply-async
#(put! kv-store (index-resource context entry))
executor))


(defn- index-resources* [resource-indexer last-updated entries]
(defn- index-resources* [context entries]
(log/trace "index" (count entries) "resource(s)")
(ac/supply-async
#(batch-index-resources resource-indexer last-updated (vec entries))))
(ac/all-of (mapv (partial async-index-resource context) entries)))


(defn- hashes [tx-cmds]
Expand All @@ -151,17 +142,19 @@
{:arglists '([resource-indexer tx-data])}
[{:keys [resource-store] :as resource-indexer}
{:keys [tx-cmds] resources :local-payload last-updated :instant}]
(if resources
(index-resources* resource-indexer last-updated resources)
(-> (rs/multi-get resource-store (hashes tx-cmds))
(ac/then-compose
(partial index-resources* resource-indexer last-updated)))))
(let [context (assoc resource-indexer :last-updated last-updated)]
(if resources
(index-resources* context resources)
(-> (rs/multi-get resource-store (hashes tx-cmds))
(ac/then-compose
(partial index-resources* context))))))


(defmethod ig/pre-init-spec :blaze.db.node/resource-indexer [_]
(s/keys :req-un [:blaze.db/kv-store
:blaze.db/resource-store
:blaze.db/search-param-registry]))
:blaze.db/search-param-registry
::executor]))


(defmethod ig/init-key :blaze.db.node/resource-indexer
Expand All @@ -170,5 +163,35 @@
resource-indexer)


(defmethod ig/pre-init-spec ::executor [_]
(s/keys :opt-un [::num-threads]))


(defn- executor-init-msg [num-threads]
(format "Init resource indexer executor with %d threads" num-threads))


(defmethod ig/init-key ::executor
[_ {:keys [num-threads] :or {num-threads 4}}]
(log/info (executor-init-msg num-threads))
(ex/io-pool num-threads "resource-indexer-%d"))


(defmethod ig/halt-key! ::executor
[_ executor]
(log/info "Stopping resource indexer executor...")
(ex/shutdown! executor)
(if (ex/await-termination executor 10 TimeUnit/SECONDS)
(log/info "Resource indexer executor was stopped successfully")
(log/warn "Got timeout while stopping the resource indexer executor")))


(derive ::executor :blaze.metrics/thread-pool-executor)


(reg-collector ::duration-seconds
duration-seconds)


(reg-collector ::index-entries
index-entries)
1 change: 1 addition & 0 deletions modules/db/src/blaze/db/node/resource_indexer/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[blaze.db.kv.spec]
[blaze.db.resource-store.spec]
[blaze.db.search-param-registry.spec]
[blaze.executors :as ex]
[clojure.spec.alpha :as s]))


Expand Down
5 changes: 4 additions & 1 deletion modules/db/test-perf/blaze/db/api_test_perf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
:blaze.db.node/resource-indexer
{:kv-store (ig/ref :blaze.db/index-kv-store)
:resource-store (ig/ref ::rs/kv)
:search-param-registry (ig/ref :blaze.db/search-param-registry)}
:search-param-registry (ig/ref :blaze.db/search-param-registry)
:executor (ig/ref :blaze.db.node.resource-indexer/executor)}

:blaze.db.node.resource-indexer/executor {}

:blaze.db/search-param-registry
{:structure-definition-repo (ig/ref :blaze.fhir/structure-definition-repo)}
Expand Down
7 changes: 4 additions & 3 deletions modules/db/test/blaze/db/node/resource_indexer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@
:blaze.db.node/resource-indexer
{:kv-store (ig/ref :blaze.db/index-kv-store)
:resource-store (ig/ref ::rs/kv)
:search-param-registry (ig/ref :blaze.db/search-param-registry)}
:search-param-registry (ig/ref :blaze.db/search-param-registry)
:executor (ig/ref ::resource-indexer/executor)}

:blaze.test/executor {}})
::resource-indexer/executor {}})


(deftest fails-on-kv-put-test
Expand Down Expand Up @@ -188,7 +189,7 @@
(let [observation {:fhir/type :fhir/Observation :id "0"
:subject #fhir/Reference{:reference "foo"}}
hash (hash/generate observation)]
(with-redefs [fhir-path/eval (fn [_ _ _] {::anom/category ::anom/fault})]
(with-redefs [fhir-path/eval (fn [_ _ _] {::anom/category ::anom/fault ::x ::y})]
@(resource-indexer/index-resources
resource-indexer
{:t 0
Expand Down
5 changes: 4 additions & 1 deletion modules/db/test/blaze/db/test_util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
:blaze.db.node/resource-indexer
{:kv-store (ig/ref :blaze.db/index-kv-store)
:resource-store (ig/ref ::rs/kv)
:search-param-registry (ig/ref :blaze.db/search-param-registry)}
:search-param-registry (ig/ref :blaze.db/search-param-registry)
:executor (ig/ref :blaze.db.node.resource-indexer/executor)}

:blaze.db.node.resource-indexer/executor {}

:blaze.db/search-param-registry
{:structure-definition-repo (ig/ref :blaze.fhir/structure-definition-repo)}
Expand Down
Loading

0 comments on commit 1645f14

Please sign in to comment.