From ddd21b39a603361be50693ed7a66603e58cc5aa7 Mon Sep 17 00:00:00 2001 From: xuu Date: Sat, 28 Oct 2023 08:41:59 -0600 Subject: [PATCH 1/3] chore: add lsm/sstable --- lsm/marshal.go | 138 +++++++++++++++++++ lsm/marshal_test.go | 71 ++++++++++ lsm/sst.go | 323 ++++++++++++++++++++++++++++++++++++++++++++ lsm/sst_test.go | 302 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 834 insertions(+) create mode 100644 lsm/marshal.go create mode 100644 lsm/marshal_test.go create mode 100644 lsm/sst.go create mode 100644 lsm/sst_test.go diff --git a/lsm/marshal.go b/lsm/marshal.go new file mode 100644 index 0000000..00034d2 --- /dev/null +++ b/lsm/marshal.go @@ -0,0 +1,138 @@ +package lsm + +import ( + "bytes" + "encoding" + "encoding/binary" + "fmt" +) + +type entry struct { + key string + value uint64 +} + +// MarshalBinary implements encoding.BinaryMarshaler. +func (e *entry) MarshalBinary() (data []byte, err error) { + data = make([]byte, len(e.key), len(e.key)+binary.MaxVarintLen16) + copy(data, e.key) + + data = binary.AppendUvarint(data, e.value) + reverse(data[len(e.key):]) + return data, err +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (e *entry) UnmarshalBinary(data []byte) error { + // fmt.Println("unmarshal", data, string(data)) + + if len(data) < binary.MaxVarintLen16 { + return fmt.Errorf("%w: bad data", ErrDecode) + } + head := make([]byte, binary.MaxVarintLen16) + copy(head, data[max(0, len(data)-cap(head)):]) + reverse(head) + + size := 0 + e.value, size = binary.Uvarint(head) + if size == 0 { + return fmt.Errorf("%w: invalid data", ErrDecode) + } + e.key = string(data[:len(data)-size]) + + return nil +} + +var _ encoding.BinaryMarshaler = (*entry)(nil) +var _ encoding.BinaryUnmarshaler = (*entry)(nil) + +type entries []entry + +// MarshalBinary implements encoding.BinaryMarshaler. +func (lis *entries) MarshalBinary() (data []byte, err error) { + var buf bytes.Buffer + + for _, e := range *lis { + d, err := e.MarshalBinary() + if err != nil { + return nil, err + } + + _, err = buf.Write(d) + if err != nil { + return nil, err + } + + _, err = buf.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(d))))) + if err != nil { + return nil, err + } + } + + return buf.Bytes(), err +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (lis *entries) UnmarshalBinary(data []byte) error { + head := make([]byte, binary.MaxVarintLen16) + pos := uint64(len(data)) + + for pos > 0 { + copy(head, data[max(0, pos-uint64(cap(head))):]) + length, size := binary.Uvarint(reverse(head)) + + e := entry{} + if err := e.UnmarshalBinary(data[max(0, pos-(length+uint64(size))) : pos-uint64(size)]); err != nil { + return err + } + *lis = append(*lis, e) + + pos -= length + uint64(size) + } + reverse(*lis) + return nil +} + +var _ encoding.BinaryMarshaler = (*entries)(nil) +var _ encoding.BinaryUnmarshaler = (*entries)(nil) + +type segment struct { + entries entries +} + +// MarshalBinary implements encoding.BinaryMarshaler. +func (s *segment) MarshalBinary() (data []byte, err error) { + head := header{ + entries: uint64(len(s.entries)), + } + + data, err = s.entries.MarshalBinary() + if err != nil { + return nil, err + } + + head.datalen = uint64(len(data)) + + h := hash() + h.Write(data) + head.sig = h.Sum(nil) + + return head.Append(data), err +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (s *segment) UnmarshalBinary(data []byte) error { + head, err := ReadHead(data) + if err != nil { + return err + } + + h := hash() + h.Write(data[:head.datalen]) + if !bytes.Equal(head.sig, h.Sum(nil)) { + return fmt.Errorf("%w: invalid checksum", ErrDecode) + } + + s.entries = make(entries, 0, head.entries) + return s.entries.UnmarshalBinary(data[:head.datalen]) +} diff --git a/lsm/marshal_test.go b/lsm/marshal_test.go new file mode 100644 index 0000000..e67f0a0 --- /dev/null +++ b/lsm/marshal_test.go @@ -0,0 +1,71 @@ +package lsm + +import ( + "io/fs" + "testing" + + "github.com/matryer/is" +) + +func TestEncoding(t *testing.T) { + is := is.New(t) + + data := segment{entries: entries{ + {"key-1", 1}, + {"key-2", 2}, + {"key-3", 3}, + {"longerkey-4", 65535}, + }} + + b, err := data.MarshalBinary() + is.NoErr(err) + + var got segment + err = got.UnmarshalBinary(b) + is.NoErr(err) + + is.Equal(data, got) +} + +func TestReverse(t *testing.T) { + is := is.New(t) + + got := []byte("gnirts a si siht") + reverse(got) + + is.Equal(got, []byte("this is a string")) + + got = []byte("!gnirts a si siht") + reverse(got) + + is.Equal(got, []byte("this is a string!")) +} + +func TestFile(t *testing.T) { + is := is.New(t) + + f := basicFile(t) + + sf, err := ReadFile(f) + is.NoErr(err) + + is.Equal(len(sf.segments), 3) +} + +func basicFile(t *testing.T) fs.File { + t.Helper() + + data := segment{entries: entries{ + {"key-1", 1}, + {"key-2", 2}, + {"key-3", 3}, + {"longerkey-4", 65535}, + }} + + b, err := data.MarshalBinary() + if err != nil { + t.Error(err) + } + + return NewFile(b, b, b) +} diff --git a/lsm/sst.go b/lsm/sst.go new file mode 100644 index 0000000..5f6c833 --- /dev/null +++ b/lsm/sst.go @@ -0,0 +1,323 @@ +// SPDX-FileCopyrightText: 2023 Jon Lundy +// SPDX-License-Identifier: BSD-3-Clause +package lsm + +import ( + "bytes" + "encoding" + "encoding/binary" + "errors" + "fmt" + "hash/fnv" + "io" + "io/fs" + "sort" +) + +var ( + magic = reverse(append([]byte("Souris"), '\x01')) + hash = fnv.New32a + hashLength = hash().Size() + // segmentSize = 2 ^ 16 // min 2^9 = 512b, max? 2^20 = 1M + segmentFooterLength = len(magic) + hashLength + binary.MaxVarintLen32 + binary.MaxVarintLen32 +) + +type header struct { + sig []byte + entries uint64 + datalen uint64 + headlen uint64 + end int64 +} + +// ReadHead parse header from a segment. reads from the end of slice of length segmentFooterLength +func ReadHead(data []byte) (*header, error) { + if len(data) < len(magic)+6 { + return nil, fmt.Errorf("%w: invalid size", ErrDecode) + } + + if !bytes.Equal(data[len(data)-len(magic):], magic) { + return nil, fmt.Errorf("%w: invalid header", ErrDecode) + } + + head := make([]byte, 0, segmentFooterLength) + head = reverse(append(head, data[max(0, len(data)-cap(head)-1):]...)) + size, s := binary.Uvarint(head[len(magic)+4:]) + length, i := binary.Uvarint(head[len(magic)+4+s:]) + + return &header{ + sig: head[len(magic) : len(magic)+4], + entries: size, + datalen: length, + headlen: uint64(len(magic) + hashLength + s + i), + end: int64(len(data)), + }, nil +} +func (h *header) Append(data []byte) []byte { + + length := len(data) + data = append(data, h.sig...) + data = binary.AppendUvarint(data, h.entries) + data = binary.AppendUvarint(data, h.datalen) + reverse(data[length:]) + + return append(data, magic...) +} + +var _ encoding.BinaryMarshaler = (*segment)(nil) +var _ encoding.BinaryUnmarshaler = (*segment)(nil) + +var ErrDecode = errors.New("decode") + +func reverse[T any](b []T) []T { + l := len(b) + for i := 0; i < l/2; i++ { + b[i], b[l-i-1] = b[l-i-1], b[i] + } + return b +} + +// func clone[T ~[]E, E any](e []E) []E { +// return append(e[0:0:0], e...) +// } + +type entryBytes []byte + +// KeyValue returns the parsed key and value from an entry +func (e entryBytes) KeyValue() ([]byte, uint64) { + if len(e) < 2 { + return nil, 0 + } + head := reverse(append(e[0:0:0], e[max(0, len(e)-binary.MaxVarintLen64):]...)) + value, i := binary.Uvarint(head) + return append(e[0:0:0], e[:len(e)-i]...), value +} + +// NewKeyValue packed into an entry +func NewKeyValue(key []byte, val uint64) entryBytes { + length := len(key) + data := append(key[0:0:0], key...) + data = binary.AppendUvarint(data, val) + reverse(data[length:]) + + return data +} + +type listEntries []entryBytes + +// WriteTo implements io.WriterTo. +func (lis *listEntries) WriteTo(wr io.Writer) (int64, error) { + if lis == nil { + return 0, nil + } + + head := header{ + entries: uint64(len(*lis)), + } + h := hash() + + wr = io.MultiWriter(wr, h) + + var i int64 + for _, b := range *lis { + j, err := wr.Write(b) + i += int64(j) + if err != nil { + return i, err + } + + j, err = wr.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(b))))) + i += int64(j) + if err != nil { + return i, err + } + } + head.datalen = uint64(i) + head.sig = h.Sum(nil) + + b := head.Append([]byte{}) + j, err := wr.Write(b) + i += int64(j) + + return i, err +} + +var _ sort.Interface = listEntries{} + +// Len implements sort.Interface. +func (lis listEntries) Len() int { + return len(lis) +} + +// Less implements sort.Interface. +func (lis listEntries) Less(i int, j int) bool { + iname, _ := lis[i].KeyValue() + jname, _ := lis[j].KeyValue() + + return bytes.Compare(iname, jname) < 0 +} + +// Swap implements sort.Interface. +func (lis listEntries) Swap(i int, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + +type segmentReader struct { + head *header + rd io.ReaderAt +} + +// FirstEntry parses the first segment entry from the end of the segment +func (s *segmentReader) FirstEntry() (*entryBytes, error) { + e, _, err := s.readEntryAt(-1) + return e, err +} + +// Find locates needle within a segment. if it cant find it will return the nearest key before needle. +func (s *segmentReader) Find(needle []byte) (*entryBytes, bool, error) { + if s == nil { + return nil, false, nil + } + e, pos, err := s.readEntryAt(-1) + if err != nil { + return nil, false, err + } + + last := e + for pos > 0 { + key, _ := e.KeyValue() + switch bytes.Compare(key, needle) { + case 0: // equal + return e, true, nil + case -1: // key=aaa, needle=bbb + last = e + e, pos, err = s.readEntryAt(pos) + if err != nil { + return nil, false, err + } + + case 1: // key=ccc, needle=bbb + return last, false, nil + } + } + return last, false, nil +} +func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) { + if pos < 0 { + pos = s.head.end + } + head := make([]byte, binary.MaxVarintLen16) + s.rd.ReadAt(head, pos-binary.MaxVarintLen16) + length, hsize := binary.Uvarint(reverse(head)) + + e := make(entryBytes, length) + _, err := s.rd.ReadAt(e, pos-int64(length)-int64(hsize)) + + return &e, pos - int64(length) - int64(hsize), err +} + +type logFile struct { + rd interface{io.ReaderAt; io.WriterTo} + segments []segmentReader + + fs.File +} + +func ReadFile(fd fs.File) (*logFile, error) { + l := &logFile{File: fd} + + stat, err := fd.Stat() + if err != nil { + return nil, err + } + + eof := stat.Size() + if rd, ok := fd.(interface{io.ReaderAt; io.WriterTo}); ok { + l.rd = rd + + } else { + rd, err := io.ReadAll(fd) + if err != nil { + return nil, err + } + l.rd = bytes.NewReader(rd) + } + + for eof > 0 { + head := make([]byte, segmentFooterLength) + _, err = l.rd.ReadAt(head, eof-int64(segmentFooterLength)) + if err != nil { + return nil, err + } + + s := segmentReader{ + rd: l.rd, + } + s.head, err = ReadHead(head) + s.head.end = eof - int64(s.head.headlen) + if err != nil { + return nil, err + } + eof -= int64(s.head.datalen) + int64(s.head.headlen) + l.segments = append(l.segments, s) + } + + return l, nil +} + +func (l *logFile) Count() int64 { + return int64(len(l.segments)) +} +func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) { + if pos < 0 { + pos = int64(len(l.segments) - 1) + } + if pos > int64(len(l.segments)-1) { + return nil, ErrDecode + } + s := l.segments[pos] + + b := make([]byte, s.head.datalen+s.head.headlen) + _, err := l.rd.ReadAt(b, s.head.end-int64(len(b))) + if err != nil { + return nil, err + } + + return &segmentBytes{b, -1}, nil +} +func (l *logFile) Find(needle []byte) (*entryBytes, bool, error) { + var last segmentReader + + for _, s := range l.segments { + e, err := s.FirstEntry() + if err != nil { + return nil, false, err + } + k, _ := e.KeyValue() + if bytes.Compare(k, needle) > 0 { + break + } + last = s + } + + return last.Find(needle) +} +func (l *logFile) WriteTo(w io.Writer) (int64, error) { + return l.rd.WriteTo(w) +} + +type segmentBytes struct { + b []byte + pos int +} + +type dataset struct { + rd io.ReaderAt + files []logFile + + fs.FS +} + +func ReadDataset(fd fs.FS) (*dataset, error) { + panic("not implemented") +} diff --git a/lsm/sst_test.go b/lsm/sst_test.go new file mode 100644 index 0000000..839a924 --- /dev/null +++ b/lsm/sst_test.go @@ -0,0 +1,302 @@ +// SPDX-FileCopyrightText: 2023 Jon Lundy +// SPDX-License-Identifier: BSD-3-Clause +package lsm + +import ( + "bytes" + crand "crypto/rand" + "encoding/base64" + "io" + "io/fs" + "math/rand" + "os" + "sort" + "sync" + "testing" + "time" + + "github.com/matryer/is" +) + +func TestLargeFile(t *testing.T) { + is := is.New(t) + + segCount := 4098 + + f := randFile(t, 2_000_000, segCount) + + sf, err := ReadFile(f) + is.NoErr(err) + + is.True(len(sf.segments) <= segCount) + var needle []byte + for i, s := range sf.segments { + e, err := s.FirstEntry() + is.NoErr(err) + k, v := e.KeyValue() + needle = k + t.Logf("Segment-%d: %s = %d", i, k, v) + } + t.Log(f.Stat()) + + tt, ok, err := sf.Find(needle) + is.NoErr(err) + is.True(ok) + key, val := tt.KeyValue() + t.Log(string(key), val) + + tt, ok, err = sf.Find([]byte("needle")) + is.NoErr(err) + is.True(!ok) + key, val = tt.KeyValue() + t.Log(string(key), val) + + tt, ok, err = sf.Find([]byte{'\xff'}) + is.NoErr(err) + is.True(!ok) + key, val = tt.KeyValue() + t.Log(string(key), val) +} + +func TestLargeFileDisk(t *testing.T) { + is := is.New(t) + + segCount := 4098 + + t.Log("generate large file") + f := randFile(t, 2_000_000, segCount) + + fd, err := os.CreateTemp("", "sst*") + is.NoErr(err) + defer func() { t.Log("cleanup:", fd.Name()); fd.Close(); os.Remove(fd.Name()) }() + + t.Log("write file:", fd.Name()) + _, err = io.Copy(fd, f) + is.NoErr(err) + fd.Seek(0, 0) + + sf, err := ReadFile(fd) + is.NoErr(err) + + is.True(len(sf.segments) <= segCount) + var needle []byte + for i, s := range sf.segments { + e, err := s.FirstEntry() + is.NoErr(err) + k, v := e.KeyValue() + needle = k + t.Logf("Segment-%d: %s = %d", i, k, v) + } + t.Log(f.Stat()) + + tt, ok, err := sf.Find(needle) + is.NoErr(err) + is.True(ok) + key, val := tt.KeyValue() + t.Log(string(key), val) + + tt, ok, err = sf.Find([]byte("needle")) + is.NoErr(err) + is.True(!ok) + key, val = tt.KeyValue() + t.Log(string(key), val) + + tt, ok, err = sf.Find([]byte{'\xff'}) + is.NoErr(err) + is.True(!ok) + key, val = tt.KeyValue() + t.Log(string(key), val) +} + +func BenchmarkLargeFile(b *testing.B) { + segCount := 4098 / 4 + f := randFile(b, 2_000_000, segCount) + + sf, err := ReadFile(f) + if err != nil { + b.Error(err) + } + key := make([]byte, 5) + keys := make([][]byte, b.N) + for i := range keys { + _, err = crand.Read(key) + if err != nil { + b.Error(err) + } + keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key)) + } + b.Log("ready", b.N) + b.ResetTimer() + okays := 0 + each := b.N / 10 + for n := 0; n < b.N; n++ { + if each > 0 && n%each == 0 { + b.Log(n) + } + _, ok, err := sf.Find(keys[n]) + if err != nil { + b.Error(err) + } + if ok { + okays++ + } + } + b.Log("okays=", b.N, okays) +} + +func BenchmarkLargeFileB(b *testing.B) { + segCount := 4098 / 16 + f := randFile(b, 2_000_000, segCount) + + sf, err := ReadFile(f) + if err != nil { + b.Error(err) + } + key := make([]byte, 5) + keys := make([][]byte, b.N) + for i := range keys { + _, err = crand.Read(key) + if err != nil { + b.Error(err) + } + keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key)) + } + b.Log("ready", b.N) + b.ResetTimer() + okays := 0 + each := b.N / 10 + for n := 0; n < b.N; n++ { + if each > 0 && n%each == 0 { + b.Log(n) + } + _, ok, err := sf.Find(keys[n]) + if err != nil { + b.Error(err) + } + if ok { + okays++ + } + } + b.Log("okays=", b.N, okays) +} + +func randFile(t interface { + Helper() + Error(...any) +}, size int, segments int) fs.File { + t.Helper() + + lis := make(listEntries, size) + for i := range lis { + key := make([]byte, 5) + _, err := crand.Read(key) + if err != nil { + t.Error(err) + } + key = []byte(base64.RawURLEncoding.EncodeToString(key)) + // key := []byte(fmt.Sprintf("key-%05d", i)) + + lis[i] = NewKeyValue(key, rand.Uint64()%16_777_216) + } + + sort.Sort(sort.Reverse(&lis)) + each := size / segments + if size%segments != 0 { + each++ + } + split := make([]listEntries, segments) + + for i := range split { + if (i+1)*each > len(lis) { + split[i] = lis[i*each : i*each+len(lis[i*each:])] + split = split[:i+1] + break + } + split[i] = lis[i*each : (i+1)*each] + } + + var b bytes.Buffer + for _, s := range split { + s.WriteTo(&b) + } + + return NewFile(b.Bytes()) +} + +type fakeStat struct { + size int64 +} + +// IsDir implements fs.FileInfo. +func (*fakeStat) IsDir() bool { + panic("unimplemented") +} + +// ModTime implements fs.FileInfo. +func (*fakeStat) ModTime() time.Time { + panic("unimplemented") +} + +// Mode implements fs.FileInfo. +func (*fakeStat) Mode() fs.FileMode { + panic("unimplemented") +} + +// Name implements fs.FileInfo. +func (*fakeStat) Name() string { + panic("unimplemented") +} + +// Size implements fs.FileInfo. +func (s *fakeStat) Size() int64 { + return s.size +} + +// Sys implements fs.FileInfo. +func (*fakeStat) Sys() any { + panic("unimplemented") +} + +var _ fs.FileInfo = (*fakeStat)(nil) + +type rd interface { + io.ReaderAt + io.Reader +} +type fakeFile struct { + stat func() fs.FileInfo + + rd +} + +func (fakeFile) Close() error { return nil } +func (f fakeFile) Stat() (fs.FileInfo, error) { return f.stat(), nil } + +func NewFile(b ...[]byte) fs.File { + in := bytes.Join(b, nil) + rd := bytes.NewReader(in) + size := int64(len(in)) + return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: size} }, rd: rd} +} +func NewFileFromReader(rd *bytes.Reader) fs.File { + return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: int64(rd.Len())} }, rd: rd} +} + +type fakeFS struct { + files map[string]*fakeFile + mu sync.RWMutex +} + +// Open implements fs.FS. +func (f *fakeFS) Open(name string) (fs.File, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + if file, ok := f.files[name]; ok { + return file, nil + } + + return nil, fs.ErrNotExist +} + +var _ fs.FS = (*fakeFS)(nil) From 59eaef2ae3599db7276debc55348b8f400b9e5c6 Mon Sep 17 00:00:00 2001 From: xuu Date: Sat, 28 Oct 2023 19:40:29 -0600 Subject: [PATCH 2/3] chore(lsm): add initial range search --- lsm/marshal_test.go | 31 ++++++++------- lsm/sst.go | 69 ++++++++++++++++++++++++--------- lsm/sst_test.go | 94 ++++++++++++++++++++++++++------------------- 3 files changed, 123 insertions(+), 71 deletions(-) diff --git a/lsm/marshal_test.go b/lsm/marshal_test.go index e67f0a0..8bdcffa 100644 --- a/lsm/marshal_test.go +++ b/lsm/marshal_test.go @@ -44,7 +44,14 @@ func TestReverse(t *testing.T) { func TestFile(t *testing.T) { is := is.New(t) - f := basicFile(t) + entries := entries { + {"key-1", 1}, + {"key-2", 2}, + {"key-3", 3}, + {"longerkey-4", 65535}, + } + + f := basicFile(t, entries, entries, entries) sf, err := ReadFile(f) is.NoErr(err) @@ -52,20 +59,18 @@ func TestFile(t *testing.T) { is.Equal(len(sf.segments), 3) } -func basicFile(t *testing.T) fs.File { +func basicFile(t *testing.T, lis ...entries) fs.File { t.Helper() - data := segment{entries: entries{ - {"key-1", 1}, - {"key-2", 2}, - {"key-3", 3}, - {"longerkey-4", 65535}, - }} - - b, err := data.MarshalBinary() - if err != nil { - t.Error(err) + segments := make([][]byte, len(lis)) + var err error + for i, entries := range lis { + data := segment{entries: entries} + segments[i], err = data.MarshalBinary() + if err != nil { + t.Error(err) + } } - return NewFile(b, b, b) + return NewFile(segments...) } diff --git a/lsm/sst.go b/lsm/sst.go index 5f6c833..199ad50 100644 --- a/lsm/sst.go +++ b/lsm/sst.go @@ -1,5 +1,12 @@ // SPDX-FileCopyrightText: 2023 Jon Lundy // SPDX-License-Identifier: BSD-3-Clause + +// lsm -- Log Structured Merge-Tree +// +// This is a basic LSM tree using a SSTable optimized for append only writing. On disk data is organized into time ordered +// files of segments, containing reverse sorted keys. Each segment ends with a magic value `Souris\x01`, a 4byte hash, count of +// segment entries, and data length. + package lsm import ( @@ -23,11 +30,11 @@ var ( ) type header struct { - sig []byte - entries uint64 - datalen uint64 - headlen uint64 - end int64 + sig []byte // 4Byte signature + entries uint64 // count of entries in segment + datalen uint64 // length of data + headlen uint64 // length of header + end int64 // location of end of data/start of header (start of data is `end - datalen`) } // ReadHead parse header from a segment. reads from the end of slice of length segmentFooterLength @@ -173,8 +180,21 @@ func (s *segmentReader) FirstEntry() (*entryBytes, error) { return e, err } +func (s *segmentReader) VerifyHash() (bool, error) { + h := hash() + data := make([]byte, s.head.datalen) + _, err := s.rd.ReadAt(data, s.head.end-int64(s.head.datalen)) + if err != nil { + return false, err + } + _, err = h.Write(data) + ok := bytes.Equal(h.Sum(nil), s.head.sig) + + return ok, err +} + // Find locates needle within a segment. if it cant find it will return the nearest key before needle. -func (s *segmentReader) Find(needle []byte) (*entryBytes, bool, error) { +func (s *segmentReader) Find(needle []byte, first bool) (*entryBytes, bool, error) { if s == nil { return nil, false, nil } @@ -184,23 +204,27 @@ func (s *segmentReader) Find(needle []byte) (*entryBytes, bool, error) { } last := e + found := false for pos > 0 { key, _ := e.KeyValue() switch bytes.Compare(key, needle) { + case 1: // key=ccc, needle=bbb + return last, found, nil case 0: // equal - return e, true, nil + if first { + return e, true, nil + } + found = true + fallthrough case -1: // key=aaa, needle=bbb last = e e, pos, err = s.readEntryAt(pos) if err != nil { - return nil, false, err + return nil, found, err } - - case 1: // key=ccc, needle=bbb - return last, false, nil } } - return last, false, nil + return last, found, nil } func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) { if pos < 0 { @@ -217,7 +241,10 @@ func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) { } type logFile struct { - rd interface{io.ReaderAt; io.WriterTo} + rd interface { + io.ReaderAt + io.WriterTo + } segments []segmentReader fs.File @@ -232,7 +259,10 @@ func ReadFile(fd fs.File) (*logFile, error) { } eof := stat.Size() - if rd, ok := fd.(interface{io.ReaderAt; io.WriterTo}); ok { + if rd, ok := fd.(interface { + io.ReaderAt + io.WriterTo + }); ok { l.rd = rd } else { @@ -243,8 +273,8 @@ func ReadFile(fd fs.File) (*logFile, error) { l.rd = bytes.NewReader(rd) } + head := make([]byte, segmentFooterLength) for eof > 0 { - head := make([]byte, segmentFooterLength) _, err = l.rd.ReadAt(head, eof-int64(segmentFooterLength)) if err != nil { return nil, err @@ -285,7 +315,7 @@ func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) { return &segmentBytes{b, -1}, nil } -func (l *logFile) Find(needle []byte) (*entryBytes, bool, error) { +func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) { var last segmentReader for _, s := range l.segments { @@ -294,13 +324,16 @@ func (l *logFile) Find(needle []byte) (*entryBytes, bool, error) { return nil, false, err } k, _ := e.KeyValue() - if bytes.Compare(k, needle) > 0 { + if first && bytes.Compare(k, needle) >= 0 { + break + } + if !first && bytes.Compare(k, needle) > 0 { break } last = s } - return last.Find(needle) + return last.Find(needle, first) } func (l *logFile) WriteTo(w io.Writer) (int64, error) { return l.rd.WriteTo(w) diff --git a/lsm/sst_test.go b/lsm/sst_test.go index 839a924..319a09a 100644 --- a/lsm/sst_test.go +++ b/lsm/sst_test.go @@ -1,5 +1,6 @@ // SPDX-FileCopyrightText: 2023 Jon Lundy // SPDX-License-Identifier: BSD-3-Clause + package lsm import ( @@ -39,19 +40,19 @@ func TestLargeFile(t *testing.T) { } t.Log(f.Stat()) - tt, ok, err := sf.Find(needle) + tt, ok, err := sf.Find(needle, false) is.NoErr(err) is.True(ok) key, val := tt.KeyValue() t.Log(string(key), val) - tt, ok, err = sf.Find([]byte("needle")) + tt, ok, err = sf.Find([]byte("needle"), false) is.NoErr(err) is.True(!ok) key, val = tt.KeyValue() t.Log(string(key), val) - tt, ok, err = sf.Find([]byte{'\xff'}) + tt, ok, err = sf.Find([]byte{'\xff'}, false) is.NoErr(err) is.True(!ok) key, val = tt.KeyValue() @@ -85,23 +86,28 @@ func TestLargeFileDisk(t *testing.T) { is.NoErr(err) k, v := e.KeyValue() needle = k - t.Logf("Segment-%d: %s = %d", i, k, v) + + ok, err := s.VerifyHash() + is.NoErr(err) + + t.Logf("Segment-%d: %s = %d %t", i, k, v, ok) + is.True(ok) } t.Log(f.Stat()) - tt, ok, err := sf.Find(needle) + tt, ok, err := sf.Find(needle, false) is.NoErr(err) is.True(ok) key, val := tt.KeyValue() t.Log(string(key), val) - tt, ok, err = sf.Find([]byte("needle")) + tt, ok, err = sf.Find([]byte("needle"), false) is.NoErr(err) is.True(!ok) key, val = tt.KeyValue() t.Log(string(key), val) - tt, ok, err = sf.Find([]byte{'\xff'}) + tt, ok, err = sf.Find([]byte{'\xff'}, false) is.NoErr(err) is.True(!ok) key, val = tt.KeyValue() @@ -133,7 +139,7 @@ func BenchmarkLargeFile(b *testing.B) { if each > 0 && n%each == 0 { b.Log(n) } - _, ok, err := sf.Find(keys[n]) + _, ok, err := sf.Find(keys[n], false) if err != nil { b.Error(err) } @@ -144,40 +150,48 @@ func BenchmarkLargeFile(b *testing.B) { b.Log("okays=", b.N, okays) } -func BenchmarkLargeFileB(b *testing.B) { - segCount := 4098 / 16 - f := randFile(b, 2_000_000, segCount) +// TestFindRange is an initial range find for start and stop of a range of needles. +// TODO: start the second query from where the first left off. Use an iterator? +func TestFindRange(t *testing.T) { + is := is.New(t) + f := basicFile(t, + entries{ + {"AD", 5}, + {"AC", 5}, + {"AB", 4}, + {"AB", 3}, + }, + entries{ + {"AB", 2}, + {"AA", 1}, + }, + ) sf, err := ReadFile(f) - if err != nil { - b.Error(err) - } - key := make([]byte, 5) - keys := make([][]byte, b.N) - for i := range keys { - _, err = crand.Read(key) - if err != nil { - b.Error(err) - } - keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key)) - } - b.Log("ready", b.N) - b.ResetTimer() - okays := 0 - each := b.N / 10 - for n := 0; n < b.N; n++ { - if each > 0 && n%each == 0 { - b.Log(n) - } - _, ok, err := sf.Find(keys[n]) - if err != nil { - b.Error(err) - } - if ok { - okays++ - } - } - b.Log("okays=", b.N, okays) + is.NoErr(err) + + var ok bool + var first, last *entryBytes + + first, ok, err = sf.Find([]byte("AB"), true) + is.NoErr(err) + + key, val := first.KeyValue() + t.Log(string(key), val) + + is.True(ok) + is.Equal(key, []byte("AB")) + is.Equal(val, uint64(2)) + + last, ok, err = sf.Find([]byte("AC"), false) + is.NoErr(err) + + key, val = last.KeyValue() + t.Log(string(key), val) + + is.True(ok) + is.Equal(key, []byte("AC")) + is.Equal(val, uint64(5)) } func randFile(t interface { From 3be012e7807ffb71b9bd98b2c240e6d57492e35a Mon Sep 17 00:00:00 2001 From: xuu Date: Mon, 15 Jan 2024 11:26:54 -0700 Subject: [PATCH 3/3] chore: save sst code --- lg/metric.go | 4 +++- lg/tracer.go | 7 +++---- lsm/sst.go | 22 ++++++++++++++++++---- lsm/sst_test.go | 13 ++++++++++++- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/lg/metric.go b/lg/metric.go index 9976fd4..8aa6478 100644 --- a/lg/metric.go +++ b/lg/metric.go @@ -38,9 +38,11 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro goversion := "" pkg := "" host := "" + version := "0.0.1" if info, ok := debug.ReadBuildInfo(); ok { goversion = info.GoVersion pkg = info.Path + version = info.Main.Version } if h, err := os.Hostname(); err == nil { host = h @@ -69,7 +71,7 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro ) meter := provider.Meter(name, - api.WithInstrumentationVersion("0.0.1"), + api.WithInstrumentationVersion(version), api.WithInstrumentationAttributes( attribute.String("app", name), attribute.String("host", host), diff --git a/lg/tracer.go b/lg/tracer.go index 99b0190..16305ae 100644 --- a/lg/tracer.go +++ b/lg/tracer.go @@ -64,12 +64,11 @@ func (w wrapSpan) AddEvent(name string, options ...trace.EventOption) { cfg := trace.NewEventConfig(options...) attrs := cfg.Attributes() - args := make([]any, len(attrs)*2) + args := make([]any, len(attrs)) for i, a := range attrs { - args[2*i] = a.Key - args[2*i+1] = a.Value - } + args[i] = slog.Attr{Key: string(a.Key), Value: slog.StringValue(a.Value.AsString())} + } slog.Debug(name, args...) } diff --git a/lsm/sst.go b/lsm/sst.go index 199ad50..e95731c 100644 --- a/lsm/sst.go +++ b/lsm/sst.go @@ -316,15 +316,24 @@ func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) { return &segmentBytes{b, -1}, nil } func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) { - var last segmentReader + var cur, last segmentReader for _, s := range l.segments { - e, err := s.FirstEntry() + cur = s + e, err := cur.FirstEntry() if err != nil { return nil, false, err } k, _ := e.KeyValue() - if first && bytes.Compare(k, needle) >= 0 { + + if first && bytes.Equal(k, needle) { + break + } + if first && bytes.Compare(k, needle) > 0 { + e, ok, err := cur.Find(needle, first) + if ok || err != nil{ + return e, ok, err + } break } if !first && bytes.Compare(k, needle) > 0 { @@ -333,7 +342,12 @@ func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) { last = s } - return last.Find(needle, first) + e, ok, err := last.Find(needle, first) + if ok || err != nil{ + return e, ok, err + } + // if by mistake it was not found in the last.. check the next segment. + return cur.Find(needle, first) } func (l *logFile) WriteTo(w io.Writer) (int64, error) { return l.rd.WriteTo(w) diff --git a/lsm/sst_test.go b/lsm/sst_test.go index 319a09a..7e842ac 100644 --- a/lsm/sst_test.go +++ b/lsm/sst_test.go @@ -40,7 +40,7 @@ func TestLargeFile(t *testing.T) { } t.Log(f.Stat()) - tt, ok, err := sf.Find(needle, false) + tt, ok, err := sf.Find(needle, true) is.NoErr(err) is.True(ok) key, val := tt.KeyValue() @@ -183,6 +183,17 @@ func TestFindRange(t *testing.T) { is.Equal(key, []byte("AB")) is.Equal(val, uint64(2)) + last, ok, err = sf.Find([]byte("AB"), false) + is.NoErr(err) + + key, val = last.KeyValue() + t.Log(string(key), val) + + is.True(ok) + is.Equal(key, []byte("AB")) + is.Equal(val, uint64(4)) + + last, ok, err = sf.Find([]byte("AC"), false) is.NoErr(err)