Skip to content

Commit

Permalink
Improve Database Sync Efficiency
Browse files Browse the repository at this point in the history
Also renamed most occurrences of system into config because the system
will be created by the with-system macro and is a config before that.
  • Loading branch information
alexanderkiel committed Jun 23, 2023
1 parent 78649dd commit 0431a68
Show file tree
Hide file tree
Showing 69 changed files with 679 additions and 634 deletions.
22 changes: 11 additions & 11 deletions modules/cql/test/blaze/elm/compiler/external_data_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:require
[blaze.anomaly :as ba]
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.compiler :as c]
[blaze.elm.compiler.core :as core]
[blaze.elm.compiler.external-data]
Expand Down Expand Up @@ -64,7 +64,7 @@
(deftest compile-retrieve-test
(testing "Patient context"
(testing "Patient"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]]]

(let [context
Expand All @@ -84,7 +84,7 @@
(is (= '(retrieve-resource) (core/-form expr)))))))

(testing "Observation"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "1"
:subject
Expand All @@ -107,7 +107,7 @@
(is (= '(compartment-list-retrieve "Observation") (core/-form expr))))))

(testing "with one code"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -149,7 +149,7 @@
(core/-form expr)))))))

(testing "with two codes"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -198,7 +198,7 @@
[1 :id] := "2"))))

(testing "with one concept"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -251,7 +251,7 @@

(testing "Specimen context"
(testing "Patient"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Specimen :id "0"
:subject
Expand All @@ -271,7 +271,7 @@

(testing "Unfiltered context"
(testing "Medication"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Medication :id "0"
:code
#fhir/CodeableConcept
Expand Down Expand Up @@ -301,7 +301,7 @@
[0 :id] := "0"))))

(testing "unknown code property"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [context
{:node node
:eval-context "Unfiltered"
Expand All @@ -322,7 +322,7 @@

(testing "with related context"
(testing "with pre-compiled database query"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [library {:codeSystems
{:def [{:name "sys-def-174848" :id "system-174915"}]}
:statements
Expand All @@ -340,7 +340,7 @@
type := WithRelatedContextQueryRetrieveExpression))))

(testing "unknown code property"
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(let [library {:codeSystems
{:def [{:name "sys-def-174848" :id "system-174915"}]}
:statements
Expand Down
20 changes: 10 additions & 10 deletions modules/cql/test/blaze/elm/compiler/library_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns blaze.elm.compiler.library-test
(:require
[blaze.cql-translator :as t]
[blaze.db.api-stub :refer [mem-node-system]]
[blaze.db.api-stub :refer [mem-node-config]]
[blaze.elm.compiler :as compiler]
[blaze.elm.compiler.library :as library]
[blaze.elm.compiler.library-spec]
Expand Down Expand Up @@ -64,13 +64,13 @@
(deftest compile-library-test
(testing "empty library"
(let [library (t/translate "library Test")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
:expression-defs := {}))))

(testing "one static expression"
(let [library (t/translate "library Test define Foo: true")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "Foo" :context] := "Patient"
[:expression-defs "Foo" :expression] := true))))
Expand All @@ -80,7 +80,7 @@
using FHIR version '4.0.0'
context Patient
define Gender: Patient.gender")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "Gender" :context] := "Patient"
[:expression-defs "Gender" :expression compiler/form] := '(:gender (expr-ref "Patient"))))))
Expand All @@ -91,7 +91,7 @@
context Patient
define function Gender(P Patient): P.gender
define InInitialPopulation: Gender(Patient)")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "InInitialPopulation" :context] := "Patient"
[:expression-defs "InInitialPopulation" :resultTypeName] := "{http://hl7.org/fhir}AdministrativeGender"
Expand All @@ -107,7 +107,7 @@
define function Inc(i System.Integer): i + 1
define function Inc2(i System.Integer): Inc(i) + 1
define InInitialPopulation: Inc2(1)")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:expression-defs "InInitialPopulation" :context] := "Patient"
[:expression-defs "InInitialPopulation" :expression compiler/form] := '(call "Inc2" 1)))))
Expand All @@ -116,31 +116,31 @@
(testing "function"
(let [library (t/translate "library Test
define function Error(): singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message := "More than one element in `SingletonFrom` expression."))))

(testing "expression"
(let [library (t/translate "library Test
define Error: singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message := "More than one element in `SingletonFrom` expression.")))))

(testing "with parameter default"
(let [library (t/translate "library Test
parameter \"Measurement Period\" Interval<Date> default Interval[@2020-01-01, @2020-12-31]")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
[:parameter-default-values "Measurement Period" :start] := #system/date"2020-01-01"
[:parameter-default-values "Measurement Period" :end] := #system/date"2020-12-31"))))

(testing "with invalid parameter default"
(let [library (t/translate "library Test
parameter \"Measurement Start\" Integer default singleton from {1, 2}")]
(with-system [{:blaze.db/keys [node]} mem-node-system]
(with-system [{:blaze.db/keys [node]} mem-node-config]
(given (library/compile-library node library {})
::anom/category := ::anom/conflict
::anom/message "More than one element in `SingletonFrom` expression.")))))
8 changes: 4 additions & 4 deletions modules/cql/test/blaze/elm/compiler/queries_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
(:require
[blaze.anomaly :as ba]
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.code :as code]
[blaze.elm.code-spec]
[blaze.elm.compiler :as c]
Expand Down Expand Up @@ -128,7 +128,7 @@
(is (= '(eduction-query distinct [1 1]) (core/-form expr)))))))

(testing "Retrieve queries"
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]]]

(let [db (d/db node)
Expand Down Expand Up @@ -272,7 +272,7 @@
;; condition. This operation is known as a semi-join in database languages.
(deftest compile-with-clause-test
(testing "Equiv With with two Observations comparing there subjects."
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down Expand Up @@ -312,7 +312,7 @@
(queries/-form xform-factory)))))))

(testing "Equiv With with one Patient and one Observation comparing the patient with the operation subject."
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[[[:put {:fhir/type :fhir/Patient :id "0"}]
[:put {:fhir/type :fhir/Observation :id "0"
:subject
Expand Down
4 changes: 2 additions & 2 deletions modules/cql/test/blaze/elm/compiler/string_operators_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
https://cql.hl7.org/04-logicalspecification.html."
(:require
[blaze.db.api :as d]
[blaze.db.api-stub :refer [mem-node-system with-system-data]]
[blaze.db.api-stub :refer [mem-node-config with-system-data]]
[blaze.elm.compiler :as c]
[blaze.elm.compiler.core :as core]
[blaze.elm.compiler.test-util :as tu]
Expand Down Expand Up @@ -203,7 +203,7 @@

(testing "retrieve"
(are [count]
(with-system-data [{:blaze.db/keys [node]} mem-node-system]
(with-system-data [{:blaze.db/keys [node]} mem-node-config]
[(into [[:put {:fhir/type :fhir/Patient :id "0"}]]
(map (fn [id]
[:put {:fhir/type :fhir/Observation :id (str id)
Expand Down
15 changes: 10 additions & 5 deletions modules/db-stub/src/blaze/db/api_stub.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[java-time.api :as time]))


(defn create-mem-node-system [node-config]
(defn create-mem-node-config [node-config]
{:blaze.db/node
(merge
{:tx-log (ig/ref :blaze.db/tx-log)
Expand Down Expand Up @@ -78,11 +78,16 @@
{:structure-definition-repo structure-definition-repo}})


(def mem-node-system
(create-mem-node-system {}))
(def mem-node-config
(create-mem-node-config {}))


(defmacro with-system-data [[binding-form system] txs & body]
`(with-system [system# ~system]
(defmacro with-system-data
"Runs `body` inside a system that is initialized from `config`, bound to
`binding-form` and finally halted.
Additionally the database is initialized with `txs`."
[[binding-form config] txs & body]
`(with-system [system# ~config]
(run! #(deref (d/transact (:blaze.db/node system#) %)) ~txs)
(let [~binding-form system#] ~@body)))
18 changes: 9 additions & 9 deletions modules/db-tx-log-kafka/test/blaze/db/tx_log/kafka_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
(def tx-cmd {:op "create" :type "Patient" :id "0" :hash patient-hash-0})


(def system
(def config
{::tx-log/kafka
{:bootstrap-servers bootstrap-servers
:last-t-executor (ig/ref ::kafka/last-t-executor)}
Expand Down Expand Up @@ -105,7 +105,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(is (= 1 @(tx-log/submit tx-log [tx-cmd] nil)))))

(testing "RecordTooLargeException"
Expand All @@ -121,7 +121,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(given-failed-future (tx-log/submit tx-log [tx-cmd] nil)
::anom/category := ::anom/unsupported
::anom/message := "A transaction with 1 commands generated a Kafka message which is larger than the configured maximum of null bytes. In order to prevent this error, increase the maximum message size by setting DB_KAFKA_MAX_REQUEST_SIZE to a higher number. msg-173357"))))
Expand All @@ -139,7 +139,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(given-failed-future @(tx-log/submit tx-log [tx-cmd] nil)
::anom/category := ::anom/fault
::anom/message := "msg-175337")))))
Expand All @@ -161,7 +161,7 @@
AutoCloseable
(close [_])))
kafka/create-last-t-consumer no-op-consumer]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(with-open [queue (tx-log/new-queue tx-log 1)]
(is (empty? (tx-log/poll! queue (time/seconds 1))))))))

Expand All @@ -178,19 +178,19 @@
(Map/of (first partitions) 104614))
AutoCloseable
(close [_])))]
(with-system [{tx-log ::tx-log/kafka} system]
(with-system [{tx-log ::tx-log/kafka} config]
(is (= 104614 @(tx-log/last-t tx-log)))))))


(def config {:bootstrap-servers "localhost:9092"})
(def producer-config {:bootstrap-servers "localhost:9092"})


(deftest create-producer-test
(is (instance? KafkaProducer (kafka/create-producer config))))
(is (instance? KafkaProducer (kafka/create-producer producer-config))))


(deftest create-consumer-test
(given (.assignment (kafka/create-consumer config))
(given (.assignment (kafka/create-consumer producer-config))
count := 1
[0] := (TopicPartition. "tx" 0)))

Expand Down
19 changes: 14 additions & 5 deletions modules/db/src/blaze/db/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@
add-subsetted-xf)))


(defrecord Node [context tx-log tx-cache kv-store resource-store
(defrecord Node [context tx-log tx-cache kv-store resource-store sync-fn
search-param-registry resource-indexer state run? poll-timeout
finished]
np/Node
Expand All @@ -336,8 +336,7 @@

(-sync [node]
(log/trace "sync on last t")
(-> (tx-log/last-t tx-log)
(ac/then-compose #(np/-sync node %))))
(sync-fn node))

(-sync [node t]
(log/trace "sync on t =" t)
Expand Down Expand Up @@ -515,15 +514,25 @@
expected-kv-store-version))))))


(defn- sync-fn [storage]
(condp identical? storage
:distributed
(fn sync-distributed [^Node node]
(-> (tx-log/last-t (.-tx_log node))
(ac/then-compose #(np/-sync node %))))
(fn sync-standalone [^Node node]
(ac/completed-future (db/db node (:t @(.-state node)))))))


(defmethod ig/init-key :blaze.db/node
[_ {:keys [tx-log tx-cache indexer-executor kv-store resource-indexer
[_ {:keys [storage tx-log tx-cache indexer-executor kv-store resource-indexer
resource-store search-param-registry poll-timeout]
:or {poll-timeout (time/seconds 1)}
:as config}]
(init-msg config)
(check-version! kv-store)
(let [node (->Node (ctx config) tx-log tx-cache kv-store resource-store
search-param-registry resource-indexer
(sync-fn storage) search-param-registry resource-indexer
(atom (initial-state kv-store))
(volatile! true)
poll-timeout
Expand Down
Loading

0 comments on commit 0431a68

Please sign in to comment.