From 36460a131ea3f79b3ee9f417bfb5dcfb9b757bae Mon Sep 17 00:00:00 2001 From: xuu Date: Sat, 2 Nov 2024 09:00:17 -0600 Subject: [PATCH] forward iteration --- go.mod | 5 +- go.sum | 4 + lsm2/cli/main.go | 85 ++++++ lsm2/cli/main_test.go | 104 +++++++ lsm2/sst.go | 641 ++++++++++++++++++++++++++++++++++++++++++ lsm2/sst_test.go | 168 +++++++++++ 6 files changed, 1005 insertions(+), 2 deletions(-) create mode 100644 lsm2/cli/main.go create mode 100644 lsm2/cli/main_test.go create mode 100644 lsm2/sst.go create mode 100644 lsm2/sst_test.go diff --git a/go.mod b/go.mod index 8e9e505..de99e0b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module go.sour.is/pkg -go 1.22.0 +go 1.23.1 require ( github.com/99designs/gqlgen v0.17.44 @@ -9,7 +9,7 @@ require ( github.com/matryer/is v1.4.1 github.com/ravilushqa/otelgqlgen v0.15.0 github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1 - github.com/vektah/gqlparser/v2 v2.5.11 + github.com/vektah/gqlparser/v2 v2.5.14 go.opentelemetry.io/otel v1.23.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 go.opentelemetry.io/otel/sdk/metric v1.23.1 @@ -54,6 +54,7 @@ require ( github.com/BurntSushi/toml v1.3.2 github.com/Masterminds/squirrel v1.5.4 github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f diff --git a/go.sum b/go.sum index 50d7198..799c27e 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= +github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -135,6 +137,8 @@ github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1 h1:lQwP++j github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1/go.mod h1:sb520Yr+GHBsfL43FQgQ+rLFfuJkItgRWlTgbIQHVxA= github.com/vektah/gqlparser/v2 v2.5.11 h1:JJxLtXIoN7+3x6MBdtIP59TP1RANnY7pXOaDnADQSf8= github.com/vektah/gqlparser/v2 v2.5.11/go.mod h1:1rCcfwB2ekJofmluGWXMSEnPMZgbxzwj6FaZ/4OT8Cc= +github.com/vektah/gqlparser/v2 v2.5.14 h1:dzLq75BJe03jjQm6n56PdH1oweB8ana42wj7E4jRy70= +github.com/vektah/gqlparser/v2 v2.5.14/go.mod h1:WQQjFc+I1YIzoPvZBhUQX7waZgg3pMLi0r8KymvAE2w= github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= diff --git a/lsm2/cli/main.go b/lsm2/cli/main.go new file mode 100644 index 0000000..f75ad91 --- /dev/null +++ b/lsm2/cli/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "errors" + "fmt" + "io" + "iter" + "os" + "path/filepath" + + "github.com/docopt/docopt-go" + "go.sour.is/pkg/lsm2" +) + +var usage = ` +Usage: lsm2 create ...` + +type args struct { + Create bool + Archive string `docopt:""` + Files []string `docopt:""` +} + +func main() { + opts, err := docopt.ParseDoc(usage) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(opts, must(opts.Bool("create"))) + console := console{os.Stdin, os.Stdout, os.Stderr} + args := args{} + err = opts.Bind(&args) + fmt.Println(err) + run(console, args) +} + +type console struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +func (c console) Write(b []byte) (int, error) { + return c.Stdout.Write(b) +} + +func run(console console, args args) error { + switch { + case args.Create: + fmt.Fprintf(console, "creating %s from %v\n", filepath.Base(args.Archive), args.Files) + out, err := os.OpenFile(args.Archive, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer out.Close() + + filesWritten := 0 + defer func() { fmt.Fprintln(console, "wrote", filesWritten, "files") }() + + return lsm2.WriteIter(out, iter.Seq[io.Reader](func(yield func(io.Reader) bool) { + for _, name := range args.Files { + f, err := os.Open(name) + if err != nil { + continue + } + filesWritten++ + if !yield(f) { + f.Close() + return + } + f.Close() + } + })) + default: + return errors.New("unknown command") + } +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/lsm2/cli/main_test.go b/lsm2/cli/main_test.go new file mode 100644 index 0000000..a50dfd6 --- /dev/null +++ b/lsm2/cli/main_test.go @@ -0,0 +1,104 @@ +package main + +import ( + "bytes" + "os" + "testing" +) + +func TestCreate(t *testing.T) { + tests := []struct { + name string + args args + wantErr bool + wantOutput string + }{ + { + name: "no input files", + args: args{ + Create: true, + Archive: "test.txt", + Files: []string{}, + }, + wantErr: false, + wantOutput: "creating test.txt from []\nwrote 0 files\n", + }, + { + name: "one input file", + args: args{ + Create: true, + Archive: "test.txt", + Files: []string{"test_input.txt"}, + }, + wantErr: false, + wantOutput: "creating test.txt from [test_input.txt]\nwrote 1 files\n", + }, + { + name: "multiple input files", + args: args{ + Create: true, + Archive: "test.txt", + Files: []string{"test_input1.txt", "test_input2.txt"}, + }, + wantErr: false, + wantOutput: "creating test.txt from [test_input1.txt test_input2.txt]\nwrote 2 files\n", + }, + { + name: "non-existent input files", + args: args{ + Create: true, + Archive: "test.txt", + Files: []string{"non_existent_file.txt"}, + }, wantErr: false, + wantOutput: "creating test.txt from [non_existent_file.txt]\nwrote 0 files\n", + }, + { + name: "invalid command", + args: args{ + Create: false, + Archive: "test.txt", + Files: []string{}, + }, + wantErr: true, + wantOutput: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Create a temporary directory for the input files + tmpDir, err := os.MkdirTemp("", "lsm2-cli-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + os.Chdir(tmpDir) + + // Create the input files + for _, file := range tc.args.Files { + if file == "non_existent_file.txt" { + continue + } + if err := os.WriteFile(file, []byte(file), 0o644); err != nil { + t.Fatal(err) + } + } + + // Create a buffer to capture the output + var output bytes.Buffer + + // Call the create function + err = run(console{Stdout: &output}, tc.args) + + // Check the output + if output.String() != tc.wantOutput { + t.Errorf("run() output = %q, want %q", output.String(), tc.wantOutput) + } + + // Check for errors + if tc.wantErr && err == nil { + t.Errorf("run() did not return an error") + } + }) + } +} diff --git a/lsm2/sst.go b/lsm2/sst.go new file mode 100644 index 0000000..3c3aba8 --- /dev/null +++ b/lsm2/sst.go @@ -0,0 +1,641 @@ +package lsm2 + +// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size] + +import ( + "encoding/binary" + "errors" + "fmt" + "hash/fnv" + "io" + "iter" +) + +const ( + TypeUnknown uint64 = iota + TypeSegment + TypeCommit + TypePrevCommit + + headerSize = 10 + + maxCommitSize = 4 * binary.MaxVarintLen64 + minCommitSize = 3 + + maxBlockSize = 2 * binary.MaxVarintLen64 + minBlockSize = 2 +) + +var ( + Magic = [10]byte([]byte("Sour.is\x00\x00\x00")) + Version = uint8(1) + hash = fnv.New64a + + ErrDecode = errors.New("decode") +) + +type header struct { + end uint64 + extra []byte +} + +func (h *header) UnmarshalBinary(data []byte) error { + if len(data) != 10 { + return fmt.Errorf("%w: bad data", ErrDecode) + } + h.extra = append(h.extra, data...) + + var n int + h.end, n = binary.Uvarint(h.extra) + reverse(h.extra) + h.extra = h.extra[:len(h.extra)-n] + + return nil +} + +// Commit1: [magic>| end = seek to end of file +// <---|-------------+ size = seek to magic header +// <---|-------------+10 size + 10 = seek to start of file +// <-----------------------------T+10----------------> 10 + size + trailer = full file size + +// Commit2: [magic>| +// +--------|-------------------------------------------------------------------------> +// <-------------------------------------|----------------+ +// prev = seek to last commit <---|-+ +// prev + trailer = size of commit <----T+---------------------------------> + +// Block: [hash>| end = seek to end of block +// <---|-+ size = seek to end of header +// <-------------------|-+10 size + 10 = seek to start of block +// <---------------------T+10---------------> size + 10 + trailer = full block size + +type Commit struct { + flag uint64 // flag values + size uint64 // size of the trailer + count uint64 // number of entries + prev uint64 // previous commit + + tsize int +} + +// Append marshals the trailer into binary form and appends it to data. +// It returns the new slice. +func (h *Commit) AppendTrailer(data []byte) []byte { + h.flag |= TypePrevCommit + if h.prev == 0 { + h.flag &= TypeCommit + } + + size := len(data) + data = binary.AppendUvarint(data, h.size) + data = binary.AppendUvarint(data, h.flag) + data = binary.AppendUvarint(data, h.count) + if h.prev != 0 { + data = binary.AppendUvarint(data, h.prev) + } + reverse(data[size:]) + + return data +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +// It reads a trailer from binary data, and sets the fields +// of the receiver to the values found in the header. +func (h *Commit) UnmarshalBinary(data []byte) error { + if len(data) < minCommitSize { + return fmt.Errorf("%w: bad data", ErrDecode) + } + + var n int + h.size, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + h.flag, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + h.count, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + h.prev = h.size + if h.flag&TypePrevCommit == TypePrevCommit { + h.prev, n = binary.Uvarint(data) + h.tsize += n + } + + return nil +} + +type Block struct { + header + + size uint64 + flag uint64 + + tsize int +} + +func (h *Block) AppendHeader(data []byte) []byte { + size := len(data) + data = append(data, make([]byte, 10)...) + copy(data, h.extra) + if h.size == 0 { + return data + } + hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) + reverse(hdata) + copy(data[size+10-len(hdata):], hdata) + + return data +} + +// AppendTrailer marshals the footer into binary form and appends it to data. +// It returns the new slice. +func (h *Block) AppendTrailer(data []byte) []byte { + size := len(data) + + h.flag |= TypeSegment + data = binary.AppendUvarint(data, h.size) + data = binary.AppendUvarint(data, h.flag) + reverse(data[size:]) + + return data +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +// It reads a footer from binary data, and sets the fields +// of the receiver to the values found in the footer. +func (h *Block) UnmarshalBinary(data []byte) error { + if len(data) < minBlockSize { + return fmt.Errorf("%w: bad data", ErrDecode) + } + + var n int + h.size, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + h.flag, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + copy(h.extra, data[:8]) + + return nil +} + +type logFile struct { + header + Commit +} + +func (h *logFile) AppendMagic(data []byte) []byte { + size := len(data) + data = append(data, Magic[:]...) + if h.end == 0 { + return data + } + hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) + reverse(hdata) + copy(data[size+10-len(hdata):], hdata) + + return data +} +func (h *logFile) UnmarshalBinary(data []byte) error { + return h.header.UnmarshalBinary(data) +} + +// WriteLogFile writes a log file to w, given a list of segments. +// The caller is responsible for calling WriteAt on the correct offset. +// The function will return an error if any of the segments fail to write. +// The offset is the initial offset of the first segment, and will be +// incremented by the length of the segment on each write. +// +// The log file is written with the following format: +// - A header with the magic, version, and flag (Dirty) +// - A series of segments, each with: +// - A footer with the length and hash of the segment +// - The contents of the segment +// - A header with the magic, version, flag (Clean), and end offset +func WriteLogFile(w io.WriterAt, segments ...io.Reader) error { + _, err := w.WriteAt(Magic[:], 0) + if err != nil { + return err + } + + lf := &LogWriter{ + WriterAt: w, + } + + return lf.writeIter(w, iter.Seq[io.Reader](func(yield func(io.Reader) bool) { + for _, s := range segments { + if !yield(s) { + return + } + } + })) +} + +type rw interface { + io.ReaderAt + io.WriterAt +} + +func AppendLogFile(rw rw, segments ...io.Reader) error { + logFile, err := ReadLogFile(rw) + if err != nil { + return err + } + lf := &LogWriter{ + WriterAt: rw, + logFile: logFile.logFile, + } + return lf.writeIter(rw, iter.Seq[io.Reader](func(yield func(io.Reader) bool) { + for _, s := range segments { + if !yield(s) { + return + } + } + })) + +} + +func WriteIter(w io.WriterAt, segments iter.Seq[io.Reader]) error { + _, err := w.WriteAt(Magic[:], 0) + if err != nil { + return err + } + + lf := &LogWriter{ + WriterAt: w, + } + + return lf.writeIter(w, segments) +} + +func (lf *LogWriter) writeIter(w io.WriterAt, segments iter.Seq[io.Reader]) error { + for s := range segments { + err := lf.writeSegment(s) + if err != nil { + return err + } + lf.count++ + } + + // Write the footer to the log file. + // The footer is written at the current end of file position. + n, err := lf.WriteAt(lf.AppendTrailer(make([]byte, 0, maxCommitSize)), int64(lf.end)+10) + if err != nil { + // If there is an error, return it. + return err + } + lf.end += uint64(n) + + _, err = w.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0) + + return err +} + +type LogWriter struct { + logFile + io.WriterAt +} + +// writeSegment writes a segment to the log file at the current end of file position. +// The segment is written in chunks of 1024 bytes, and the hash of the segment +func (lf *LogWriter) writeSegment(segment io.Reader) error { + h := hash() + head := Block{} + start := int64(lf.end) + 10 + end := int64(lf.end) + 10 + + // Write the header to the log file. + // The footer is written at the current end of file position. + n, err := lf.WriteAt(make([]byte, headerSize), start) + if err != nil { + // If there is an error, return it. + return err + } + end += int64(n) + lf.size += uint64(n) + lf.end += uint64(n) + + // Write the segment to the log file. + // The segment is written in chunks of 1024 bytes. + for { + // Read a chunk of the segment. + buf := make([]byte, 1024) + n, err := segment.Read(buf) + if err != nil { + // If the segment is empty, break the loop. + if err == io.EOF { + break + } + // If there is an error, return it. + return err + } + + // Compute the hash of the chunk. + h.Write(buf[:n]) + + // Write the chunk to the log file. + // The chunk is written at the current end of file position. + _, err = lf.WriteAt(buf[:n], end) + if err != nil { + // If there is an error, return it. + return err + } + + // Update the length of the segment. + end += int64(n) + head.size += uint64(n) + } + + head.extra = h.Sum(nil) + head.end += head.size + + // Write the footer to the log file. + // The footer is written at the current end of file position. + n, err = lf.WriteAt(head.AppendTrailer(make([]byte, 0, maxBlockSize)), end) + if err != nil { + // If there is an error, return it. + return err + } + end += int64(n) + head.end += uint64(n) + + // Update header to the log file. + // The footer is written at the current end of file position. + _, err = lf.WriteAt(head.AppendHeader(make([]byte, 0, headerSize)), start) + if err != nil { + // If there is an error, return it. + return err + } + + // Update the end of file position. + lf.size += head.end + lf.end += head.end + return nil +} + +// reverse reverses a slice in-place. +func reverse[T any](b []T) { + l := len(b) + for i := 0; i < l/2; i++ { + b[i], b[l-i-1] = b[l-i-1], b[i] + } +} + +type LogReader struct { + logFile + io.ReaderAt + Err error +} + +// ReadLogFile reads a log file from the given io.ReaderAt. It returns a pointer to a LogFile, or an error if the file +// could not be read. +func ReadLogFile(reader io.ReaderAt) (*LogReader, error) { + header := make([]byte, headerSize) + n, err := rsr(reader, 0, 10).ReadAt(header, 0) + if err != nil { + return nil, err + } + header = header[:n] + + logFile := &LogReader{ReaderAt: reader} + err = logFile.header.UnmarshalBinary(header) + if err != nil { + return nil, err + } + + if logFile.end == 0 { + return logFile, nil + } + + commit := make([]byte, maxCommitSize) + n, err = rsr(reader, 10, int64(logFile.end)).ReadAt(commit, 0) + if n == 0 && err != nil { + return nil, err + } + commit = commit[:n] + + err = logFile.Commit.UnmarshalBinary(commit) + + return logFile, err +} + +// Iterate reads the log file and calls the given function for each segment. +// It passes an io.Reader that reads from the current segment. It will stop +// calling the function if the function returns false. +func (lf *LogReader) Iter() iter.Seq2[uint64, io.Reader] { + var commits []*Commit + for commit := range lf.iterCommits() { + commits = append(commits, &commit) + } + if lf.Err != nil { + return func(yield func(uint64, io.Reader) bool) {} + } + + reverse(commits) + + return func(yield func(uint64, io.Reader) bool) { + start := int64(10) + for _, commit := range commits { + size := int64(commit.prev) + it := iterBlocks(io.NewSectionReader(lf, start, size), size) + for i, block := range it { + if !yield(i, block) { + return + } + } + + start += size + int64(commit.tsize) + } + } +} + +func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[uint64, io.Reader] { + var start int64 + var i uint64 + return func(yield func(uint64, io.Reader) bool) { + for start < end { + block := &Block{} + buf := make([]byte, 10) + n, err := rsr(r, int64(start), 10).ReadAt(buf, 0) + if n == 0 && err != nil { + return + } + start += int64(n) + + if err := block.header.UnmarshalBinary(buf); err != nil { + return + } + + buf = make([]byte, maxBlockSize) + n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0) + if n == 0 && err != nil { + return + } + buf = buf[:n] + err = block.UnmarshalBinary(buf) + if err != nil { + return + } + + if !yield(i, io.NewSectionReader(r, int64(start), int64(block.size))) { + return + } + + i++ + start += int64(block.end) + } + } +} + + +func (lf *LogReader) iterCommits() iter.Seq[Commit] { + eof := lf.end + 10 + + if eof <= 10 { + return func(yield func(Commit) bool) {} + } + + offset := eof - 10 - lf.prev - uint64(lf.tsize) + return func(yield func(Commit) bool) { + if !yield(lf.Commit) { + return + } + + for offset > 10 { + commit := Commit{} + buf := make([]byte, maxCommitSize) + n, err := rsr(lf, 10, int64(offset)).ReadAt(buf, 0) + if n == 0 && err != nil { + lf.Err = err + return + } + buf = buf[:n] + err = commit.UnmarshalBinary(buf) + if err != nil { + lf.Err = err + return + } + if !yield(commit) { + return + } + offset -= commit.prev + uint64(commit.tsize) + } + } +} + +// func (lf *LogReader) Rev() iter.Seq2[uint64, io.Reader] { +// end := lf.end + 10 +// i := lf.count +// return func(yield func(uint64, io.Reader) bool) { + +// for commit := range lf.iterCommits() { +// end -= uint64(commit.tsize) +// start := end - commit.prev - uint64(commit.tsize) +// for start > end{ +// block := &Block{} +// buf := make([]byte, min(maxBlockSize, commit.size)) +// n, err := lf.ReaderAt.ReadAt(buf, max(0, int64(end)-int64(len(buf)))) +// if n == 0 && err != nil { +// lf.Err = err +// return +// } +// buf = buf[:n] +// err = block.UnmarshalBinary(buf) +// if err != nil { +// lf.Err = err +// return +// } +// if !yield(i, io.NewSectionReader(lf, int64(end-block.size)-int64(block.tsize), int64(block.size))) { +// return +// } +// end -= block.size + 10 + uint64(block.tsize) +// i-- +// } + +// } +// } +// } + +func iterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] { + return func(yield func(T) bool) { + for _, v := range it { + if !yield(v) { + return + } + } + } +} + +func rsr(r io.ReaderAt, offset, size int64) *revSegmentReader { + r = io.NewSectionReader(r, offset, size) + return &revSegmentReader{r, size} +} + +type revSegmentReader struct { + io.ReaderAt + size int64 +} + +func (r *revSegmentReader) ReadAt(data []byte, offset int64) (int, error) { + if offset < 0 { + return 0, errors.New("negative offset") + } + + if offset > int64(r.size) { + return 0, io.EOF + } + + o := r.size - int64(len(data)) - offset + d := int64(len(data)) + if o < 0 { + d = max(0, d+o) + } + + i, err := r.ReaderAt.ReadAt(data[:d], max(0, o)) + reverse(data[:i]) + return i, err +} + +// rdr : 0 1 2|3 4 5 6|7 8 9 10 +// rsr 6,4: 3 2 1 0 +// 6 +// ------- -4 +// 0 1 2 3 +// offset - size +// rdr : 0 1 2|3 4 5 6|7 8 9 10 +// 0 1 2 3 +// offset=0 |-------| d[:4], o=3 3-0=3 +// offset=1 _|----- | d[:3], o=3 3-1=2 +// offset=2 ___|--- | d[:2], o=3 3-2=1 +// offset=3 _____|- | d[:1], o=3 3-3=0 +// offset=4+_____| | d[:0], o=3 3-4=0 + +// rdr : 0 1 2|3 4 5 6|7 8 9 10 +// offset=0 |-------| d[:4], o=0 -> 3 +// offset=0 | -----| d[:3], o=1 -> 4 +// offset=0 | ---| d[:2], o=2 -> 5 +// offset=0 | -| d[:1], o=3 -> 6 +// offset=0 | | d[:0], o=4+-> 7 + +// rdr : 0 1 2|3 4 5 6|7 8 9 10 +// offset=4 ___| | d[:0], o=0 +// offset=3 _|- | d[:1], o=0 +// offset=2 |--- | d[:2], o=0 +// offset=1 | --- | d[:2], o=1 +// offset=0 | ---| d[:2], o=2 +// offset=-1 | -|_ d[:2], o=3 +// offset=-2 | |___ d[:2], o=4+ + +// o = max(0, offset - len) +// d = diff --git a/lsm2/sst_test.go b/lsm2/sst_test.go new file mode 100644 index 0000000..a794783 --- /dev/null +++ b/lsm2/sst_test.go @@ -0,0 +1,168 @@ +package lsm2 + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "io" + "testing" + + "github.com/docopt/docopt-go" + "github.com/matryer/is" +) + +func TestAppend(t *testing.T) { + type test struct { + name string + in [][]io.Reader + enc string + out [][]byte + rev [][]byte + } + tests := []test{ + { + "nil reader", + nil, + "U291ci5pcwAAAwACAA", + [][]byte{}, + [][]byte{}, + }, + { + "single reader", + [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4})}}, + "U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA", + [][]byte{{1, 2, 3, 4}}, + [][]byte{{1, 2, 3, 4}}}, + { + "multiple readers", + [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4}), + bytes.NewBuffer([]byte{5, 6, 7, 8})}}, + "U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg", + [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, + [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, + { + "multiple commit", + [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4})}, + { + bytes.NewBuffer([]byte{5, 6, 7, 8})}}, + "U291ci5pcwAAJ756XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEEAIDIA", + [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, + [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + is := is.New(t) + buf := &buffer{} + + buffers := 0 + + if len(test.in) == 0 { + err := WriteLogFile(buf) + is.NoErr(err) + } + for i, in := range test.in { + buffers += len(in) + + if i == 0 { + err := WriteLogFile(buf, in...) + is.NoErr(err) + } else { + err := AppendLogFile(buf, in...) + is.NoErr(err) + } + } + + is.Equal(base64.RawStdEncoding.EncodeToString(buf.Bytes()), test.enc) + + files, err := ReadLogFile(bytes.NewReader(buf.Bytes())) + is.NoErr(err) + + i := 0 + for fp := range iterOne(files.Iter()) { + buf, err := io.ReadAll(fp) + is.NoErr(err) + + is.Equal(buf, test.out[i]) + i++ + } + is.NoErr(files.Err) + is.Equal(i, buffers) + + // i = 0 + // for fp := range iterOne(files.Rev()) { + // buf, err := io.ReadAll(fp) + // is.NoErr(err) + + // is.Equal(buf, test.rev[i]) + // i++ + // } + // is.NoErr(files.Err) + // is.Equal(i, buffers) + + }) + } +} + +func TestArgs(t *testing.T) { + is := is.New(t) + usage := `Usage: lsm2 create ... + ` + opts, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0") + is.NoErr(err) + + args := struct { + Create bool `docopt:"create"` + Archive string `docopt:""` + Files []string `docopt:""` + }{} + err = opts.Bind(&args) + is.NoErr(err) + fmt.Println(args) + +} + +type buffer struct { + buf []byte +} + +// Bytes returns the underlying byte slice of the bufferWriterAt. +func (b *buffer) Bytes() []byte { + return b.buf +} + +// WriteAt implements io.WriterAt. It appends data to the internal buffer +// if the offset is beyond the current length of the buffer. It will +// return an error if the offset is negative. +func (b *buffer) WriteAt(data []byte, offset int64) (written int, err error) { + if offset < 0 { + return 0, errors.New("negative offset") + } + + currentLength := int64(len(b.buf)) + if currentLength < offset+int64(len(data)) { + b.buf = append(b.buf, make([]byte, offset+int64(len(data))-currentLength)...) + } + + written = copy(b.buf[offset:], data) + return +} + +func (b *buffer) ReadAt(data []byte, offset int64) (int, error) { + if offset < 0 { + return 0, errors.New("negative offset") + } + + if offset > int64(len(b.buf)) || len(b.buf[offset:]) < len(data) { + return copy(data, b.buf[offset:]), io.EOF + } + + return copy(data, b.buf[offset:]), nil +}