forked from lancedb/lance
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add FSST string compression (lancedb#2470)
lancedb#2415 This is a naive implementation of the [FSST](https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf), it is fairly tested so that it can be used in correctness proof for future optimization, and a mark for future performance engineering. this is not complete, but enough for one git commit, it's just, I am going through something and a bit debilitated, I hope pushing a PR and make the progress live may motivate me more. Question 1: why don't use the original C++ implementation in the paper and do some Rust/C++ FFI? the original C++ implementation is not tuned for our workload the original C++ implementation doesn't work directly with arrow string_array interface Question 2: how to use this FSST implementation? this is the designed interface, ```rust pub fn fsst_compress(_input_buf: &[u8], _input_offsets_buf: &[i32], _output_buf: &mut Vec<u8>, _output_offsets_buf: &mut Vec<i32>) -> Result<()> { Ok(()) } pub fn fsst_decompress(_input_buf: &[u8], _input_offsets_buf: &[i32], _output_buf: &mut Vec<u8>, _output_offsets_buf: &mut Vec<i32>) -> Result<()> { Ok(()) } ``` TODO: 1: more comments and documentations 2: benchmark this implementation(how can I get test data for our intended workload?) 3: push string encoding from lance logical encoding to lance physical encoding 4: lance string compression trait in lance physical encoding 5: implement FSST for lance string compression trait 6: implement SIMD optimization in the paper 7: maybe we could make FSST as a separate rust crate 8: predicate push down to FSST decompression a few questions: 1: does lance intended to support big-endian machine? 2: what kind of string dataset is preferable for our use? --------- Co-authored-by: Weston Pace <[email protected]>
- Loading branch information
1 parent
04d9cc5
commit e5e6fd5
Showing
12 changed files
with
1,731 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
[package] | ||
name = "fsst" | ||
version = "0.1.0" | ||
edition.workspace = true | ||
authors.workspace = true | ||
license.workspace = true | ||
repository.workspace = true | ||
readme = "README.md" | ||
description = "FSST string compression" | ||
keywords.workspace = true | ||
categories.workspace = true | ||
rust-version.workspace = true | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
rand = "0.8.4" | ||
# this should be put in dev-dependencies, TODO: remove it from here | ||
arrow = { version = "51.0.0", optional = false, features = ["prettyprint"] } | ||
|
||
[dev-dependencies] | ||
test-log.workspace = true | ||
tokio.workspace = true | ||
rand_xoshiro = "0.6.0" | ||
lance-datagen = { workspace = true } | ||
|
||
[[example]] | ||
name = "benchmark" | ||
path = "examples/benchmark.rs" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
## FSST String Compression |
153 changes: 153 additions & 0 deletions
153
rust/lance-encoding/compression-algo/fsst/examples/benchmark.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
use fsst::fsst::{compress, decompress, FSST_SYMBOL_TABLE_SIZE}; | ||
use rand::Rng; | ||
|
||
const TEST_NUM: usize = 10; | ||
const BUFFER_SIZE: usize = 8 * 1024 * 1024; | ||
|
||
use arrow::array::StringArray; | ||
use std::fs::File; | ||
use std::io::{BufRead, BufReader}; | ||
fn read_random_8_m_chunk(file_path: &str) -> Result<StringArray, std::io::Error> { | ||
let file = File::open(file_path)?; | ||
let reader = BufReader::new(file); | ||
|
||
let lines: Vec<String> = reader.lines().collect::<std::result::Result<_, _>>()?; | ||
let num_lines = lines.len(); | ||
|
||
let mut rng = rand::thread_rng(); | ||
let mut curr_line = rng.gen_range(0..num_lines); | ||
|
||
let chunk_size = BUFFER_SIZE; | ||
let mut size = 0; | ||
let mut result_lines = vec![]; | ||
while size + lines[curr_line].len() < chunk_size { | ||
result_lines.push(lines[curr_line].clone()); | ||
size += lines[curr_line].len(); | ||
curr_line += 1; | ||
curr_line %= num_lines; | ||
} | ||
|
||
Ok(StringArray::from(result_lines)) | ||
} | ||
|
||
fn benchmark(file_path: &str) { | ||
// Step 1: load data in memory | ||
let mut inputs: Vec<StringArray> = vec![]; | ||
let mut symbol_tables: Vec<[u8; FSST_SYMBOL_TABLE_SIZE]> = vec![]; | ||
for _ in 0..TEST_NUM { | ||
let this_input = read_random_8_m_chunk(&file_path).unwrap(); | ||
inputs.push(this_input); | ||
symbol_tables.push([0u8; FSST_SYMBOL_TABLE_SIZE]); | ||
} | ||
|
||
// Step 2: allocate memory for compression and decompression outputs | ||
let mut compression_out_bufs = vec![]; | ||
let mut compression_out_offsets_bufs = vec![]; | ||
for _ in 0..TEST_NUM { | ||
let this_com_out_buf = vec![0u8; BUFFER_SIZE]; | ||
let this_com_out_offsets_buf = vec![0i32; BUFFER_SIZE]; | ||
compression_out_bufs.push(this_com_out_buf); | ||
compression_out_offsets_bufs.push(this_com_out_offsets_buf); | ||
} | ||
let mut decompression_out_bufs = vec![]; | ||
let mut decompression_out_offsets_bufs = vec![]; | ||
for _ in 0..TEST_NUM { | ||
let this_decom_out_buf = vec![0u8; BUFFER_SIZE * 3]; | ||
let this_decom_out_offsets_buf = vec![0i32; BUFFER_SIZE * 3]; | ||
decompression_out_bufs.push(this_decom_out_buf); | ||
decompression_out_offsets_bufs.push(this_decom_out_offsets_buf); | ||
} | ||
|
||
let original_total_size: usize = inputs.iter().map(|input| input.values().len()).sum(); | ||
|
||
// Step 3: compress data | ||
let start = std::time::Instant::now(); | ||
for i in 0..TEST_NUM { | ||
compress( | ||
symbol_tables[i].as_mut(), | ||
&inputs[i].values(), | ||
inputs[i].value_offsets(), | ||
&mut compression_out_bufs[i], | ||
&mut compression_out_offsets_bufs[i], | ||
) | ||
.unwrap(); | ||
} | ||
let compression_finish_time = std::time::Instant::now(); | ||
|
||
for i in 0..TEST_NUM { | ||
decompress( | ||
&symbol_tables[i], | ||
&compression_out_bufs[i], | ||
&compression_out_offsets_bufs[i], | ||
&mut decompression_out_bufs[i], | ||
&mut decompression_out_offsets_bufs[i], | ||
) | ||
.unwrap(); | ||
} | ||
let decompression_finish_time = std::time::Instant::now(); | ||
let compression_total_size: usize = compression_out_bufs.iter().map(|buf| buf.len()).sum(); | ||
let compression_ratio = original_total_size as f64 / compression_total_size as f64; | ||
let compress_time = compression_finish_time - start; | ||
let decompress_time = decompression_finish_time - compression_finish_time; | ||
|
||
let compress_seconds = | ||
compress_time.as_secs() as f64 + compress_time.subsec_nanos() as f64 * 1e-9; | ||
|
||
let decompress_seconds = | ||
decompress_time.as_secs() as f64 + decompress_time.subsec_nanos() as f64 * 1e-9; | ||
|
||
let com_speed = (original_total_size as f64 / compress_seconds) / 1024f64 / 1024f64; | ||
|
||
let d_speed = (original_total_size as f64 / decompress_seconds) / 1024f64 / 1024f64; | ||
for i in 0..TEST_NUM { | ||
assert_eq!( | ||
inputs[i].value_offsets().len(), | ||
decompression_out_offsets_bufs[i].len() | ||
); | ||
} | ||
|
||
// Print tsv headers | ||
println!("for file: {}", file_path); | ||
println!( | ||
"{}\t{}\t{}", | ||
"Compression ratio", "Compression speed", "Decompression speed" | ||
); | ||
println!( | ||
"{:.3}\t\t\t\t{:.2}MB/s\t\t\t{:.2}MB/s", | ||
compression_ratio, com_speed, d_speed | ||
); | ||
for i in 0..TEST_NUM { | ||
assert_eq!(inputs[i].value_data(), decompression_out_bufs[i]); | ||
assert_eq!(inputs[i].value_offsets(), decompression_out_offsets_bufs[i]); | ||
} | ||
} | ||
|
||
// to run this test, download MS Marco dataset from https://msmarco.z22.web.core.windows.net/msmarcoranking/fulldocs.tsv.gz | ||
// and use a script like this to get each column | ||
/* | ||
import csv | ||
import sys | ||
def write_second_column(input_path, output_path): | ||
csv.field_size_limit(sys.maxsize) | ||
with open(input_path, 'r') as input_file, open(output_path, 'w') as output_file: | ||
tsv_reader = csv.reader(input_file, delimiter='\t') | ||
tsv_writer = csv.writer(output_file, delimiter='\t') | ||
for row in tsv_reader: | ||
tsv_writer.writerow([row[2]]) | ||
#write_second_column('/Users/x/fulldocs.tsv', '/Users/x/first_column_fulldocs.tsv') | ||
#write_second_column('/Users/x/fulldocs.tsv', '/Users/x/second_column_fulldocs.tsv') | ||
write_second_column('/Users/x/fulldocs.tsv', '/Users/x/third_column_fulldocs.tsv') | ||
*/ | ||
fn main() { | ||
let file_paths = [ | ||
"/home/x/first_column_fulldocs.tsv", | ||
"/home/x/second_column_fulldocs.tsv", | ||
"/home/x/third_column_fulldocs_chunk_0.tsv", | ||
]; | ||
for file_path in file_paths { | ||
benchmark(file_path); | ||
} | ||
} |
Oops, something went wrong.