Skip to content

Commit

Permalink
checkpoint before redoing DurableInput
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Nov 4, 2019
1 parent 499b6f4 commit a86adf8
Show file tree
Hide file tree
Showing 25 changed files with 874 additions and 246 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ benchmarks/data/
.DS_Store
gen-javadoc.sh
javadoc
*.iml
*.iml
.java-version
77 changes: 55 additions & 22 deletions src/io/lacuna/bifurcan/DurableEncoding.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,56 @@
package io.lacuna.bifurcan;

import io.lacuna.bifurcan.allocator.SlabAllocator;
import io.lacuna.bifurcan.durable.BlockPrefix;
import io.lacuna.bifurcan.durable.BlockPrefix.BlockType;
import io.lacuna.bifurcan.durable.DurableAccumulator;

import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.function.*;

public interface DurableEncoding {

class Descriptor {
public final String id;

public Descriptor(String id) {
this.id = id.intern();
}

@Override
public int hashCode() {
return id.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Descriptor) {
return id == ((Descriptor) obj).id;
} else {
return false;
}
}
}

interface SkippableIterator extends Iterator<Object> {
default SkippableIterator skip(int n) {
for (int i = 0; i < n; i++) {
skip();
}
return this;
}

void skip();
}

/**
* A plain-text description of the encoding, which is also used as its identity. All encodings sharing a name should
* be equivalent.
*/
String descriptor();
Descriptor descriptor();

/**
* Describes whether this encoding can be used to encode maps (and implicitly sets, which are treated as maps without
Expand All @@ -25,19 +63,26 @@ default boolean encodesMaps() {
/**
* The hash function used within maps and sets.
*/
default ToIntFunction<Object> hashFunction() {
default ToIntFunction<Object> keyHash() {
return Objects::hashCode;
}

/**
* The encoding for `key` within a map or set.
* The key equality used within maps and sets.
*/
default BiPredicate<Object, Object> keyEquality() {
return Objects::equals;
}

/**
* The encoding for any key in a map or set.
*/
default DurableEncoding keyEncoding(Object key) {
default DurableEncoding keyEncoding() {
throw new UnsupportedOperationException("Encoding '" + descriptor() + "' does not support maps");
}

/**
* The encoding for the value corresponding to `key` within a map.
* The encoding for any value corresponding to `key` within a map.
*/
default DurableEncoding valueEncoding(Object key) {
throw new UnsupportedOperationException("Encoding '" + descriptor() + "' does not support maps");
Expand All @@ -51,7 +96,7 @@ default boolean encodesLists() {
}

/**
* The encoding for the value stored at `index` within a list.
* The encoding for an element at `index` within a list.
*/
default DurableEncoding elementEncoding(long index) {
throw new UnsupportedOperationException("Encoding '" + descriptor() + "' does not support lists");
Expand Down Expand Up @@ -81,7 +126,7 @@ default void encode(IList<Object> primitives, DurableOutput out) {
/**
* Decodes a block of primitive values, returning an iterator of thunks representing each individual value.
*/
default Iterator<Supplier<Object>> decode(DurableInput in) {
default SkippableIterator decode(DurableInput in) {
throw new UnsupportedOperationException("Encoding '" + descriptor() + "' does not support primitives");
}

Expand All @@ -92,22 +137,10 @@ default boolean hasOrdering() {
return true;
}

default Comparator<Object> comparator() {
default Comparator<Object> keyComparator() {
return DurableEncoding::defaultComparator;
}

static Object defaultCoercion(Object o) {
if (o instanceof java.util.Map) {
return Maps.from((java.util.Map) o);
} else if (o instanceof java.util.Set) {
return Sets.from((java.util.Set) o);
} else if (o instanceof java.util.List) {
return Lists.from((java.util.List) o);
} else {
return o;
}
}

static int defaultComparator(Object a, Object b) {
if (a instanceof Comparable) {
return ((Comparable) a).compareTo(b);
Expand Down
94 changes: 94 additions & 0 deletions src/io/lacuna/bifurcan/DurableHashMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.lacuna.bifurcan;

import io.lacuna.bifurcan.durable.HashMap;
import io.lacuna.bifurcan.durable.HashTable;
import io.lacuna.bifurcan.durable.SkipTable;

import java.util.function.BiPredicate;
import java.util.function.ToIntFunction;

public class DurableHashMap implements IDurableCollection, IMap<Object, Object> {

private final DurableInput in;
private final long size, hashTablePosition, skipTablePosition, entriesPosition;
private final DurableEncoding encoding;

private DurableHashMap(
DurableInput in,
DurableEncoding encoding,
long size,
long hashTablePosition,
long skipTablePosition,
long entriesPosition) {
this.in = in;
this.encoding = encoding;

this.size = size;

this.hashTablePosition = hashTablePosition;
this.skipTablePosition = skipTablePosition;
this.entriesPosition = entriesPosition;
}

static DurableHashMap from(DurableInput in, DurableEncoding encoding) {
long size = in.readVLQ();
long hashTableSize = in.readVLQ();
long skipTableSize = in.readVLQ();
long hashTablePosition = in.position();

return new DurableHashMap(
in,
encoding, size,
hashTablePosition,
hashTablePosition + hashTableSize,
hashTablePosition + hashTableSize + skipTableSize);
}

@Override
public ToIntFunction<Object> keyHash() {
return encoding.keyHash();
}

@Override
public BiPredicate<Object, Object> keyEquality() {
return encoding.keyEquality();
}

@Override
public Object get(Object key, Object defaultValue) {
int hash = keyHash().applyAsInt(key);

in.seek(hashTablePosition);
HashTable.Entry blockEntry = HashTable.get(in, hash);
if (blockEntry == null) {
return defaultValue;
} else {
in.seek(blockEntry.offset);
return HashMap.keyWithinBlock(in, encoding, hash, key, defaultValue);
}
}

@Override
public long indexOf(Object key) {
return 0;
}

@Override
public long size() {
return size;
}

@Override
public IEntry<Object, Object> nth(long index) {
in.seek(skipTablePosition);
SkipTable.Entry blockEntry = SkipTable.lookup(in, index);

in.seek(blockEntry.offset);
return HashMap.indexWithinBlock(in, encoding, (int) (index - blockEntry.index));
}

@Override
public DurableHashMap clone() {
return this;
}
}
33 changes: 19 additions & 14 deletions src/io/lacuna/bifurcan/DurableInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.function.Function;

public interface DurableInput extends DataInput, Closeable, AutoCloseable {

Expand All @@ -33,6 +34,10 @@ static DurableInput open(Path path, int bufferSize) throws IOException {
return new ByteChannelDurableInput(file, 0, file.size(), bufferSize);
}

static DurableInput from(Iterable<ByteBuffer> buffers) {
return from(buffers, DEFAULT_BUFFER_SIZE);
}

static DurableInput from(Iterable<ByteBuffer> buffers, int bufferSize) {
long size = 0;
for (ByteBuffer b : buffers) {
Expand All @@ -41,16 +46,18 @@ static DurableInput from(Iterable<ByteBuffer> buffers, int bufferSize) {
return new ByteChannelDurableInput(new ByteBufferReadableChannel(buffers), 0, size, bufferSize);
}

// DurableInput slice(long offset, long length);
//
// default DurableInput slice(long length) {
// return slice(position(), length);
// }

default void readFully(byte[] b) {
readFully(b, 0, b.length);
}

void seek(long position);

default void skip(long bytes) {
seek(position() + bytes);
}

long remaining();

long position();
Expand Down Expand Up @@ -86,11 +93,7 @@ default int skipBytes(int n) {
double readDouble();

default long readVLQ() {
try {
return Util.readVLQ(this);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Util.readVLQ(this);
}

default boolean readBoolean() {
Expand All @@ -116,11 +119,13 @@ default String readUTF() {
}

default BlockPrefix readPrefix() {
try {
return BlockPrefix.read(this);
} catch (IOException e) {
throw new RuntimeException(e);
}
return BlockPrefix.read(this);
}

default long skipBlock() {
long pos = position();
skipBytes(readPrefix().length);
return position() - pos;
}

default InputStream asInputStream() {
Expand Down
24 changes: 14 additions & 10 deletions src/io/lacuna/bifurcan/DurableOutput.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package io.lacuna.bifurcan;

import io.lacuna.bifurcan.durable.*;
import io.lacuna.bifurcan.durable.BlockPrefix.BlockType;

import java.io.*;
import io.lacuna.bifurcan.durable.ByteChannelDurableOutput;
import io.lacuna.bifurcan.durable.DurableOutputStream;
import io.lacuna.bifurcan.durable.Util;

import java.io.Closeable;
import java.io.DataOutput;
import java.io.Flushable;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.nio.channels.Channels;

public interface DurableOutput extends DataOutput, Flushable, Closeable, AutoCloseable {

int DEFAULT_BUFFER_SIZE = 1 << 16;

static DurableOutput from(OutputStream os) {
return new ByteChannelDurableOutput(Channels.newChannel(os), DEFAULT_BUFFER_SIZE);
}

default void write(byte[] b) {
write(b, 0, b.length);
}
Expand Down Expand Up @@ -42,11 +50,7 @@ default void write(byte[] b, int off, int len) {
int write(ByteBuffer src);

default void writeVLQ(long n) {
try {
Util.writeVLQ(n, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
Util.writeVLQ(n, this);
}

default void write(Iterable<ByteBuffer> buffers) {
Expand Down
5 changes: 0 additions & 5 deletions src/io/lacuna/bifurcan/FloatMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ public boolean contains(double key) {
return map.contains(doubleToLong(key));
}

@Override
public boolean contains(Double key) {
return contains((double) key);
}

@Override
public IList<IEntry<Double, V>> entries() {
return Lists.lazyMap(map.entries(), FloatMap::convertEntry);
Expand Down
6 changes: 3 additions & 3 deletions src/io/lacuna/bifurcan/ICollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public interface ICollection<C, V> extends Iterable<V> {
long size();

/**
* @return the element at {@code idx}
* @throws IndexOutOfBoundsException when {@code idx} is not within {@code [0, size-1]}
* @return the element at {@code index}
* @throws IndexOutOfBoundsException when {@code index} is not within {@code [0, size-1]}
*/
V nth(long idx);
V nth(long index);

/**
* @return the element at {@code idx}, or {@code defaultValue} if it is not within {@code [0, size-1]}
Expand Down
4 changes: 4 additions & 0 deletions src/io/lacuna/bifurcan/IDurableCollection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.lacuna.bifurcan;

public interface IDurableCollection {
}
4 changes: 3 additions & 1 deletion src/io/lacuna/bifurcan/IMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ default V getOrCreate(K key, Supplier<V> f) {
/**
* @return true if {@code key} is in the map, false otherwise
*/
boolean contains(K key);
default boolean contains(K key) {
return indexOf(key) != -1;
}

/**
* @return a list containing all the entries within the map
Expand Down
Loading

0 comments on commit a86adf8

Please sign in to comment.