Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 0 additions & 77 deletions Learn Clojask.md

This file was deleted.

16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

### Features

- **Unlimited size**
- **Unlimited Size**

It supports datasets larger than memory.

Expand All @@ -15,36 +15,40 @@

Faster than Dask in most operations, and the larger the dataframe is, the bigger the advantage. Please find the benchmarks [here](https://clojure-finance.github.io/clojask-website/pages-output/about/#benchmarks).

- **All native types**
- **All Native Types**

All the datatypes used to store data are native Clojure (or Java) types.

- **From file to file**
- **From File to File**

Integrate IO inside the dataframe. No need to write your own read-in and output functions.

- **Parallel**

Most operations could be executed into multiple threads or even machines. See the principle in [Onyx](http://www.onyxplatform.org/).

- **Lazy operations**
- **Lazy Operations**

Most operations will not be executed immediately. Dataframe will intelligently pipeline the operations altogether in computation.

- **Little Constraints on programming**

Except for some aggregations where you need to write customized functions subject to simple templates, operations in Clojask support arbitrary Clojure functions as input

### Installation

Available on [Clojars](https://clojars.org/com.github.clojure-finance/clojask) ![Clojars Project](https://img.shields.io/clojars/v/com.github.clojure-finance/clojask.svg).

Insert this line into your `project.clj` if using Leiningen.

```
[com.github.clojure-finance/clojask "1.2.5"]
[com.github.clojure-finance/clojask "2.0.0"]
```

Insert this line into your `deps.edn` if using CLI.

```clojure
com.github.clojure-finance/clojask {:mvn/version "1.2.5"}
com.github.clojure-finance/clojask {:mvn/version "2.0.0"}
```

**Requirements:**
Expand Down
60 changes: 40 additions & 20 deletions docs/documentation.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
### API DOCUMENTATION
## API DOCUMENTATION

##### Basic Information
### Basic Information

- Most operations to the dataframe is performed lazily and all at once with `compute` except `sort ` and `join`.
- The dataframe process the data in rows, ie one row in one vector.
- The input dataframe can be larger than memory in size.
- By default, all columns have the same type: string. You are allowed to set its type, with our predefined type keywords.
- The APIs below are defined in namespace `clojask.dataframe`.

- Most dataframe manipulation operations are performed lazily (except for `sort` and `join`). They will be executed all at once when `compute` is called.

- By default (except for Excel input), all columns are assigned with the data type `string` when the dataframe is initialized.

- **[ ]** surrounding the argument indicates an optional operation.

- Without further specification, the return of all these functions is the resultant Clojask dataframe with type `clojask.dataframe.classes.DataFrame` . Therefore, you can pipeline these functions with `->` macros.

```clojure
(-> (dataframe "xxx.csv")
(set-type "Colx" "int")
(operate inc "Colx")
...
(compute 8 "xxx-modified.csv"))
```

### API

Expand Down Expand Up @@ -431,22 +444,23 @@ This means you cannot further apply complicated operations to a joined dataframe

Compute the result. The pre-defined lazy operations will be executed in pipeline, ie the result of the previous operation becomes the argument of the next operation.

| Argument | Type | Function | Remarks |
| ------------------- | ------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| Argument | Type | Function | Remarks |
| ------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| `dataframe` | clojask.classes.DataFrame.DataFrame / Clojask.JoinedDataFrame | The operated object | |
| `num of workers` | int (max 8) | The number of worker instances (except the input and output nodes) | Uses [onyx](http://www.onyxplatform.org/) as the distributed platform |
| `output path` | String | The path of the output csv file | If the path already exists, will overwrite the file. |
| [`exception`] | Boolean | Whether an exception during calculation will cause termination | By default `false`. Is useful for debugging or detecting empty fields |
| [`order`] | Boolean | If enforce the order of rows in the output to be the same as input | By default `false`. If set to `true`, will sacrifice the performance. |
| [`output-function`] | Function | Specify how to output a row vector to the output file | Takes two arguments.<br />`writer` java.io.BufferedWriter<br />`rows` clojure.lang.PersistentVector (rows) of clojure.lang.PersistentVector (each row) |
| [`select`] | String / Collection of strings | Chooses columns to select for the operation | Can only specify either of select and exclude |
| [`exclude`] | String / Collection of strings | Chooses columns to be excluded for the operation | Can only specify either of select and exclude |
| [`header`] | Collection of strings | The column names in the output file that appears in the first row | Will replace the default column names. Should be equal to the number of columns. |
| [`melt`] | Function (one argument) | Reorganize each resultant row | Should take each row as a collection and return a collection of collections (This API is used in the `extensions.reshpae.melt`) |
| `num of workers` | int (max 8) | The number of worker instances (except the input and output nodes) | Uses [Onyx](http://www.onyxplatform.org/) as the distributed platform |
| `output path` | String / `nil` | The path of the output csv file | If the path already exists, will overwrite the file.<br>If `nil`, will store the output in memory as a vector of vectors, which represent each row. See [example](https://github.com/clojure-finance/clojask-examples/blob/main/src/clojask_examples/in_memory.clj). |
| [`exception`] | Boolean | Whether an exception during calculation will cause termination | By default `false`. Is useful for debugging or detecting empty fields |
| [`order`] | Boolean | If enforce the order of rows in the output to be the same as input | By default `false`. If set to `true`, will sacrifice the performance. |
| [`output-function`] | Function | Specify how to output a row vector to the output file | Takes two arguments.<br />`writer` java.io.BufferedWriter<br />`rows` clojure.lang.PersistentVector (rows) of clojure.lang.PersistentVector (each row) |
| [`select`] | String / Collection of strings | Chooses columns to select for the operation | Can only specify either of select and exclude |
| [`exclude`] | String / Collection of strings | Chooses columns to be excluded for the operation | Can only specify either of select and exclude |
| [`header`] | Collection of strings | The column names in the output file that appears in the first row | Will replace the default column names. Should be equal to the number of columns. |
| [`melt`] | Function (one argument) | Reorganize each resultant row | Should take each row as a collection and return a collection of collections (This API is used in the `extensions.reshpae.melt`) |
| [`in-memory`] | Boolean | Whether the computation should all be completed in memory | If set to `true`, this affects the computation procedure of groupby-aggregation and joins. These operations originally will write to and read from intermediate group files in disk. Now it will stores these groups in memory only, **which will speed up the computation process**. **However, when the dataframe is larger than memory, this option should not be set to `false`.** Other operations are not affected because they natively do not require out-of-memory steps. |

**Return**

A `clojask.classes.DataFrame.DataFrame`, which is the resultant dataframe.
A `clojask.classes.DataFrame.DataFrame`, which is the resultant dataframe. / A vector of vectors, which represent each row, if `output path` = `nil`.

**Example**

Expand All @@ -457,6 +471,12 @@ A `clojask.classes.DataFrame.DataFrame`, which is the resultant dataframe.
(compute x 8 "output.csv" :select "col a")
;; only select column a

(compute x 8 "output.csv" :order true)
;; make sure the order of the output is the same of the input

(compute x 8 nil :in-memory true)
;; compute the dataframe in memory and store the dataframe also in memory

(compute x 8 "output.csv" :select ["col b" "col a"])
;; select two columns, column b and column a in order

Expand All @@ -468,6 +488,6 @@ A `clojask.classes.DataFrame.DataFrame`, which is the resultant dataframe.

(compute x 8 "output.csv" :melt (fn [row] (map concat (repeat (take 2 x)) (take-last 2 x))))
;; each result row becomes two rows
;; [a b c d] => [[a b c]
;; [a b d]]
;; [a b c d] => [[a b c] [a b d]]
```

2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.github.clojure-finance/clojask "2.0.0"
(defproject com.github.clojure-finance/clojask "2.0.1"
:description "Data analysis and manipulation library with parallel computing for larger-than-memory datasets"
:url "https://github.com/clojure-finance/clojask"
:license {:name "MIT"
Expand Down
37 changes: 24 additions & 13 deletions src/main/clojure/clojask/aggregate/aggre_input.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
[clojure.data.csv :as csv]
[clojask.utils :refer [filter-check]]
[taoensso.timbre :refer [fatal info debug] :as timbre]
[clojure.java.io :as java.io])
[clojure.java.io :as java.io]
[clojask.utils :as u])
(:import (java.io BufferedReader)))

(defrecord AbsSeqReader [event path rst completed? checkpoint? offset]
(defrecord AbsSeqReader [event path rst completed? checkpoint? offset source]
p/Plugin

(start [this event]
Expand All @@ -25,15 +26,23 @@
[this _ checkpoint]
(vreset! completed? false)

(let [directory (java.io/file path)
files (rest (file-seq directory))
(let [
;; directory (java.io/file path)
;; files (rest (file-seq directory))
;; data (map zipmap (repeat [:id :file :d]) (map vector (iterate inc 0) [files (mapv (fn [_] (read-string (str _))) files)]))
data (do
(def tmp (volatile! -1))
(map (fn [file]
(vswap! tmp inc)
{:id @tmp :file file :d (read-string (subs (str file) (inc (count (str directory)))))})
files))
data (if (= path nil)
(do
(def tmp (volatile! -1))
(map (fn [file]
(vswap! tmp inc)
{:id @tmp :file file :d (read-string file)})
(.getKeys source)))
(do
(def tmp (volatile! -1))
(map (fn [file]
(vswap! tmp inc)
{:id @tmp :file file :d (read-string (u/decode-str (.getName file)))})
(rest (file-seq (java.io/file path))))))
]
(if (nil? checkpoint)
(do
Expand Down Expand Up @@ -75,8 +84,9 @@
))

(defn inject-dataframe
[dataframe]
(def df dataframe))
[dataframe _source]
(def df dataframe)
(def source _source))

(defn input [{:keys [onyx.core/task-map] :as event}]
;; (println (:seq/rdr event))
Expand All @@ -89,7 +99,8 @@
:rst (volatile! nil)
:completed? (volatile! false)
:checkpoint? (not (false? (:seq/checkpoint? task-map)))
:offset (volatile! nil)}))
:offset (volatile! nil)
:source source}))

(def reader-calls
{})
Expand Down
17 changes: 9 additions & 8 deletions src/main/clojure/clojask/aggregate/aggre_onyx_comps.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[clojure.data.csv :as csv]
[clojask.utils :as u]
[clojure.set :as set]
[clojask.groupby :refer [read-csv-seq]])
[clojask.groupby :refer [read-csv-seq insert-mgroup]])
(:import (java.io BufferedReader FileReader BufferedWriter FileWriter)
[com.clojask.exception ExecutionException]))

Expand Down Expand Up @@ -39,7 +39,7 @@


(defn worker-func-gen
[df exception aggre-funcs index formatter]
[df exception aggre-funcs index formatter source]
(reset! dataframe df)
(let [
;; aggre-funcs (.getAggreFunc (.row-info (deref dataframe)))
Expand All @@ -59,7 +59,7 @@
"refered in preview"
[seq]
;; (println formatters)
(let [data (read-csv-seq (:file seq))
(let [data (if (= source nil) (read-csv-seq (:file seq)) (.getKey source (:file seq)))
pre (:d seq)
pre (u/gets-format pre pre-index org-format)
data-map (-> (iterate inc 0)
Expand All @@ -69,6 +69,7 @@
;; index (nth _ 1)]
;; (func (get data-map index))))
;; aggre-funcs)
;; (println data)
(loop [aggre-funcs aggre-funcs
res []]
(if (= aggre-funcs [])
Expand Down Expand Up @@ -269,18 +270,18 @@

(defn start-onyx-aggre
"start the onyx cluster with the specification inside dataframe"
[num-work batch-size dataframe dist exception aggre-func index formatter out]
[num-work batch-size dataframe source dist exception aggre-func index formatter out]
(try
(workflow-gen num-work)
(config-env)
(worker-func-gen dataframe exception aggre-func index formatter) ;;need some work
(worker-func-gen dataframe exception aggre-func index formatter source) ;;need some work
(catalog-gen num-work batch-size)
(lifecycle-gen "./.clojask/grouped" dist)
(lifecycle-gen (if (nil? source) "./.clojask/grouped" nil) dist)
(flow-cond-gen num-work)
(input/inject-dataframe dataframe)
(input/inject-dataframe dataframe source)
(output/inject-dataframe dataframe out)
;; (insert-mgroup source)
(catch Exception e (do
(shutdown)
(throw (ExecutionException. (format "[preparing stage (groupby aggregate)] Refer to .clojask/clojask.log for detailed information. (original error: %s)" (.getMessage e)))))))
(try
(let [submission (onyx.api/submit-job peer-config
Expand Down
1 change: 1 addition & 0 deletions src/main/clojure/clojask/aggregate/aggre_output.clj
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
(output-func wtr (:d msg))
;; !! define argument (debug)
))))
(.flush wtr)
true))

;; Builder function for your output plugin.
Expand Down
Loading