diff --git a/.gitignore b/.gitignore index 9784aeb..9b39e8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ test.db *.mercury -sour.is-mercury \ No newline at end of file +sour.is-mercury +.vscode/ \ No newline at end of file diff --git a/lsm/cli/main.go b/lsm/cli/main.go new file mode 100644 index 0000000..0df7f87 --- /dev/null +++ b/lsm/cli/main.go @@ -0,0 +1,286 @@ +package main + +import ( + "bytes" + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "iter" + "net/http" + "net/url" + "os" + "time" + + "github.com/docopt/docopt-go" + "go.sour.is/pkg/lsm" +) + +var usage = ` +Usage: + lsm create ... + lsm append ... + lsm read [ []] + lsm serve + lsm client [ []]` + +type args struct { + Create bool + Append bool + Read bool + Serve bool + Client bool + + Archive string `docopt:""` + Files []string `docopt:""` + Start int64 `docopt:""` + End int64 `docopt:""` +} + +func main() { + opts, err := docopt.ParseDoc(usage) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + args := args{} + err = opts.Bind(&args) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = run(Console, args) + if err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +type console struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} +var Console = console{os.Stdin, os.Stdout, os.Stderr} + +func (c console) Write(b []byte) (int, error) { + return c.Stdout.Write(b) +} + +func run(console console, a args) error { + fmt.Fprintln(console, "lsm") + switch { + case a.Create: + f, err := os.OpenFile(a.Archive, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + return lsm.WriteLogFile(f, fileReaders(a.Files)) + case a.Append: + f, err := os.OpenFile(a.Archive, os.O_RDWR, 0644) + if err != nil { + return err + } + defer f.Close() + + return lsm.AppendLogFile(f, fileReaders(a.Files)) + case a.Read: + fmt.Fprintln(console, "reading", a.Archive) + + f, err := os.Open(a.Archive) + if err != nil { + return err + } + defer f.Close() + return readContent(f, console, a.Start, a.End) + case a.Serve: + fmt.Fprintln(console, "serving", a.Archive) + b, err := base64.RawStdEncoding.DecodeString(a.Archive) + now := time.Now() + if err != nil { + return err + } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.ServeContent(w, r, "", now, bytes.NewReader(b)) + }) + return http.ListenAndServe(":8080", nil) + case a.Client: + r, err := OpenHttpReader(context.Background(), a.Archive, 0) + if err != nil { + return err + } + defer r.Close() + defer func() {fmt.Println("bytes read", r.bytesRead)}() + return readContent(r, console, a.Start, a.End) + } + return errors.New("unknown command") +} + +func readContent(r io.ReaderAt, console console, start, end int64) error { + lg, err := lsm.ReadLogFile(r) + if err != nil { + return err + } + for bi, rd := range lg.Iter(uint64(start)) { + if end > 0 && int64(bi.Index) >= end { + break + } + fmt.Fprintf(console, "=========================\n%+v:\n", bi) + wr := base64.NewEncoder(base64.RawStdEncoding, console) + io.Copy(wr, rd) + fmt.Fprintln(console, "\n=========================") + } + if lg.Err != nil { + return lg.Err + } + for bi, rd := range lg.Rev(lg.Count()) { + if end > 0 && int64(bi.Index) >= end { + break + } + fmt.Fprintf(console, "=========================\n%+v:\n", bi) + wr := base64.NewEncoder(base64.RawStdEncoding, console) + io.Copy(wr, rd) + fmt.Fprintln(console, "\n=========================") + } + + return lg.Err +} + + +func fileReaders(names []string) iter.Seq[io.Reader] { + return iter.Seq[io.Reader](func(yield func(io.Reader) bool) { + for _, name := range names { + f, err := os.Open(name) + if err != nil { + continue + } + if !yield(f) { + f.Close() + return + } + f.Close() + } + }) +} + +type HttpReader struct { + ctx context.Context + uri url.URL + tmpfile *os.File + pos int64 + end int64 + bytesRead int +} + +func OpenHttpReader(ctx context.Context, uri string, end int64) (*HttpReader, error) { + u, err := url.Parse(uri) + if err != nil { + return nil, err + } + return &HttpReader{ctx: ctx, uri: *u, end: end}, nil +} + +func (r *HttpReader) Read(p []byte) (int, error) { + n, err := r.ReadAt(p, r.pos) + if err != nil { + return n, err + } + r.pos += int64(n) + r.bytesRead += n + return n, nil +} + +func (r *HttpReader) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + r.pos = offset + case io.SeekCurrent: + r.pos += offset + case io.SeekEnd: + r.pos = r.end + offset + } + return r.pos, nil +} + +func (r *HttpReader) Close() error { + r.ctx.Done() + return nil +} + +// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting +// from the specified offset and writes it into the provided data slice. If the +// offset is negative, it returns an error. If the requested read extends beyond +// the buffer's length, it returns the data read so far along with an io.EOF error. +func (r *HttpReader) ReadAt(data []byte, offset int64) (int, error) { + if err := r.ctx.Err(); err != nil { + return 0, err + } + + if offset < 0 { + return 0, errors.New("negative offset") + } + + if r.end > 0 && offset > r.end { + return 0, io.EOF + } + + dlen := len(data) + int(offset) + + if r.end > 0 && r.end+int64(dlen) > r.end { + dlen = int(r.end) + } + + end := "" + if r.end > 0 { + end = fmt.Sprintf("/%d", r.end) + } + req, err := http.NewRequestWithContext(r.ctx, "GET", r.uri.String(), nil) + if err != nil { + return 0, err + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d%s", offset, dlen, end)) + + fmt.Fprintln(Console.Stderr, req) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + fmt.Fprintln(Console.Stderr, "requested range not satisfiable") + return 0, io.EOF + } + + if resp.StatusCode == http.StatusOK { + + r.tmpfile, err = os.CreateTemp("", "httpReader") + if err != nil { + return 0, err + } + defer os.Remove(r.tmpfile.Name()) + n, err := io.Copy(r.tmpfile, resp.Body) + if err != nil { + return 0, err + } + r.bytesRead += int(n) + + defer fmt.Fprintln(Console.Stderr, "wrote ", n, " bytes to ", r.tmpfile.Name()) + resp.Body.Close() + r.tmpfile.Seek(offset, 0) + return io.ReadFull(r.tmpfile, data) + } + + n, err := io.ReadFull(resp.Body, data) + if n == 0 && err != nil { + return n, err + } + r.bytesRead += n + defer fmt.Fprintln(Console.Stderr, "read ", n, " bytes") + return n, nil +} diff --git a/lsm2/cli/main_test.go b/lsm/cli/main_test.go similarity index 100% rename from lsm2/cli/main_test.go rename to lsm/cli/main_test.go diff --git a/lsm/marshal.go b/lsm/marshal.go deleted file mode 100644 index 00034d2..0000000 --- a/lsm/marshal.go +++ /dev/null @@ -1,138 +0,0 @@ -package lsm - -import ( - "bytes" - "encoding" - "encoding/binary" - "fmt" -) - -type entry struct { - key string - value uint64 -} - -// MarshalBinary implements encoding.BinaryMarshaler. -func (e *entry) MarshalBinary() (data []byte, err error) { - data = make([]byte, len(e.key), len(e.key)+binary.MaxVarintLen16) - copy(data, e.key) - - data = binary.AppendUvarint(data, e.value) - reverse(data[len(e.key):]) - return data, err -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (e *entry) UnmarshalBinary(data []byte) error { - // fmt.Println("unmarshal", data, string(data)) - - if len(data) < binary.MaxVarintLen16 { - return fmt.Errorf("%w: bad data", ErrDecode) - } - head := make([]byte, binary.MaxVarintLen16) - copy(head, data[max(0, len(data)-cap(head)):]) - reverse(head) - - size := 0 - e.value, size = binary.Uvarint(head) - if size == 0 { - return fmt.Errorf("%w: invalid data", ErrDecode) - } - e.key = string(data[:len(data)-size]) - - return nil -} - -var _ encoding.BinaryMarshaler = (*entry)(nil) -var _ encoding.BinaryUnmarshaler = (*entry)(nil) - -type entries []entry - -// MarshalBinary implements encoding.BinaryMarshaler. -func (lis *entries) MarshalBinary() (data []byte, err error) { - var buf bytes.Buffer - - for _, e := range *lis { - d, err := e.MarshalBinary() - if err != nil { - return nil, err - } - - _, err = buf.Write(d) - if err != nil { - return nil, err - } - - _, err = buf.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(d))))) - if err != nil { - return nil, err - } - } - - return buf.Bytes(), err -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (lis *entries) UnmarshalBinary(data []byte) error { - head := make([]byte, binary.MaxVarintLen16) - pos := uint64(len(data)) - - for pos > 0 { - copy(head, data[max(0, pos-uint64(cap(head))):]) - length, size := binary.Uvarint(reverse(head)) - - e := entry{} - if err := e.UnmarshalBinary(data[max(0, pos-(length+uint64(size))) : pos-uint64(size)]); err != nil { - return err - } - *lis = append(*lis, e) - - pos -= length + uint64(size) - } - reverse(*lis) - return nil -} - -var _ encoding.BinaryMarshaler = (*entries)(nil) -var _ encoding.BinaryUnmarshaler = (*entries)(nil) - -type segment struct { - entries entries -} - -// MarshalBinary implements encoding.BinaryMarshaler. -func (s *segment) MarshalBinary() (data []byte, err error) { - head := header{ - entries: uint64(len(s.entries)), - } - - data, err = s.entries.MarshalBinary() - if err != nil { - return nil, err - } - - head.datalen = uint64(len(data)) - - h := hash() - h.Write(data) - head.sig = h.Sum(nil) - - return head.Append(data), err -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (s *segment) UnmarshalBinary(data []byte) error { - head, err := ReadHead(data) - if err != nil { - return err - } - - h := hash() - h.Write(data[:head.datalen]) - if !bytes.Equal(head.sig, h.Sum(nil)) { - return fmt.Errorf("%w: invalid checksum", ErrDecode) - } - - s.entries = make(entries, 0, head.entries) - return s.entries.UnmarshalBinary(data[:head.datalen]) -} diff --git a/lsm/marshal_test.go b/lsm/marshal_test.go deleted file mode 100644 index 8bdcffa..0000000 --- a/lsm/marshal_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package lsm - -import ( - "io/fs" - "testing" - - "github.com/matryer/is" -) - -func TestEncoding(t *testing.T) { - is := is.New(t) - - data := segment{entries: entries{ - {"key-1", 1}, - {"key-2", 2}, - {"key-3", 3}, - {"longerkey-4", 65535}, - }} - - b, err := data.MarshalBinary() - is.NoErr(err) - - var got segment - err = got.UnmarshalBinary(b) - is.NoErr(err) - - is.Equal(data, got) -} - -func TestReverse(t *testing.T) { - is := is.New(t) - - got := []byte("gnirts a si siht") - reverse(got) - - is.Equal(got, []byte("this is a string")) - - got = []byte("!gnirts a si siht") - reverse(got) - - is.Equal(got, []byte("this is a string!")) -} - -func TestFile(t *testing.T) { - is := is.New(t) - - entries := entries { - {"key-1", 1}, - {"key-2", 2}, - {"key-3", 3}, - {"longerkey-4", 65535}, - } - - f := basicFile(t, entries, entries, entries) - - sf, err := ReadFile(f) - is.NoErr(err) - - is.Equal(len(sf.segments), 3) -} - -func basicFile(t *testing.T, lis ...entries) fs.File { - t.Helper() - - segments := make([][]byte, len(lis)) - var err error - for i, entries := range lis { - data := segment{entries: entries} - segments[i], err = data.MarshalBinary() - if err != nil { - t.Error(err) - } - } - - return NewFile(segments...) -} diff --git a/lsm/sst.go b/lsm/sst.go index e95731c..393663a 100644 --- a/lsm/sst.go +++ b/lsm/sst.go @@ -1,370 +1,634 @@ -// SPDX-FileCopyrightText: 2023 Jon Lundy -// SPDX-License-Identifier: BSD-3-Clause - -// lsm -- Log Structured Merge-Tree -// -// This is a basic LSM tree using a SSTable optimized for append only writing. On disk data is organized into time ordered -// files of segments, containing reverse sorted keys. Each segment ends with a magic value `Souris\x01`, a 4byte hash, count of -// segment entries, and data length. - package lsm import ( - "bytes" - "encoding" "encoding/binary" "errors" "fmt" "hash/fnv" "io" - "io/fs" - "sort" + "iter" + "slices" +) + +// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size] + +// Commit1: [magic>| end = seek to end of file +// <---|-------------+ size = seek to magic header +// <---|-------------+10 size + 10 = seek to start of file +// <-----------------------------T+10----------------> 10 + size + trailer = full file size + +// Commit2: [magic>| +// +--------|-------------------------------------------------------------------------> +// <-------------------------------------|----------------+ +// prev = seek to last commit <---|-+ +// prev + trailer = size of commit <----T+---------------------------------> + +// Block: [hash>| end = seek to end of block +// <---|-+ size = seek to end of header +// <-------------------|-+10 size + 10 = seek to start of block +// <---------------------T+10---------------> size + 10 + trailer = full block size + +const ( + TypeUnknown uint64 = iota + TypeSegment + TypeCommit + TypePrevCommit + + headerSize = 10 + + maxCommitSize = 4 * binary.MaxVarintLen64 + minCommitSize = 3 + + maxBlockSize = 2 * binary.MaxVarintLen64 + minBlockSize = 2 ) var ( - magic = reverse(append([]byte("Souris"), '\x01')) - hash = fnv.New32a - hashLength = hash().Size() - // segmentSize = 2 ^ 16 // min 2^9 = 512b, max? 2^20 = 1M - segmentFooterLength = len(magic) + hashLength + binary.MaxVarintLen32 + binary.MaxVarintLen32 + Magic = [10]byte([]byte("Sour.is\x00\x00\x00")) + Version = uint8(1) + hash = fnv.New64a + + ErrDecode = errors.New("decode") ) type header struct { - sig []byte // 4Byte signature - entries uint64 // count of entries in segment - datalen uint64 // length of data - headlen uint64 // length of header - end int64 // location of end of data/start of header (start of data is `end - datalen`) + end uint64 + extra []byte } -// ReadHead parse header from a segment. reads from the end of slice of length segmentFooterLength -func ReadHead(data []byte) (*header, error) { - if len(data) < len(magic)+6 { - return nil, fmt.Errorf("%w: invalid size", ErrDecode) +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +// It decodes the input binary data into the header struct. +// The function expects the input data to be of a specific size (headerSize), +// otherwise it returns an error indicating bad data. +// It reads the 'end' field from the binary data, updates the 'extra' field, +// and reverses the byte order of 'extra' in place. +func (h *header) UnmarshalBinary(data []byte) error { + if len(data) != headerSize { + return fmt.Errorf("%w: bad data", ErrDecode) } - if !bytes.Equal(data[len(data)-len(magic):], magic) { - return nil, fmt.Errorf("%w: invalid header", ErrDecode) - } + h.extra = make([]byte, headerSize) + copy(h.extra, data) - head := make([]byte, 0, segmentFooterLength) - head = reverse(append(head, data[max(0, len(data)-cap(head)-1):]...)) - size, s := binary.Uvarint(head[len(magic)+4:]) - length, i := binary.Uvarint(head[len(magic)+4+s:]) + var bytesRead int + h.end, bytesRead = binary.Uvarint(h.extra) + reverse(h.extra) + h.extra = h.extra[:min(8,headerSize-bytesRead)] - return &header{ - sig: head[len(magic) : len(magic)+4], - entries: size, - datalen: length, - headlen: uint64(len(magic) + hashLength + s + i), - end: int64(len(data)), - }, nil -} -func (h *header) Append(data []byte) []byte { - - length := len(data) - data = append(data, h.sig...) - data = binary.AppendUvarint(data, h.entries) - data = binary.AppendUvarint(data, h.datalen) - reverse(data[length:]) - - return append(data, magic...) + return nil } -var _ encoding.BinaryMarshaler = (*segment)(nil) -var _ encoding.BinaryUnmarshaler = (*segment)(nil) +type Commit struct { + flag uint64 // flag values + size uint64 // size of the trailer + count uint64 // number of entries + prev uint64 // previous commit -var ErrDecode = errors.New("decode") - -func reverse[T any](b []T) []T { - l := len(b) - for i := 0; i < l/2; i++ { - b[i], b[l-i-1] = b[l-i-1], b[i] - } - return b + tsize int } -// func clone[T ~[]E, E any](e []E) []E { -// return append(e[0:0:0], e...) -// } +// Append marshals the trailer into binary form and appends it to data. +// It returns the new slice. +func (h *Commit) AppendTrailer(data []byte) []byte { + h.flag |= TypeCommit + // if h.prev > 0 { + // h.flag |= TypePrevCommit + // } -type entryBytes []byte - -// KeyValue returns the parsed key and value from an entry -func (e entryBytes) KeyValue() ([]byte, uint64) { - if len(e) < 2 { - return nil, 0 - } - head := reverse(append(e[0:0:0], e[max(0, len(e)-binary.MaxVarintLen64):]...)) - value, i := binary.Uvarint(head) - return append(e[0:0:0], e[:len(e)-i]...), value -} - -// NewKeyValue packed into an entry -func NewKeyValue(key []byte, val uint64) entryBytes { - length := len(key) - data := append(key[0:0:0], key...) - data = binary.AppendUvarint(data, val) - reverse(data[length:]) + size := len(data) + data = binary.AppendUvarint(data, h.size) + data = binary.AppendUvarint(data, h.flag) + data = binary.AppendUvarint(data, h.count) + // if h.prev > 0 { + // data = binary.AppendUvarint(data, h.prev) + // } + reverse(data[size:]) return data } -type listEntries []entryBytes - -// WriteTo implements io.WriterTo. -func (lis *listEntries) WriteTo(wr io.Writer) (int64, error) { - if lis == nil { - return 0, nil +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +// It reads a trailer from binary data, and sets the fields +// of the receiver to the values found in the header. +func (h *Commit) UnmarshalBinary(data []byte) error { + if len(data) < minCommitSize { + return fmt.Errorf("%w: bad data", ErrDecode) } - head := header{ - entries: uint64(len(*lis)), - } - h := hash() + var n int + h.size, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n - wr = io.MultiWriter(wr, h) + h.flag, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n - var i int64 - for _, b := range *lis { - j, err := wr.Write(b) - i += int64(j) - if err != nil { - return i, err - } + h.count, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n - j, err = wr.Write(reverse(binary.AppendUvarint(make([]byte, 0, binary.MaxVarintLen32), uint64(len(b))))) - i += int64(j) - if err != nil { - return i, err - } - } - head.datalen = uint64(i) - head.sig = h.Sum(nil) - - b := head.Append([]byte{}) - j, err := wr.Write(b) - i += int64(j) - - return i, err -} - -var _ sort.Interface = listEntries{} - -// Len implements sort.Interface. -func (lis listEntries) Len() int { - return len(lis) -} - -// Less implements sort.Interface. -func (lis listEntries) Less(i int, j int) bool { - iname, _ := lis[i].KeyValue() - jname, _ := lis[j].KeyValue() - - return bytes.Compare(iname, jname) < 0 -} - -// Swap implements sort.Interface. -func (lis listEntries) Swap(i int, j int) { - lis[i], lis[j] = lis[j], lis[i] -} - -type segmentReader struct { - head *header - rd io.ReaderAt -} - -// FirstEntry parses the first segment entry from the end of the segment -func (s *segmentReader) FirstEntry() (*entryBytes, error) { - e, _, err := s.readEntryAt(-1) - return e, err -} - -func (s *segmentReader) VerifyHash() (bool, error) { - h := hash() - data := make([]byte, s.head.datalen) - _, err := s.rd.ReadAt(data, s.head.end-int64(s.head.datalen)) - if err != nil { - return false, err - } - _, err = h.Write(data) - ok := bytes.Equal(h.Sum(nil), s.head.sig) - - return ok, err -} - -// Find locates needle within a segment. if it cant find it will return the nearest key before needle. -func (s *segmentReader) Find(needle []byte, first bool) (*entryBytes, bool, error) { - if s == nil { - return nil, false, nil - } - e, pos, err := s.readEntryAt(-1) - if err != nil { - return nil, false, err + // h.prev = h.size + if h.flag&TypePrevCommit == TypePrevCommit { + h.prev, n = binary.Uvarint(data) + h.tsize += n } - last := e - found := false - for pos > 0 { - key, _ := e.KeyValue() - switch bytes.Compare(key, needle) { - case 1: // key=ccc, needle=bbb - return last, found, nil - case 0: // equal - if first { - return e, true, nil - } - found = true - fallthrough - case -1: // key=aaa, needle=bbb - last = e - e, pos, err = s.readEntryAt(pos) - if err != nil { - return nil, found, err - } - } - } - return last, found, nil + return nil } -func (s *segmentReader) readEntryAt(pos int64) (*entryBytes, int64, error) { - if pos < 0 { - pos = s.head.end + +type Block struct { + header + + size uint64 + flag uint64 + + tsize int +} + +func (h *Block) AppendHeader(data []byte) []byte { + size := len(data) + data = append(data, make([]byte, 10)...) + copy(data, h.extra) + if h.size == 0 { + return data } - head := make([]byte, binary.MaxVarintLen16) - s.rd.ReadAt(head, pos-binary.MaxVarintLen16) - length, hsize := binary.Uvarint(reverse(head)) + hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) + reverse(hdata) + copy(data[size+10-len(hdata):], hdata) - e := make(entryBytes, length) - _, err := s.rd.ReadAt(e, pos-int64(length)-int64(hsize)) + return data +} - return &e, pos - int64(length) - int64(hsize), err +// AppendTrailer marshals the footer into binary form and appends it to data. +// It returns the new slice. +func (h *Block) AppendTrailer(data []byte) []byte { + size := len(data) + + h.flag |= TypeSegment + data = binary.AppendUvarint(data, h.size) + data = binary.AppendUvarint(data, h.flag) + reverse(data[size:]) + + return data +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +// It reads a footer from binary data, and sets the fields +// of the receiver to the values found in the footer. +func (h *Block) UnmarshalBinary(data []byte) error { + if len(data) < minBlockSize { + return fmt.Errorf("%w: bad data", ErrDecode) + } + + var n int + h.size, n = binary.Uvarint(data) + data = data[n:] + h.tsize += n + + h.flag, n = binary.Uvarint(data) + h.tsize += n + + return nil } type logFile struct { - rd interface { - io.ReaderAt - io.WriterTo - } - segments []segmentReader - - fs.File + header + Commit } -func ReadFile(fd fs.File) (*logFile, error) { - l := &logFile{File: fd} +func (h *logFile) AppendMagic(data []byte) []byte { + size := len(data) + data = append(data, Magic[:]...) + if h.end == 0 { + return data + } + hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) + reverse(hdata) + copy(data[size+10-len(hdata):], hdata) - stat, err := fd.Stat() + return data +} + +// WriteLogFile writes a log file to w, given a list of segments. +// The caller is responsible for calling WriteAt on the correct offset. +// The function will return an error if any of the segments fail to write. +// The offset is the initial offset of the first segment, and will be +// incremented by the length of the segment on each write. +// +// The log file is written with the following format: +// - A header with the magic, version, and flag (Dirty) +// - A series of segments, each with: +// - A footer with the length and hash of the segment +// - The contents of the segment +// - A header with the magic, version, flag (Clean), and end offset +func WriteLogFile(w io.WriterAt, segments iter.Seq[io.Reader]) error { + _, err := w.WriteAt(Magic[:], 0) if err != nil { - return nil, err + return err } - eof := stat.Size() - if rd, ok := fd.(interface { - io.ReaderAt - io.WriterTo - }); ok { - l.rd = rd - - } else { - rd, err := io.ReadAll(fd) - if err != nil { - return nil, err - } - l.rd = bytes.NewReader(rd) + lf := &LogWriter{ + WriterAt: w, } - head := make([]byte, segmentFooterLength) - for eof > 0 { - _, err = l.rd.ReadAt(head, eof-int64(segmentFooterLength)) - if err != nil { - return nil, err - } - - s := segmentReader{ - rd: l.rd, - } - s.head, err = ReadHead(head) - s.head.end = eof - int64(s.head.headlen) - if err != nil { - return nil, err - } - eof -= int64(s.head.datalen) + int64(s.head.headlen) - l.segments = append(l.segments, s) - } - - return l, nil + return lf.writeIter(segments) } -func (l *logFile) Count() int64 { - return int64(len(l.segments)) +type rw interface { + io.ReaderAt + io.WriterAt } -func (l *logFile) LoadSegment(pos int64) (*segmentBytes, error) { - if pos < 0 { - pos = int64(len(l.segments) - 1) - } - if pos > int64(len(l.segments)-1) { - return nil, ErrDecode - } - s := l.segments[pos] - b := make([]byte, s.head.datalen+s.head.headlen) - _, err := l.rd.ReadAt(b, s.head.end-int64(len(b))) +func AppendLogFile(rw rw, segments iter.Seq[io.Reader]) error { + logFile, err := ReadLogFile(rw) if err != nil { - return nil, err + return err + } + lf := &LogWriter{ + WriterAt: rw, + logFile: logFile.logFile, + } + return lf.writeIter(segments) +} + +func (lf *LogWriter) writeIter(segments iter.Seq[io.Reader]) error { + lf.size = 0 + for s := range segments { + n, err := lf.writeBlock(s) + if err != nil { + return err + } + lf.end += n + lf.size += n + lf.count++ } - return &segmentBytes{b, -1}, nil + // Write the footer to the log file. + // The footer is written at the current end of file position. + n, err := lf.WriteAt(lf.AppendTrailer(make([]byte, 0, maxCommitSize)), int64(lf.end)+10) + if err != nil { + // If there is an error, return it. + return err + } + lf.end += uint64(n) + + _, err = lf.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0) + + return err } -func (l *logFile) Find(needle []byte, first bool) (*entryBytes, bool, error) { - var cur, last segmentReader - for _, s := range l.segments { - cur = s - e, err := cur.FirstEntry() +type LogWriter struct { + logFile + io.WriterAt +} + +// writeBlock writes a segment to the log file at the current end of file position. +// The segment is written in chunks of 1024 bytes, and the hash of the segment +func (lf *LogWriter) writeBlock(segment io.Reader) (uint64, error) { + h := hash() + block := Block{} + + start := int64(lf.end) + 10 + end := start + + bytesWritten := 0 + + // Write the header to the log file. + // The footer is written at the current end of file position. + n, err := lf.WriteAt(make([]byte, headerSize), start) + bytesWritten += n + end += int64(n) + if err != nil { + // If there is an error, return it. + return uint64(bytesWritten), err + } + + // Write the segment to the log file. + // The segment is written in chunks of 1024 bytes. + for { + // Read a chunk of the segment. + buf := make([]byte, 1024) + n, err := segment.Read(buf) if err != nil { - return nil, false, err - } - k, _ := e.KeyValue() - - if first && bytes.Equal(k, needle) { - break - } - if first && bytes.Compare(k, needle) > 0 { - e, ok, err := cur.Find(needle, first) - if ok || err != nil{ - return e, ok, err + // If the segment is empty, break the loop. + if err == io.EOF { + break } - break + // If there is an error, return it. + return uint64(bytesWritten), err } - if !first && bytes.Compare(k, needle) > 0 { - break + + // Compute the hash of the chunk. + h.Write(buf[:n]) + + // Write the chunk to the log file. + // The chunk is written at the current end of file position. + n, err = lf.WriteAt(buf[:n], end) + bytesWritten += n + if err != nil { + // If there is an error, return it. + return uint64(bytesWritten), err } - last = s + + // Update the length of the segment. + end += int64(n) + block.size += uint64(n) } - e, ok, err := last.Find(needle, first) - if ok || err != nil{ - return e, ok, err + block.extra = h.Sum(nil) + block.end += block.size + + // Write the footer to the log file. + // The footer is written at the current end of file position. + n, err = lf.WriteAt(block.AppendTrailer(make([]byte, 0, maxBlockSize)), end) + bytesWritten += n + if err != nil { + // If there is an error, return it. + return uint64(bytesWritten), err } - // if by mistake it was not found in the last.. check the next segment. - return cur.Find(needle, first) -} -func (l *logFile) WriteTo(w io.Writer) (int64, error) { - return l.rd.WriteTo(w) + end += int64(n) + block.end += uint64(n) + + // Update header to the log file. + // The footer is written at the current end of file position. + _, err = lf.WriteAt(block.AppendHeader(make([]byte, 0, headerSize)), start) + if err != nil { + // If there is an error, return it. + return uint64(bytesWritten), err + } + + return uint64(bytesWritten), nil } -type segmentBytes struct { - b []byte - pos int +// reverse reverses a slice in-place. +func reverse[T any](b []T) { + l := len(b) + for i := 0; i < l/2; i++ { + b[i], b[l-i-1] = b[l-i-1], b[i] + } } -type dataset struct { - rd io.ReaderAt - files []logFile - - fs.FS +type LogReader struct { + logFile + io.ReaderAt + Err error } -func ReadDataset(fd fs.FS) (*dataset, error) { - panic("not implemented") +// ReadLogFile reads a log file from the given io.ReaderAt. It returns a pointer to a LogFile, or an error if the file +// could not be read. +func ReadLogFile(reader io.ReaderAt) (*LogReader, error) { + header := make([]byte, headerSize) + n, err := rsr(reader, 0, 10).ReadAt(header, 0) + if err != nil { + return nil, err + } + header = header[:n] + + logFile := &LogReader{ReaderAt: reader} + err = logFile.header.UnmarshalBinary(header) + if err != nil { + return nil, err + } + + if logFile.end == 0 { + return logFile, nil + } + + commit := make([]byte, maxCommitSize) + n, err = rsr(reader, 10, int64(logFile.end)).ReadAt(commit, 0) + if n == 0 && err != nil { + return nil, err + } + commit = commit[:n] + + err = logFile.Commit.UnmarshalBinary(commit) + + return logFile, err +} + +// Iterate reads the log file and calls the given function for each segment. +// It passes an io.Reader that reads from the current segment. It will stop +// calling the function if the function returns false. +func (lf *LogReader) Iter(begin uint64) iter.Seq2[blockInfo, io.Reader] { + var commits []*Commit + for commit := range lf.iterCommits() { + commits = append(commits, &commit) + } + if lf.Err != nil { + return func(yield func(blockInfo, io.Reader) bool) {} + } + + reverse(commits) + + return func(yield func(blockInfo, io.Reader) bool) { + start := int64(10) + var adj uint64 + for _, commit := range commits { + size := int64(commit.size) + it := iterBlocks(io.NewSectionReader(lf, start, size), size) + for bi, block := range it { + bi.Commit = *commit + bi.Index += adj + bi.Start += uint64(start) + if begin <= bi.Index { + if !yield(bi, block) { + return + } + } + } + + start += size + int64(commit.tsize) + adj = commit.count + } + } +} + +type blockInfo struct{ + Index uint64 + Commit Commit + Start uint64 + Size uint64 + Hash []byte +} + +func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[blockInfo, io.Reader] { + var start int64 + var i uint64 + var bi blockInfo + + return func(yield func(blockInfo, io.Reader) bool) { + buf := make([]byte, maxBlockSize) + for start < end { + block := &Block{} + buf = buf[:10] + n, err := rsr(r, int64(start), 10).ReadAt(buf, 0) + if n == 0 && err != nil { + return + } + start += int64(n) + + if err := block.header.UnmarshalBinary(buf); err != nil { + return + } + buf = buf[:maxBlockSize] + n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0) + if n == 0 && err != nil { + return + } + buf = buf[:n] + err = block.UnmarshalBinary(buf) + if err != nil { + return + } + + bi.Index = i + bi.Start = uint64(start) + bi.Size = block.size + bi.Hash = block.extra + + if !yield(bi, io.NewSectionReader(r, int64(start), int64(block.size))) { + return + } + + i++ + start += int64(block.end) + } + } +} + +func (lf *LogReader) iterCommits() iter.Seq[Commit] { + if lf.end == 0 { + return slices.Values([]Commit(nil)) + } + + offset := lf.end - lf.size - uint64(lf.tsize) + return func(yield func(Commit) bool) { + if !yield(lf.Commit) { + return + } + + buf := make([]byte, maxCommitSize) + + for offset > 10 { + commit := Commit{} + buf = buf[:10] + n, err := rsr(lf, 10, int64(offset)).ReadAt(buf, 0) + if n == 0 && err != nil { + lf.Err = err + return + } + buf = buf[:n] + err = commit.UnmarshalBinary(buf) + if err != nil { + lf.Err = err + return + } + if !yield(commit) { + return + } + offset -= commit.size + uint64(commit.tsize) + } + } +} + +func (lf *LogReader) Rev(begin uint64) iter.Seq2[blockInfo, io.Reader] { + end := lf.end + 10 + bi := blockInfo{} + bi.Index = lf.count-1 + + return func(yield func(blockInfo, io.Reader) bool) { + buf := make([]byte, maxBlockSize) + + for commit := range lf.iterCommits() { + end -= uint64(commit.tsize) + start := end - commit.size + + bi.Commit = commit + + for start < end { + block := &Block{} + buf = buf[:maxBlockSize] + n, err := rsr(lf, int64(start), int64(commit.size)).ReadAt(buf, 0) + if n == 0 && err != nil { + lf.Err = err + return + } + buf = buf[:n] + err = block.UnmarshalBinary(buf) + if err != nil { + lf.Err = err + return + } + if begin >= bi.Index { + + bi.Start = uint64(end-block.size)-uint64(block.tsize) + bi.Size = block.size + + buf = buf[:10] + _, err = rsr(lf, int64(bi.Start)-10, 10).ReadAt(buf, 0) + if err != nil { + lf.Err = err + return + } + err = block.header.UnmarshalBinary(buf) + if err != nil { + lf.Err = err + return + } + + bi.Hash = block.extra + + if !yield(bi, io.NewSectionReader(lf, int64(bi.Start), int64(bi.Size))) { + return + } + } + end -= block.size + 10 + uint64(block.tsize) + bi.Index-- + } + + } + } +} + +func (lf *LogReader) Count() uint64 { + return lf.count +} + +func (lf *LogReader) Size() uint64 { + return lf.end + 10 +} + +func rsr(r io.ReaderAt, offset, size int64) *revSegmentReader { + r = io.NewSectionReader(r, offset, size) + return &revSegmentReader{r, size} +} + +type revSegmentReader struct { + io.ReaderAt + size int64 +} + +func (r *revSegmentReader) ReadAt(data []byte, offset int64) (int, error) { + if offset < 0 { + return 0, errors.New("negative offset") + } + + if offset > int64(r.size) { + return 0, io.EOF + } + + o := r.size - int64(len(data)) - offset + d := int64(len(data)) + if o < 0 { + d = max(0, d+o) + } + + i, err := r.ReaderAt.ReadAt(data[:d], max(0, o)) + reverse(data[:i]) + return i, err } diff --git a/lsm/sst_test.go b/lsm/sst_test.go index 7e842ac..e8c3af8 100644 --- a/lsm/sst_test.go +++ b/lsm/sst_test.go @@ -1,327 +1,300 @@ -// SPDX-FileCopyrightText: 2023 Jon Lundy -// SPDX-License-Identifier: BSD-3-Clause - package lsm import ( "bytes" - crand "crypto/rand" "encoding/base64" + "errors" "io" - "io/fs" - "math/rand" - "os" - "sort" - "sync" + "iter" + "slices" "testing" - "time" + "github.com/docopt/docopt-go" "github.com/matryer/is" ) -func TestLargeFile(t *testing.T) { - is := is.New(t) - - segCount := 4098 - - f := randFile(t, 2_000_000, segCount) - - sf, err := ReadFile(f) - is.NoErr(err) - - is.True(len(sf.segments) <= segCount) - var needle []byte - for i, s := range sf.segments { - e, err := s.FirstEntry() - is.NoErr(err) - k, v := e.KeyValue() - needle = k - t.Logf("Segment-%d: %s = %d", i, k, v) +// TestWriteLogFile tests AppendLogFile and WriteLogFile against a set of test cases. +// +// Each test case contains a slice of slices of io.Readers, which are passed to +// AppendLogFile and WriteLogFile in order. The test case also contains the +// expected encoded output as a base64 string, as well as the expected output +// when the file is read back using ReadLogFile. +// +// The test case also contains the expected output when the file is read back in +// reverse order using ReadLogFile.Rev(). +// +// The test cases are as follows: +// +// - nil reader: Passes a nil slice of io.Readers to WriteLogFile. +// - err reader: Passes a slice of io.Readers to WriteLogFile which returns an +// error when read. +// - single reader: Passes a single io.Reader to WriteLogFile. +// - multiple readers: Passes a slice of multiple io.Readers to WriteLogFile. +// - multiple commit: Passes multiple slices of io.Readers to AppendLogFile. +// - multiple commit 3x: Passes multiple slices of io.Readers to AppendLogFile +// three times. +// +// The test uses the is package from github.com/matryer/is to check that the +// output matches the expected output. +func TestWriteLogFile(t *testing.T) { + type test struct { + name string + in [][]io.Reader + enc string + out [][]byte + rev [][]byte } - t.Log(f.Stat()) - - tt, ok, err := sf.Find(needle, true) - is.NoErr(err) - is.True(ok) - key, val := tt.KeyValue() - t.Log(string(key), val) - - tt, ok, err = sf.Find([]byte("needle"), false) - is.NoErr(err) - is.True(!ok) - key, val = tt.KeyValue() - t.Log(string(key), val) - - tt, ok, err = sf.Find([]byte{'\xff'}, false) - is.NoErr(err) - is.True(!ok) - key, val = tt.KeyValue() - t.Log(string(key), val) -} - -func TestLargeFileDisk(t *testing.T) { - is := is.New(t) - - segCount := 4098 - - t.Log("generate large file") - f := randFile(t, 2_000_000, segCount) - - fd, err := os.CreateTemp("", "sst*") - is.NoErr(err) - defer func() { t.Log("cleanup:", fd.Name()); fd.Close(); os.Remove(fd.Name()) }() - - t.Log("write file:", fd.Name()) - _, err = io.Copy(fd, f) - is.NoErr(err) - fd.Seek(0, 0) - - sf, err := ReadFile(fd) - is.NoErr(err) - - is.True(len(sf.segments) <= segCount) - var needle []byte - for i, s := range sf.segments { - e, err := s.FirstEntry() - is.NoErr(err) - k, v := e.KeyValue() - needle = k - - ok, err := s.VerifyHash() - is.NoErr(err) - - t.Logf("Segment-%d: %s = %d %t", i, k, v, ok) - is.True(ok) - } - t.Log(f.Stat()) - - tt, ok, err := sf.Find(needle, false) - is.NoErr(err) - is.True(ok) - key, val := tt.KeyValue() - t.Log(string(key), val) - - tt, ok, err = sf.Find([]byte("needle"), false) - is.NoErr(err) - is.True(!ok) - key, val = tt.KeyValue() - t.Log(string(key), val) - - tt, ok, err = sf.Find([]byte{'\xff'}, false) - is.NoErr(err) - is.True(!ok) - key, val = tt.KeyValue() - t.Log(string(key), val) -} - -func BenchmarkLargeFile(b *testing.B) { - segCount := 4098 / 4 - f := randFile(b, 2_000_000, segCount) - - sf, err := ReadFile(f) - if err != nil { - b.Error(err) - } - key := make([]byte, 5) - keys := make([][]byte, b.N) - for i := range keys { - _, err = crand.Read(key) - if err != nil { - b.Error(err) - } - keys[i] = []byte(base64.RawURLEncoding.EncodeToString(key)) - } - b.Log("ready", b.N) - b.ResetTimer() - okays := 0 - each := b.N / 10 - for n := 0; n < b.N; n++ { - if each > 0 && n%each == 0 { - b.Log(n) - } - _, ok, err := sf.Find(keys[n], false) - if err != nil { - b.Error(err) - } - if ok { - okays++ - } - } - b.Log("okays=", b.N, okays) -} - -// TestFindRange is an initial range find for start and stop of a range of needles. -// TODO: start the second query from where the first left off. Use an iterator? -func TestFindRange(t *testing.T) { - is := is.New(t) - - f := basicFile(t, - entries{ - {"AD", 5}, - {"AC", 5}, - {"AB", 4}, - {"AB", 3}, + tests := []test{ + { + name: "nil reader", + in: nil, + enc: "U291ci5pcwAAAwACAA", + out: [][]byte{}, + rev: [][]byte{}, }, - entries{ - {"AB", 2}, - {"AA", 1}, + { + name: "err reader", + in: nil, + enc: "U291ci5pcwAAAwACAA", + out: [][]byte{}, + rev: [][]byte{}, }, - ) - sf, err := ReadFile(f) - is.NoErr(err) + { + name: "single reader", + in: [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4})}}, + enc: "U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA", + out: [][]byte{{1, 2, 3, 4}}, + rev: [][]byte{{1, 2, 3, 4}}}, + { + name: "multiple readers", + in: [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4}), + bytes.NewBuffer([]byte{5, 6, 7, 8})}}, + enc: "U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg", + out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, + rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, + { + name: "multiple commit", + in: [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4})}, + { + bytes.NewBuffer([]byte{5, 6, 7, 8})}}, + enc: "U291ci5pcwAAJr56XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEAgIQ", + out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, + rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, + { + name: "multiple commit", + in: [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3, 4}), + bytes.NewBuffer([]byte{5, 6, 7, 8})}, + { + bytes.NewBuffer([]byte{9, 10, 11, 12})}, + }, + enc: "U291ci5pcwAANr56XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIgA4Buuio8Ro0ABgkKCwwBBAMCEA", + out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, + rev: [][]byte{{9, 10, 11, 12}, {5, 6, 7, 8}, {1, 2, 3, 4}}}, + { + name: "multiple commit 3x", + in: [][]io.Reader{ + { + bytes.NewBuffer([]byte{1, 2, 3}), + bytes.NewBuffer([]byte{4, 5, 6}), + }, + { + bytes.NewBuffer([]byte{7, 8, 9}), + }, + { + bytes.NewBuffer([]byte{10, 11, 12}), + bytes.NewBuffer([]byte{13, 14, 15}), + }, + }, + enc: "U291ci5pcwAAVNCqYhhnLPWrAAUBAgMBA7axWhhYd+HsAAUEBQYBAwICHr9ryhhdbkEZAAUHCAkBAwMCDy/UIhidCwCqAAUKCwwBA/NCwhh6wXgXAAUNDg8BAwUCHg", + out: [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}}, + rev: [][]byte{{13, 14, 15}, {10, 11, 12}, {7, 8, 9}, {4, 5, 6}, {1, 2, 3}}}, + } - var ok bool - var first, last *entryBytes + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + is := is.New(t) + buf := &buffer{} - first, ok, err = sf.Find([]byte("AB"), true) - is.NoErr(err) + buffers := 0 - key, val := first.KeyValue() - t.Log(string(key), val) + if len(test.in) == 0 { + err := WriteLogFile(buf, slices.Values([]io.Reader{})) + is.NoErr(err) + } + for i, in := range test.in { + buffers += len(in) - is.True(ok) - is.Equal(key, []byte("AB")) - is.Equal(val, uint64(2)) + if i == 0 { + err := WriteLogFile(buf, slices.Values(in)) + is.NoErr(err) + } else { + err := AppendLogFile(buf, slices.Values(in)) + is.NoErr(err) + } + } + + is.Equal(base64.RawStdEncoding.EncodeToString(buf.Bytes()), test.enc) - last, ok, err = sf.Find([]byte("AB"), false) - is.NoErr(err) + files, err := ReadLogFile(bytes.NewReader(buf.Bytes())) + is.NoErr(err) - key, val = last.KeyValue() - t.Log(string(key), val) + is.Equal(files.Size(), uint64(len(buf.Bytes()))) - is.True(ok) - is.Equal(key, []byte("AB")) - is.Equal(val, uint64(4)) + i := 0 + for bi, fp := range files.Iter(0) { + buf, err := io.ReadAll(fp) + is.NoErr(err) + hash := hash() + hash.Write(buf) + is.Equal(bi.Hash, hash.Sum(nil)[:len(bi.Hash)]) - last, ok, err = sf.Find([]byte("AC"), false) - is.NoErr(err) + is.True(len(test.out) > int(bi.Index)) + is.Equal(buf, test.out[bi.Index]) + i++ + } + is.NoErr(files.Err) + is.Equal(i, buffers) - key, val = last.KeyValue() - t.Log(string(key), val) + i = 0 + for bi, fp := range files.Rev(files.Count()) { + buf, err := io.ReadAll(fp) + is.NoErr(err) - is.True(ok) - is.Equal(key, []byte("AC")) - is.Equal(val, uint64(5)) + hash := hash() + hash.Write(buf) + is.Equal(bi.Hash, hash.Sum(nil)[:len(bi.Hash)]) + + is.Equal(buf, test.rev[i]) + is.Equal(buf, test.out[bi.Index]) + i++ + } + is.NoErr(files.Err) + is.Equal(i, buffers) + is.Equal(files.Count(), uint64(i)) + }) + } } -func randFile(t interface { - Helper() - Error(...any) -}, size int, segments int) fs.File { - t.Helper() +// TestArgs tests that the CLI arguments are correctly parsed. +func TestArgs(t *testing.T) { + is := is.New(t) + usage := `Usage: lsm2 create ...` - lis := make(listEntries, size) - for i := range lis { - key := make([]byte, 5) - _, err := crand.Read(key) - if err != nil { - t.Error(err) + arguments, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0") + is.NoErr(err) + + var params struct { + Create bool `docopt:"create"` + Archive string `docopt:""` + Files []string `docopt:""` + } + err = arguments.Bind(¶ms) + is.NoErr(err) + + is.Equal(params.Create, true) + is.Equal(params.Archive, "archive") + is.Equal(params.Files, []string{"file1", "file2"}) +} + +func BenchmarkIterate(b *testing.B) { + block := make([]byte, 1024) + buf := &buffer{} + + + b.Run("write", func(b *testing.B) { + WriteLogFile(buf, func(yield func(io.Reader) bool) { + for range (b.N) { + if !yield(bytes.NewBuffer(block)) { + break + } + } + }) + }) + + b.Run("read", func(b *testing.B) { + lf, _ := ReadLogFile(buf) + b.Log(lf.Count()) + for range (b.N) { + for _, fp := range lf.Iter(0) { + _, _ = io.Copy(io.Discard, fp) + break + } } - key = []byte(base64.RawURLEncoding.EncodeToString(key)) - // key := []byte(fmt.Sprintf("key-%05d", i)) + }) - lis[i] = NewKeyValue(key, rand.Uint64()%16_777_216) - } - - sort.Sort(sort.Reverse(&lis)) - each := size / segments - if size%segments != 0 { - each++ - } - split := make([]listEntries, segments) - - for i := range split { - if (i+1)*each > len(lis) { - split[i] = lis[i*each : i*each+len(lis[i*each:])] - split = split[:i+1] - break + b.Run("rev", func(b *testing.B) { + lf, _ := ReadLogFile(buf) + b.Log(lf.Count()) + for range (b.N) { + for _, fp := range lf.Rev(lf.Count()) { + _, _ = io.Copy(io.Discard, fp) + break + } } - split[i] = lis[i*each : (i+1)*each] + }) +} + +type buffer struct { + buf []byte +} + +// Bytes returns the underlying byte slice of the bufferWriterAt. +func (b *buffer) Bytes() []byte { + return b.buf +} + +// WriteAt implements io.WriterAt. It appends data to the internal buffer +// if the offset is beyond the current length of the buffer. It will +// return an error if the offset is negative. +func (b *buffer) WriteAt(data []byte, offset int64) (written int, err error) { + if offset < 0 { + return 0, errors.New("negative offset") } - var b bytes.Buffer - for _, s := range split { - s.WriteTo(&b) + currentLength := int64(len(b.buf)) + if currentLength < offset+int64(len(data)) { + b.buf = append(b.buf, make([]byte, offset+int64(len(data))-currentLength)...) } - return NewFile(b.Bytes()) + written = copy(b.buf[offset:], data) + return } -type fakeStat struct { - size int64 -} - -// IsDir implements fs.FileInfo. -func (*fakeStat) IsDir() bool { - panic("unimplemented") -} - -// ModTime implements fs.FileInfo. -func (*fakeStat) ModTime() time.Time { - panic("unimplemented") -} - -// Mode implements fs.FileInfo. -func (*fakeStat) Mode() fs.FileMode { - panic("unimplemented") -} - -// Name implements fs.FileInfo. -func (*fakeStat) Name() string { - panic("unimplemented") -} - -// Size implements fs.FileInfo. -func (s *fakeStat) Size() int64 { - return s.size -} - -// Sys implements fs.FileInfo. -func (*fakeStat) Sys() any { - panic("unimplemented") -} - -var _ fs.FileInfo = (*fakeStat)(nil) - -type rd interface { - io.ReaderAt - io.Reader -} -type fakeFile struct { - stat func() fs.FileInfo - - rd -} - -func (fakeFile) Close() error { return nil } -func (f fakeFile) Stat() (fs.FileInfo, error) { return f.stat(), nil } - -func NewFile(b ...[]byte) fs.File { - in := bytes.Join(b, nil) - rd := bytes.NewReader(in) - size := int64(len(in)) - return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: size} }, rd: rd} -} -func NewFileFromReader(rd *bytes.Reader) fs.File { - return &fakeFile{stat: func() fs.FileInfo { return &fakeStat{size: int64(rd.Len())} }, rd: rd} -} - -type fakeFS struct { - files map[string]*fakeFile - mu sync.RWMutex -} - -// Open implements fs.FS. -func (f *fakeFS) Open(name string) (fs.File, error) { - f.mu.RLock() - defer f.mu.RUnlock() - - if file, ok := f.files[name]; ok { - return file, nil +// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting +// from the specified offset and writes it into the provided data slice. If the +// offset is negative, it returns an error. If the requested read extends beyond +// the buffer's length, it returns the data read so far along with an io.EOF error. +func (b *buffer) ReadAt(data []byte, offset int64) (int, error) { + if offset < 0 { + return 0, errors.New("negative offset") } - return nil, fs.ErrNotExist + if offset > int64(len(b.buf)) || len(b.buf[offset:]) < len(data) { + return copy(data, b.buf[offset:]), io.EOF + } + + return copy(data, b.buf[offset:]), nil } -var _ fs.FS = (*fakeFS)(nil) +// IterOne takes an iterator that yields values of type T along with a value of +// type I, and returns an iterator that yields only the values of type T. It +// discards the values of type I. +func IterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] { + return func(yield func(T) bool) { + for i, v := range it { + _ = i + if !yield(v) { + return + } + } + } +} diff --git a/lsm2/cli/main.go b/lsm2/cli/main.go deleted file mode 100644 index 2aa7362..0000000 --- a/lsm2/cli/main.go +++ /dev/null @@ -1,120 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "io" - "iter" - "os" - - "github.com/docopt/docopt-go" - "go.sour.is/pkg/lsm2" -) - -var usage = ` -Usage: - lsm2 create ... - lsm2 append ... - lsm2 read ` - -type args struct { - Create bool - Append bool - Read bool - - Archive string `docopt:""` - Files []string `docopt:""` - Index int64 `docopt:""` -} - -func main() { - opts, err := docopt.ParseDoc(usage) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Println(opts, must(opts.Bool("create"))) - console := console{os.Stdin, os.Stdout, os.Stderr} - args := args{} - err = opts.Bind(&args) - fmt.Println(err) - run(console, args) -} - -type console struct { - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer -} - -func (c console) Write(b []byte) (int, error) { - return c.Stdout.Write(b) -} - -func run(console console,a args) error { - fmt.Fprintln(console, "lsm") - switch { - case a.Create: - f, err := os.OpenFile(a.Archive, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer f.Close() - - return lsm2.WriteLogFile(f, fileReaders(a.Files)) - case a.Append: - f, err := os.OpenFile(a.Archive, os.O_RDWR, 0644) - if err != nil { - return err - } - defer f.Close() - - return lsm2.AppendLogFile(f, fileReaders(a.Files)) - case a.Read: - fmt.Fprintln(console, "reading", a.Archive) - - f, err := os.Open(a.Archive) - if err != nil { - return err - } - defer f.Close() - lg, err := lsm2.ReadLogFile(f) - if err != nil { - return err - } - for i, rd := range lg.Iter() { - fmt.Fprintf(console, "=========================\n%d:\n", i) - io.Copy(console, rd) - fmt.Fprintln(console, "=========================") - } - if lg.Err != nil { - return lg.Err - } - return nil - default: - return errors.New("unknown command") - } -} - -func fileReaders(names []string) iter.Seq[io.Reader] { - return iter.Seq[io.Reader](func(yield func(io.Reader) bool) { - for _, name := range names { - f, err := os.Open(name) - if err != nil { - continue - } - if !yield(f) { - f.Close() - return - } - f.Close() - } - }) -} - -func must[T any](v T, err error) T { - if err != nil { - panic(err) - } - return v -} diff --git a/lsm2/sst.go b/lsm2/sst.go deleted file mode 100644 index e3cf76d..0000000 --- a/lsm2/sst.go +++ /dev/null @@ -1,581 +0,0 @@ -package lsm2 - -import ( - "encoding/binary" - "errors" - "fmt" - "hash/fnv" - "io" - "iter" -) - -// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size] - -// Commit1: [magic>| end = seek to end of file -// <---|-------------+ size = seek to magic header -// <---|-------------+10 size + 10 = seek to start of file -// <-----------------------------T+10----------------> 10 + size + trailer = full file size - -// Commit2: [magic>| -// +--------|-------------------------------------------------------------------------> -// <-------------------------------------|----------------+ -// prev = seek to last commit <---|-+ -// prev + trailer = size of commit <----T+---------------------------------> - -// Block: [hash>| end = seek to end of block -// <---|-+ size = seek to end of header -// <-------------------|-+10 size + 10 = seek to start of block -// <---------------------T+10---------------> size + 10 + trailer = full block size - -const ( - TypeUnknown uint64 = iota - TypeSegment - TypeCommit - TypePrevCommit - - headerSize = 10 - - maxCommitSize = 4 * binary.MaxVarintLen64 - minCommitSize = 3 - - maxBlockSize = 2 * binary.MaxVarintLen64 - minBlockSize = 2 -) - -var ( - Magic = [10]byte([]byte("Sour.is\x00\x00\x00")) - Version = uint8(1) - hash = fnv.New64a - - ErrDecode = errors.New("decode") -) - -type header struct { - end uint64 - extra []byte -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -// It decodes the input binary data into the header struct. -// The function expects the input data to be of a specific size (headerSize), -// otherwise it returns an error indicating bad data. -// It reads the 'end' field from the binary data, updates the 'extra' field, -// and reverses the byte order of 'extra' in place. -func (h *header) UnmarshalBinary(data []byte) error { - if len(data) != headerSize { - return fmt.Errorf("%w: bad data", ErrDecode) - } - - h.extra = make([]byte, headerSize) - copy(h.extra, data) - - var bytesRead int - h.end, bytesRead = binary.Uvarint(h.extra) - reverse(h.extra) - h.extra = h.extra[:headerSize-bytesRead] - - return nil -} - -type Commit struct { - flag uint64 // flag values - size uint64 // size of the trailer - count uint64 // number of entries - prev uint64 // previous commit - - tsize int -} - -// Append marshals the trailer into binary form and appends it to data. -// It returns the new slice. -func (h *Commit) AppendTrailer(data []byte) []byte { - h.flag |= TypeCommit - // if h.prev > 0 { - // h.flag |= TypePrevCommit - // } - - size := len(data) - data = binary.AppendUvarint(data, h.size) - data = binary.AppendUvarint(data, h.flag) - data = binary.AppendUvarint(data, h.count) - // if h.prev > 0 { - // data = binary.AppendUvarint(data, h.prev) - // } - reverse(data[size:]) - - return data -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -// It reads a trailer from binary data, and sets the fields -// of the receiver to the values found in the header. -func (h *Commit) UnmarshalBinary(data []byte) error { - if len(data) < minCommitSize { - return fmt.Errorf("%w: bad data", ErrDecode) - } - - var n int - h.size, n = binary.Uvarint(data) - data = data[n:] - h.tsize += n - - h.flag, n = binary.Uvarint(data) - data = data[n:] - h.tsize += n - - h.count, n = binary.Uvarint(data) - data = data[n:] - h.tsize += n - - // h.prev = h.size - if h.flag&TypePrevCommit == TypePrevCommit { - h.prev, n = binary.Uvarint(data) - h.tsize += n - } - - return nil -} - -type Block struct { - header - - size uint64 - flag uint64 - - tsize int -} - -func (h *Block) AppendHeader(data []byte) []byte { - size := len(data) - data = append(data, make([]byte, 10)...) - copy(data, h.extra) - if h.size == 0 { - return data - } - hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) - reverse(hdata) - copy(data[size+10-len(hdata):], hdata) - - return data -} - -// AppendTrailer marshals the footer into binary form and appends it to data. -// It returns the new slice. -func (h *Block) AppendTrailer(data []byte) []byte { - size := len(data) - - h.flag |= TypeSegment - data = binary.AppendUvarint(data, h.size) - data = binary.AppendUvarint(data, h.flag) - reverse(data[size:]) - - return data -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -// It reads a footer from binary data, and sets the fields -// of the receiver to the values found in the footer. -func (h *Block) UnmarshalBinary(data []byte) error { - if len(data) < minBlockSize { - return fmt.Errorf("%w: bad data", ErrDecode) - } - - var n int - h.size, n = binary.Uvarint(data) - data = data[n:] - h.tsize += n - - h.flag, n = binary.Uvarint(data) - data = data[n:] - h.tsize += n - - copy(h.extra, data[:8]) - - return nil -} - -type logFile struct { - header - Commit -} - -func (h *logFile) AppendMagic(data []byte) []byte { - size := len(data) - data = append(data, Magic[:]...) - if h.end == 0 { - return data - } - hdata := binary.AppendUvarint(make([]byte, 0, 10), h.end) - reverse(hdata) - copy(data[size+10-len(hdata):], hdata) - - return data -} - -// WriteLogFile writes a log file to w, given a list of segments. -// The caller is responsible for calling WriteAt on the correct offset. -// The function will return an error if any of the segments fail to write. -// The offset is the initial offset of the first segment, and will be -// incremented by the length of the segment on each write. -// -// The log file is written with the following format: -// - A header with the magic, version, and flag (Dirty) -// - A series of segments, each with: -// - A footer with the length and hash of the segment -// - The contents of the segment -// - A header with the magic, version, flag (Clean), and end offset -func WriteLogFile(w io.WriterAt, segments iter.Seq[io.Reader]) error { - _, err := w.WriteAt(Magic[:], 0) - if err != nil { - return err - } - - lf := &LogWriter{ - WriterAt: w, - } - - return lf.writeIter(segments) -} - -type rw interface { - io.ReaderAt - io.WriterAt -} - -func AppendLogFile(rw rw, segments iter.Seq[io.Reader]) error { - logFile, err := ReadLogFile(rw) - if err != nil { - return err - } - lf := &LogWriter{ - WriterAt: rw, - logFile: logFile.logFile, - } - return lf.writeIter( segments) -} - - -func (lf *LogWriter) writeIter(segments iter.Seq[io.Reader]) error { - lf.size = 0 - for s := range segments { - n, err := lf.writeBlock(s) - if err != nil { - return err - } - lf.end += n - lf.size += n - lf.count++ - } - - // Write the footer to the log file. - // The footer is written at the current end of file position. - n, err := lf.WriteAt(lf.AppendTrailer(make([]byte, 0, maxCommitSize)), int64(lf.end)+10) - if err != nil { - // If there is an error, return it. - return err - } - lf.end += uint64(n) - - _, err = lf.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0) - - return err -} - -type LogWriter struct { - logFile - io.WriterAt -} - -// writeBlock writes a segment to the log file at the current end of file position. -// The segment is written in chunks of 1024 bytes, and the hash of the segment -func (lf *LogWriter) writeBlock(segment io.Reader) (uint64, error) { - h := hash() - block := Block{} - - start := int64(lf.end) + 10 - end := start - - bytesWritten := 0 - - // Write the header to the log file. - // The footer is written at the current end of file position. - n, err := lf.WriteAt(make([]byte, headerSize), start) - bytesWritten += n - end += int64(n) - if err != nil { - // If there is an error, return it. - return uint64(bytesWritten), err - } - - // Write the segment to the log file. - // The segment is written in chunks of 1024 bytes. - for { - // Read a chunk of the segment. - buf := make([]byte, 1024) - n, err := segment.Read(buf) - if err != nil { - // If the segment is empty, break the loop. - if err == io.EOF { - break - } - // If there is an error, return it. - return uint64(bytesWritten), err - } - - // Compute the hash of the chunk. - h.Write(buf[:n]) - - // Write the chunk to the log file. - // The chunk is written at the current end of file position. - n, err = lf.WriteAt(buf[:n], end) - bytesWritten += n - if err != nil { - // If there is an error, return it. - return uint64(bytesWritten), err - } - - // Update the length of the segment. - end += int64(n) - block.size += uint64(n) - } - - block.extra = h.Sum(nil) - block.end += block.size - - // Write the footer to the log file. - // The footer is written at the current end of file position. - n, err = lf.WriteAt(block.AppendTrailer(make([]byte, 0, maxBlockSize)), end) - bytesWritten += n - if err != nil { - // If there is an error, return it. - return uint64(bytesWritten), err - } - end += int64(n) - block.end += uint64(n) - - // Update header to the log file. - // The footer is written at the current end of file position. - _, err = lf.WriteAt(block.AppendHeader(make([]byte, 0, headerSize)), start) - if err != nil { - // If there is an error, return it. - return uint64(bytesWritten), err - } - - return uint64(bytesWritten), nil -} - -// reverse reverses a slice in-place. -func reverse[T any](b []T) { - l := len(b) - for i := 0; i < l/2; i++ { - b[i], b[l-i-1] = b[l-i-1], b[i] - } -} - -type LogReader struct { - logFile - io.ReaderAt - Err error -} - -// ReadLogFile reads a log file from the given io.ReaderAt. It returns a pointer to a LogFile, or an error if the file -// could not be read. -func ReadLogFile(reader io.ReaderAt) (*LogReader, error) { - header := make([]byte, headerSize) - n, err := rsr(reader, 0, 10).ReadAt(header, 0) - if err != nil { - return nil, err - } - header = header[:n] - - logFile := &LogReader{ReaderAt: reader} - err = logFile.header.UnmarshalBinary(header) - if err != nil { - return nil, err - } - - if logFile.end == 0 { - return logFile, nil - } - - commit := make([]byte, maxCommitSize) - n, err = rsr(reader, 10, int64(logFile.end)).ReadAt(commit, 0) - if n == 0 && err != nil { - return nil, err - } - commit = commit[:n] - - err = logFile.Commit.UnmarshalBinary(commit) - - return logFile, err -} - -// Iterate reads the log file and calls the given function for each segment. -// It passes an io.Reader that reads from the current segment. It will stop -// calling the function if the function returns false. -func (lf *LogReader) Iter() iter.Seq2[uint64, io.Reader] { - var commits []*Commit - for commit := range lf.iterCommits() { - commits = append(commits, &commit) - } - if lf.Err != nil { - return func(yield func(uint64, io.Reader) bool) {} - } - - reverse(commits) - - return func(yield func(uint64, io.Reader) bool) { - start := int64(10) - var adj uint64 - for _, commit := range commits { - size := int64(commit.size) - it := iterBlocks(io.NewSectionReader(lf, start, size), size) - for i, block := range it { - if !yield(adj+i, block) { - return - } - } - - start += size + int64(commit.tsize) - adj = commit.count - } - } -} - -func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[uint64, io.Reader] { - var start int64 - var i uint64 - return func(yield func(uint64, io.Reader) bool) { - buf := make([]byte, maxBlockSize) - for start < end { - block := &Block{} - buf = buf[:10] - n, err := rsr(r, int64(start), 10).ReadAt(buf, 0) - if n == 0 && err != nil { - return - } - start += int64(n) - - if err := block.header.UnmarshalBinary(buf); err != nil { - return - } - buf = buf[:maxBlockSize] - n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0) - if n == 0 && err != nil { - return - } - buf = buf[:n] - err = block.UnmarshalBinary(buf) - if err != nil { - return - } - - if !yield(i, io.NewSectionReader(r, int64(start), int64(block.size))) { - return - } - - i++ - start += int64(block.end) - } - } -} - -func (lf *LogReader) iterCommits() iter.Seq[Commit] { - if lf.end == 0 { - return func(yield func(Commit) bool) {} - } - - offset := lf.end - lf.size - uint64(lf.tsize) - return func(yield func(Commit) bool) { - if !yield(lf.Commit) { - return - } - - for offset > 10 { - commit := Commit{} - buf := make([]byte, maxCommitSize) - n, err := rsr(lf, 10, int64(offset)).ReadAt(buf, 0) - if n == 0 && err != nil { - lf.Err = err - return - } - buf = buf[:n] - err = commit.UnmarshalBinary(buf) - if err != nil { - lf.Err = err - return - } - if !yield(commit) { - return - } - offset -= commit.size + uint64(commit.tsize) - } - } -} - -func (lf *LogReader) Rev() iter.Seq2[uint64, io.Reader] { - end := lf.end + 10 - i := lf.count - return func(yield func(uint64, io.Reader) bool) { - - for commit := range lf.iterCommits() { - end -= uint64(commit.tsize) - start := end - commit.size - for start < end { - block := &Block{} - buf := make([]byte, maxBlockSize) - n, err := rsr(lf, int64(start), int64(commit.size)).ReadAt(buf, 0) - if n == 0 && err != nil { - lf.Err = err - return - } - buf = buf[:n] - err = block.UnmarshalBinary(buf) - if err != nil { - lf.Err = err - return - } - if !yield(i-1, io.NewSectionReader(lf, int64(end-block.size)-int64(block.tsize), int64(block.size))) { - return - } - end -= block.size + 10 + uint64(block.tsize) - i-- - } - - } - } -} - -func rsr(r io.ReaderAt, offset, size int64) *revSegmentReader { - r = io.NewSectionReader(r, offset, size) - return &revSegmentReader{r, size} -} - -type revSegmentReader struct { - io.ReaderAt - size int64 -} - -func (r *revSegmentReader) ReadAt(data []byte, offset int64) (int, error) { - if offset < 0 { - return 0, errors.New("negative offset") - } - - if offset > int64(r.size) { - return 0, io.EOF - } - - o := r.size - int64(len(data)) - offset - d := int64(len(data)) - if o < 0 { - d = max(0, d+o) - } - - i, err := r.ReaderAt.ReadAt(data[:d], max(0, o)) - reverse(data[:i]) - return i, err -} diff --git a/lsm2/sst_test.go b/lsm2/sst_test.go deleted file mode 100644 index fc6a72e..0000000 --- a/lsm2/sst_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package lsm2 - -import ( - "bytes" - "encoding/base64" - "errors" - "io" - "iter" - "slices" - "testing" - - "github.com/docopt/docopt-go" - "github.com/matryer/is" -) - -// TestAppend tests AppendLogFile and WriteLogFile against a set of test cases. -// -// Each test case contains a slice of slices of io.Readers, which are passed to -// AppendLogFile and WriteLogFile in order. The test case also contains the -// expected encoded output as a base64 string, as well as the expected output -// when the file is read back using ReadLogFile. -// -// The test case also contains the expected output when the file is read back in -// reverse order using ReadLogFile.Rev(). -// -// The test cases are as follows: -// -// - nil reader: Passes a nil slice of io.Readers to WriteLogFile. -// - err reader: Passes a slice of io.Readers to WriteLogFile which returns an -// error when read. -// - single reader: Passes a single io.Reader to WriteLogFile. -// - multiple readers: Passes a slice of multiple io.Readers to WriteLogFile. -// - multiple commit: Passes multiple slices of io.Readers to AppendLogFile. -// - multiple commit 3x: Passes multiple slices of io.Readers to AppendLogFile -// three times. -// -// The test uses the is package from github.com/matryer/is to check that the -// output matches the expected output. -func TestAppend(t *testing.T) { - type test struct { - name string - in [][]io.Reader - enc string - out [][]byte - rev [][]byte - } - tests := []test{ - { - name: "nil reader", - in: nil, - enc: "U291ci5pcwAAAwACAA", - out: [][]byte{}, - rev: [][]byte{}, - }, - { - name: "err reader", - in: nil, - enc: "U291ci5pcwAAAwACAA", - out: [][]byte{}, - rev: [][]byte{}, - }, - { - name: "single reader", - in: [][]io.Reader{ - { - bytes.NewBuffer([]byte{1, 2, 3, 4})}}, - enc: "U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA", - out: [][]byte{{1, 2, 3, 4}}, - rev: [][]byte{{1, 2, 3, 4}}}, - { - name: "multiple readers", - in: [][]io.Reader{ - { - bytes.NewBuffer([]byte{1, 2, 3, 4}), - bytes.NewBuffer([]byte{5, 6, 7, 8})}}, - enc: "U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg", - out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, - rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, - { - name: "multiple commit", - in: [][]io.Reader{ - { - bytes.NewBuffer([]byte{1, 2, 3, 4})}, - { - bytes.NewBuffer([]byte{5, 6, 7, 8})}}, - enc: "U291ci5pcwAAJr56XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEAgIQ", - out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}}, - rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}}, - { - name: "multiple commit", - in: [][]io.Reader{ - { - bytes.NewBuffer([]byte{1, 2, 3, 4}), - bytes.NewBuffer([]byte{5, 6, 7, 8})}, - { - bytes.NewBuffer([]byte{9, 10, 11, 12})}, - }, - enc: "U291ci5pcwAANr56XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIgA4Buuio8Ro0ABgkKCwwBBAMCEA", - out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, - rev: [][]byte{{9, 10, 11, 12}, {5, 6, 7, 8}, {1, 2, 3, 4}}}, - { - name: "multiple commit 3x", - in: [][]io.Reader{ - { - bytes.NewBuffer([]byte{1, 2, 3}), - bytes.NewBuffer([]byte{4, 5, 6}), - }, - { - bytes.NewBuffer([]byte{7, 8, 9}), - }, - { - bytes.NewBuffer([]byte{10, 11, 12}), - bytes.NewBuffer([]byte{13, 14, 15}), - }, - }, - enc: "U291ci5pcwAAVNCqYhhnLPWrAAUBAgMBA7axWhhYd+HsAAUEBQYBAwICHr9ryhhdbkEZAAUHCAkBAwMCDy/UIhidCwCqAAUKCwwBA/NCwhh6wXgXAAUNDg8BAwUCHg", - out: [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}}, - rev: [][]byte{{13, 14, 15}, {10, 11, 12}, {7, 8, 9}, {4, 5, 6}, {1, 2, 3}}}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - is := is.New(t) - buf := &buffer{} - - buffers := 0 - - if len(test.in) == 0 { - err := WriteLogFile(buf, slices.Values([]io.Reader{})) - is.NoErr(err) - } - for i, in := range test.in { - buffers += len(in) - - if i == 0 { - err := WriteLogFile(buf, slices.Values(in)) - is.NoErr(err) - } else { - err := AppendLogFile(buf, slices.Values(in)) - is.NoErr(err) - } - } - - is.Equal(base64.RawStdEncoding.EncodeToString(buf.Bytes()), test.enc) - - files, err := ReadLogFile(bytes.NewReader(buf.Bytes())) - is.NoErr(err) - - i := 0 - for j, fp := range files.Iter() { - buf, err := io.ReadAll(fp) - is.NoErr(err) - - is.True(len(test.out) > int(j)) - is.Equal(buf, test.out[j]) - i++ - } - is.NoErr(files.Err) - is.Equal(i, buffers) - - i = 0 - for j, fp := range files.Rev() { - buf, err := io.ReadAll(fp) - is.NoErr(err) - - is.Equal(buf, test.rev[i]) - is.Equal(buf, test.out[j]) - i++ - } - is.NoErr(files.Err) - is.Equal(i, buffers) - - }) - } -} - -// TestArgs tests that the CLI arguments are correctly parsed. -func TestArgs(t *testing.T) { - is := is.New(t) - usage := `Usage: lsm2 create ...` - - arguments, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0") - is.NoErr(err) - - var params struct { - Create bool `docopt:"create"` - Archive string `docopt:""` - Files []string `docopt:""` - } - err = arguments.Bind(¶ms) - is.NoErr(err) - - is.Equal(params.Create, true) - is.Equal(params.Archive, "archive") - is.Equal(params.Files, []string{"file1", "file2"}) -} - -type buffer struct { - buf []byte -} - -// Bytes returns the underlying byte slice of the bufferWriterAt. -func (b *buffer) Bytes() []byte { - return b.buf -} - -// WriteAt implements io.WriterAt. It appends data to the internal buffer -// if the offset is beyond the current length of the buffer. It will -// return an error if the offset is negative. -func (b *buffer) WriteAt(data []byte, offset int64) (written int, err error) { - if offset < 0 { - return 0, errors.New("negative offset") - } - - currentLength := int64(len(b.buf)) - if currentLength < offset+int64(len(data)) { - b.buf = append(b.buf, make([]byte, offset+int64(len(data))-currentLength)...) - } - - written = copy(b.buf[offset:], data) - return -} - -// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting -// from the specified offset and writes it into the provided data slice. If the -// offset is negative, it returns an error. If the requested read extends beyond -// the buffer's length, it returns the data read so far along with an io.EOF error. -func (b *buffer) ReadAt(data []byte, offset int64) (int, error) { - if offset < 0 { - return 0, errors.New("negative offset") - } - - if offset > int64(len(b.buf)) || len(b.buf[offset:]) < len(data) { - return copy(data, b.buf[offset:]), io.EOF - } - - return copy(data, b.buf[offset:]), nil -} - -// IterOne takes an iterator that yields values of type T along with a value of -// type I, and returns an iterator that yields only the values of type T. It -// discards the values of type I. -func IterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] { - return func(yield func(T) bool) { - for i, v := range it { - _ = i - if !yield(v) { - return - } - } - } -}