From b1891a2c456484e232d5968b33375afcd58805dd Mon Sep 17 00:00:00 2001 From: Alexander Kiel Date: Wed, 24 Jul 2024 13:02:51 +0200 Subject: [PATCH] Reintroduce Configurable Thread Pool for Measure Evaluation Because accessing the contents of a resource isn't async in measure evaluation, the thread pool can't be CPU bound only. In case a resource has to be loaded from disk the measure evaluation thread will be blocked and so can't be used for computation anymore. Also modern SSDs need a certain amount of I/O requests in flight in order to saturate the drive. If the number of available processors is lower than the number of parallel I/O requests needed, the storage can't be even used fully with a CPU bound pool. So the old env var FHIR_OPERATION_EVALUATE_MEASURE_THREADS is available again. --- docs/deployment/environment-variables.md | 58 ++++++++++--------- modules/module-base/src/blaze/util.clj | 3 + modules/module-base/test/blaze/util_test.clj | 3 + .../blaze/fhir/operation/evaluate_measure.clj | 15 +++-- .../fhir/operation/evaluate_measure/spec.clj | 3 + .../fhir/operation/evaluate_measure_test.clj | 24 ++++++++ resources/blaze.edn | 3 +- src/blaze/core.clj | 5 +- src/blaze/system.clj | 8 ++- 9 files changed, 83 insertions(+), 39 deletions(-) diff --git a/docs/deployment/environment-variables.md b/docs/deployment/environment-variables.md index 638cf7fe8..a935c9ebb 100644 --- a/docs/deployment/environment-variables.md +++ b/docs/deployment/environment-variables.md @@ -86,33 +86,33 @@ More information about distributed deployment are available [here](distributed-b ### Other Environment Variables -| Name | Default | Since | Depr ¹ | Description | -|:----------------------------------------|:---------------------------|:-------|---------|:------------------------------------------------------------------------------------------------------------| -| PROXY_HOST | — | v0.6 | — | REMOVED: use -Dhttp.proxyHost | -| PROXY_PORT | — | v0.6 | — | REMOVED: use -Dhttp.proxyPort | -| PROXY_USER | — | v0.6.1 | — | REMOVED: try [SOCKS Options][1] | -| PROXY_PASSWORD | — | v0.6.1 | — | REMOVED: try [SOCKS Options][1] | -| CONNECTION_TIMEOUT | 5 s | v0.6.3 | — | connection timeout for outbound HTTP requests | -| REQUEST_TIMEOUT | 30 s | v0.6.3 | — | REMOVED | -| TERM_SERVICE_URI | [http://tx.fhir.org/r4][6] | v0.6 | v0.11 | Base URI of the terminology service | -| BASE_URL | `http://localhost:8080` | — | — | The URL under which Blaze is accessible by clients. | -| CONTEXT_PATH | /fhir | v0.11 | — | Context path under which the FHIR RESTful API will be accessible. | -| SERVER_PORT | 8080 | — | — | The port of the main HTTP server | -| METRICS_SERVER_PORT | 8081 | v0.6 | — | The port of the Prometheus metrics server | -| LOG_LEVEL | info | v0.6 | — | one of trace, debug, info, warn or error | -| JAVA_TOOL_OPTIONS | — | — | — | JVM options \(Docker only\) | -| FHIR_OPERATION_EVALUATE_MEASURE_THREADS | — | v0.8 | v0.23.3 | The number threads used for $evaluate-measure executions. | -| FHIR_OPERATION_EVALUATE_MEASURE_TIMEOUT | 3600000 (1h) | v0.19 | — | Timeout in milliseconds for synchronous $evaluate-measure executions. | -| OPENID_PROVIDER_URL | — | v0.11 | — | [OpenID Connect][4] provider URL to enable [authentication][5] | -| OPENID_CLIENT_TRUST_STORE | — | v0.26 | — | A PKCS #12 trust store containing CA certificates needed for the [OpenID Connect][4] provider. | -| OPENID_CLIENT_TRUST_STORE_PASS | — | v0.26 | — | The password for the PKCS #12 trust store. | -| ENFORCE_REFERENTIAL_INTEGRITY | true | v0.14 | — | Enforce referential integrity on resource create, update and delete. | -| DB_SYNC_TIMEOUT | 10000 | v0.15 | — | Timeout in milliseconds for all reading FHIR interactions acquiring the newest database state. | -| DB_SEARCH_PARAM_BUNDLE | — | v0.21 | — | Name of a custom search parameter bundle file. | -| ENABLE_ADMIN_API | — | v0.26 | — | Set to `true` if the optional Admin API should be enabled. Needed by the frontend. | -| CQL_EXPR_CACHE_SIZE | — | v0.28 | — | Size of the CQL expression cache in MiB. This cache is part of the JVM heap. Will be disabled if not given. | -| CQL_EXPR_CACHE_REFRESH | PT24H | v0.28 | — | The duration after which a Bloom filter of the CQL expression cache will be refreshed. | -| CQL_EXPR_CACHE_THREADS | 4 | v0.28 | — | The maximum number of parallel Bloom filter calculations for the CQL expression cache. | +| Name | Default | Since | Depr ¹ | Description | +|:----------------------------------------|:---------------------------|:-------|--------|:------------------------------------------------------------------------------------------------------------| +| PROXY_HOST | — | v0.6 | — | REMOVED: use -Dhttp.proxyHost | +| PROXY_PORT | — | v0.6 | — | REMOVED: use -Dhttp.proxyPort | +| PROXY_USER | — | v0.6.1 | — | REMOVED: try [SOCKS Options][1] | +| PROXY_PASSWORD | — | v0.6.1 | — | REMOVED: try [SOCKS Options][1] | +| CONNECTION_TIMEOUT | 5 s | v0.6.3 | — | connection timeout for outbound HTTP requests | +| REQUEST_TIMEOUT | 30 s | v0.6.3 | — | REMOVED | +| TERM_SERVICE_URI | [http://tx.fhir.org/r4][6] | v0.6 | v0.11 | Base URI of the terminology service | +| BASE_URL | `http://localhost:8080` | — | — | The URL under which Blaze is accessible by clients. | +| CONTEXT_PATH | /fhir | v0.11 | — | Context path under which the FHIR RESTful API will be accessible. | +| SERVER_PORT | 8080 | — | — | The port of the main HTTP server | +| METRICS_SERVER_PORT | 8081 | v0.6 | — | The port of the Prometheus metrics server | +| LOG_LEVEL | info | v0.6 | — | one of trace, debug, info, warn or error | +| JAVA_TOOL_OPTIONS | — | — | — | JVM options \(Docker only\) | +| FHIR_OPERATION_EVALUATE_MEASURE_THREADS | number of CPUs | v0.8 | — | The number threads used for $evaluate-measure executions. | +| FHIR_OPERATION_EVALUATE_MEASURE_TIMEOUT | 3600000 (1h) | v0.19 | — | Timeout in milliseconds for synchronous $evaluate-measure executions. | +| OPENID_PROVIDER_URL | — | v0.11 | — | [OpenID Connect][4] provider URL to enable [authentication][5] | +| OPENID_CLIENT_TRUST_STORE | — | v0.26 | — | A PKCS #12 trust store containing CA certificates needed for the [OpenID Connect][4] provider. | +| OPENID_CLIENT_TRUST_STORE_PASS | — | v0.26 | — | The password for the PKCS #12 trust store. | +| ENFORCE_REFERENTIAL_INTEGRITY | true | v0.14 | — | Enforce referential integrity on resource create, update and delete. | +| DB_SYNC_TIMEOUT | 10000 | v0.15 | — | Timeout in milliseconds for all reading FHIR interactions acquiring the newest database state. | +| DB_SEARCH_PARAM_BUNDLE | — | v0.21 | — | Name of a custom search parameter bundle file. | +| ENABLE_ADMIN_API | — | v0.26 | — | Set to `true` if the optional Admin API should be enabled. Needed by the frontend. | +| CQL_EXPR_CACHE_SIZE | — | v0.28 | — | Size of the CQL expression cache in MiB. This cache is part of the JVM heap. Will be disabled if not given. | +| CQL_EXPR_CACHE_REFRESH | PT24H | v0.28 | — | The duration after which a Bloom filter of the CQL expression cache will be refreshed. | +| CQL_EXPR_CACHE_THREADS | 4 | v0.28 | — | The maximum number of parallel Bloom filter calculations for the CQL expression cache. | ¹ Deprecated @@ -120,6 +120,10 @@ More information about distributed deployment are available [here](distributed-b The [FHIR RESTful API](https://www.hl7.org/fhir/http.html) will be accessible under `BASE_URL/CONTEXT_PATH`. Possible `X-Forwarded-Host`, `X-Forwarded-Proto` and `Forwarded` request headers will override this URL. +#### FHIR_OPERATION_EVALUATE_MEASURE_THREADS + +The number threads used for $evaluate-measure executions. The default is the number of available processors (CPUs). For measures that do not load lots of resources from disk the default is the right choice. However, if some of the measures load lots of resources directly from disk, it can be beneficial to set the number of threads to 2x or 4x the number of available processors. Be sure to increase `DB_RESOURCE_STORE_KV_THREADS` accordingly to be able to use the increased I/O capabilities. + #### OPENID_CLIENT_TRUST_STORE The PKCS #12 trust store has to be generated by the Java keytool. OpenSSL will not work. diff --git a/modules/module-base/src/blaze/util.clj b/modules/module-base/src/blaze/util.clj index adda60379..d8c0ff859 100644 --- a/modules/module-base/src/blaze/util.clj +++ b/modules/module-base/src/blaze/util.clj @@ -20,3 +20,6 @@ "Strips all possible leading slashes from `s`." [s] (if (str/starts-with? s "/") (recur (subs s 1)) s)) + +(defn available-processors [] + (.availableProcessors (Runtime/getRuntime))) diff --git a/modules/module-base/test/blaze/util_test.clj b/modules/module-base/test/blaze/util_test.clj index 0bf68b41b..223928892 100644 --- a/modules/module-base/test/blaze/util_test.clj +++ b/modules/module-base/test/blaze/util_test.clj @@ -30,3 +30,6 @@ (satisfies-prop 10000 (prop/for-all [s gen/string] (not (str/starts-with? (u/strip-leading-slashes s) "/"))))) + +(deftest available-processors-test + (is (pos-int? (u/available-processors)))) diff --git a/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure.clj b/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure.clj index 4bfe2b297..ef9de38c6 100644 --- a/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure.clj +++ b/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure.clj @@ -19,6 +19,7 @@ [blaze.luid :as luid] [blaze.module :as m :refer [reg-collector]] [blaze.spec] + [blaze.util :as u] [clojure.spec.alpha :as s] [cognitect.anomalies :as anom] [integrant.core :as ig] @@ -139,14 +140,16 @@ (defmethod ig/init-key ::timeout [_ {:keys [millis]}] (time/millis millis)) -(defn- executor-init-msg [] - (format "Init $evaluate-measure operation executor with %d threads" - (.availableProcessors (Runtime/getRuntime)))) +(defmethod m/pre-init-spec ::executor [_] + (s/keys :opt-un [:blaze.fhir.operation.evaluate-measure.executor/num-threads])) + +(defn- executor-init-msg [num-threads] + (format "Init $evaluate-measure operation executor with %d threads" num-threads)) (defmethod ig/init-key ::executor - [_ _] - (log/info (executor-init-msg)) - (ex/cpu-bound-pool "operation-evaluate-measure-%d")) + [_ {:keys [num-threads] :or {num-threads (u/available-processors)}}] + (log/info (executor-init-msg num-threads)) + (ex/io-pool num-threads "operation-evaluate-measure-%d")) (defmethod ig/halt-key! ::executor [_ executor] diff --git a/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure/spec.clj b/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure/spec.clj index 2ed720dc3..141db2d31 100644 --- a/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure/spec.clj +++ b/modules/operation-measure-evaluate-measure/src/blaze/fhir/operation/evaluate_measure/spec.clj @@ -16,3 +16,6 @@ (s/def :blaze.fhir.operation.evaluate-measure.timeout/millis nat-int?) + +(s/def :blaze.fhir.operation.evaluate-measure.executor/num-threads + pos-int?) diff --git a/modules/operation-measure-evaluate-measure/test/blaze/fhir/operation/evaluate_measure_test.clj b/modules/operation-measure-evaluate-measure/test/blaze/fhir/operation/evaluate_measure_test.clj index 037b81b45..e7b127f1b 100644 --- a/modules/operation-measure-evaluate-measure/test/blaze/fhir/operation/evaluate_measure_test.clj +++ b/modules/operation-measure-evaluate-measure/test/blaze/fhir/operation/evaluate_measure_test.clj @@ -154,6 +154,30 @@ {::evaluate-measure/timeout {:millis 154912}}] (is (= (time/millis 154912) timeout))))) +(deftest executor-init-test + (testing "nil config" + (given-thrown (ig/init {::evaluate-measure/executor nil}) + :key := ::evaluate-measure/executor + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `map?)) + + (testing "invalid num-threads" + (given-thrown (ig/init {::evaluate-measure/executor {:num-threads ::invalid}}) + :key := ::evaluate-measure/executor + :reason := ::ig/build-failed-spec + [:cause-data ::s/problems 0 :pred] := `pos-int? + [:cause-data ::s/problems 0 :val] := ::invalid)) + + (testing "init with default number of threads" + (with-system [{::evaluate-measure/keys [executor]} + {::evaluate-measure/executor {}}] + (is (ex/executor? executor)))) + + (testing "init with given number of threads" + (with-system [{::evaluate-measure/keys [executor]} + {::evaluate-measure/executor {:num-threads 4}}] + (is (ex/executor? executor))))) + (deftest compile-duration-seconds-collector-init-test (with-system [{collector ::evaluate-measure/compile-duration-seconds} {::evaluate-measure/compile-duration-seconds nil}] diff --git a/resources/blaze.edn b/resources/blaze.edn index c6382ebb0..1746dd9bb 100644 --- a/resources/blaze.edn +++ b/resources/blaze.edn @@ -211,7 +211,8 @@ :blaze.fhir.operation.evaluate-measure/timeout {:millis #blaze/cfg ["FHIR_OPERATION_EVALUATE_MEASURE_TIMEOUT" nat-int? 3600000]} - :blaze.fhir.operation.evaluate-measure/executor {} + :blaze.fhir.operation.evaluate-measure/executor + {:num-threads #blaze/cfg ["FHIR_OPERATION_EVALUATE_MEASURE_THREADS" nat-int? :available-processors]} :blaze.fhir.operation.evaluate-measure/compile-duration-seconds {} :blaze.fhir.operation.evaluate-measure/evaluate-duration-seconds {} diff --git a/src/blaze/core.clj b/src/blaze/core.clj index 0aa5b5cfa..01d746a79 100644 --- a/src/blaze/core.clj +++ b/src/blaze/core.clj @@ -9,9 +9,6 @@ (defn- max-memory [] (quot (.maxMemory (Runtime/getRuntime)) (* 1024 1024))) -(defn- available-processors [] - (.availableProcessors (Runtime/getRuntime))) - (defn- config-msg [config] (->> (sort-by key config) (map (fn [[k v]] (str k " = " v))) @@ -54,6 +51,6 @@ {:blaze/keys [version]} (init-system! (System/getenv))] (log/info "JVM version:" (System/getProperty "java.version")) (log/info "Maximum available memory:" (max-memory) "MiB") - (log/info "Number of available processors:" (available-processors)) + (log/info "Number of available processors:" (u/available-processors)) (log/info "Successfully started \uD83D\uDD25 Blaze version" version "in" (format "%.1f" (u/duration-s start)) "seconds"))) diff --git a/src/blaze/system.clj b/src/blaze/system.clj index d1da5f6b4..8b5bc75d5 100644 --- a/src/blaze/system.clj +++ b/src/blaze/system.clj @@ -6,6 +6,7 @@ be given to `init!``. The server port has a default of `8080`." (:require [blaze.log] + [blaze.util :as u] [clojure.java.io :as io] [clojure.spec.alpha :as s] [clojure.string :as str] @@ -36,6 +37,11 @@ (defrecord Cfg [env-var spec default]) +(defn- resolve-special-default-values [default] + (condp identical? default + :available-processors (u/available-processors) + default)) + (defn- cfg "Creates a config entry which consists of the name of an environment variable, a spec and a default value. @@ -46,7 +52,7 @@ (if (symbol? spec-form) (var-get (resolve spec-form)) spec-form)] - (->Cfg env-var spec default))) + (->Cfg env-var spec (resolve-special-default-values default)))) (defrecord RefMap [key] ig/RefLike