chore: fixes to lsm
This commit is contained in:
parent
36460a131e
commit
cf99e18a39
|
@ -6,19 +6,25 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"iter"
|
"iter"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/docopt/docopt-go"
|
"github.com/docopt/docopt-go"
|
||||||
"go.sour.is/pkg/lsm2"
|
"go.sour.is/pkg/lsm2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var usage = `
|
var usage = `
|
||||||
Usage: lsm2 create <archive> <files>...`
|
Usage:
|
||||||
|
lsm2 create <archive> <files>...
|
||||||
|
lsm2 append <archive> <files>...
|
||||||
|
lsm2 read <archive> <index>`
|
||||||
|
|
||||||
type args struct {
|
type args struct {
|
||||||
Create bool
|
Create bool
|
||||||
|
Append bool
|
||||||
|
Read bool
|
||||||
|
|
||||||
Archive string `docopt:"<archive>"`
|
Archive string `docopt:"<archive>"`
|
||||||
Files []string `docopt:"<files>"`
|
Files []string `docopt:"<files>"`
|
||||||
|
Index int64 `docopt:"<index>"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -45,38 +51,67 @@ func (c console) Write(b []byte) (int, error) {
|
||||||
return c.Stdout.Write(b)
|
return c.Stdout.Write(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(console console, args args) error {
|
func run(console console,a args) error {
|
||||||
|
fmt.Fprintln(console, "lsm")
|
||||||
switch {
|
switch {
|
||||||
case args.Create:
|
case a.Create:
|
||||||
fmt.Fprintf(console, "creating %s from %v\n", filepath.Base(args.Archive), args.Files)
|
f, err := os.OpenFile(a.Archive, os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
out, err := os.OpenFile(args.Archive, os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer out.Close()
|
defer f.Close()
|
||||||
|
|
||||||
filesWritten := 0
|
return lsm2.WriteLogFile(f, fileReaders(a.Files))
|
||||||
defer func() { fmt.Fprintln(console, "wrote", filesWritten, "files") }()
|
case a.Append:
|
||||||
|
f, err := os.OpenFile(a.Archive, os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
return lsm2.WriteIter(out, iter.Seq[io.Reader](func(yield func(io.Reader) bool) {
|
return lsm2.AppendLogFile(f, fileReaders(a.Files))
|
||||||
for _, name := range args.Files {
|
case a.Read:
|
||||||
f, err := os.Open(name)
|
fmt.Fprintln(console, "reading", a.Archive)
|
||||||
if err != nil {
|
|
||||||
continue
|
f, err := os.Open(a.Archive)
|
||||||
}
|
if err != nil {
|
||||||
filesWritten++
|
return err
|
||||||
if !yield(f) {
|
}
|
||||||
f.Close()
|
defer f.Close()
|
||||||
return
|
lg, err := lsm2.ReadLogFile(f)
|
||||||
}
|
if err != nil {
|
||||||
f.Close()
|
return err
|
||||||
}
|
}
|
||||||
}))
|
for i, rd := range lg.Iter() {
|
||||||
|
fmt.Fprintf(console, "=========================\n%d:\n", i)
|
||||||
|
io.Copy(console, rd)
|
||||||
|
fmt.Fprintln(console, "=========================")
|
||||||
|
}
|
||||||
|
if lg.Err != nil {
|
||||||
|
return lg.Err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
default:
|
default:
|
||||||
return errors.New("unknown command")
|
return errors.New("unknown command")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fileReaders(names []string) iter.Seq[io.Reader] {
|
||||||
|
return iter.Seq[io.Reader](func(yield func(io.Reader) bool) {
|
||||||
|
for _, name := range names {
|
||||||
|
f, err := os.Open(name)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !yield(f) {
|
||||||
|
f.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func must[T any](v T, err error) T {
|
func must[T any](v T, err error) T {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
288
lsm2/sst.go
288
lsm2/sst.go
|
@ -1,7 +1,5 @@
|
||||||
package lsm2
|
package lsm2
|
||||||
|
|
||||||
// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size]
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -11,6 +9,28 @@ import (
|
||||||
"iter"
|
"iter"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// [Sour.is|size] [size|hash][data][hash|flag|size]... [prev|count|flag|size]
|
||||||
|
|
||||||
|
// Commit1: [magic>|<end]{10} ... [<count][<size][<flag]{3..30}
|
||||||
|
// +---------|--------------------------------> end = seek to end of file
|
||||||
|
// <---|-------------+ size = seek to magic header
|
||||||
|
// <---|-------------+10 size + 10 = seek to start of file
|
||||||
|
// <-----------------------------T+10----------------> 10 + size + trailer = full file size
|
||||||
|
|
||||||
|
// Commit2: [magic>|<end]{10} ... [<count][<size][<flag]{3..30} ... [<prev][<count][<size][<flag]{4..40}
|
||||||
|
// <---|---------+
|
||||||
|
// <-------------+T----------------->
|
||||||
|
// +--------|------------------------------------------------------------------------->
|
||||||
|
// <-------------------------------------|----------------+
|
||||||
|
// prev = seek to last commit <---|-+
|
||||||
|
// prev + trailer = size of commit <----T+--------------------------------->
|
||||||
|
|
||||||
|
// Block: [hash>|<end]{10} ... [<size][<flag]{2..20}
|
||||||
|
// +---------|------------------------> end = seek to end of block
|
||||||
|
// <---|-+ size = seek to end of header
|
||||||
|
// <-------------------|-+10 size + 10 = seek to start of block
|
||||||
|
// <---------------------T+10---------------> size + 10 + trailer = full block size
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TypeUnknown uint64 = iota
|
TypeUnknown uint64 = iota
|
||||||
TypeSegment
|
TypeSegment
|
||||||
|
@ -39,40 +59,28 @@ type header struct {
|
||||||
extra []byte
|
extra []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
|
||||||
|
// It decodes the input binary data into the header struct.
|
||||||
|
// The function expects the input data to be of a specific size (headerSize),
|
||||||
|
// otherwise it returns an error indicating bad data.
|
||||||
|
// It reads the 'end' field from the binary data, updates the 'extra' field,
|
||||||
|
// and reverses the byte order of 'extra' in place.
|
||||||
func (h *header) UnmarshalBinary(data []byte) error {
|
func (h *header) UnmarshalBinary(data []byte) error {
|
||||||
if len(data) != 10 {
|
if len(data) != headerSize {
|
||||||
return fmt.Errorf("%w: bad data", ErrDecode)
|
return fmt.Errorf("%w: bad data", ErrDecode)
|
||||||
}
|
}
|
||||||
h.extra = append(h.extra, data...)
|
|
||||||
|
|
||||||
var n int
|
h.extra = make([]byte, headerSize)
|
||||||
h.end, n = binary.Uvarint(h.extra)
|
copy(h.extra, data)
|
||||||
|
|
||||||
|
var bytesRead int
|
||||||
|
h.end, bytesRead = binary.Uvarint(h.extra)
|
||||||
reverse(h.extra)
|
reverse(h.extra)
|
||||||
h.extra = h.extra[:len(h.extra)-n]
|
h.extra = h.extra[:headerSize-bytesRead]
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit1: [magic>|<end]{10} ... [<count][<size][<flag]{3..30}
|
|
||||||
// +---------|--------------------------------> end = seek to end of file
|
|
||||||
// <---|-------------+ size = seek to magic header
|
|
||||||
// <---|-------------+10 size + 10 = seek to start of file
|
|
||||||
// <-----------------------------T+10----------------> 10 + size + trailer = full file size
|
|
||||||
|
|
||||||
// Commit2: [magic>|<end]{10} ... [<count][<size][<flag]{3..30} ... [<prev][<count][<size][<flag]{4..40}
|
|
||||||
// <---|---------+
|
|
||||||
// <-------------+T----------------->
|
|
||||||
// +--------|------------------------------------------------------------------------->
|
|
||||||
// <-------------------------------------|----------------+
|
|
||||||
// prev = seek to last commit <---|-+
|
|
||||||
// prev + trailer = size of commit <----T+--------------------------------->
|
|
||||||
|
|
||||||
// Block: [hash>|<end]{10} ... [<size][<flag]{2..20}
|
|
||||||
// +---------|------------------------> end = seek to end of block
|
|
||||||
// <---|-+ size = seek to end of header
|
|
||||||
// <-------------------|-+10 size + 10 = seek to start of block
|
|
||||||
// <---------------------T+10---------------> size + 10 + trailer = full block size
|
|
||||||
|
|
||||||
type Commit struct {
|
type Commit struct {
|
||||||
flag uint64 // flag values
|
flag uint64 // flag values
|
||||||
size uint64 // size of the trailer
|
size uint64 // size of the trailer
|
||||||
|
@ -85,18 +93,18 @@ type Commit struct {
|
||||||
// Append marshals the trailer into binary form and appends it to data.
|
// Append marshals the trailer into binary form and appends it to data.
|
||||||
// It returns the new slice.
|
// It returns the new slice.
|
||||||
func (h *Commit) AppendTrailer(data []byte) []byte {
|
func (h *Commit) AppendTrailer(data []byte) []byte {
|
||||||
h.flag |= TypePrevCommit
|
h.flag |= TypeCommit
|
||||||
if h.prev == 0 {
|
// if h.prev > 0 {
|
||||||
h.flag &= TypeCommit
|
// h.flag |= TypePrevCommit
|
||||||
}
|
// }
|
||||||
|
|
||||||
size := len(data)
|
size := len(data)
|
||||||
data = binary.AppendUvarint(data, h.size)
|
data = binary.AppendUvarint(data, h.size)
|
||||||
data = binary.AppendUvarint(data, h.flag)
|
data = binary.AppendUvarint(data, h.flag)
|
||||||
data = binary.AppendUvarint(data, h.count)
|
data = binary.AppendUvarint(data, h.count)
|
||||||
if h.prev != 0 {
|
// if h.prev > 0 {
|
||||||
data = binary.AppendUvarint(data, h.prev)
|
// data = binary.AppendUvarint(data, h.prev)
|
||||||
}
|
// }
|
||||||
reverse(data[size:])
|
reverse(data[size:])
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
@ -123,7 +131,7 @@ func (h *Commit) UnmarshalBinary(data []byte) error {
|
||||||
data = data[n:]
|
data = data[n:]
|
||||||
h.tsize += n
|
h.tsize += n
|
||||||
|
|
||||||
h.prev = h.size
|
// h.prev = h.size
|
||||||
if h.flag&TypePrevCommit == TypePrevCommit {
|
if h.flag&TypePrevCommit == TypePrevCommit {
|
||||||
h.prev, n = binary.Uvarint(data)
|
h.prev, n = binary.Uvarint(data)
|
||||||
h.tsize += n
|
h.tsize += n
|
||||||
|
@ -207,9 +215,6 @@ func (h *logFile) AppendMagic(data []byte) []byte {
|
||||||
|
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
func (h *logFile) UnmarshalBinary(data []byte) error {
|
|
||||||
return h.header.UnmarshalBinary(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteLogFile writes a log file to w, given a list of segments.
|
// WriteLogFile writes a log file to w, given a list of segments.
|
||||||
// The caller is responsible for calling WriteAt on the correct offset.
|
// The caller is responsible for calling WriteAt on the correct offset.
|
||||||
|
@ -223,7 +228,7 @@ func (h *logFile) UnmarshalBinary(data []byte) error {
|
||||||
// - A footer with the length and hash of the segment
|
// - A footer with the length and hash of the segment
|
||||||
// - The contents of the segment
|
// - The contents of the segment
|
||||||
// - A header with the magic, version, flag (Clean), and end offset
|
// - A header with the magic, version, flag (Clean), and end offset
|
||||||
func WriteLogFile(w io.WriterAt, segments ...io.Reader) error {
|
func WriteLogFile(w io.WriterAt, segments iter.Seq[io.Reader]) error {
|
||||||
_, err := w.WriteAt(Magic[:], 0)
|
_, err := w.WriteAt(Magic[:], 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -233,13 +238,7 @@ func WriteLogFile(w io.WriterAt, segments ...io.Reader) error {
|
||||||
WriterAt: w,
|
WriterAt: w,
|
||||||
}
|
}
|
||||||
|
|
||||||
return lf.writeIter(w, iter.Seq[io.Reader](func(yield func(io.Reader) bool) {
|
return lf.writeIter(segments)
|
||||||
for _, s := range segments {
|
|
||||||
if !yield(s) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type rw interface {
|
type rw interface {
|
||||||
|
@ -247,7 +246,7 @@ type rw interface {
|
||||||
io.WriterAt
|
io.WriterAt
|
||||||
}
|
}
|
||||||
|
|
||||||
func AppendLogFile(rw rw, segments ...io.Reader) error {
|
func AppendLogFile(rw rw, segments iter.Seq[io.Reader]) error {
|
||||||
logFile, err := ReadLogFile(rw)
|
logFile, err := ReadLogFile(rw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -256,35 +255,19 @@ func AppendLogFile(rw rw, segments ...io.Reader) error {
|
||||||
WriterAt: rw,
|
WriterAt: rw,
|
||||||
logFile: logFile.logFile,
|
logFile: logFile.logFile,
|
||||||
}
|
}
|
||||||
return lf.writeIter(rw, iter.Seq[io.Reader](func(yield func(io.Reader) bool) {
|
return lf.writeIter( segments)
|
||||||
for _, s := range segments {
|
|
||||||
if !yield(s) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func WriteIter(w io.WriterAt, segments iter.Seq[io.Reader]) error {
|
|
||||||
_, err := w.WriteAt(Magic[:], 0)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
lf := &LogWriter{
|
func (lf *LogWriter) writeIter(segments iter.Seq[io.Reader]) error {
|
||||||
WriterAt: w,
|
lf.size = 0
|
||||||
}
|
|
||||||
|
|
||||||
return lf.writeIter(w, segments)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lf *LogWriter) writeIter(w io.WriterAt, segments iter.Seq[io.Reader]) error {
|
|
||||||
for s := range segments {
|
for s := range segments {
|
||||||
err := lf.writeSegment(s)
|
n, err := lf.writeBlock(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
lf.end += n
|
||||||
|
lf.size += n
|
||||||
lf.count++
|
lf.count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +280,7 @@ func (lf *LogWriter) writeIter(w io.WriterAt, segments iter.Seq[io.Reader]) erro
|
||||||
}
|
}
|
||||||
lf.end += uint64(n)
|
lf.end += uint64(n)
|
||||||
|
|
||||||
_, err = w.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0)
|
_, err = lf.WriteAt(lf.AppendMagic(make([]byte, 0, 10)), 0)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -307,24 +290,26 @@ type LogWriter struct {
|
||||||
io.WriterAt
|
io.WriterAt
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeSegment writes a segment to the log file at the current end of file position.
|
// writeBlock writes a segment to the log file at the current end of file position.
|
||||||
// The segment is written in chunks of 1024 bytes, and the hash of the segment
|
// The segment is written in chunks of 1024 bytes, and the hash of the segment
|
||||||
func (lf *LogWriter) writeSegment(segment io.Reader) error {
|
func (lf *LogWriter) writeBlock(segment io.Reader) (uint64, error) {
|
||||||
h := hash()
|
h := hash()
|
||||||
head := Block{}
|
block := Block{}
|
||||||
|
|
||||||
start := int64(lf.end) + 10
|
start := int64(lf.end) + 10
|
||||||
end := int64(lf.end) + 10
|
end := start
|
||||||
|
|
||||||
|
bytesWritten := 0
|
||||||
|
|
||||||
// Write the header to the log file.
|
// Write the header to the log file.
|
||||||
// The footer is written at the current end of file position.
|
// The footer is written at the current end of file position.
|
||||||
n, err := lf.WriteAt(make([]byte, headerSize), start)
|
n, err := lf.WriteAt(make([]byte, headerSize), start)
|
||||||
|
bytesWritten += n
|
||||||
|
end += int64(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If there is an error, return it.
|
// If there is an error, return it.
|
||||||
return err
|
return uint64(bytesWritten), err
|
||||||
}
|
}
|
||||||
end += int64(n)
|
|
||||||
lf.size += uint64(n)
|
|
||||||
lf.end += uint64(n)
|
|
||||||
|
|
||||||
// Write the segment to the log file.
|
// Write the segment to the log file.
|
||||||
// The segment is written in chunks of 1024 bytes.
|
// The segment is written in chunks of 1024 bytes.
|
||||||
|
@ -338,7 +323,7 @@ func (lf *LogWriter) writeSegment(segment io.Reader) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// If there is an error, return it.
|
// If there is an error, return it.
|
||||||
return err
|
return uint64(bytesWritten), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute the hash of the chunk.
|
// Compute the hash of the chunk.
|
||||||
|
@ -346,42 +331,41 @@ func (lf *LogWriter) writeSegment(segment io.Reader) error {
|
||||||
|
|
||||||
// Write the chunk to the log file.
|
// Write the chunk to the log file.
|
||||||
// The chunk is written at the current end of file position.
|
// The chunk is written at the current end of file position.
|
||||||
_, err = lf.WriteAt(buf[:n], end)
|
n, err = lf.WriteAt(buf[:n], end)
|
||||||
|
bytesWritten += n
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If there is an error, return it.
|
// If there is an error, return it.
|
||||||
return err
|
return uint64(bytesWritten), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the length of the segment.
|
// Update the length of the segment.
|
||||||
end += int64(n)
|
end += int64(n)
|
||||||
head.size += uint64(n)
|
block.size += uint64(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
head.extra = h.Sum(nil)
|
block.extra = h.Sum(nil)
|
||||||
head.end += head.size
|
block.end += block.size
|
||||||
|
|
||||||
// Write the footer to the log file.
|
// Write the footer to the log file.
|
||||||
// The footer is written at the current end of file position.
|
// The footer is written at the current end of file position.
|
||||||
n, err = lf.WriteAt(head.AppendTrailer(make([]byte, 0, maxBlockSize)), end)
|
n, err = lf.WriteAt(block.AppendTrailer(make([]byte, 0, maxBlockSize)), end)
|
||||||
|
bytesWritten += n
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If there is an error, return it.
|
// If there is an error, return it.
|
||||||
return err
|
return uint64(bytesWritten), err
|
||||||
}
|
}
|
||||||
end += int64(n)
|
end += int64(n)
|
||||||
head.end += uint64(n)
|
block.end += uint64(n)
|
||||||
|
|
||||||
// Update header to the log file.
|
// Update header to the log file.
|
||||||
// The footer is written at the current end of file position.
|
// The footer is written at the current end of file position.
|
||||||
_, err = lf.WriteAt(head.AppendHeader(make([]byte, 0, headerSize)), start)
|
_, err = lf.WriteAt(block.AppendHeader(make([]byte, 0, headerSize)), start)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If there is an error, return it.
|
// If there is an error, return it.
|
||||||
return err
|
return uint64(bytesWritten), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the end of file position.
|
return uint64(bytesWritten), nil
|
||||||
lf.size += head.end
|
|
||||||
lf.end += head.end
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reverse reverses a slice in-place.
|
// reverse reverses a slice in-place.
|
||||||
|
@ -446,16 +430,18 @@ func (lf *LogReader) Iter() iter.Seq2[uint64, io.Reader] {
|
||||||
|
|
||||||
return func(yield func(uint64, io.Reader) bool) {
|
return func(yield func(uint64, io.Reader) bool) {
|
||||||
start := int64(10)
|
start := int64(10)
|
||||||
|
var adj uint64
|
||||||
for _, commit := range commits {
|
for _, commit := range commits {
|
||||||
size := int64(commit.prev)
|
size := int64(commit.size)
|
||||||
it := iterBlocks(io.NewSectionReader(lf, start, size), size)
|
it := iterBlocks(io.NewSectionReader(lf, start, size), size)
|
||||||
for i, block := range it {
|
for i, block := range it {
|
||||||
if !yield(i, block) {
|
if !yield(adj+i, block) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start += size + int64(commit.tsize)
|
start += size + int64(commit.tsize)
|
||||||
|
adj = commit.count
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -464,9 +450,10 @@ func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[uint64, io.Reader] {
|
||||||
var start int64
|
var start int64
|
||||||
var i uint64
|
var i uint64
|
||||||
return func(yield func(uint64, io.Reader) bool) {
|
return func(yield func(uint64, io.Reader) bool) {
|
||||||
|
buf := make([]byte, maxBlockSize)
|
||||||
for start < end {
|
for start < end {
|
||||||
block := &Block{}
|
block := &Block{}
|
||||||
buf := make([]byte, 10)
|
buf = buf[:10]
|
||||||
n, err := rsr(r, int64(start), 10).ReadAt(buf, 0)
|
n, err := rsr(r, int64(start), 10).ReadAt(buf, 0)
|
||||||
if n == 0 && err != nil {
|
if n == 0 && err != nil {
|
||||||
return
|
return
|
||||||
|
@ -476,8 +463,7 @@ func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[uint64, io.Reader] {
|
||||||
if err := block.header.UnmarshalBinary(buf); err != nil {
|
if err := block.header.UnmarshalBinary(buf); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
buf = buf[:maxBlockSize]
|
||||||
buf = make([]byte, maxBlockSize)
|
|
||||||
n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0)
|
n, err = rsr(r, int64(start), int64(block.end)).ReadAt(buf, 0)
|
||||||
if n == 0 && err != nil {
|
if n == 0 && err != nil {
|
||||||
return
|
return
|
||||||
|
@ -498,15 +484,12 @@ func iterBlocks(r io.ReaderAt, end int64) iter.Seq2[uint64, io.Reader] {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (lf *LogReader) iterCommits() iter.Seq[Commit] {
|
func (lf *LogReader) iterCommits() iter.Seq[Commit] {
|
||||||
eof := lf.end + 10
|
if lf.end == 0 {
|
||||||
|
|
||||||
if eof <= 10 {
|
|
||||||
return func(yield func(Commit) bool) {}
|
return func(yield func(Commit) bool) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := eof - 10 - lf.prev - uint64(lf.tsize)
|
offset := lf.end - lf.size - uint64(lf.tsize)
|
||||||
return func(yield func(Commit) bool) {
|
return func(yield func(Commit) bool) {
|
||||||
if !yield(lf.Commit) {
|
if !yield(lf.Commit) {
|
||||||
return
|
return
|
||||||
|
@ -529,50 +512,40 @@ func (lf *LogReader) iterCommits() iter.Seq[Commit] {
|
||||||
if !yield(commit) {
|
if !yield(commit) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
offset -= commit.prev + uint64(commit.tsize)
|
offset -= commit.size + uint64(commit.tsize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (lf *LogReader) Rev() iter.Seq2[uint64, io.Reader] {
|
func (lf *LogReader) Rev() iter.Seq2[uint64, io.Reader] {
|
||||||
// end := lf.end + 10
|
end := lf.end + 10
|
||||||
// i := lf.count
|
i := lf.count
|
||||||
// return func(yield func(uint64, io.Reader) bool) {
|
return func(yield func(uint64, io.Reader) bool) {
|
||||||
|
|
||||||
// for commit := range lf.iterCommits() {
|
for commit := range lf.iterCommits() {
|
||||||
// end -= uint64(commit.tsize)
|
end -= uint64(commit.tsize)
|
||||||
// start := end - commit.prev - uint64(commit.tsize)
|
start := end - commit.size
|
||||||
// for start > end{
|
for start < end {
|
||||||
// block := &Block{}
|
block := &Block{}
|
||||||
// buf := make([]byte, min(maxBlockSize, commit.size))
|
buf := make([]byte, maxBlockSize)
|
||||||
// n, err := lf.ReaderAt.ReadAt(buf, max(0, int64(end)-int64(len(buf))))
|
n, err := rsr(lf, int64(start), int64(commit.size)).ReadAt(buf, 0)
|
||||||
// if n == 0 && err != nil {
|
if n == 0 && err != nil {
|
||||||
// lf.Err = err
|
lf.Err = err
|
||||||
// return
|
return
|
||||||
// }
|
}
|
||||||
// buf = buf[:n]
|
buf = buf[:n]
|
||||||
// err = block.UnmarshalBinary(buf)
|
err = block.UnmarshalBinary(buf)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// lf.Err = err
|
lf.Err = err
|
||||||
// return
|
return
|
||||||
// }
|
}
|
||||||
// if !yield(i, io.NewSectionReader(lf, int64(end-block.size)-int64(block.tsize), int64(block.size))) {
|
if !yield(i-1, io.NewSectionReader(lf, int64(end-block.size)-int64(block.tsize), int64(block.size))) {
|
||||||
// return
|
return
|
||||||
// }
|
}
|
||||||
// end -= block.size + 10 + uint64(block.tsize)
|
end -= block.size + 10 + uint64(block.tsize)
|
||||||
// i--
|
i--
|
||||||
// }
|
|
||||||
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func iterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] {
|
|
||||||
return func(yield func(T) bool) {
|
|
||||||
for _, v := range it {
|
|
||||||
if !yield(v) {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -606,36 +579,3 @@ func (r *revSegmentReader) ReadAt(data []byte, offset int64) (int, error) {
|
||||||
reverse(data[:i])
|
reverse(data[:i])
|
||||||
return i, err
|
return i, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// rdr : 0 1 2|3 4 5 6|7 8 9 10
|
|
||||||
// rsr 6,4: 3 2 1 0
|
|
||||||
// 6
|
|
||||||
// ------- -4
|
|
||||||
// 0 1 2 3
|
|
||||||
// offset - size
|
|
||||||
// rdr : 0 1 2|3 4 5 6|7 8 9 10
|
|
||||||
// 0 1 2 3
|
|
||||||
// offset=0 |-------| d[:4], o=3 3-0=3
|
|
||||||
// offset=1 _|----- | d[:3], o=3 3-1=2
|
|
||||||
// offset=2 ___|--- | d[:2], o=3 3-2=1
|
|
||||||
// offset=3 _____|- | d[:1], o=3 3-3=0
|
|
||||||
// offset=4+_____| | d[:0], o=3 3-4=0
|
|
||||||
|
|
||||||
// rdr : 0 1 2|3 4 5 6|7 8 9 10
|
|
||||||
// offset=0 |-------| d[:4], o=0 -> 3
|
|
||||||
// offset=0 | -----| d[:3], o=1 -> 4
|
|
||||||
// offset=0 | ---| d[:2], o=2 -> 5
|
|
||||||
// offset=0 | -| d[:1], o=3 -> 6
|
|
||||||
// offset=0 | | d[:0], o=4+-> 7
|
|
||||||
|
|
||||||
// rdr : 0 1 2|3 4 5 6|7 8 9 10
|
|
||||||
// offset=4 ___| | d[:0], o=0
|
|
||||||
// offset=3 _|- | d[:1], o=0
|
|
||||||
// offset=2 |--- | d[:2], o=0
|
|
||||||
// offset=1 | --- | d[:2], o=1
|
|
||||||
// offset=0 | ---| d[:2], o=2
|
|
||||||
// offset=-1 | -|_ d[:2], o=3
|
|
||||||
// offset=-2 | |___ d[:2], o=4+
|
|
||||||
|
|
||||||
// o = max(0, offset - len)
|
|
||||||
// d =
|
|
||||||
|
|
168
lsm2/sst_test.go
168
lsm2/sst_test.go
|
@ -4,14 +4,38 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
|
"iter"
|
||||||
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docopt/docopt-go"
|
"github.com/docopt/docopt-go"
|
||||||
"github.com/matryer/is"
|
"github.com/matryer/is"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestAppend tests AppendLogFile and WriteLogFile against a set of test cases.
|
||||||
|
//
|
||||||
|
// Each test case contains a slice of slices of io.Readers, which are passed to
|
||||||
|
// AppendLogFile and WriteLogFile in order. The test case also contains the
|
||||||
|
// expected encoded output as a base64 string, as well as the expected output
|
||||||
|
// when the file is read back using ReadLogFile.
|
||||||
|
//
|
||||||
|
// The test case also contains the expected output when the file is read back in
|
||||||
|
// reverse order using ReadLogFile.Rev().
|
||||||
|
//
|
||||||
|
// The test cases are as follows:
|
||||||
|
//
|
||||||
|
// - nil reader: Passes a nil slice of io.Readers to WriteLogFile.
|
||||||
|
// - err reader: Passes a slice of io.Readers to WriteLogFile which returns an
|
||||||
|
// error when read.
|
||||||
|
// - single reader: Passes a single io.Reader to WriteLogFile.
|
||||||
|
// - multiple readers: Passes a slice of multiple io.Readers to WriteLogFile.
|
||||||
|
// - multiple commit: Passes multiple slices of io.Readers to AppendLogFile.
|
||||||
|
// - multiple commit 3x: Passes multiple slices of io.Readers to AppendLogFile
|
||||||
|
// three times.
|
||||||
|
//
|
||||||
|
// The test uses the is package from github.com/matryer/is to check that the
|
||||||
|
// output matches the expected output.
|
||||||
func TestAppend(t *testing.T) {
|
func TestAppend(t *testing.T) {
|
||||||
type test struct {
|
type test struct {
|
||||||
name string
|
name string
|
||||||
|
@ -22,39 +46,76 @@ func TestAppend(t *testing.T) {
|
||||||
}
|
}
|
||||||
tests := []test{
|
tests := []test{
|
||||||
{
|
{
|
||||||
"nil reader",
|
name: "nil reader",
|
||||||
nil,
|
in: nil,
|
||||||
"U291ci5pcwAAAwACAA",
|
enc: "U291ci5pcwAAAwACAA",
|
||||||
[][]byte{},
|
out: [][]byte{},
|
||||||
[][]byte{},
|
rev: [][]byte{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"single reader",
|
name: "err reader",
|
||||||
[][]io.Reader{
|
in: nil,
|
||||||
|
enc: "U291ci5pcwAAAwACAA",
|
||||||
|
out: [][]byte{},
|
||||||
|
rev: [][]byte{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single reader",
|
||||||
|
in: [][]io.Reader{
|
||||||
{
|
{
|
||||||
bytes.NewBuffer([]byte{1, 2, 3, 4})}},
|
bytes.NewBuffer([]byte{1, 2, 3, 4})}},
|
||||||
"U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA",
|
enc: "U291ci5pcwAAE756XndRZXhdAAYBAgMEAQQBAhA",
|
||||||
[][]byte{{1, 2, 3, 4}},
|
out: [][]byte{{1, 2, 3, 4}},
|
||||||
[][]byte{{1, 2, 3, 4}}},
|
rev: [][]byte{{1, 2, 3, 4}}},
|
||||||
{
|
{
|
||||||
"multiple readers",
|
name: "multiple readers",
|
||||||
[][]io.Reader{
|
in: [][]io.Reader{
|
||||||
{
|
{
|
||||||
bytes.NewBuffer([]byte{1, 2, 3, 4}),
|
bytes.NewBuffer([]byte{1, 2, 3, 4}),
|
||||||
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
|
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
|
||||||
"U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg",
|
enc: "U291ci5pcwAAI756XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIg",
|
||||||
[][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
|
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
|
||||||
[][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
|
rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
|
||||||
{
|
{
|
||||||
"multiple commit",
|
name: "multiple commit",
|
||||||
[][]io.Reader{
|
in: [][]io.Reader{
|
||||||
{
|
{
|
||||||
bytes.NewBuffer([]byte{1, 2, 3, 4})},
|
bytes.NewBuffer([]byte{1, 2, 3, 4})},
|
||||||
{
|
{
|
||||||
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
|
bytes.NewBuffer([]byte{5, 6, 7, 8})}},
|
||||||
"U291ci5pcwAAJ756XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEEAIDIA",
|
enc: "U291ci5pcwAAJr56XndRZXhdAAYBAgMEAQQBAhBhQyZWDDn5BQAGBQYHCAEEAgIQ",
|
||||||
[][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
|
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}},
|
||||||
[][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
|
rev: [][]byte{{5, 6, 7, 8}, {1, 2, 3, 4}}},
|
||||||
|
{
|
||||||
|
name: "multiple commit",
|
||||||
|
in: [][]io.Reader{
|
||||||
|
{
|
||||||
|
bytes.NewBuffer([]byte{1, 2, 3, 4}),
|
||||||
|
bytes.NewBuffer([]byte{5, 6, 7, 8})},
|
||||||
|
{
|
||||||
|
bytes.NewBuffer([]byte{9, 10, 11, 12})},
|
||||||
|
},
|
||||||
|
enc: "U291ci5pcwAANr56XndRZXhdAAYBAgMEAQRhQyZWDDn5BQAGBQYHCAEEAgIgA4Buuio8Ro0ABgkKCwwBBAMCEA",
|
||||||
|
out: [][]byte{{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}},
|
||||||
|
rev: [][]byte{{9, 10, 11, 12}, {5, 6, 7, 8}, {1, 2, 3, 4}}},
|
||||||
|
{
|
||||||
|
name: "multiple commit 3x",
|
||||||
|
in: [][]io.Reader{
|
||||||
|
{
|
||||||
|
bytes.NewBuffer([]byte{1, 2, 3}),
|
||||||
|
bytes.NewBuffer([]byte{4, 5, 6}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
bytes.NewBuffer([]byte{7, 8, 9}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
bytes.NewBuffer([]byte{10, 11, 12}),
|
||||||
|
bytes.NewBuffer([]byte{13, 14, 15}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
enc: "U291ci5pcwAAVNCqYhhnLPWrAAUBAgMBA7axWhhYd+HsAAUEBQYBAwICHr9ryhhdbkEZAAUHCAkBAwMCDy/UIhidCwCqAAUKCwwBA/NCwhh6wXgXAAUNDg8BAwUCHg",
|
||||||
|
out: [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}},
|
||||||
|
rev: [][]byte{{13, 14, 15}, {10, 11, 12}, {7, 8, 9}, {4, 5, 6}, {1, 2, 3}}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
@ -65,17 +126,17 @@ func TestAppend(t *testing.T) {
|
||||||
buffers := 0
|
buffers := 0
|
||||||
|
|
||||||
if len(test.in) == 0 {
|
if len(test.in) == 0 {
|
||||||
err := WriteLogFile(buf)
|
err := WriteLogFile(buf, slices.Values([]io.Reader{}))
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
}
|
}
|
||||||
for i, in := range test.in {
|
for i, in := range test.in {
|
||||||
buffers += len(in)
|
buffers += len(in)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
err := WriteLogFile(buf, in...)
|
err := WriteLogFile(buf, slices.Values(in))
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
} else {
|
} else {
|
||||||
err := AppendLogFile(buf, in...)
|
err := AppendLogFile(buf, slices.Values(in))
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,47 +147,52 @@ func TestAppend(t *testing.T) {
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for fp := range iterOne(files.Iter()) {
|
for j, fp := range files.Iter() {
|
||||||
buf, err := io.ReadAll(fp)
|
buf, err := io.ReadAll(fp)
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
|
|
||||||
is.Equal(buf, test.out[i])
|
is.True(len(test.out) > int(j))
|
||||||
|
is.Equal(buf, test.out[j])
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
is.NoErr(files.Err)
|
is.NoErr(files.Err)
|
||||||
is.Equal(i, buffers)
|
is.Equal(i, buffers)
|
||||||
|
|
||||||
// i = 0
|
i = 0
|
||||||
// for fp := range iterOne(files.Rev()) {
|
for j, fp := range files.Rev() {
|
||||||
// buf, err := io.ReadAll(fp)
|
buf, err := io.ReadAll(fp)
|
||||||
// is.NoErr(err)
|
is.NoErr(err)
|
||||||
|
|
||||||
// is.Equal(buf, test.rev[i])
|
is.Equal(buf, test.rev[i])
|
||||||
// i++
|
is.Equal(buf, test.out[j])
|
||||||
// }
|
i++
|
||||||
// is.NoErr(files.Err)
|
}
|
||||||
// is.Equal(i, buffers)
|
is.NoErr(files.Err)
|
||||||
|
is.Equal(i, buffers)
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestArgs tests that the CLI arguments are correctly parsed.
|
||||||
func TestArgs(t *testing.T) {
|
func TestArgs(t *testing.T) {
|
||||||
is := is.New(t)
|
is := is.New(t)
|
||||||
usage := `Usage: lsm2 create <archive> <files>...
|
usage := `Usage: lsm2 create <archive> <files>...`
|
||||||
`
|
|
||||||
opts, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0")
|
arguments, err := docopt.ParseArgs(usage, []string{"create", "archive", "file1", "file2"}, "1.0")
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
|
|
||||||
args := struct {
|
var params struct {
|
||||||
Create bool `docopt:"create"`
|
Create bool `docopt:"create"`
|
||||||
Archive string `docopt:"<archive>"`
|
Archive string `docopt:"<archive>"`
|
||||||
Files []string `docopt:"<files>"`
|
Files []string `docopt:"<files>"`
|
||||||
}{}
|
}
|
||||||
err = opts.Bind(&args)
|
err = arguments.Bind(¶ms)
|
||||||
is.NoErr(err)
|
is.NoErr(err)
|
||||||
fmt.Println(args)
|
|
||||||
|
|
||||||
|
is.Equal(params.Create, true)
|
||||||
|
is.Equal(params.Archive, "archive")
|
||||||
|
is.Equal(params.Files, []string{"file1", "file2"})
|
||||||
}
|
}
|
||||||
|
|
||||||
type buffer struct {
|
type buffer struct {
|
||||||
|
@ -155,6 +221,10 @@ func (b *buffer) WriteAt(data []byte, offset int64) (written int, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadAt implements io.ReaderAt. It reads data from the internal buffer starting
|
||||||
|
// from the specified offset and writes it into the provided data slice. If the
|
||||||
|
// offset is negative, it returns an error. If the requested read extends beyond
|
||||||
|
// the buffer's length, it returns the data read so far along with an io.EOF error.
|
||||||
func (b *buffer) ReadAt(data []byte, offset int64) (int, error) {
|
func (b *buffer) ReadAt(data []byte, offset int64) (int, error) {
|
||||||
if offset < 0 {
|
if offset < 0 {
|
||||||
return 0, errors.New("negative offset")
|
return 0, errors.New("negative offset")
|
||||||
|
@ -166,3 +236,17 @@ func (b *buffer) ReadAt(data []byte, offset int64) (int, error) {
|
||||||
|
|
||||||
return copy(data, b.buf[offset:]), nil
|
return copy(data, b.buf[offset:]), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterOne takes an iterator that yields values of type T along with a value of
|
||||||
|
// type I, and returns an iterator that yields only the values of type T. It
|
||||||
|
// discards the values of type I.
|
||||||
|
func IterOne[I, T any](it iter.Seq2[I, T]) iter.Seq[T] {
|
||||||
|
return func(yield func(T) bool) {
|
||||||
|
for i, v := range it {
|
||||||
|
_ = i
|
||||||
|
if !yield(v) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user