fix: disk counters
This commit is contained in:
parent
be8a318ca3
commit
092a4d59f1
|
@ -30,8 +30,10 @@ type diskStore struct {
|
|||
path string
|
||||
openlogs *locker.Locked[openlogs]
|
||||
|
||||
Mdisk_open syncint64.Counter
|
||||
Mdisk_evict syncint64.Counter
|
||||
m_disk_open syncint64.Counter
|
||||
m_disk_evict syncint64.Counter
|
||||
m_disk_read syncint64.Counter
|
||||
m_disk_write syncint64.Counter
|
||||
}
|
||||
|
||||
const AppendOnly = es.AppendOnly
|
||||
|
@ -41,19 +43,24 @@ func Init(ctx context.Context) error {
|
|||
_, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
d := &diskStore{}
|
||||
|
||||
m := lg.Meter(ctx)
|
||||
var err, errs error
|
||||
|
||||
Mdisk_open, err := m.SyncInt64().Counter("disk_open")
|
||||
d.m_disk_open, err = m.SyncInt64().Counter("disk_open")
|
||||
errs = multierr.Append(errs, err)
|
||||
|
||||
Mdisk_evict, err := m.SyncInt64().Counter("disk_evict")
|
||||
d.m_disk_evict, err = m.SyncInt64().Counter("disk_evict")
|
||||
errs = multierr.Append(errs, err)
|
||||
|
||||
es.Register(ctx, "file", &diskStore{
|
||||
Mdisk_open: Mdisk_open,
|
||||
Mdisk_evict: Mdisk_evict,
|
||||
})
|
||||
d.m_disk_read, err = m.SyncInt64().Counter("disk_read")
|
||||
errs = multierr.Append(errs, err)
|
||||
|
||||
d.m_disk_write, err = m.SyncInt64().Counter("disk_write")
|
||||
errs = multierr.Append(errs, err)
|
||||
|
||||
es.Register(ctx, "file", d)
|
||||
|
||||
return errs
|
||||
}
|
||||
|
@ -61,11 +68,9 @@ func Init(ctx context.Context) error {
|
|||
var _ driver.Driver = (*diskStore)(nil)
|
||||
|
||||
func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
_, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
d.Mdisk_open.Add(ctx, 1)
|
||||
|
||||
scheme, path, ok := strings.Cut(dsn, ":")
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected scheme")
|
||||
|
@ -78,6 +83,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
|||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
err = os.MkdirAll(path, 0700)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -89,7 +95,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
|||
_, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
d.Mdisk_evict.Add(ctx, 1)
|
||||
d.m_disk_evict.Add(ctx, 1)
|
||||
|
||||
err := w.Close()
|
||||
if err != nil {
|
||||
|
@ -107,15 +113,15 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
|||
return &diskStore{
|
||||
path: path,
|
||||
openlogs: locker.New(logs),
|
||||
Mdisk_open: d.Mdisk_open,
|
||||
Mdisk_evict: d.Mdisk_evict,
|
||||
m_disk_open: d.m_disk_open,
|
||||
m_disk_evict: d.m_disk_evict,
|
||||
}, nil
|
||||
}
|
||||
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
||||
_, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
el := &eventLog{streamID: streamID}
|
||||
el := &eventLog{streamID: streamID, diskStore: d}
|
||||
|
||||
return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error {
|
||||
_, span := lg.Span(ctx)
|
||||
|
@ -126,11 +132,14 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
|
|||
return nil
|
||||
}
|
||||
|
||||
d.m_disk_open.Add(ctx, 1)
|
||||
|
||||
l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
el.events = locker.New(l)
|
||||
openlogs.logs.Add(ctx, streamID, el.events)
|
||||
return nil
|
||||
|
@ -140,6 +149,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
|
|||
type eventLog struct {
|
||||
streamID string
|
||||
events *locker.Locked[wal.Log]
|
||||
diskStore *diskStore
|
||||
}
|
||||
|
||||
var _ driver.EventLog = (*eventLog)(nil)
|
||||
|
@ -184,6 +194,8 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint
|
|||
}
|
||||
|
||||
count = uint64(len(events))
|
||||
e.diskStore.m_disk_write.Add(ctx, int64(len(events)))
|
||||
|
||||
return l.WriteBatch(batch)
|
||||
})
|
||||
|
||||
|
@ -255,6 +267,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er
|
|||
}
|
||||
|
||||
event.SetStreamID(e.streamID, events...)
|
||||
e.diskStore.m_disk_read.Add(ctx, int64(len(events)))
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user