Compare commits
7 Commits
main
...
inprogress
Author | SHA1 | Date | |
---|---|---|---|
|
2732178507 | ||
|
092a4d59f1 | ||
|
be8a318ca3 | ||
|
fb72d4bc8c | ||
|
7c48812057 | ||
|
47aff6b106 | ||
|
ba3c302dc4 |
|
@ -16,16 +16,19 @@ import (
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/event"
|
"github.com/sour-is/ev/pkg/es/event"
|
||||||
"github.com/sour-is/ev/pkg/gql"
|
"github.com/sour-is/ev/pkg/gql"
|
||||||
|
"go.opentelemetry.io/otel/metric/instrument"
|
||||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||||
|
"go.opentelemetry.io/otel/metric/unit"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
es *es.EventStore
|
es *es.EventStore
|
||||||
|
|
||||||
Mresolver_posts syncint64.Counter
|
m_gql_posts syncint64.Counter
|
||||||
Mresolver_post_added syncint64.Counter
|
m_gql_post_added syncint64.Counter
|
||||||
Mresolver_post_added_event syncint64.Counter
|
m_gql_post_added_event syncint64.Counter
|
||||||
|
m_req_time syncint64.Histogram
|
||||||
}
|
}
|
||||||
|
|
||||||
type MsgbusResolver interface {
|
type MsgbusResolver interface {
|
||||||
|
@ -50,13 +53,25 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) {
|
||||||
svc := &service{es: es}
|
svc := &service{es: es}
|
||||||
|
|
||||||
var err, errs error
|
var err, errs error
|
||||||
svc.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts")
|
svc.m_gql_posts, err = m.SyncInt64().Counter("msgbus_posts",
|
||||||
|
instrument.WithDescription("msgbus graphql posts requests"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added")
|
svc.m_gql_post_added, err = m.SyncInt64().Counter("msgbus_post_added",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subcription requests"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added")
|
svc.m_gql_post_added_event, err = m.SyncInt64().Counter("msgbus_post_event",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subscription events"),
|
||||||
|
)
|
||||||
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
|
svc.m_req_time, err = m.SyncInt64().Histogram("msgbus_request_time",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subscription events"),
|
||||||
|
instrument.WithUnit(unit.Unit("ns")),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
@ -76,8 +91,10 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) {
|
||||||
}
|
}
|
||||||
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r = r.WithContext(ctx)
|
r = r.WithContext(ctx)
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
|
@ -96,13 +113,16 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Posts is the resolver for the events field.
|
// Posts is the resolver for the events field.
|
||||||
func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
|
func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r.Mresolver_posts.Add(ctx, 1)
|
s.m_gql_posts.Add(ctx, 1)
|
||||||
|
|
||||||
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
|
lis, err := s.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -122,11 +142,11 @@ func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn
|
||||||
}
|
}
|
||||||
|
|
||||||
var first, last uint64
|
var first, last uint64
|
||||||
if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
|
if first, err = s.es.FirstIndex(ctx, streamID); err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if last, err = r.es.LastIndex(ctx, streamID); err != nil {
|
if last, err = s.es.LastIndex(ctx, streamID); err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -146,7 +166,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r.Mresolver_post_added.Add(ctx, 1)
|
r.m_gql_post_added.Add(ctx, 1)
|
||||||
|
|
||||||
es := r.es.EventStream()
|
es := r.es.EventStream()
|
||||||
if es == nil {
|
if es == nil {
|
||||||
|
@ -183,7 +203,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
|
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
|
||||||
r.Mresolver_post_added_event.Add(ctx, int64(len(events)))
|
r.m_gql_post_added_event.Add(ctx, int64(len(events)))
|
||||||
|
|
||||||
for _, e := range events {
|
for _, e := range events {
|
||||||
if p, ok := e.(*PostEvent); ok {
|
if p, ok := e.(*PostEvent); ok {
|
||||||
|
@ -203,9 +223,13 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
|
|
||||||
func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
name, _, _ := strings.Cut(r.URL.Path, "/")
|
name, _, _ := strings.Cut(r.URL.Path, "/")
|
||||||
if name == "" {
|
if name == "" {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -260,6 +284,9 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
name, tags, _ := strings.Cut(r.URL.Path, "/")
|
name, tags, _ := strings.Cut(r.URL.Path, "/")
|
||||||
if name == "" {
|
if name == "" {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
|
|
@ -25,6 +25,9 @@ type Capabilities struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Capabilities) String() string {
|
func (c Capabilities) String() string {
|
||||||
|
if c.AcceptEncoding == "" {
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
return fmt.Sprint("accept-encoding: ", c.AcceptEncoding)
|
return fmt.Sprint("accept-encoding: ", c.AcceptEncoding)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,30 +86,40 @@ func (a *Addr) Refresh(ctx context.Context) error {
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain))
|
span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain))
|
||||||
if target, _, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil {
|
if _, srv, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil {
|
||||||
a.discoveredDomain = target
|
if len(srv) > 0 {
|
||||||
|
a.discoveredDomain = strings.TrimSuffix(srv[0].Target, ".")
|
||||||
|
}
|
||||||
span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain))
|
span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain))
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
span.AddEvent(fmt.Sprintf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err))
|
span.RecordError(fmt.Errorf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
config, cap, err := fetchConfig(ctx, a.HashURI())
|
config, cap, err := fetchConfig(ctx, a.HashURI())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Fallback to plain user nick
|
// Fallback to plain user nick
|
||||||
|
span.RecordError(err)
|
||||||
|
|
||||||
config, cap, err = fetchConfig(ctx, a.URI())
|
config, cap, err = fetchConfig(ctx, a.URI())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error looking up user %s: %w", a, err)
|
err = fmt.Errorf("error looking up user %s: %w", a, err)
|
||||||
|
span.RecordError(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key))
|
key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing public key %s: %w", config.Key, err)
|
err = fmt.Errorf("error parsing public key %s: %w", config.Key, err)
|
||||||
|
span.RecordError(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
a.key = key
|
a.key = key
|
||||||
|
|
||||||
u, err := url.Parse(config.Endpoint)
|
u, err := url.Parse(config.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err)
|
err = fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err)
|
||||||
|
span.RecordError(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
a.endpoint = u
|
a.endpoint = u
|
||||||
a.capabilities = cap
|
a.capabilities = cap
|
||||||
|
|
|
@ -10,13 +10,16 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/keys-pub/keys"
|
"github.com/keys-pub/keys"
|
||||||
"github.com/sour-is/ev/internal/lg"
|
"github.com/sour-is/ev/internal/lg"
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/event"
|
"github.com/sour-is/ev/pkg/es/event"
|
||||||
"github.com/sour-is/ev/pkg/gql"
|
"github.com/sour-is/ev/pkg/gql"
|
||||||
|
"go.opentelemetry.io/otel/metric/instrument"
|
||||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||||
|
"go.opentelemetry.io/otel/metric/unit"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -35,6 +38,7 @@ type service struct {
|
||||||
m_api_register syncint64.Counter
|
m_api_register syncint64.Counter
|
||||||
m_api_lookup syncint64.Counter
|
m_api_lookup syncint64.Counter
|
||||||
m_api_send syncint64.Counter
|
m_api_send syncint64.Counter
|
||||||
|
m_req_time syncint64.Histogram
|
||||||
}
|
}
|
||||||
type contextKey struct {
|
type contextKey struct {
|
||||||
name string
|
name string
|
||||||
|
@ -64,23 +68,42 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro
|
||||||
svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
|
svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
|
||||||
|
|
||||||
var err, errs error
|
var err, errs error
|
||||||
svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user")
|
svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user",
|
||||||
|
instrument.WithDescription("salty create user graphql called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user")
|
svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user",
|
||||||
|
instrument.WithDescription("salty get user graphql called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping")
|
svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping",
|
||||||
|
instrument.WithDescription("salty api ping called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register")
|
svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register",
|
||||||
|
instrument.WithDescription("salty api register"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup")
|
svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup",
|
||||||
|
instrument.WithDescription("salty api ping lookup"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send")
|
svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send",
|
||||||
|
instrument.WithDescription("salty api ping send"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
|
svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time",
|
||||||
|
instrument.WithDescription("histogram of requests"),
|
||||||
|
instrument.WithUnit(unit.Unit("ns")),
|
||||||
|
)
|
||||||
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
|
||||||
return svc, errs
|
return svc, errs
|
||||||
|
@ -106,6 +129,9 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
|
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
|
||||||
addr = strings.TrimSuffix(addr, ".json")
|
addr = strings.TrimSuffix(addr, ".json")
|
||||||
|
|
||||||
|
@ -143,6 +169,8 @@ func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
s.m_create_user.Add(ctx, 1)
|
s.m_create_user.Add(ctx, 1)
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
||||||
span.AddEvent(streamID)
|
span.AddEvent(streamID)
|
||||||
|
@ -173,6 +201,8 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
s.m_get_user.Add(ctx, 1)
|
s.m_get_user.Add(ctx, 1)
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
||||||
span.AddEvent(streamID)
|
span.AddEvent(streamID)
|
||||||
|
@ -205,14 +235,21 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Nanoseconds())
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
switch {
|
switch {
|
||||||
case r.URL.Path == "/ping":
|
case r.URL.Path == "/ping":
|
||||||
|
s.m_api_ping.Add(ctx, 1)
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
_, _ = w.Write([]byte(`{}`))
|
_, _ = w.Write([]byte(`{}`))
|
||||||
|
|
||||||
case strings.HasPrefix(r.URL.Path, "/lookup/"):
|
case strings.HasPrefix(r.URL.Path, "/lookup/"):
|
||||||
|
s.m_api_lookup.Add(ctx, 1)
|
||||||
|
|
||||||
addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/"))
|
addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
@ -226,7 +263,8 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(addr)
|
err = json.NewEncoder(w).Encode(addr)
|
||||||
|
span.RecordError(err)
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -237,8 +275,14 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
|
||||||
case http.MethodPost:
|
case http.MethodPost:
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/register":
|
case "/register":
|
||||||
|
s.m_api_register.Add(ctx, 1)
|
||||||
|
notImplemented(w)
|
||||||
|
return
|
||||||
|
|
||||||
case "/send":
|
case "/send":
|
||||||
|
s.m_api_send.Add(ctx, 1)
|
||||||
|
notImplemented(w)
|
||||||
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -249,3 +293,7 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func notImplemented(w http.ResponseWriter) {
|
||||||
|
w.WriteHeader(http.StatusNotImplemented)
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/rs/cors"
|
"github.com/rs/cors"
|
||||||
|
@ -17,6 +18,7 @@ func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler {
|
||||||
fn.RegisterHTTP(mux.ServeMux)
|
fn.RegisterHTTP(mux.ServeMux)
|
||||||
|
|
||||||
if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok {
|
if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok {
|
||||||
|
log.Printf("register api %T", fn)
|
||||||
fn.RegisterAPIv1(mux.api)
|
fn.RegisterAPIv1(mux.api)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,10 +30,7 @@ func newMux() *mux {
|
||||||
api: http.NewServeMux(),
|
api: http.NewServeMux(),
|
||||||
ServeMux: http.NewServeMux(),
|
ServeMux: http.NewServeMux(),
|
||||||
}
|
}
|
||||||
mux.Handle("/api/v1/", http.StripPrefix("/api/v1/", mux.api))
|
mux.Handle("/api/v1/", http.StripPrefix("/api/v1", mux.api))
|
||||||
|
|
||||||
return mux
|
return mux
|
||||||
}
|
}
|
||||||
func (m mux) HandleAPIv1(pattern string, handler http.Handler) {
|
|
||||||
m.api.Handle(pattern, handler)
|
|
||||||
}
|
|
||||||
|
|
40
httpmux_test.go
Normal file
40
httpmux_test.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/matryer/is"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockHTTP struct {
|
||||||
|
onServeHTTP func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
m.onServeHTTP()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *mockHTTP) RegisterHTTP(mux *http.ServeMux) {
|
||||||
|
mux.Handle("/", h)
|
||||||
|
}
|
||||||
|
func (h *mockHTTP) RegisterAPIv1(mux *http.ServeMux) {
|
||||||
|
mux.Handle("/ping", h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHttpMux(t *testing.T) {
|
||||||
|
is := is.New(t)
|
||||||
|
|
||||||
|
called := false
|
||||||
|
|
||||||
|
mux := httpMux(&mockHTTP{func() { called = true }})
|
||||||
|
|
||||||
|
is.True(mux != nil)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
r := httptest.NewRequest(http.MethodGet, "/api/v1/ping", nil)
|
||||||
|
mux.ServeHTTP(w, r)
|
||||||
|
|
||||||
|
is.True(called)
|
||||||
|
}
|
|
@ -48,7 +48,11 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro
|
||||||
host = h
|
host = h
|
||||||
}
|
}
|
||||||
|
|
||||||
config := prometheus.Config{}
|
config := prometheus.Config{
|
||||||
|
DefaultHistogramBoundaries: []float64{
|
||||||
|
2 << 6, 2 << 8, 2 << 10, 2 << 12, 2 << 14, 2 << 16, 2 << 18, 2 << 20, 2 << 22, 2 << 24, 2 << 26, 2 << 28,
|
||||||
|
},
|
||||||
|
}
|
||||||
cont := controller.New(
|
cont := controller.New(
|
||||||
processor.NewFactory(
|
processor.NewFactory(
|
||||||
selector.NewWithHistogramDistribution(
|
selector.NewWithHistogramDistribution(
|
||||||
|
|
7
main.go
7
main.go
|
@ -60,7 +60,12 @@ func run(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
es, err := es.Open(ctx, env("EV_DATA", "mem:"), streamer.New(ctx), projecter.New(ctx))
|
es, err := es.Open(
|
||||||
|
ctx,
|
||||||
|
env("EV_DATA", "mem:"),
|
||||||
|
streamer.New(ctx),
|
||||||
|
projecter.New(ctx, projecter.DefaultProjection),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
|
|
2
pkg/cache/cache.go
vendored
2
pkg/cache/cache.go
vendored
|
@ -139,7 +139,7 @@ func (c *Cache[K, V]) ContainsOrAdd(ctx context.Context, key K, value V) (ok, ev
|
||||||
// PeekOrAdd checks if a key is in the cache without updating the
|
// PeekOrAdd checks if a key is in the cache without updating the
|
||||||
// recent-ness or deleting it for being stale, and if not, adds the value.
|
// recent-ness or deleting it for being stale, and if not, adds the value.
|
||||||
// Returns whether found and whether an eviction occurred.
|
// Returns whether found and whether an eviction occurred.
|
||||||
func (c *Cache[K, V]) PeekOrAdd(ctx context.Context, key K, value V) (previous interface{}, ok, evicted bool) {
|
func (c *Cache[K, V]) PeekOrAdd(ctx context.Context, key K, value V) (previous *V, ok, evicted bool) {
|
||||||
var k K
|
var k K
|
||||||
var v V
|
var v V
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
|
|
131
pkg/cache/cache_test.go
vendored
Normal file
131
pkg/cache/cache_test.go
vendored
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
package cache_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/matryer/is"
|
||||||
|
"github.com/sour-is/ev/pkg/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCache(t *testing.T) {
|
||||||
|
is := is.New(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
c, err := cache.NewCache[string, int](1)
|
||||||
|
is.NoErr(err)
|
||||||
|
|
||||||
|
evicted := c.Add(ctx, "one", 1)
|
||||||
|
is.True(!evicted)
|
||||||
|
|
||||||
|
is.True(c.Contains("one"))
|
||||||
|
_, ok := c.Peek("one")
|
||||||
|
is.True(ok)
|
||||||
|
|
||||||
|
ok, evicted = c.ContainsOrAdd(ctx, "two", 2)
|
||||||
|
is.True(!ok)
|
||||||
|
is.True(evicted)
|
||||||
|
|
||||||
|
is.True(!c.Contains("one"))
|
||||||
|
is.True(c.Contains("two"))
|
||||||
|
|
||||||
|
is.Equal(c.Len(), 1)
|
||||||
|
is.Equal(c.Keys(), []string{"two"})
|
||||||
|
|
||||||
|
v, ok := c.Get("two")
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*v, 2)
|
||||||
|
|
||||||
|
evictCount := c.Resize(ctx, 100)
|
||||||
|
is.True(evictCount == 0)
|
||||||
|
|
||||||
|
c.Add(ctx, "one", 1)
|
||||||
|
|
||||||
|
prev, ok, evicted := c.PeekOrAdd(ctx, "three", 3)
|
||||||
|
is.True(!ok)
|
||||||
|
is.True(!evicted)
|
||||||
|
is.Equal(prev, nil)
|
||||||
|
|
||||||
|
key, value, ok := c.GetOldest()
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*key, "two")
|
||||||
|
is.Equal(*value, 2)
|
||||||
|
|
||||||
|
key, value, ok = c.RemoveOldest(ctx)
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*key, "two")
|
||||||
|
is.Equal(*value, 2)
|
||||||
|
|
||||||
|
c.Remove(ctx, "one")
|
||||||
|
|
||||||
|
c.Purge(ctx)
|
||||||
|
is.True(!c.Contains("three"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheWithEvict(t *testing.T) {
|
||||||
|
is := is.New(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
evictions := 0
|
||||||
|
|
||||||
|
c, err := cache.NewWithEvict(1, func(ctx context.Context, s string, i int) { evictions++ })
|
||||||
|
is.NoErr(err)
|
||||||
|
|
||||||
|
key, value, ok := c.GetOldest()
|
||||||
|
is.True(!ok)
|
||||||
|
is.Equal(key, nil)
|
||||||
|
is.Equal(value, nil)
|
||||||
|
|
||||||
|
key, value, ok = c.RemoveOldest(ctx)
|
||||||
|
is.True(!ok)
|
||||||
|
is.Equal(key, nil)
|
||||||
|
is.Equal(value, nil)
|
||||||
|
|
||||||
|
evicted := c.Add(ctx, "one", 1)
|
||||||
|
is.True(!evicted)
|
||||||
|
|
||||||
|
is.True(c.Contains("one"))
|
||||||
|
_, ok = c.Peek("one")
|
||||||
|
is.True(ok)
|
||||||
|
|
||||||
|
ok, evicted = c.ContainsOrAdd(ctx, "two", 2)
|
||||||
|
is.True(!ok)
|
||||||
|
is.True(evicted)
|
||||||
|
|
||||||
|
is.True(!c.Contains("one"))
|
||||||
|
is.True(c.Contains("two"))
|
||||||
|
|
||||||
|
is.Equal(c.Len(), 1)
|
||||||
|
is.Equal(c.Keys(), []string{"two"})
|
||||||
|
|
||||||
|
v, ok := c.Get("two")
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*v, 2)
|
||||||
|
|
||||||
|
evictCount := c.Resize(ctx, 100)
|
||||||
|
is.True(evictCount == 0)
|
||||||
|
|
||||||
|
c.Add(ctx, "one", 1)
|
||||||
|
|
||||||
|
prev, ok, evicted := c.PeekOrAdd(ctx, "three", 3)
|
||||||
|
is.True(!ok)
|
||||||
|
is.True(!evicted)
|
||||||
|
is.Equal(prev, nil)
|
||||||
|
|
||||||
|
key, value, ok = c.GetOldest()
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*key, "two")
|
||||||
|
is.Equal(*value, 2)
|
||||||
|
|
||||||
|
key, value, ok = c.RemoveOldest(ctx)
|
||||||
|
is.True(ok)
|
||||||
|
is.Equal(*key, "two")
|
||||||
|
is.Equal(*value, 2)
|
||||||
|
|
||||||
|
c.Resize(ctx, 1)
|
||||||
|
|
||||||
|
c.Purge(ctx)
|
||||||
|
is.True(!c.Contains("three"))
|
||||||
|
|
||||||
|
is.Equal(evictions, 4)
|
||||||
|
}
|
|
@ -30,8 +30,10 @@ type diskStore struct {
|
||||||
path string
|
path string
|
||||||
openlogs *locker.Locked[openlogs]
|
openlogs *locker.Locked[openlogs]
|
||||||
|
|
||||||
Mdisk_open syncint64.Counter
|
m_disk_open syncint64.Counter
|
||||||
Mdisk_evict syncint64.Counter
|
m_disk_evict syncint64.Counter
|
||||||
|
m_disk_read syncint64.Counter
|
||||||
|
m_disk_write syncint64.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
const AppendOnly = es.AppendOnly
|
const AppendOnly = es.AppendOnly
|
||||||
|
@ -41,19 +43,24 @@ func Init(ctx context.Context) error {
|
||||||
_, span := lg.Span(ctx)
|
_, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
d := &diskStore{}
|
||||||
|
|
||||||
m := lg.Meter(ctx)
|
m := lg.Meter(ctx)
|
||||||
var err, errs error
|
var err, errs error
|
||||||
|
|
||||||
Mdisk_open, err := m.SyncInt64().Counter("disk_open")
|
d.m_disk_open, err = m.SyncInt64().Counter("disk_open")
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
Mdisk_evict, err := m.SyncInt64().Counter("disk_evict")
|
d.m_disk_evict, err = m.SyncInt64().Counter("disk_evict")
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
es.Register(ctx, "file", &diskStore{
|
d.m_disk_read, err = m.SyncInt64().Counter("disk_read")
|
||||||
Mdisk_open: Mdisk_open,
|
errs = multierr.Append(errs, err)
|
||||||
Mdisk_evict: Mdisk_evict,
|
|
||||||
})
|
d.m_disk_write, err = m.SyncInt64().Counter("disk_write")
|
||||||
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
|
es.Register(ctx, "file", d)
|
||||||
|
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
@ -61,11 +68,9 @@ func Init(ctx context.Context) error {
|
||||||
var _ driver.Driver = (*diskStore)(nil)
|
var _ driver.Driver = (*diskStore)(nil)
|
||||||
|
|
||||||
func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) {
|
func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) {
|
||||||
ctx, span := lg.Span(ctx)
|
_, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
d.Mdisk_open.Add(ctx, 1)
|
|
||||||
|
|
||||||
scheme, path, ok := strings.Cut(dsn, ":")
|
scheme, path, ok := strings.Cut(dsn, ":")
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("expected scheme")
|
return nil, fmt.Errorf("expected scheme")
|
||||||
|
@ -78,6 +83,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
||||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||||
err = os.MkdirAll(path, 0700)
|
err = os.MkdirAll(path, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +95,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
||||||
_, span := lg.Span(ctx)
|
_, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
d.Mdisk_evict.Add(ctx, 1)
|
d.m_disk_evict.Add(ctx, 1)
|
||||||
|
|
||||||
err := w.Close()
|
err := w.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -107,15 +113,17 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
||||||
return &diskStore{
|
return &diskStore{
|
||||||
path: path,
|
path: path,
|
||||||
openlogs: locker.New(logs),
|
openlogs: locker.New(logs),
|
||||||
Mdisk_open: d.Mdisk_open,
|
m_disk_open: d.m_disk_open,
|
||||||
Mdisk_evict: d.Mdisk_evict,
|
m_disk_evict: d.m_disk_evict,
|
||||||
|
m_disk_read: d.m_disk_read,
|
||||||
|
m_disk_write: d.m_disk_write,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
||||||
_, span := lg.Span(ctx)
|
_, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
el := &eventLog{streamID: streamID}
|
el := &eventLog{streamID: streamID, diskStore: d}
|
||||||
|
|
||||||
return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error {
|
return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error {
|
||||||
_, span := lg.Span(ctx)
|
_, span := lg.Span(ctx)
|
||||||
|
@ -126,11 +134,14 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.m_disk_open.Add(ctx, 1)
|
||||||
|
|
||||||
l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions)
|
l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
el.events = locker.New(l)
|
el.events = locker.New(l)
|
||||||
openlogs.logs.Add(ctx, streamID, el.events)
|
openlogs.logs.Add(ctx, streamID, el.events)
|
||||||
return nil
|
return nil
|
||||||
|
@ -140,6 +151,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
|
||||||
type eventLog struct {
|
type eventLog struct {
|
||||||
streamID string
|
streamID string
|
||||||
events *locker.Locked[wal.Log]
|
events *locker.Locked[wal.Log]
|
||||||
|
diskStore *diskStore
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ driver.EventLog = (*eventLog)(nil)
|
var _ driver.EventLog = (*eventLog)(nil)
|
||||||
|
@ -184,6 +196,8 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint
|
||||||
}
|
}
|
||||||
|
|
||||||
count = uint64(len(events))
|
count = uint64(len(events))
|
||||||
|
e.diskStore.m_disk_write.Add(ctx, int64(len(events)))
|
||||||
|
|
||||||
return l.WriteBatch(batch)
|
return l.WriteBatch(batch)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -255,6 +269,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er
|
||||||
}
|
}
|
||||||
|
|
||||||
event.SetStreamID(e.streamID, events...)
|
event.SetStreamID(e.streamID, events...)
|
||||||
|
e.diskStore.m_disk_read.Add(ctx, int64(len(events)))
|
||||||
|
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,10 +12,11 @@ import (
|
||||||
|
|
||||||
type projector struct {
|
type projector struct {
|
||||||
up driver.Driver
|
up driver.Driver
|
||||||
|
fns []func(event.Event) []event.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context) *projector {
|
func New(ctx context.Context, fns ...func(event.Event) []event.Event) *projector {
|
||||||
return &projector{}
|
return &projector{fns: fns}
|
||||||
}
|
}
|
||||||
func (p *projector) Apply(e *es.EventStore) {
|
func (p *projector) Apply(e *es.EventStore) {
|
||||||
p.up = e.Driver
|
p.up = e.Driver
|
||||||
|
@ -70,28 +71,14 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6
|
||||||
|
|
||||||
for i := range events {
|
for i := range events {
|
||||||
e := events[i]
|
e := events[i]
|
||||||
eventType := event.TypeOf(e)
|
|
||||||
m := e.EventMeta()
|
|
||||||
streamID := m.StreamID
|
|
||||||
streamPos := m.Position
|
|
||||||
|
|
||||||
e1 := event.NewPtr(streamID, streamPos)
|
|
||||||
event.SetStreamID("$all", e1)
|
|
||||||
|
|
||||||
e2 := event.NewPtr(streamID, streamPos)
|
|
||||||
event.SetStreamID("$type-"+eventType, e2)
|
|
||||||
|
|
||||||
e3 := event.NewPtr(streamID, streamPos)
|
|
||||||
pkg, _, _ := strings.Cut(eventType, ".")
|
|
||||||
event.SetStreamID("$pkg-"+pkg, e3)
|
|
||||||
|
|
||||||
|
for _, fn := range w.projector.fns {
|
||||||
pevents = append(
|
pevents = append(
|
||||||
pevents,
|
pevents,
|
||||||
e1,
|
fn(e)...,
|
||||||
e2,
|
|
||||||
e3,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i := range pevents {
|
for i := range pevents {
|
||||||
e := pevents[i]
|
e := pevents[i]
|
||||||
|
@ -126,3 +113,22 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(
|
||||||
|
|
||||||
return w.up.LoadForUpdate(ctx, a, fn)
|
return w.up.LoadForUpdate(ctx, a, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DefaultProjection(e event.Event) []event.Event {
|
||||||
|
eventType := event.TypeOf(e)
|
||||||
|
m := e.EventMeta()
|
||||||
|
streamID := m.StreamID
|
||||||
|
streamPos := m.Position
|
||||||
|
|
||||||
|
e1 := event.NewPtr(streamID, streamPos)
|
||||||
|
event.SetStreamID("$all", e1)
|
||||||
|
|
||||||
|
e2 := event.NewPtr(streamID, streamPos)
|
||||||
|
event.SetStreamID("$type-"+eventType, e2)
|
||||||
|
|
||||||
|
e3 := event.NewPtr(streamID, streamPos)
|
||||||
|
pkg, _, _ := strings.Cut(eventType, ".")
|
||||||
|
event.SetStreamID("$pkg-"+pkg, e3)
|
||||||
|
|
||||||
|
return []event.Event{e1, e2, e3}
|
||||||
|
}
|
||||||
|
|
29
pkg/es/driver/projecter/projector_test.go
Normal file
29
pkg/es/driver/projecter/projector_test.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package projecter_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/sour-is/ev/pkg/es/driver"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockDriver struct {
|
||||||
|
onOpen func()
|
||||||
|
onEventLog func()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventLog implements driver.Driver
|
||||||
|
func (*mockDriver) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open implements driver.Driver
|
||||||
|
func (*mockDriver) Open(ctx context.Context, dsn string) (driver.Driver, error) {
|
||||||
|
panic("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ driver.Driver = (*mockDriver)(nil)
|
||||||
|
|
||||||
|
func TestProjecter(t *testing.T) {
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user