diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 874fd53..36472ef 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -30,8 +30,10 @@ type diskStore struct { path string openlogs *locker.Locked[openlogs] - Mdisk_open syncint64.Counter - Mdisk_evict syncint64.Counter + m_disk_open syncint64.Counter + m_disk_evict syncint64.Counter + m_disk_read syncint64.Counter + m_disk_write syncint64.Counter } const AppendOnly = es.AppendOnly @@ -41,19 +43,24 @@ func Init(ctx context.Context) error { _, span := lg.Span(ctx) defer span.End() + d := &diskStore{} + m := lg.Meter(ctx) var err, errs error - Mdisk_open, err := m.SyncInt64().Counter("disk_open") + d.m_disk_open, err = m.SyncInt64().Counter("disk_open") errs = multierr.Append(errs, err) - Mdisk_evict, err := m.SyncInt64().Counter("disk_evict") + d.m_disk_evict, err = m.SyncInt64().Counter("disk_evict") errs = multierr.Append(errs, err) - es.Register(ctx, "file", &diskStore{ - Mdisk_open: Mdisk_open, - Mdisk_evict: Mdisk_evict, - }) + d.m_disk_read, err = m.SyncInt64().Counter("disk_read") + errs = multierr.Append(errs, err) + + d.m_disk_write, err = m.SyncInt64().Counter("disk_write") + errs = multierr.Append(errs, err) + + es.Register(ctx, "file", d) return errs } @@ -61,11 +68,9 @@ func Init(ctx context.Context) error { var _ driver.Driver = (*diskStore)(nil) func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) { - ctx, span := lg.Span(ctx) + _, span := lg.Span(ctx) defer span.End() - d.Mdisk_open.Add(ctx, 1) - scheme, path, ok := strings.Cut(dsn, ":") if !ok { return nil, fmt.Errorf("expected scheme") @@ -78,6 +83,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) if _, err := os.Stat(path); os.IsNotExist(err) { err = os.MkdirAll(path, 0700) if err != nil { + span.RecordError(err) return nil, err } } @@ -89,7 +95,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) _, span := lg.Span(ctx) defer span.End() - d.Mdisk_evict.Add(ctx, 1) + d.m_disk_evict.Add(ctx, 1) err := w.Close() if err != nil { @@ -107,15 +113,15 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) return &diskStore{ path: path, openlogs: locker.New(logs), - Mdisk_open: d.Mdisk_open, - Mdisk_evict: d.Mdisk_evict, + m_disk_open: d.m_disk_open, + m_disk_evict: d.m_disk_evict, }, nil } func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { _, span := lg.Span(ctx) defer span.End() - el := &eventLog{streamID: streamID} + el := &eventLog{streamID: streamID, diskStore: d} return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error { _, span := lg.Span(ctx) @@ -126,11 +132,14 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event return nil } + d.m_disk_open.Add(ctx, 1) + l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions) if err != nil { span.RecordError(err) return err } + el.events = locker.New(l) openlogs.logs.Add(ctx, streamID, el.events) return nil @@ -140,6 +149,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event type eventLog struct { streamID string events *locker.Locked[wal.Log] + diskStore *diskStore } var _ driver.EventLog = (*eventLog)(nil) @@ -184,6 +194,8 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint } count = uint64(len(events)) + e.diskStore.m_disk_write.Add(ctx, int64(len(events))) + return l.WriteBatch(batch) }) @@ -255,6 +267,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er } event.SetStreamID(e.streamID, events...) + e.diskStore.m_disk_read.Add(ctx, int64(len(events))) return events, nil }