Compare commits

..

No commits in common. "dfae0ddbcc70f4d9e1cc632de5fd1f4a7c454178" and "1d987d238dd8287dddddb9594d3958148227080c" have entirely different histories.

9 changed files with 837 additions and 1256 deletions

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
test.db test.db
*.mercury *.mercury
sour.is-mercury sour.is-mercury
.vscode/

5
go.mod
View File

@ -1,6 +1,6 @@
module go.sour.is/pkg module go.sour.is/pkg
go 1.23.1 go 1.22.0
require ( require (
github.com/99designs/gqlgen v0.17.44 github.com/99designs/gqlgen v0.17.44
@ -9,7 +9,7 @@ require (
github.com/matryer/is v1.4.1 github.com/matryer/is v1.4.1
github.com/ravilushqa/otelgqlgen v0.15.0 github.com/ravilushqa/otelgqlgen v0.15.0
github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1 github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1
github.com/vektah/gqlparser/v2 v2.5.14 github.com/vektah/gqlparser/v2 v2.5.11
go.opentelemetry.io/otel v1.23.1 go.opentelemetry.io/otel v1.23.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1
go.opentelemetry.io/otel/sdk/metric v1.23.1 go.opentelemetry.io/otel/sdk/metric v1.23.1
@ -54,7 +54,6 @@ require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/Masterminds/squirrel v1.5.4 github.com/Masterminds/squirrel v1.5.4
github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f

4
go.sum
View File

@ -30,8 +30,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
@ -137,8 +135,6 @@ github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1 h1:lQwP++j
github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1/go.mod h1:sb520Yr+GHBsfL43FQgQ+rLFfuJkItgRWlTgbIQHVxA= github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1/go.mod h1:sb520Yr+GHBsfL43FQgQ+rLFfuJkItgRWlTgbIQHVxA=
github.com/vektah/gqlparser/v2 v2.5.11 h1:JJxLtXIoN7+3x6MBdtIP59TP1RANnY7pXOaDnADQSf8= github.com/vektah/gqlparser/v2 v2.5.11 h1:JJxLtXIoN7+3x6MBdtIP59TP1RANnY7pXOaDnADQSf8=
github.com/vektah/gqlparser/v2 v2.5.11/go.mod h1:1rCcfwB2ekJofmluGWXMSEnPMZgbxzwj6FaZ/4OT8Cc= github.com/vektah/gqlparser/v2 v2.5.11/go.mod h1:1rCcfwB2ekJofmluGWXMSEnPMZgbxzwj6FaZ/4OT8Cc=
github.com/vektah/gqlparser/v2 v2.5.14 h1:dzLq75BJe03jjQm6n56PdH1oweB8ana42wj7E4jRy70=
github.com/vektah/gqlparser/v2 v2.5.14/go.mod h1:WQQjFc+I1YIzoPvZBhUQX7waZgg3pMLi0r8KymvAE2w=
github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA=
github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M=

View File

@ -1,286 +0,0 @@
package main
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"iter"
"net/http"
"net/url"
"os"
"time"
"github.com/docopt/docopt-go"
"go.sour.is/pkg/lsm"
)
var usage = `
Usage:
lsm create <archive> <files>...
lsm append <archive> <files>...
lsm read <archive> [<start> [<end>]]
lsm serve <archive>
lsm client <archive> [<start> [<end>]]`
type args struct {
Create bool
Append bool
Read bool
Serve bool
Client bool
Archive string `docopt:"<archive>"`
Files []string `docopt:"<files>"`
Start int64 `docopt:"<start>"`
End int64 `docopt:"<end>"`
}
func main() {
opts, err := docopt.ParseDoc(usage)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
args := args{}
err = opts.Bind(&args)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = run(Console, args)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
type console struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
var Console = console{os.Stdin, os.Stdout, os.Stderr}
func (c console) Write(b []byte) (int, error) {
return c.Stdout.Write(b)
}
func run(console console, a args) error {
fmt.Fprintln(console, "lsm")
switch {
case a.Create:
f, err := os.OpenFile(a.Archive, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
return lsm.WriteLogFile(f, fileReaders(a.Files))
case a.Append:
f, err := os.OpenFile(a.Archive, os.O_RDWR, 0644)
if err != nil {
return err
}
defer f.Close()
return lsm.AppendLogFile(f, fileReaders(a.Files))
case a.Read:
fmt.Fprintln(console, "reading", a.Archive)
f, err := os.Open(a.Archive)
if err != nil {
return err
}
defer f.Close()
return readContent(f, console, a.Start, a.End)
case a.Serve:
fmt.Fprintln(console, "serving", a.Archive)
b, err := base64.RawStdEncoding.DecodeString(a.Archive)
now := time.Now()
if err != nil {
return err
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "", now, bytes.NewReader(b))
})
return http.ListenAndServe(":8080", nil)
case a.Client:
r, err := OpenHttpReader(context.Background(), a.Archive, 0)
if err != nil {
return err
}
defer r.Close()
defer func() {fmt.Println("bytes read", r.bytesRead)}()
return readContent(r, console, a.Start, a.End)
}
return errors.New("unknown command")
}
func readContent(r io.ReaderAt, console console, start, end int64) error {
lg, err := lsm.ReadLogFile(r)
if err != nil {
return err
}
for bi, rd := range lg.Iter(uint64(start)) {
if end > 0 && int64(bi.Index) >= end {
break
}
fmt.Fprintf(console, "=========================\n%+v:\n", bi)
wr := base64.NewEncoder(base64.RawStdEncoding, console)
io.Copy(wr, rd)
fmt.Fprintln(console, "\n=========================")
}
if lg.Err != nil {
return lg.Err
}
for bi, rd := range lg.Rev(lg.Count()) {
if end > 0 && int64(bi.Index) >= end {
break
}
fmt.Fprintf(console, "=========================\n%+v:\n", bi)
wr := base64.NewEncoder(base64.RawStdEncoding, console)
io.Copy(wr, rd)
fmt.Fprintln(console, "\n=========================")
}
return lg.Err
}
func fileReaders(names []string) iter.Seq[io.Reader] {
return iter.Seq[io.Reader](func(yield func(io.Reader) bool) {
for _, name := range names {
f, err := os.Open(name)
if err != nil {
continue
}
if !yield(f) {
f.Close()
return
}
f.Close()
}
})
}
type HttpReader struct {
ctx context.Context
uri url.URL
tmpfile *os.File
pos int64
end int64
bytesRead int
}
func OpenHttpReader(ctx context.Context, uri string, end int64) (*HttpReader, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
return &HttpReader{ctx: ctx, uri: *u, end: end}, nil
}
func (r *HttpReader) Read(p []byte) (int, error) {
n, err := r.ReadAt(p, r.pos)
if err != nil {
return n, err
}
r.pos += int64(n)
r.bytesRead += n
return n, nil
}
func (r *HttpReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos += offset
case io.SeekEnd:
r.pos = r.end + offset
}
return r.pos, nil
}
func (r *HttpReader) Close() error {
r.ctx.Done()
return nil
}
// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting
// from the specified offset and writes it into the provided data slice. If the
// offset is negative, it returns an error. If the requested read extends beyond
// the buffer's length, it returns the data read so far along with an io.EOF error.
func (r *HttpReader) ReadAt(data []byte, offset int64) (int, error) {
if err := r.ctx.Err(); err != nil {
return 0, err
}
if offset < 0 {
return 0, errors.New("negative offset")
}
if r.end > 0 && offset > r.end {
return 0, io.EOF
}
dlen := len(data) + int(offset)
if r.end > 0 && r.end+int64(dlen) > r.end {
dlen = int(r.end)
}
end := ""
if r.end > 0 {
end = fmt.Sprintf("/%d", r.end)
}
req, err := http.NewRequestWithContext(r.ctx, "GET", r.uri.String(), nil)
if err != nil {
return 0, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d%s", offset, dlen, end))
fmt.Fprintln(Console.Stderr, req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
fmt.Fprintln(Console.Stderr, "requested range not satisfiable")
return 0, io.EOF
}
if resp.StatusCode == http.StatusOK {
r.tmpfile, err = os.CreateTemp("", "httpReader")
if err != nil {
return 0, err
}
defer os.Remove(r.tmpfile.Name())
n, err := io.Copy(r.tmpfile, resp.Body)
if err != nil {
return 0, err
}
r.bytesRead += int(n)
defer fmt.Fprintln(Console.Stderr, "wrote ", n, " bytes to ", r.tmpfile.Name())
resp.Body.Close()
r.tmpfile.Seek(offset, 0)
return io.ReadFull(r.tmpfile, data)
}
n, err := io.ReadFull(resp.Body, data)
if n == 0 && err != nil {
return n, err
}
r.bytesRead += n
defer fmt.Fprintln(Console.Stderr, "read ", n, " bytes")
return n, nil
}

View File

@ -1,104 +0,0 @@
package main
import (
"bytes"
"os"
"testing"
)
func TestCreate(t *testing.T) {
tests := []struct {
name string
args args
wantErr bool
wantOutput string
}{
{
name: "no input files",
args: args{
Create: true,
Archive: "test.txt",
Files: []string{},
},
wantErr: false,
wantOutput: "creating test.txt from []\nwrote 0 files\n",
},
{
name: "one input file",
args: args{
Create: true,
Archive: "test.txt",
Files: []string{"test_input.txt"},
},
wantErr: false,
wantOutput: "creating test.txt from [test_input.txt]\nwrote 1 files\n",
},
{
name: "multiple input files",
args: args{
Create: true,
Archive: "test.txt",
Files: []string{"test_input1.txt", "test_input2.txt"},
},
wantErr: false,
wantOutput: "creating test.txt from [test_input1.txt test_input2.txt]\nwrote 2 files\n",
},
{
name: "non-existent input files",
args: args{
Create: true,
Archive: "test.txt",
Files: []string{"non_existent_file.txt"},
}, wantErr: false,
wantOutput: "creating test.txt from [non_existent_file.txt]\nwrote 0 files\n",
},
{
name: "invalid command",
args: args{
Create: false,
Archive: "test.txt",
Files: []string{},
},
wantErr: true,
wantOutput: "",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Create a temporary directory for the input files
tmpDir, err := os.MkdirTemp("", "lsm2-cli-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
os.Chdir(tmpDir)
// Create the input files
for _, file := range tc.args.Files {
if file == "non_existent_file.txt" {
continue
}
if err := os.WriteFile(file, []byte(file), 0o644); err != nil {
t.Fatal(err)
}
}
// Create a buffer to capture the output
var output bytes.Buffer
// Call the create function
err = run(console{Stdout: &output}, tc.args)
// Check the output
if output.String() != tc.wantOutput {
t.Errorf("run() output = %q, want %q", output.String(), tc.wantOutput)
}
// Check for errors
if tc.wantErr && err == nil {
t.Errorf("run() did not return an error")
}
})
}
}

138
lsm/marshal.go Normal file
View File

@ -0,0 +1,138 @@
package lsm
import (
"bytes"
"encoding"
"encoding/binary"
"fmt"
)
type entry struct {
key string
value uint64
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (e *entry) MarshalBinary() (data []byte, err error) {
data = make([]byte, len(e.key), len(e.key)+binary.MaxVarintLen16)
copy(data, e.key)
data = binary.AppendUvarint(data, e.value)
reverse(data[len(e.key):])
return data, err
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (e *entry) UnmarshalBinary(data []byte) error {
// fmt.Println("unmarshal", data, string(data))
if len(data) < binary.MaxVarintLen16 {
return fmt.Errorf("%w: bad data", ErrDecode)
}
head := make([]byte, binary.MaxVarintLen16)
copy(head, data[max(0, len(data)-cap(head)):])
reverse(head)
size := 0
e.value, size = binary.Uvarint(head)
if size == 0 {
return fmt.Errorf("%w: invalid data", ErrDecode)
}
e.key = string(data[:len(data)-size])
return nil
}
var _ encoding.BinaryMarshaler = (*entry)(nil)
var _ encoding.BinaryUnmarshaler = (*entry)(nil)
type entries []entry
// MarshalBinary implements encoding.BinaryMarshaler.
func (lis *entries) MarshalBinary() (data []byte, err error) {
var buf bytes.Buffer
for _, e := range *lis {
d, err := e.MarshalBinary()
if err != nil {
return nil, err
}
_, err = buf.Write(d)
if err != nil {
return nil, err
}
_, err = buf.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(d)))))
if err != nil {
return nil, err
}
}
return buf.Bytes(), err
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (lis *entries) UnmarshalBinary(data []byte) error {
head := make([]byte, binary.MaxVarintLen16)
pos := uint64(len(data))
for pos > 0 {
copy(head, data[max(0, pos-uint64(cap(head))):])
length, size := binary.Uvarint(reverse(head))
e := entry{}
if err := e.UnmarshalBinary(data[max(0, pos-(length+uint64(size))) : pos-uint64(size)]); err != nil {
return err
}
*lis = append(*lis, e)
pos -= length + uint64(size)
}
reverse(*lis)
return nil
}
var _ encoding.BinaryMarshaler = (*entries)(nil)
var _ encoding.BinaryUnmarshaler = (*entries)(nil)
type segment struct {
entries entries
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (s *segment) MarshalBinary() (data []byte, err error) {
head := header{
entries: uint64(len(s.entries)),
}
data, err = s.entries.MarshalBinary()
if err != nil {
return nil, err
}
head.datalen = uint64(len(data))
h := hash()
h.Write(data)
head.sig = h.Sum(nil)
return head.Append(data), err
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (s *segment) UnmarshalBinary(data []byte) error {
head, err := ReadHead(data)
if err != nil {
return err
}
h := hash()
h.Write(data[:head.datalen])
if !bytes.Equal(head.sig, h.Sum(nil)) {
return fmt.Errorf("%w: invalid checksum", ErrDecode)
}
s.entries = make(entries, 0, head.entries)
return s.entries.UnmarshalBinary(data[:head.datalen])
}

76
lsm/marshal_test.go Normal file
View File

@ -0,0 +1,76 @@
package lsm
import (
"io/fs"
"testing"
"github.com/matryer/is"
)
func TestEncoding(t *testing.T) {
is := is.New(t)
data := segment{entries: entries{
{"key-1", 1},
{"key-2", 2},
{"key-3", 3},
{"longerkey-4", 65535},
}}
b, err := data.MarshalBinary()
is.NoErr(err)
var got segment
err = got.UnmarshalBinary(b)
is.NoErr(err)
is.Equal(data, got)
}
func TestReverse(t *testing.T) {
is := is.New(t)
got := []byte("gnirts a si siht")
reverse(got)
is.Equal(got, []byte("this is a string"))
got = []byte("!gnirts a si siht")
reverse(got)
is.Equal(got, []byte("this is a string!"))
}
func TestFile(t *testing.T) {
is := is.New(t)
entries := entries {
{"key-1", 1},
{"key-2", 2},
{"key-3", 3},
{"longerkey-4", 65535},
}
f := basicFile(t, entries, entries, entries)
sf, err := ReadFile(f)
is.NoErr(err)
is.Equal(len(sf.segments), 3)
}
func basicFile(t *testing.T, lis ...entries) fs.File {
t.Helper()
segments := make([][]byte, len(lis))
var err error
for i, entries := range lis {
data := segment{entries: entries}
segments[i], err = data.MarshalBinary()
if err != nil {
t.Error(err)
}
}
return NewFile(segments...)
}

View File

@ -1,634 +1,370 @@
// SPDX-FileCopyrightText: 2023 Jon Lundy <jon@xuu.cc>
// SPDX-License-Identifier: BSD-3-Clause
// lsm -- Log Structured Merge-Tree
//
// This is a basic LSM tree using a SSTable optimized for append only writing. On disk data is organized into time ordered
// files of segments, containing reverse sorted keys. Each segment ends with a magic value `Souris\x01`, a 4byte hash, count of
// segment entries, and data length.
package lsm package lsm
import ( import (
"bytes"
"encoding"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"io" "io"
"iter" "io/fs"
"slices" "sort"
)
// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size]
// Commit1: [magic>|<end]{10} ... [<count][<size][<flag]{3..30}
// +---------|--------------------------------> end = seek to end of file
// <---|-------------+ size = seek to magic header
// <---|-------------+10 size + 10 = seek to start of file
// <-----------------------------T+10----------------> 10 + size + trailer = full file size
// Commit2: [magic>|<end]{10} ... [<count][<size][<flag]{3..30} ... [<prev][<count][<size][<flag]{4..40}
// <---|---------+
// <-------------+T----------------->
// +--------|------------------------------------------------------------------------->
// <-------------------------------------|----------------+
// prev = seek to last commit <---|-+
// prev + trailer = size of commit <----T+--------------------------------->
// Block: [hash>|<end]{10} ... [<size][<flag]{2..20}
// +---------|------------------------> end = seek to end of block
// <---|-+ size = seek to end of header
// <-------------------|-+10 size + 10 = seek to start of block
// <---------------------T+10---------------> size + 10 + trailer = full block size
const (
TypeUnknown uint64 = iota
TypeSegment
TypeCommit
TypePrevCommit
headerSize = 10
maxCommitSize = 4 * binary.MaxVarintLen64
minCommitSize = 3
maxBlockSize = 2 * binary.MaxVarintLen64
minBlockSize = 2
) )
var ( var (
Magic = [10]byte([]byte("Sour.is\x00\x00\x00")) magic = reverse(append([]byte("Souris"), '\x01'))
Version = uint8(1) hash = fnv.New32a
hash = fnv.New64a hashLength = hash().Size()
// segmentSize = 2 ^ 16 // min 2^9 = 512b, max? 2^20 = 1M
ErrDecode = errors.New("decode") segmentFooterLength = len(magic) + hashLength + binary.MaxVarintLen32 + binary.MaxVarintLen32
) )
type header struct { type header struct {
end uint64 sig []byte // 4Byte signature
extra []byte entries uint64 // count of entries in segment
datalen uint64 // length of data
headlen uint64 // length of header
end int64 // location of end of data/start of header (start of data is `end - datalen`)
} }
// UnmarshalBinary implements encoding.BinaryUnmarshaler. // ReadHead parse header from a segment. reads from the end of slice of length segmentFooterLength
// It decodes the input binary data into the header struct. func ReadHead(data []byte) (*header, error) {
// The function expects the input data to be of a specific size (headerSize), if len(data) < len(magic)+6 {
// otherwise it returns an error indicating bad data. return nil, fmt.Errorf("%w: invalid size", ErrDecode)
// It reads the 'end' field from the binary data, updates the 'extra' field,
// and reverses the byte order of 'extra' in place.
func (h *header) UnmarshalBinary(data []byte) error {
if len(data) != headerSize {
return fmt.Errorf("%w: bad data", ErrDecode)
} }
h.extra = make([]byte, headerSize) if !bytes.Equal(data[len(data)-len(magic):], magic) {
copy(h.extra, data) return nil, fmt.Errorf("%w: invalid header", ErrDecode)
}
var bytesRead int head := make([]byte, 0, segmentFooterLength)
h.end, bytesRead = binary.Uvarint(h.extra) head = reverse(append(head, data[max(0, len(data)-cap(head)-1):]...))
reverse(h.extra) size, s := binary.Uvarint(head[len(magic)+4:])
h.extra = h.extra[:min(8,headerSize-bytesRead)] length, i := binary.Uvarint(head[len(magic)+4+s:])
return nil return &header{
sig: head[len(magic) : len(magic)+4],
entries: size,
datalen: length,
headlen: uint64(len(magic) + hashLength + s + i),
end: int64(len(data)),
}, nil
}
func (h *header) Append(data []byte) []byte {
length := len(data)
data = append(data, h.sig...)
data = binary.AppendUvarint(data, h.entries)
data = binary.AppendUvarint(data, h.datalen)
reverse(data[length:])
return append(data, magic...)
} }
type Commit struct { var _ encoding.BinaryMarshaler = (*segment)(nil)
flag uint64 // flag values var _ encoding.BinaryUnmarshaler = (*segment)(nil)
size uint64 // size of the trailer
count uint64 // number of entries
prev uint64 // previous commit
tsize int var ErrDecode = errors.New("decode")
}
// Append marshals the trailer into binary form and appends it to data. func reverse[T any](b []T) []T {
// It returns the new slice.
func (h *Commit) AppendTrailer(data []byte) []byte {
h.flag |= TypeCommit
// if h.prev > 0 {
// h.flag |= TypePrevCommit
// }
size := len(data)
data = binary.AppendUvarint(data, h.size)
data = binary.AppendUvarint(data, h.flag)
data = binary.AppendUvarint(data, h.count)
// if h.prev > 0 {
// data = binary.AppendUvarint(data, h.prev)
// }
reverse(data[size:])
return data
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
// It reads a trailer from binary data, and sets the fields
// of the receiver to the values found in the header.
func (h *Commit) UnmarshalBinary(data []byte) error {
if len(data) < minCommitSize {
return fmt.Errorf("%w: bad data", ErrDecode)
}
var n int
h.size, n = binary.Uvarint(data)
data = data[n:]
h.tsize += n
h.flag, n = binary.Uvarint(data)
data = data[n:]
h.tsize += n
h.count, n = binary.Uvarint(data)
data = data[n:]
h.tsize += n
// h.prev = h.size
if h.flag&TypePrevCommit == TypePrevCommit {
h.prev, n = binary.Uvarint(data)
h.tsize += n
}
return nil
}
type Block struct {
header
size uint64
flag uint64
tsize int
}
func (h *Block) AppendHeader(data []byte) []byte {
size := len(data)
data = append(data, make([]byte, 10)...)
copy(data, h.extra)
if h.size == 0 {
return data
}
hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end)
reverse(hdata)
copy(data[size+10-len(hdata):], hdata)
return data
}
// AppendTrailer marshals the footer into binary form and appends it to data.
// It returns the new slice.
func (h *Block) AppendTrailer(data []byte) []byte {
size := len(data)
h.flag |= TypeSegment
data = binary.AppendUvarint(data, h.size)
data = binary.AppendUvarint(data, h.flag)
reverse(data[size:])
return data
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
// It reads a footer from binary data, and sets the fields
// of the receiver to the values found in the footer.
func (h *Block) UnmarshalBinary(data []byte) error {
if len(data) < minBlockSize {
return fmt.Errorf("%w: bad data", ErrDecode)
}
var n int
h.size, n = binary.Uvarint(data)
data = data[n:]
h.tsize += n
h.flag, n = binary.Uvarint(data)
h.tsize += n
return nil
}
type logFile struct {
header
Commit
}
func (h *logFile) AppendMagic(data []byte) []byte {
size := len(data)
data = append(data, Magic[:]...)
if h.end == 0 {
return data
}
hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end)
reverse(hdata)
copy(data[size+10-len(hdata):], hdata)
return data
}
// WriteLogFile writes a log file to w, given a list of segments.
// The caller is responsible for calling WriteAt on the correct offset.
// The function will return an error if any of the segments fail to write.
// The offset is the initial offset of the first segment, and will be
// incremented by the length of the segment on each write.
//
// The log file is written with the following format:
// - A header with the magic, version, and flag (Dirty)
// - A series of segments, each with:
// - A footer with the length and hash of the segment
// - The contents of the segment
// - A header with the magic, version, flag (Clean), and end offset
func WriteLogFile(w io.WriterAt, segments iter.Seq[io.Reader]) error {
_, err := w.WriteAt(Magic[:], 0)
if err != nil {
return err
}
lf := &LogWriter{
WriterAt: w,
}
return lf.writeIter(segments)
}
type rw interface {
io.ReaderAt
io.WriterAt
}
func AppendLogFile(rw rw, segments iter.Seq[io.Reader]) error {
logFile, err := ReadLogFile(rw)
if err != nil {
return err
}
lf := &LogWriter{
WriterAt: rw,
logFile: logFile.logFile,
}
return lf.writeIter(segments)
}
func (lf *LogWriter) writeIter(segments iter.Seq[io.Reader]) error {
lf.size = 0
for s := range segments {
n, err := lf.writeBlock(s)
if err != nil {
return err
}
lf.end += n
lf.size += n
lf.count++
}
// Write the footer to the log file.
// The footer is written at the current end of file position.
n, err := lf.WriteAt(lf.AppendTrailer(make([]byte, 0, maxCommitSize)), int64(lf.end)+10)
if err != nil {
// If there is an error, return it.
return err
}
lf.end += uint64(n)
_, err = lf.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0)
return err
}
type LogWriter struct {
logFile
io.WriterAt
}
// writeBlock writes a segment to the log file at the current end of file position.
// The segment is written in chunks of 1024 bytes, and the hash of the segment
func (lf *LogWriter) writeBlock(segment io.Reader) (uint64, error) {
h := hash()
block := Block{}
start := int64(lf.end) + 10
end := start
bytesWritten := 0
// Write the header to the log file.
// The footer is written at the current end of file position.
n, err := lf.WriteAt(make([]byte, headerSize), start)
bytesWritten += n
end += int64(n)
if err != nil {
// If there is an error, return it.
return uint64(bytesWritten), err
}
// Write the segment to the log file.
// The segment is written in chunks of 1024 bytes.
for {
// Read a chunk of the segment.
buf := make([]byte, 1024)
n, err := segment.Read(buf)
if err != nil {
// If the segment is empty, break the loop.
if err == io.EOF {
break
}
// If there is an error, return it.
return uint64(bytesWritten), err
}
// Compute the hash of the chunk.
h.Write(buf[:n])
// Write the chunk to the log file.
// The chunk is written at the current end of file position.
n, err = lf.WriteAt(buf[:n], end)
bytesWritten += n
if err != nil {
// If there is an error, return it.
return uint64(bytesWritten), err
}
// Update the length of the segment.
end += int64(n)
block.size += uint64(n)
}
block.extra = h.Sum(nil)
block.end += block.size
// Write the footer to the log file.
// The footer is written at the current end of file position.
n, err = lf.WriteAt(block.AppendTrailer(make([]byte, 0, maxBlockSize)), end)
bytesWritten += n
if err != nil {
// If there is an error, return it.
return uint64(bytesWritten), err
}
end += int64(n)
block.end += uint64(n)
// Update header to the log file.
// The footer is written at the current end of file position.
_, err = lf.WriteAt(block.AppendHeader(make([]byte, 0, headerSize)), start)
if err != nil {
// If there is an error, return it.
return uint64(bytesWritten), err
}
return uint64(bytesWritten), nil
}
// reverse reverses a slice in-place.
func reverse[T any](b []T) {
l := len(b) l := len(b)
for i := 0; i < l/2; i++ { for i := 0; i < l/2; i++ {
b[i], b[l-i-1] = b[l-i-1], b[i] b[i], b[l-i-1] = b[l-i-1], b[i]
} }
return b
} }
type LogReader struct { // func clone[T ~[]E, E any](e []E) []E {
logFile // return append(e[0:0:0], e...)
io.ReaderAt // }
Err error
type entryBytes []byte
// KeyValue returns the parsed key and value from an entry
func (e entryBytes) KeyValue() ([]byte, uint64) {
if len(e) < 2 {
return nil, 0
}
head := reverse(append(e[0:0:0], e[max(0, len(e)-binary.MaxVarintLen64):]...))
value, i := binary.Uvarint(head)
return append(e[0:0:0], e[:len(e)-i]...), value
} }
// ReadLogFile reads a log file from the given io.ReaderAt. It returns a pointer to a LogFile, or an error if the file // NewKeyValue packed into an entry
// could not be read. func NewKeyValue(key []byte, val uint64) entryBytes {
func ReadLogFile(reader io.ReaderAt) (*LogReader, error) { length := len(key)
header := make([]byte, headerSize) data := append(key[0:0:0], key...)
n, err := rsr(reader, 0, 10).ReadAt(header, 0) data = binary.AppendUvarint(data, val)
reverse(data[length:])
return data
}
type listEntries []entryBytes
// WriteTo implements io.WriterTo.
func (lis *listEntries) WriteTo(wr io.Writer) (int64, error) {
if lis == nil {
return 0, nil
}
head := header{
entries: uint64(len(*lis)),
}
h := hash()
wr = io.MultiWriter(wr, h)
var i int64
for _, b := range *lis {
j, err := wr.Write(b)
i += int64(j)
if err != nil { if err != nil {
return nil, err return i, err
} }
header = header[:n]
logFile := &LogReader{ReaderAt: reader} j, err = wr.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(b)))))
err = logFile.header.UnmarshalBinary(header) i += int64(j)
if err != nil { if err != nil {
return nil, err return i, err
}
if logFile.end == 0 {
return logFile, nil
}
commit := make([]byte, maxCommitSize)
n, err = rsr(reader, 10, int64(logFile.end)).ReadAt(commit, 0)
if n == 0 && err != nil {
return nil, err
}
commit = commit[:n]
err = logFile.Commit.UnmarshalBinary(commit)
return logFile, err
}
// Iterate reads the log file and calls the given function for each segment.
// It passes an io.Reader that reads from the current segment. It will stop
// calling the function if the function returns false.
func (lf *LogReader) Iter(begin uint64) iter.Seq2[blockInfo, io.Reader] {
var commits []*Commit
for commit := range lf.iterCommits() {
commits = append(commits, &commit)
}
if lf.Err != nil {
return func(yield func(blockInfo, io.Reader) bool) {}
}
reverse(commits)
return func(yield func(blockInfo, io.Reader) bool) {
start := int64(10)
var adj uint64
for _, commit := range commits {
size := int64(commit.size)
it := iterBlocks(io.NewSectionReader(lf, start, size), size)
for bi, block := range it {
bi.Commit = *commit
bi.Index += adj
bi.Start += uint64(start)
if begin <= bi.Index {
if !yield(bi, block) {
return
}
} }
} }
head.datalen = uint64(i)
head.sig = h.Sum(nil)
start += size + int64(commit.tsize) b := head.Append([]byte{})
adj = commit.count j, err := wr.Write(b)
} i += int64(j)
}
}
type blockInfo struct{
Index uint64
Commit Commit
Start uint64
Size uint64
Hash []byte
}
func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[blockInfo, io.Reader] {
var start int64
var i uint64
var bi blockInfo
return func(yield func(blockInfo, io.Reader) bool) {
buf := make([]byte, maxBlockSize)
for start < end {
block := &Block{}
buf = buf[:10]
n, err := rsr(r, int64(start), 10).ReadAt(buf, 0)
if n == 0 && err != nil {
return
}
start += int64(n)
if err := block.header.UnmarshalBinary(buf); err != nil {
return
}
buf = buf[:maxBlockSize]
n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0)
if n == 0 && err != nil {
return
}
buf = buf[:n]
err = block.UnmarshalBinary(buf)
if err != nil {
return
}
bi.Index = i
bi.Start = uint64(start)
bi.Size = block.size
bi.Hash = block.extra
if !yield(bi, io.NewSectionReader(r, int64(start), int64(block.size))) {
return
}
i++
start += int64(block.end)
}
}
}
func (lf *LogReader) iterCommits() iter.Seq[Commit] {
if lf.end == 0 {
return slices.Values([]Commit(nil))
}
offset := lf.end - lf.size - uint64(lf.tsize)
return func(yield func(Commit) bool) {
if !yield(lf.Commit) {
return
}
buf := make([]byte, maxCommitSize)
for offset > 10 {
commit := Commit{}
buf = buf[:10]
n, err := rsr(lf, 10, int64(offset)).ReadAt(buf, 0)
if n == 0 && err != nil {
lf.Err = err
return
}
buf = buf[:n]
err = commit.UnmarshalBinary(buf)
if err != nil {
lf.Err = err
return
}
if !yield(commit) {
return
}
offset -= commit.size + uint64(commit.tsize)
}
}
}
func (lf *LogReader) Rev(begin uint64) iter.Seq2[blockInfo, io.Reader] {
end := lf.end + 10
bi := blockInfo{}
bi.Index = lf.count-1
return func(yield func(blockInfo, io.Reader) bool) {
buf := make([]byte, maxBlockSize)
for commit := range lf.iterCommits() {
end -= uint64(commit.tsize)
start := end - commit.size
bi.Commit = commit
for start < end {
block := &Block{}
buf = buf[:maxBlockSize]
n, err := rsr(lf, int64(start), int64(commit.size)).ReadAt(buf, 0)
if n == 0 && err != nil {
lf.Err = err
return
}
buf = buf[:n]
err = block.UnmarshalBinary(buf)
if err != nil {
lf.Err = err
return
}
if begin >= bi.Index {
bi.Start = uint64(end-block.size)-uint64(block.tsize)
bi.Size = block.size
buf = buf[:10]
_, err = rsr(lf, int64(bi.Start)-10, 10).ReadAt(buf, 0)
if err != nil {
lf.Err = err
return
}
err = block.header.UnmarshalBinary(buf)
if err != nil {
lf.Err = err
return
}
bi.Hash = block.extra
if !yield(bi, io.NewSectionReader(lf, int64(bi.Start), int64(bi.Size))) {
return
}
}
end -= block.size + 10 + uint64(block.tsize)
bi.Index--
}
}
}
}
func (lf *LogReader) Count() uint64 {
return lf.count
}
func (lf *LogReader) Size() uint64 {
return lf.end + 10
}
func rsr(r io.ReaderAt, offset, size int64) *revSegmentReader {
r = io.NewSectionReader(r, offset, size)
return &revSegmentReader{r, size}
}
type revSegmentReader struct {
io.ReaderAt
size int64
}
func (r *revSegmentReader) ReadAt(data []byte, offset int64) (int, error) {
if offset < 0 {
return 0, errors.New("negative offset")
}
if offset > int64(r.size) {
return 0, io.EOF
}
o := r.size - int64(len(data)) - offset
d := int64(len(data))
if o < 0 {
d = max(0, d+o)
}
i, err := r.ReaderAt.ReadAt(data[:d], max(0, o))
reverse(data[:i])
return i, err return i, err
} }
var _ sort.Interface = listEntries{}
// Len implements sort.Interface.
func (lis listEntries) Len() int {
return len(lis)
}
// Less implements sort.Interface.
func (lis listEntries) Less(i int, j int) bool {
iname, _ := lis[i].KeyValue()
jname, _ := lis[j].KeyValue()
return bytes.Compare(iname, jname) < 0
}
// Swap implements sort.Interface.
func (lis listEntries) Swap(i int, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
type segmentReader struct {
head *header
rd io.ReaderAt
}
// FirstEntry parses the first segment entry from the end of the segment
func (s *segmentReader) FirstEntry() (*entryBytes, error) {
e, _, err := s.readEntryAt(-1)
return e, err
}
func (s *segmentReader) VerifyHash() (bool, error) {
h := hash()
data := make([]byte, s.head.datalen)
_, err := s.rd.ReadAt(data, s.head.end-int64(s.head.datalen))
if err != nil {
return false, err
}
_, err = h.Write(data)
ok := bytes.Equal(h.Sum(nil), s.head.sig)
return ok, err
}
// Find locates needle within a segment. if it cant find it will return the nearest key before needle.
func (s *segmentReader) Find(needle []byte, first bool) (*entryBytes, bool, error) {
if s == nil {
return nil, false, nil
}
e, pos, err := s.readEntryAt(-1)
if err != nil {
return nil, false, err
}
last := e
found := false
for pos > 0 {
key, _ := e.KeyValue()
switch bytes.Compare(key, needle) {
case 1: // key=ccc, needle=bbb
return last, found, nil
case 0: // equal
if first {
return e, true, nil
}
found = true
fallthrough
case -1: // key=aaa, needle=bbb
last = e
e, pos, err = s.readEntryAt(pos)
if err != nil {
return nil, found, err
}
}
}
return last, found, nil
}
func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) {
if pos < 0 {
pos = s.head.end
}
head := make([]byte, binary.MaxVarintLen16)
s.rd.ReadAt(head, pos-binary.MaxVarintLen16)
length, hsize := binary.Uvarint(reverse(head))
e := make(entryBytes, length)
_, err := s.rd.ReadAt(e, pos-int64(length)-int64(hsize))
return &e, pos - int64(length) - int64(hsize), err
}
type logFile struct {
rd interface {
io.ReaderAt
io.WriterTo
}
segments []segmentReader
fs.File
}
func ReadFile(fd fs.File) (*logFile, error) {
l := &logFile{File: fd}
stat, err := fd.Stat()
if err != nil {
return nil, err
}
eof := stat.Size()
if rd, ok := fd.(interface {
io.ReaderAt
io.WriterTo
}); ok {
l.rd = rd
} else {
rd, err := io.ReadAll(fd)
if err != nil {
return nil, err
}
l.rd = bytes.NewReader(rd)
}
head := make([]byte, segmentFooterLength)
for eof > 0 {
_, err = l.rd.ReadAt(head, eof-int64(segmentFooterLength))
if err != nil {
return nil, err
}
s := segmentReader{
rd: l.rd,
}
s.head, err = ReadHead(head)
s.head.end = eof - int64(s.head.headlen)
if err != nil {
return nil, err
}
eof -= int64(s.head.datalen) + int64(s.head.headlen)
l.segments = append(l.segments, s)
}
return l, nil
}
func (l *logFile) Count() int64 {
return int64(len(l.segments))
}
func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) {
if pos < 0 {
pos = int64(len(l.segments) - 1)
}
if pos > int64(len(l.segments)-1) {
return nil, ErrDecode
}
s := l.segments[pos]
b := make([]byte, s.head.datalen+s.head.headlen)
_, err := l.rd.ReadAt(b, s.head.end-int64(len(b)))
if err != nil {
return nil, err
}
return &segmentBytes{b, -1}, nil
}
func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) {
var cur, last segmentReader
for _, s := range l.segments {
cur = s
e, err := cur.FirstEntry()
if err != nil {
return nil, false, err
}
k, _ := e.KeyValue()
if first && bytes.Equal(k, needle) {
break
}
if first && bytes.Compare(k, needle) > 0 {
e, ok, err := cur.Find(needle, first)
if ok || err != nil{
return e, ok, err
}
break
}
if !first && bytes.Compare(k, needle) > 0 {
break
}
last = s
}
e, ok, err := last.Find(needle, first)
if ok || err != nil{
return e, ok, err
}
// if by mistake it was not found in the last.. check the next segment.
return cur.Find(needle, first)
}
func (l *logFile) WriteTo(w io.Writer) (int64, error) {
return l.rd.WriteTo(w)
}
type segmentBytes struct {
b []byte
pos int
}
type dataset struct {
rd io.ReaderAt
files []logFile
fs.FS
}
func ReadDataset(fd fs.FS) (*dataset, error) {
panic("not implemented")
}

View File

@ -1,300 +1,327 @@
// SPDX-FileCopyrightText: 2023 Jon Lundy <jon@xuu.cc>
// SPDX-License-Identifier: BSD-3-Clause
package lsm package lsm
import ( import (
"bytes" "bytes"
crand "crypto/rand"
"encoding/base64" "encoding/base64"
"errors"
"io" "io"
"iter" "io/fs"
"slices" "math/rand"
"os"
"sort"
"sync"
"testing" "testing"
"time"
"github.com/docopt/docopt-go"
"github.com/matryer/is" "github.com/matryer/is"
) )
// TestWriteLogFile tests AppendLogFile and WriteLogFile against a set of test cases. func TestLargeFile(t *testing.T) {
//
// Each test case contains a slice of slices of io.Readers, which are passed to
// AppendLogFile and WriteLogFile in order. The test case also contains the
// expected encoded output as a base64 string, as well as the expected output
// when the file is read back using ReadLogFile.
//
// The test case also contains the expected output when the file is read back in
// reverse order using ReadLogFile.Rev().
//
// The test cases are as follows:
//
// - nil reader: Passes a nil slice of io.Readers to WriteLogFile.
// - err reader: Passes a slice of io.Readers to WriteLogFile which returns an
// error when read.
// - single reader: Passes a single io.Reader to WriteLogFile.
// - multiple readers: Passes a slice of multiple io.Readers to WriteLogFile.
// - multiple commit: Passes multiple slices of io.Readers to AppendLogFile.
// - multiple commit 3x: Passes multiple slices of io.Readers to AppendLogFile
// three times.
//
// The test uses the is package from github.com/matryer/is to check that the
// output matches the expected output.
func TestWriteLogFile(t *testing.T) {
type test struct {
name string
in [][]io.Reader
enc string
out [][]byte
rev [][]byte
}
tests := []test{
{
name: "nil reader",
in: nil,
enc: "U291ci5pcwAAAwACAA",
out: [][]byte{},
rev: [][]byte{},
},
{
name: "err reader",
in: nil,
enc: "U291ci5pcwAAAwACAA",
out: [][]byte{},
rev: [][]byte{},
},
{
name: "single reader",
in: [][]io.Reader{
{
bytes.NewBuffer([]byte{1, 2, 3, 4})}},
enc: "U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA",
out: [][]byte{{1, 2, 3, 4}},
rev: [][]byte{{1, 2, 3, 4}}},
{
name: "multiple readers",
in: [][]io.Reader{
{
bytes.NewBuffer([]byte{1, 2, 3, 4}),
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
enc: "U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg",
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
{
name: "multiple commit",
in: [][]io.Reader{
{
bytes.NewBuffer([]byte{1, 2, 3, 4})},
{
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
enc: "U291ci5pcwAAJr56XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEAgIQ",
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
{
name: "multiple commit",
in: [][]io.Reader{
{
bytes.NewBuffer([]byte{1, 2, 3, 4}),
bytes.NewBuffer([]byte{5, 6, 7, 8})},
{
bytes.NewBuffer([]byte{9, 10, 11, 12})},
},
enc: "U291ci5pcwAANr56XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIgA4Buuio8Ro0ABgkKCwwBBAMCEA",
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}},
rev: [][]byte{{9, 10, 11, 12}, {5, 6, 7, 8}, {1, 2, 3, 4}}},
{
name: "multiple commit 3x",
in: [][]io.Reader{
{
bytes.NewBuffer([]byte{1, 2, 3}),
bytes.NewBuffer([]byte{4, 5, 6}),
},
{
bytes.NewBuffer([]byte{7, 8, 9}),
},
{
bytes.NewBuffer([]byte{10, 11, 12}),
bytes.NewBuffer([]byte{13, 14, 15}),
},
},
enc: "U291ci5pcwAAVNCqYhhnLPWrAAUBAgMBA7axWhhYd+HsAAUEBQYBAwICHr9ryhhdbkEZAAUHCAkBAwMCDy/UIhidCwCqAAUKCwwBA/NCwhh6wXgXAAUNDg8BAwUCHg",
out: [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}},
rev: [][]byte{{13, 14, 15}, {10, 11, 12}, {7, 8, 9}, {4, 5, 6}, {1, 2, 3}}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
is := is.New(t) is := is.New(t)
buf := &buffer{}
buffers := 0 segCount := 4098
if len(test.in) == 0 { f := randFile(t, 2_000_000, segCount)
err := WriteLogFile(buf, slices.Values([]io.Reader{}))
is.NoErr(err)
}
for i, in := range test.in {
buffers += len(in)
if i == 0 { sf, err := ReadFile(f)
err := WriteLogFile(buf, slices.Values(in))
is.NoErr(err)
} else {
err := AppendLogFile(buf, slices.Values(in))
is.NoErr(err)
}
}
is.Equal(base64.RawStdEncoding.EncodeToString(buf.Bytes()), test.enc)
files, err := ReadLogFile(bytes.NewReader(buf.Bytes()))
is.NoErr(err) is.NoErr(err)
is.Equal(files.Size(), uint64(len(buf.Bytes()))) is.True(len(sf.segments) <= segCount)
var needle []byte
i := 0 for i, s := range sf.segments {
for bi, fp := range files.Iter(0) { e, err := s.FirstEntry()
buf, err := io.ReadAll(fp)
is.NoErr(err) is.NoErr(err)
k, v := e.KeyValue()
hash := hash() needle = k
hash.Write(buf) t.Logf("Segment-%d: %s = %d", i, k, v)
is.Equal(bi.Hash, hash.Sum(nil)[:len(bi.Hash)])
is.True(len(test.out) > int(bi.Index))
is.Equal(buf, test.out[bi.Index])
i++
} }
is.NoErr(files.Err) t.Log(f.Stat())
is.Equal(i, buffers)
i = 0 tt, ok, err := sf.Find(needle, true)
for bi, fp := range files.Rev(files.Count()) {
buf, err := io.ReadAll(fp)
is.NoErr(err) is.NoErr(err)
is.True(ok)
key, val := tt.KeyValue()
t.Log(string(key), val)
hash := hash() tt, ok, err = sf.Find([]byte("needle"), false)
hash.Write(buf) is.NoErr(err)
is.Equal(bi.Hash, hash.Sum(nil)[:len(bi.Hash)]) is.True(!ok)
key, val = tt.KeyValue()
t.Log(string(key), val)
is.Equal(buf, test.rev[i]) tt, ok, err = sf.Find([]byte{'\xff'}, false)
is.Equal(buf, test.out[bi.Index]) is.NoErr(err)
i++ is.True(!ok)
} key, val = tt.KeyValue()
is.NoErr(files.Err) t.Log(string(key), val)
is.Equal(i, buffers)
is.Equal(files.Count(), uint64(i))
})
}
} }
// TestArgs tests that the CLI arguments are correctly parsed. func TestLargeFileDisk(t *testing.T) {
func TestArgs(t *testing.T) {
is := is.New(t) is := is.New(t)
usage := `Usage: lsm2 create <archive> <files>...`
arguments, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0") segCount := 4098
t.Log("generate large file")
f := randFile(t, 2_000_000, segCount)
fd, err := os.CreateTemp("", "sst*")
is.NoErr(err)
defer func() { t.Log("cleanup:", fd.Name()); fd.Close(); os.Remove(fd.Name()) }()
t.Log("write file:", fd.Name())
_, err = io.Copy(fd, f)
is.NoErr(err)
fd.Seek(0, 0)
sf, err := ReadFile(fd)
is.NoErr(err) is.NoErr(err)
var params struct { is.True(len(sf.segments) <= segCount)
Create bool `docopt:"create"` var needle []byte
Archive string `docopt:"<archive>"` for i, s := range sf.segments {
Files []string `docopt:"<files>"` e, err := s.FirstEntry()
} is.NoErr(err)
err = arguments.Bind(&params) k, v := e.KeyValue()
needle = k
ok, err := s.VerifyHash()
is.NoErr(err) is.NoErr(err)
is.Equal(params.Create, true) t.Logf("Segment-%d: %s = %d %t", i, k, v, ok)
is.Equal(params.Archive, "archive") is.True(ok)
is.Equal(params.Files, []string{"file1", "file2"}) }
t.Log(f.Stat())
tt, ok, err := sf.Find(needle, false)
is.NoErr(err)
is.True(ok)
key, val := tt.KeyValue()
t.Log(string(key), val)
tt, ok, err = sf.Find([]byte("needle"), false)
is.NoErr(err)
is.True(!ok)
key, val = tt.KeyValue()
t.Log(string(key), val)
tt, ok, err = sf.Find([]byte{'\xff'}, false)
is.NoErr(err)
is.True(!ok)
key, val = tt.KeyValue()
t.Log(string(key), val)
} }
func BenchmarkIterate(b *testing.B) { func BenchmarkLargeFile(b *testing.B) {
block := make([]byte, 1024) segCount := 4098 / 4
buf := &buffer{} f := randFile(b, 2_000_000, segCount)
sf, err := ReadFile(f)
if err != nil {
b.Error(err)
}
key := make([]byte, 5)
keys := make([][]byte, b.N)
for i := range keys {
_, err = crand.Read(key)
if err != nil {
b.Error(err)
}
keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key))
}
b.Log("ready", b.N)
b.ResetTimer()
okays := 0
each := b.N / 10
for n := 0; n < b.N; n++ {
if each > 0 && n%each == 0 {
b.Log(n)
}
_, ok, err := sf.Find(keys[n], false)
if err != nil {
b.Error(err)
}
if ok {
okays++
}
}
b.Log("okays=", b.N, okays)
}
// TestFindRange is an initial range find for start and stop of a range of needles.
// TODO: start the second query from where the first left off. Use an iterator?
func TestFindRange(t *testing.T) {
is := is.New(t)
f := basicFile(t,
entries{
{"AD", 5},
{"AC", 5},
{"AB", 4},
{"AB", 3},
},
entries{
{"AB", 2},
{"AA", 1},
},
)
sf, err := ReadFile(f)
is.NoErr(err)
var ok bool
var first, last *entryBytes
first, ok, err = sf.Find([]byte("AB"), true)
is.NoErr(err)
key, val := first.KeyValue()
t.Log(string(key), val)
is.True(ok)
is.Equal(key, []byte("AB"))
is.Equal(val, uint64(2))
last, ok, err = sf.Find([]byte("AB"), false)
is.NoErr(err)
key, val = last.KeyValue()
t.Log(string(key), val)
is.True(ok)
is.Equal(key, []byte("AB"))
is.Equal(val, uint64(4))
b.Run("write", func(b *testing.B) { last, ok, err = sf.Find([]byte("AC"), false)
WriteLogFile(buf, func(yield func(io.Reader) bool) { is.NoErr(err)
for range (b.N) {
if !yield(bytes.NewBuffer(block)) { key, val = last.KeyValue()
t.Log(string(key), val)
is.True(ok)
is.Equal(key, []byte("AC"))
is.Equal(val, uint64(5))
}
func randFile(t interface {
Helper()
Error(...any)
}, size int, segments int) fs.File {
t.Helper()
lis := make(listEntries, size)
for i := range lis {
key := make([]byte, 5)
_, err := crand.Read(key)
if err != nil {
t.Error(err)
}
key = []byte(base64.RawURLEncoding.EncodeToString(key))
// key := []byte(fmt.Sprintf("key-%05d", i))
lis[i] = NewKeyValue(key, rand.Uint64()%16_777_216)
}
sort.Sort(sort.Reverse(&lis))
each := size / segments
if size%segments != 0 {
each++
}
split := make([]listEntries, segments)
for i := range split {
if (i+1)*each > len(lis) {
split[i] = lis[i*each : i*each+len(lis[i*each:])]
split = split[:i+1]
break break
} }
split[i] = lis[i*each : (i+1)*each]
} }
})
})
b.Run("read", func(b *testing.B) { var b bytes.Buffer
lf, _ := ReadLogFile(buf) for _, s := range split {
b.Log(lf.Count()) s.WriteTo(&b)
for range (b.N) {
for _, fp := range lf.Iter(0) {
_, _ = io.Copy(io.Discard, fp)
break
} }
}
})
b.Run("rev", func(b *testing.B) { return NewFile(b.Bytes())
lf, _ := ReadLogFile(buf)
b.Log(lf.Count())
for range (b.N) {
for _, fp := range lf.Rev(lf.Count()) {
_, _ = io.Copy(io.Discard, fp)
break
}
}
})
} }
type buffer struct { type fakeStat struct {
buf []byte size int64
} }
// Bytes returns the underlying byte slice of the bufferWriterAt. // IsDir implements fs.FileInfo.
func (b *buffer) Bytes() []byte { func (*fakeStat) IsDir() bool {
return b.buf panic("unimplemented")
} }
// WriteAt implements io.WriterAt. It appends data to the internal buffer // ModTime implements fs.FileInfo.
// if the offset is beyond the current length of the buffer. It will func (*fakeStat) ModTime() time.Time {
// return an error if the offset is negative. panic("unimplemented")
func (b *buffer) WriteAt(data []byte, offset int64) (written int, err error) {
if offset < 0 {
return 0, errors.New("negative offset")
}
currentLength := int64(len(b.buf))
if currentLength < offset+int64(len(data)) {
b.buf = append(b.buf, make([]byte, offset+int64(len(data))-currentLength)...)
}
written = copy(b.buf[offset:], data)
return
} }
// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting // Mode implements fs.FileInfo.
// from the specified offset and writes it into the provided data slice. If the func (*fakeStat) Mode() fs.FileMode {
// offset is negative, it returns an error. If the requested read extends beyond panic("unimplemented")
// the buffer's length, it returns the data read so far along with an io.EOF error.
func (b *buffer) ReadAt(data []byte, offset int64) (int, error) {
if offset < 0 {
return 0, errors.New("negative offset")
}
if offset > int64(len(b.buf)) || len(b.buf[offset:]) < len(data) {
return copy(data, b.buf[offset:]), io.EOF
}
return copy(data, b.buf[offset:]), nil
} }
// IterOne takes an iterator that yields values of type T along with a value of // Name implements fs.FileInfo.
// type I, and returns an iterator that yields only the values of type T. It func (*fakeStat) Name() string {
// discards the values of type I. panic("unimplemented")
func IterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] {
return func(yield func(T) bool) {
for i, v := range it {
_ = i
if !yield(v) {
return
}
}
}
} }
// Size implements fs.FileInfo.
func (s *fakeStat) Size() int64 {
return s.size
}
// Sys implements fs.FileInfo.
func (*fakeStat) Sys() any {
panic("unimplemented")
}
var _ fs.FileInfo = (*fakeStat)(nil)
type rd interface {
io.ReaderAt
io.Reader
}
type fakeFile struct {
stat func() fs.FileInfo
rd
}
func (fakeFile) Close() error { return nil }
func (f fakeFile) Stat() (fs.FileInfo, error) { return f.stat(), nil }
func NewFile(b ...[]byte) fs.File {
in := bytes.Join(b, nil)
rd := bytes.NewReader(in)
size := int64(len(in))
return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: size} }, rd: rd}
}
func NewFileFromReader(rd *bytes.Reader) fs.File {
return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: int64(rd.Len())} }, rd: rd}
}
type fakeFS struct {
files map[string]*fakeFile
mu sync.RWMutex
}
// Open implements fs.FS.
func (f *fakeFS) Open(name string) (fs.File, error) {
f.mu.RLock()
defer f.mu.RUnlock()
if file, ok := f.files[name]; ok {
return file, nil
}
return nil, fs.ErrNotExist
}
var _ fs.FS = (*fakeFS)(nil)