fix: disk counters

This commit is contained in:
Jon Lundy 2022-09-08 10:32:35 -06:00
parent 9103de501b
commit 9878ed4a79
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024

View File

@ -30,8 +30,10 @@ type diskStore struct {
path string path string
openlogs *locker.Locked[openlogs] openlogs *locker.Locked[openlogs]
Mdisk_open syncint64.Counter m_disk_open syncint64.Counter
Mdisk_evict syncint64.Counter m_disk_evict syncint64.Counter
m_disk_read syncint64.Counter
m_disk_write syncint64.Counter
} }
const AppendOnly = es.AppendOnly const AppendOnly = es.AppendOnly
@ -41,19 +43,24 @@ func Init(ctx context.Context) error {
_, span := lg.Span(ctx) _, span := lg.Span(ctx)
defer span.End() defer span.End()
d := &diskStore{}
m := lg.Meter(ctx) m := lg.Meter(ctx)
var err, errs error var err, errs error
Mdisk_open, err := m.SyncInt64().Counter("disk_open") d.m_disk_open, err = m.SyncInt64().Counter("disk_open")
errs = multierr.Append(errs, err) errs = multierr.Append(errs, err)
Mdisk_evict, err := m.SyncInt64().Counter("disk_evict") d.m_disk_evict, err = m.SyncInt64().Counter("disk_evict")
errs = multierr.Append(errs, err) errs = multierr.Append(errs, err)
es.Register(ctx, "file", &diskStore{ d.m_disk_read, err = m.SyncInt64().Counter("disk_read")
Mdisk_open: Mdisk_open, errs = multierr.Append(errs, err)
Mdisk_evict: Mdisk_evict,
}) d.m_disk_write, err = m.SyncInt64().Counter("disk_write")
errs = multierr.Append(errs, err)
es.Register(ctx, "file", d)
return errs return errs
} }
@ -61,11 +68,9 @@ func Init(ctx context.Context) error {
var _ driver.Driver = (*diskStore)(nil) var _ driver.Driver = (*diskStore)(nil)
func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) { func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) {
ctx, span := lg.Span(ctx) _, span := lg.Span(ctx)
defer span.End() defer span.End()
d.Mdisk_open.Add(ctx, 1)
scheme, path, ok := strings.Cut(dsn, ":") scheme, path, ok := strings.Cut(dsn, ":")
if !ok { if !ok {
return nil, fmt.Errorf("expected scheme") return nil, fmt.Errorf("expected scheme")
@ -78,6 +83,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
if _, err := os.Stat(path); os.IsNotExist(err) { if _, err := os.Stat(path); os.IsNotExist(err) {
err = os.MkdirAll(path, 0700) err = os.MkdirAll(path, 0700)
if err != nil { if err != nil {
span.RecordError(err)
return nil, err return nil, err
} }
} }
@ -89,7 +95,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
_, span := lg.Span(ctx) _, span := lg.Span(ctx)
defer span.End() defer span.End()
d.Mdisk_evict.Add(ctx, 1) d.m_disk_evict.Add(ctx, 1)
err := w.Close() err := w.Close()
if err != nil { if err != nil {
@ -107,15 +113,15 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
return &diskStore{ return &diskStore{
path: path, path: path,
openlogs: locker.New(logs), openlogs: locker.New(logs),
Mdisk_open: d.Mdisk_open, m_disk_open: d.m_disk_open,
Mdisk_evict: d.Mdisk_evict, m_disk_evict: d.m_disk_evict,
}, nil }, nil
} }
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
_, span := lg.Span(ctx) _, span := lg.Span(ctx)
defer span.End() defer span.End()
el := &eventLog{streamID: streamID} el := &eventLog{streamID: streamID, diskStore: d}
return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error { return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error {
_, span := lg.Span(ctx) _, span := lg.Span(ctx)
@ -126,11 +132,14 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
return nil return nil
} }
d.m_disk_open.Add(ctx, 1)
l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions) l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
return err return err
} }
el.events = locker.New(l) el.events = locker.New(l)
openlogs.logs.Add(ctx, streamID, el.events) openlogs.logs.Add(ctx, streamID, el.events)
return nil return nil
@ -140,6 +149,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
type eventLog struct { type eventLog struct {
streamID string streamID string
events *locker.Locked[wal.Log] events *locker.Locked[wal.Log]
diskStore *diskStore
} }
var _ driver.EventLog = (*eventLog)(nil) var _ driver.EventLog = (*eventLog)(nil)
@ -184,6 +194,8 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint
} }
count = uint64(len(events)) count = uint64(len(events))
e.diskStore.m_disk_write.Add(ctx, int64(len(events)))
return l.WriteBatch(batch) return l.WriteBatch(batch)
}) })
@ -255,6 +267,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er
} }
event.SetStreamID(e.streamID, events...) event.SetStreamID(e.streamID, events...)
e.diskStore.m_disk_read.Add(ctx, int64(len(events)))
return events, nil return events, nil
} }