chore(lsm): add initial range search
This commit is contained in:
parent
ddd21b39a6
commit
59eaef2ae3
|
@ -44,7 +44,14 @@ func TestReverse(t *testing.T) {
|
|||
func TestFile(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
f := basicFile(t)
|
||||
entries := entries {
|
||||
{"key-1", 1},
|
||||
{"key-2", 2},
|
||||
{"key-3", 3},
|
||||
{"longerkey-4", 65535},
|
||||
}
|
||||
|
||||
f := basicFile(t, entries, entries, entries)
|
||||
|
||||
sf, err := ReadFile(f)
|
||||
is.NoErr(err)
|
||||
|
@ -52,20 +59,18 @@ func TestFile(t *testing.T) {
|
|||
is.Equal(len(sf.segments), 3)
|
||||
}
|
||||
|
||||
func basicFile(t *testing.T) fs.File {
|
||||
func basicFile(t *testing.T, lis ...entries) fs.File {
|
||||
t.Helper()
|
||||
|
||||
data := segment{entries: entries{
|
||||
{"key-1", 1},
|
||||
{"key-2", 2},
|
||||
{"key-3", 3},
|
||||
{"longerkey-4", 65535},
|
||||
}}
|
||||
|
||||
b, err := data.MarshalBinary()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
segments := make([][]byte, len(lis))
|
||||
var err error
|
||||
for i, entries := range lis {
|
||||
data := segment{entries: entries}
|
||||
segments[i], err = data.MarshalBinary()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
return NewFile(b, b, b)
|
||||
return NewFile(segments...)
|
||||
}
|
||||
|
|
69
lsm/sst.go
69
lsm/sst.go
|
@ -1,5 +1,12 @@
|
|||
// SPDX-FileCopyrightText: 2023 Jon Lundy <jon@xuu.cc>
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// lsm -- Log Structured Merge-Tree
|
||||
//
|
||||
// This is a basic LSM tree using a SSTable optimized for append only writing. On disk data is organized into time ordered
|
||||
// files of segments, containing reverse sorted keys. Each segment ends with a magic value `Souris\x01`, a 4byte hash, count of
|
||||
// segment entries, and data length.
|
||||
|
||||
package lsm
|
||||
|
||||
import (
|
||||
|
@ -23,11 +30,11 @@ var (
|
|||
)
|
||||
|
||||
type header struct {
|
||||
sig []byte
|
||||
entries uint64
|
||||
datalen uint64
|
||||
headlen uint64
|
||||
end int64
|
||||
sig []byte // 4Byte signature
|
||||
entries uint64 // count of entries in segment
|
||||
datalen uint64 // length of data
|
||||
headlen uint64 // length of header
|
||||
end int64 // location of end of data/start of header (start of data is `end - datalen`)
|
||||
}
|
||||
|
||||
// ReadHead parse header from a segment. reads from the end of slice of length segmentFooterLength
|
||||
|
@ -173,8 +180,21 @@ func (s *segmentReader) FirstEntry() (*entryBytes, error) {
|
|||
return e, err
|
||||
}
|
||||
|
||||
func (s *segmentReader) VerifyHash() (bool, error) {
|
||||
h := hash()
|
||||
data := make([]byte, s.head.datalen)
|
||||
_, err := s.rd.ReadAt(data, s.head.end-int64(s.head.datalen))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
_, err = h.Write(data)
|
||||
ok := bytes.Equal(h.Sum(nil), s.head.sig)
|
||||
|
||||
return ok, err
|
||||
}
|
||||
|
||||
// Find locates needle within a segment. if it cant find it will return the nearest key before needle.
|
||||
func (s *segmentReader) Find(needle []byte) (*entryBytes, bool, error) {
|
||||
func (s *segmentReader) Find(needle []byte, first bool) (*entryBytes, bool, error) {
|
||||
if s == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
@ -184,23 +204,27 @@ func (s *segmentReader) Find(needle []byte) (*entryBytes, bool, error) {
|
|||
}
|
||||
|
||||
last := e
|
||||
found := false
|
||||
for pos > 0 {
|
||||
key, _ := e.KeyValue()
|
||||
switch bytes.Compare(key, needle) {
|
||||
case 1: // key=ccc, needle=bbb
|
||||
return last, found, nil
|
||||
case 0: // equal
|
||||
return e, true, nil
|
||||
if first {
|
||||
return e, true, nil
|
||||
}
|
||||
found = true
|
||||
fallthrough
|
||||
case -1: // key=aaa, needle=bbb
|
||||
last = e
|
||||
e, pos, err = s.readEntryAt(pos)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, found, err
|
||||
}
|
||||
|
||||
case 1: // key=ccc, needle=bbb
|
||||
return last, false, nil
|
||||
}
|
||||
}
|
||||
return last, false, nil
|
||||
return last, found, nil
|
||||
}
|
||||
func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) {
|
||||
if pos < 0 {
|
||||
|
@ -217,7 +241,10 @@ func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) {
|
|||
}
|
||||
|
||||
type logFile struct {
|
||||
rd interface{io.ReaderAt; io.WriterTo}
|
||||
rd interface {
|
||||
io.ReaderAt
|
||||
io.WriterTo
|
||||
}
|
||||
segments []segmentReader
|
||||
|
||||
fs.File
|
||||
|
@ -232,7 +259,10 @@ func ReadFile(fd fs.File) (*logFile, error) {
|
|||
}
|
||||
|
||||
eof := stat.Size()
|
||||
if rd, ok := fd.(interface{io.ReaderAt; io.WriterTo}); ok {
|
||||
if rd, ok := fd.(interface {
|
||||
io.ReaderAt
|
||||
io.WriterTo
|
||||
}); ok {
|
||||
l.rd = rd
|
||||
|
||||
} else {
|
||||
|
@ -243,8 +273,8 @@ func ReadFile(fd fs.File) (*logFile, error) {
|
|||
l.rd = bytes.NewReader(rd)
|
||||
}
|
||||
|
||||
head := make([]byte, segmentFooterLength)
|
||||
for eof > 0 {
|
||||
head := make([]byte, segmentFooterLength)
|
||||
_, err = l.rd.ReadAt(head, eof-int64(segmentFooterLength))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -285,7 +315,7 @@ func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) {
|
|||
|
||||
return &segmentBytes{b, -1}, nil
|
||||
}
|
||||
func (l *logFile) Find(needle []byte) (*entryBytes, bool, error) {
|
||||
func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) {
|
||||
var last segmentReader
|
||||
|
||||
for _, s := range l.segments {
|
||||
|
@ -294,13 +324,16 @@ func (l *logFile) Find(needle []byte) (*entryBytes, bool, error) {
|
|||
return nil, false, err
|
||||
}
|
||||
k, _ := e.KeyValue()
|
||||
if bytes.Compare(k, needle) > 0 {
|
||||
if first && bytes.Compare(k, needle) >= 0 {
|
||||
break
|
||||
}
|
||||
if !first && bytes.Compare(k, needle) > 0 {
|
||||
break
|
||||
}
|
||||
last = s
|
||||
}
|
||||
|
||||
return last.Find(needle)
|
||||
return last.Find(needle, first)
|
||||
}
|
||||
func (l *logFile) WriteTo(w io.Writer) (int64, error) {
|
||||
return l.rd.WriteTo(w)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
// SPDX-FileCopyrightText: 2023 Jon Lundy <jon@xuu.cc>
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package lsm
|
||||
|
||||
import (
|
||||
|
@ -39,19 +40,19 @@ func TestLargeFile(t *testing.T) {
|
|||
}
|
||||
t.Log(f.Stat())
|
||||
|
||||
tt, ok, err := sf.Find(needle)
|
||||
tt, ok, err := sf.Find(needle, false)
|
||||
is.NoErr(err)
|
||||
is.True(ok)
|
||||
key, val := tt.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
tt, ok, err = sf.Find([]byte("needle"))
|
||||
tt, ok, err = sf.Find([]byte("needle"), false)
|
||||
is.NoErr(err)
|
||||
is.True(!ok)
|
||||
key, val = tt.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
tt, ok, err = sf.Find([]byte{'\xff'})
|
||||
tt, ok, err = sf.Find([]byte{'\xff'}, false)
|
||||
is.NoErr(err)
|
||||
is.True(!ok)
|
||||
key, val = tt.KeyValue()
|
||||
|
@ -85,23 +86,28 @@ func TestLargeFileDisk(t *testing.T) {
|
|||
is.NoErr(err)
|
||||
k, v := e.KeyValue()
|
||||
needle = k
|
||||
t.Logf("Segment-%d: %s = %d", i, k, v)
|
||||
|
||||
ok, err := s.VerifyHash()
|
||||
is.NoErr(err)
|
||||
|
||||
t.Logf("Segment-%d: %s = %d %t", i, k, v, ok)
|
||||
is.True(ok)
|
||||
}
|
||||
t.Log(f.Stat())
|
||||
|
||||
tt, ok, err := sf.Find(needle)
|
||||
tt, ok, err := sf.Find(needle, false)
|
||||
is.NoErr(err)
|
||||
is.True(ok)
|
||||
key, val := tt.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
tt, ok, err = sf.Find([]byte("needle"))
|
||||
tt, ok, err = sf.Find([]byte("needle"), false)
|
||||
is.NoErr(err)
|
||||
is.True(!ok)
|
||||
key, val = tt.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
tt, ok, err = sf.Find([]byte{'\xff'})
|
||||
tt, ok, err = sf.Find([]byte{'\xff'}, false)
|
||||
is.NoErr(err)
|
||||
is.True(!ok)
|
||||
key, val = tt.KeyValue()
|
||||
|
@ -133,7 +139,7 @@ func BenchmarkLargeFile(b *testing.B) {
|
|||
if each > 0 && n%each == 0 {
|
||||
b.Log(n)
|
||||
}
|
||||
_, ok, err := sf.Find(keys[n])
|
||||
_, ok, err := sf.Find(keys[n], false)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
|
@ -144,40 +150,48 @@ func BenchmarkLargeFile(b *testing.B) {
|
|||
b.Log("okays=", b.N, okays)
|
||||
}
|
||||
|
||||
func BenchmarkLargeFileB(b *testing.B) {
|
||||
segCount := 4098 / 16
|
||||
f := randFile(b, 2_000_000, segCount)
|
||||
// TestFindRange is an initial range find for start and stop of a range of needles.
|
||||
// TODO: start the second query from where the first left off. Use an iterator?
|
||||
func TestFindRange(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
f := basicFile(t,
|
||||
entries{
|
||||
{"AD", 5},
|
||||
{"AC", 5},
|
||||
{"AB", 4},
|
||||
{"AB", 3},
|
||||
},
|
||||
entries{
|
||||
{"AB", 2},
|
||||
{"AA", 1},
|
||||
},
|
||||
)
|
||||
sf, err := ReadFile(f)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
key := make([]byte, 5)
|
||||
keys := make([][]byte, b.N)
|
||||
for i := range keys {
|
||||
_, err = crand.Read(key)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key))
|
||||
}
|
||||
b.Log("ready", b.N)
|
||||
b.ResetTimer()
|
||||
okays := 0
|
||||
each := b.N / 10
|
||||
for n := 0; n < b.N; n++ {
|
||||
if each > 0 && n%each == 0 {
|
||||
b.Log(n)
|
||||
}
|
||||
_, ok, err := sf.Find(keys[n])
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
if ok {
|
||||
okays++
|
||||
}
|
||||
}
|
||||
b.Log("okays=", b.N, okays)
|
||||
is.NoErr(err)
|
||||
|
||||
var ok bool
|
||||
var first, last *entryBytes
|
||||
|
||||
first, ok, err = sf.Find([]byte("AB"), true)
|
||||
is.NoErr(err)
|
||||
|
||||
key, val := first.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
is.True(ok)
|
||||
is.Equal(key, []byte("AB"))
|
||||
is.Equal(val, uint64(2))
|
||||
|
||||
last, ok, err = sf.Find([]byte("AC"), false)
|
||||
is.NoErr(err)
|
||||
|
||||
key, val = last.KeyValue()
|
||||
t.Log(string(key), val)
|
||||
|
||||
is.True(ok)
|
||||
is.Equal(key, []byte("AC"))
|
||||
is.Equal(val, uint64(5))
|
||||
}
|
||||
|
||||
func randFile(t interface {
|
||||
|
|
Loading…
Reference in New Issue
Block a user