Skip to content

Commit

Permalink
feat: add standalone shuffle for transformed ivf-pq vectors file (lan…
Browse files Browse the repository at this point in the history
…cedb#2670)

Makes shuffle a standalone operation (shuffles the transformed ivf-pq
indices by partition id and saves the files on disk). Uses the v2 reader
internally during the shuffling process.
  • Loading branch information
raunaks13 authored Aug 2, 2024
1 parent 2f5a625 commit 89ad237
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 64 deletions.
31 changes: 31 additions & 0 deletions python/python/lance/indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,37 @@ def transform_vectors(
fragments,
)

def shuffle_transformed_vectors(
self,
filenames: list[str],
dir_path: str,
ivf: IvfModel,
) -> list[str]:
"""
Take the transformed, unsorted vector files as input, and create sorted
storage files. Sorting is done based on the partition id. This function
only makes sense if the transformed vector file contains a partition_id column.
Parameters
----------
filenames: list[str]
The filenames of the unsorted files.
dir_path: str
Directory where all the files are located, and where output files
will be placed.
Returns
-------
list[str]
The file paths of the sorted transformed vector files.
"""
if isinstance(filenames, list):
return indices.shuffle_transformed_vectors(
filenames, dir_path, ivf.centroids
)
else:
raise ValueError("filenames must be a list of strings")

def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int):
if num_partitions is None:
return round(math.sqrt(num_rows))
Expand Down
19 changes: 17 additions & 2 deletions python/python/tests/test_indices.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
import os

import lance
import numpy as np
import pyarrow as pa
Expand All @@ -24,7 +26,6 @@ def rand_dataset(tmpdir, request):
uri = str(tmpdir / "dataset")

ds = lance.write_dataset(table, uri, max_rows_per_file=NUM_ROWS_PER_FRAGMENT)

return ds


Expand Down Expand Up @@ -126,7 +127,6 @@ def test_vector_transform(tmpdir, rand_dataset, rand_ivf, rand_pq):
builder.transform_vectors(rand_ivf, rand_pq, uri, fragments=fragments)

reader = LanceFileReader(uri)

assert reader.metadata().num_rows == (NUM_ROWS_PER_FRAGMENT * len(fragments))
data = next(reader.read_all(batch_size=10000).to_batches())

Expand All @@ -147,3 +147,18 @@ def test_vector_transform(tmpdir, rand_dataset, rand_ivf, rand_pq):
reader = LanceFileReader(uri)

assert reader.metadata().num_rows == (NUM_ROWS_PER_FRAGMENT * NUM_FRAGMENTS)


def test_shuffle_vectors(tmpdir, rand_dataset, rand_ivf, rand_pq):
builder = IndicesBuilder(rand_dataset, "vectors")
uri = str(tmpdir / "transformed_shuffle")
builder.transform_vectors(rand_ivf, rand_pq, uri, fragments=None)

# test shuffle for transformed vectors
filenames = builder.shuffle_transformed_vectors(
["transformed_shuffle"], str(tmpdir), rand_ivf
)

for fname in filenames:
full_path = str(tmpdir / fname)
assert os.path.getsize(full_path) > 0
44 changes: 43 additions & 1 deletion python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ use arrow::pyarrow::{PyArrowType, ToPyArrow};
use arrow_array::{Array, FixedSizeListArray};
use arrow_data::ArrayData;
use lance::index::vector::ivf::builder::write_vector_storage;
use lance_index::vector::ivf::shuffler::shuffle_vectors;
use lance_index::vector::{
ivf::{storage::IvfModel, IvfBuildParams},
pq::{PQBuildParams, ProductQuantizer},
};
use lance_linalg::distance::DistanceType;
use pyo3::{pyfunction, types::PyModule, wrap_pyfunction, PyObject, PyResult, Python};
use pyo3::{
pyfunction,
types::{PyList, PyModule},
wrap_pyfunction, PyObject, PyResult, Python,
};

use crate::fragment::FileFragment;
use crate::{dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path, RT};
Expand Down Expand Up @@ -220,11 +225,48 @@ pub fn transform_vectors(
)?
}

async fn do_shuffle_transformed_vectors(
filenames: Vec<String>,
dir_path: &str,
ivf_centroids: FixedSizeListArray,
) -> PyResult<Vec<String>> {
let partition_files = shuffle_vectors(filenames, dir_path, ivf_centroids)
.await
.infer_error()?;
Ok(partition_files)
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
pub fn shuffle_transformed_vectors(
py: Python<'_>,
filenames: Vec<String>,
dir_path: &str,
ivf_centroids: PyArrowType<ArrayData>,
) -> PyResult<PyObject> {
let ivf_centroids = ivf_centroids.0;
let ivf_centroids = FixedSizeListArray::from(ivf_centroids);

let result = RT.block_on(
None,
do_shuffle_transformed_vectors(filenames, dir_path, ivf_centroids),
)?;

match result {
Ok(partition_files) => {
let py_list = PyList::new(py, partition_files);
Ok(py_list.into())
}
Err(e) => Err(pyo3::exceptions::PyRuntimeError::new_err(e.to_string())),
}
}

pub fn register_indices(py: Python, m: &PyModule) -> PyResult<()> {
let indices = PyModule::new(py, "indices")?;
indices.add_wrapped(wrap_pyfunction!(train_ivf_model))?;
indices.add_wrapped(wrap_pyfunction!(train_pq_model))?;
indices.add_wrapped(wrap_pyfunction!(transform_vectors))?;
indices.add_wrapped(wrap_pyfunction!(shuffle_transformed_vectors))?;
m.add_submodule(indices)?;
Ok(())
}
Loading

0 comments on commit 89ad237

Please sign in to comment.