diff --git a/.air.toml b/.air.toml index b69c73a..994d59b 100644 --- a/.air.toml +++ b/.air.toml @@ -1,11 +1,11 @@ -root = "." +root = "./cmd/ev" testdata_dir = "data" tmp_dir = "tmp" [build] args_bin = [] bin = "./tmp/main" - cmd = "go build -o ./tmp/main ." + cmd = "go build -o ./tmp/main ./cmd/ev" delay = 1000 exclude_dir = ["assets", "tmp", "vendor", "testdata"] exclude_file = [] diff --git a/.gitignore b/.gitignore index cbc939c..d1bd010 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ data/ local.mk logzio.yml tmp/ -ev +/ev diff --git a/.gitsecret/keys/pubring.kbx b/.gitsecret/keys/pubring.kbx deleted file mode 100644 index 93045fc..0000000 Binary files a/.gitsecret/keys/pubring.kbx and /dev/null differ diff --git a/.gitsecret/keys/pubring.kbx~ b/.gitsecret/keys/pubring.kbx~ deleted file mode 100644 index fd3b61f..0000000 Binary files a/.gitsecret/keys/pubring.kbx~ and /dev/null differ diff --git a/.gitsecret/keys/trustdb.gpg b/.gitsecret/keys/trustdb.gpg deleted file mode 100644 index a575dfa..0000000 Binary files a/.gitsecret/keys/trustdb.gpg and /dev/null differ diff --git a/.gitsecret/paths/mapping.cfg b/.gitsecret/paths/mapping.cfg deleted file mode 100644 index 9acc1e3..0000000 --- a/.gitsecret/paths/mapping.cfg +++ /dev/null @@ -1 +0,0 @@ -local.mk:d632b22a2291637331e5613d35536c69e696447ce407d7320b4c5ab0922b47a9 diff --git a/Makefile b/Makefile index a11a1b8..ce50a8c 100644 --- a/Makefile +++ b/Makefile @@ -9,10 +9,10 @@ air: gen ifeq (, $(shell which air)) go install github.com/cosmtrek/air@latest endif - air + air ./cmd/ev run: - go run . + go run ./cmd/ev test: go test -cover -race ./... @@ -25,6 +25,8 @@ GQLS:=$(GQLS) $(wildcard app/*/*.graphqls) GQLS:=$(GQLS) $(wildcard app/*/*.go) GQLSRC=internal/graph/generated/generated.go +clean: + rm -f "$(GQLSRC)" gen: gql gql: $(GQLSRC) $(GQLSRC): $(GQLS) diff --git a/app/README.md b/app/README.md new file mode 100644 index 0000000..1913322 --- /dev/null +++ b/app/README.md @@ -0,0 +1,3 @@ +# App examples + +These applications are to demonstrate how the EV library can be used. \ No newline at end of file diff --git a/app/msgbus/service.go b/app/msgbus/service.go index 58e9217..95580dc 100644 --- a/app/msgbus/service.go +++ b/app/msgbus/service.go @@ -17,14 +17,14 @@ import ( "go.opentelemetry.io/otel/metric/unit" "go.uber.org/multierr" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" ) type service struct { - es *es.EventStore + es *ev.EventStore m_gql_posts syncint64.Counter m_gql_post_added syncint64.Counter @@ -38,7 +38,7 @@ type MsgbusResolver interface { IsResolver() } -func New(ctx context.Context, es *es.EventStore) (*service, error) { +func New(ctx context.Context, es *ev.EventStore) (*service, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -243,7 +243,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { first = lis[0] } - var pos, count int64 = 0, es.AllEvents + var pos, count int64 = 0, ev.AllEvents qry := r.URL.Query() if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil && i > 1 { diff --git a/app/peerfinder/http.go b/app/peerfinder/http.go index 6868be8..ce0046d 100644 --- a/app/peerfinder/http.go +++ b/app/peerfinder/http.go @@ -19,8 +19,8 @@ import ( contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" "go.opentelemetry.io/otel/attribute" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" ) @@ -150,7 +150,7 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, peerID stri return } - info, err := es.Upsert(ctx, s.es, aggInfo, func(ctx context.Context, agg *Info) error { + info, err := ev.Upsert(ctx, s.es, aggInfo, func(ctx context.Context, agg *Info) error { return agg.OnUpsert() // initialize if not exists }) if err != nil { diff --git a/app/peerfinder/jobs.go b/app/peerfinder/jobs.go index a010a3d..a8019a2 100644 --- a/app/peerfinder/jobs.go +++ b/app/peerfinder/jobs.go @@ -9,8 +9,8 @@ import ( "net/http" "time" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/set" ) @@ -151,7 +151,7 @@ func (s *service) cleanRequests(ctx context.Context, now time.Time) error { for { events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop. - if err != nil && !errors.Is(err, es.ErrNotFound) { + if err != nil && !errors.Is(err, ev.ErrNotFound) { span.RecordError(err) return err } diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go index 2c06c97..1f72912 100644 --- a/app/peerfinder/service.go +++ b/app/peerfinder/service.go @@ -6,8 +6,8 @@ import ( "sync/atomic" "time" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" "go.uber.org/multierr" @@ -24,7 +24,7 @@ func aggRequest(id string) string { return "pf-request-" + id } func aggPeer(id string) string { return "pf-peer-" + id } type service struct { - es *es.EventStore + es *ev.EventStore statusURL string state *locker.Locked[state] @@ -37,7 +37,7 @@ type state struct { requests map[string]*Request } -func New(ctx context.Context, es *es.EventStore, statusURL string) (*service, error) { +func New(ctx context.Context, es *ev.EventStore, statusURL string) (*service, error) { ctx, span := lg.Span(ctx) defer span.End() diff --git a/app/salty/service.go b/app/salty/service.go index 28d9e7f..da35979 100644 --- a/app/salty/service.go +++ b/app/salty/service.go @@ -19,8 +19,8 @@ import ( "go.opentelemetry.io/otel/metric/unit" "go.uber.org/multierr" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" ) @@ -31,7 +31,7 @@ type DNSResolver interface { type service struct { baseURL string - es *es.EventStore + es *ev.EventStore dns DNSResolver m_create_user syncint64.Counter @@ -54,7 +54,7 @@ type SaltyResolver interface { IsResolver() } -func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, error) { +func New(ctx context.Context, es *ev.EventStore, baseURL string) (*service, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -111,23 +111,23 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro return svc, errs } -func (s *service) IsResolver() {} - func (s *service) BaseURL() string { if s == nil { return "http://missing.context/" } return s.baseURL } -func (s *service) RegisterHTTP(mux *http.ServeMux) { - mux.Handle("/.well-known/salty/", lg.Htrace(s, "lookup")) -} + +func (s *service) RegisterHTTP(mux *http.ServeMux) {} func (s *service) RegisterAPIv1(mux *http.ServeMux) { mux.HandleFunc("/ping", s.apiv1) mux.HandleFunc("/register", s.apiv1) mux.HandleFunc("/lookup/", s.apiv1) mux.HandleFunc("/send", s.apiv1) } +func (s *service) RegisterWellKnown(mux *http.ServeMux) { + mux.Handle("/salty/", lg.Htrace(s, "lookup")) +} func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() ctx, span := lg.Span(ctx) @@ -140,7 +140,7 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { addr = strings.TrimSuffix(addr, ".json") span.AddEvent(fmt.Sprint("find ", addr)) - a, err := es.Update(ctx, s.es, addr, func(ctx context.Context, agg *SaltyUser) error { return nil }) + a, err := ev.Update(ctx, s.es, addr, func(ctx context.Context, agg *SaltyUser) error { return nil }) switch { case errors.Is(err, event.ErrShouldExist): span.RecordError(err) @@ -168,6 +168,16 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { span.RecordError(err) } } + +func (s *service) IsResolver() {} +func (s *service) GetMiddleware() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(gql.ToContext(r.Context(), saltyKey, s)) + next.ServeHTTP(w, r) + }) + } +} func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -191,11 +201,11 @@ func (s *service) createSaltyUser(ctx context.Context, streamID, pub string) (*S return nil, err } - a, err := es.Create(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { + a, err := ev.Create(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { return agg.OnUserRegister(key) }) switch { - case errors.Is(err, es.ErrShouldNotExist): + case errors.Is(err, ev.ErrShouldNotExist): span.RecordError(err) return nil, fmt.Errorf("user exists: %w", err) @@ -217,9 +227,9 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) span.AddEvent(streamID) - a, err := es.Update(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { return nil }) + a, err := ev.Update(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { return nil }) switch { - case errors.Is(err, es.ErrShouldExist): + case errors.Is(err, ev.ErrShouldExist): span.RecordError(err) return nil, fmt.Errorf("user not found") @@ -230,14 +240,6 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error return a, err } -func (s *service) GetMiddleware() func(http.Handler) http.Handler { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - r = r.WithContext(gql.ToContext(r.Context(), saltyKey, s)) - next.ServeHTTP(w, r) - }) - } -} func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/app/webfinger/webfinger.go b/app/webfinger/webfinger.go new file mode 100644 index 0000000..011bc99 --- /dev/null +++ b/app/webfinger/webfinger.go @@ -0,0 +1 @@ +package webfinger \ No newline at end of file diff --git a/cmd/README.md b/cmd/README.md new file mode 100644 index 0000000..d63bad5 --- /dev/null +++ b/cmd/README.md @@ -0,0 +1,3 @@ +# Cmd + +These are examples that can be built using EV. Because they are modular the apps can be mixed an matched by including the different source files linked from `cmd/ev`. \ No newline at end of file diff --git a/cmd/ev/app.msgbus.go b/cmd/ev/app.msgbus.go new file mode 100644 index 0000000..4522d25 --- /dev/null +++ b/cmd/ev/app.msgbus.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "fmt" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(50, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable Msgbus") + eventstore, ok := slice.Find[*ev.EventStore](svc.Services...) + if !ok { + return fmt.Errorf("*es.EventStore not found in services") + } + + msgbus, err := msgbus.New(ctx, eventstore) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(msgbus) + + return nil +}) diff --git a/cmd/ev/app.peerfinder.go b/cmd/ev/app.peerfinder.go new file mode 100644 index 0000000..fcef159 --- /dev/null +++ b/cmd/ev/app.peerfinder.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "fmt" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/peerfinder" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/es/driver/projecter" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(50, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable Peers") + + eventstore, ok := slice.Find[*ev.EventStore](svc.Services...) + if !ok { + return fmt.Errorf("*es.EventStore not found in services") + } + eventstore.Option(projecter.New(ctx, peerfinder.Projector)) + + peers, err := peerfinder.New(ctx, eventstore, env.Secret("PEER_STATUS", "").Secret()) + if err != nil { + span.RecordError(err) + return err + } + svc.RunOnce(ctx, peers.RefreshJob) + svc.NewCron("0,15,30,45", peers.RefreshJob) + svc.RunOnce(ctx, peers.CleanJob) + svc.NewCron("0 1", peers.CleanJob) + svc.OnStart(peers.Run) + svc.OnStop(peers.Stop) + + svc.Add(peers) + + return nil +}) diff --git a/cmd/ev/app.salty.go b/cmd/ev/app.salty.go new file mode 100644 index 0000000..915104f --- /dev/null +++ b/cmd/ev/app.salty.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(50, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable Salty") + eventstore, ok := slice.Find[*ev.EventStore](svc.Services...) + if !ok { + return fmt.Errorf("*es.EventStore not found in services") + } + + addr := "localhost" + if ht, ok := slice.Find[*http.Server](svc.Services...); ok { + addr = ht.Addr + } + + base, err := url.JoinPath(env.Default("SALTY_BASE_URL", "http://"+addr), "inbox") + if err != nil { + span.RecordError(err) + return err + } + + salty, err := salty.New(ctx, eventstore, base) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(salty) + + return nil +}) diff --git a/cmd/ev/main.go b/cmd/ev/main.go new file mode 100644 index 0000000..5df79d0 --- /dev/null +++ b/cmd/ev/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/service" +) + +var apps service.Apps +var appName, version = service.AppName() + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + go func() { + <-ctx.Done() + defer cancel() // restore interrupt function + }() + + svc := &service.Harness{} + + ctx, stop := lg.Init(ctx, appName) + svc.OnStop(stop) + svc.Add(lg.NewHTTP(ctx)) + + svc.Setup(ctx, apps.Apps()...) + + // Run application + if err := svc.Run(ctx, appName, version); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal(err) + } +} diff --git a/cmd/ev/svc.es.go b/cmd/ev/svc.es.go new file mode 100644 index 0000000..7afe868 --- /dev/null +++ b/cmd/ev/svc.es.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/es" + diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" + memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" + "github.com/sour-is/ev/pkg/es/driver/projecter" + resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" + "github.com/sour-is/ev/pkg/es/driver/streamer" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/service" + "go.uber.org/multierr" +) + +var _ = apps.Register(10, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + // setup eventstore + err := multierr.Combine( + ev.Init(ctx), + event.Init(ctx), + diskstore.Init(ctx), + memstore.Init(ctx), + ) + if err != nil { + span.RecordError(err) + return err + } + + eventstore, err := ev.Open( + ctx, + env.Default("EV_DATA", "mem:"), + resolvelinks.New(), + streamer.New(ctx), + projecter.New( + ctx, + projecter.DefaultProjection, + ), + ) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(eventstore) + svc.Add(&es.EventStore{EventStore: eventstore}) + + return nil +}) diff --git a/cmd/ev/svc.gql.go b/cmd/ev/svc.gql.go new file mode 100644 index 0000000..51f11b2 --- /dev/null +++ b/cmd/ev/svc.gql.go @@ -0,0 +1,30 @@ +package main + +import ( + "context" + "net/http" + + "github.com/sour-is/ev/app/gql" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/gql/resolver" + "github.com/sour-is/ev/pkg/mux" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(90, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable GraphQL") + gql, err := resolver.New(ctx, &gql.Resolver{}, slice.FilterType[resolver.IsResolver](svc.Services...)...) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(gql, mux.RegisterHTTP(func(mux *http.ServeMux) { + mux.Handle("/", http.RedirectHandler("/playground", http.StatusTemporaryRedirect)) + })) + + return nil +}) diff --git a/cmd/ev/svc.http.go b/cmd/ev/svc.http.go new file mode 100644 index 0000000..72af2c9 --- /dev/null +++ b/cmd/ev/svc.http.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "log" + "net/http" + "strings" + + "github.com/rs/cors" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/mux" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(20, func(ctx context.Context, svc *service.Harness) error { + s := &http.Server{} + svc.Add(s) + + mux := mux.New() + s.Handler = cors.AllowAll().Handler(mux) + + s.Addr = env.Default("EV_HTTP", ":8080") + if strings.HasPrefix(s.Addr, ":") { + s.Addr = "[::]" + s.Addr + } + svc.OnStart(func(ctx context.Context) error { + _, span := lg.Span(ctx) + defer span.End() + + log.Print("Listen on ", s.Addr) + span.AddEvent("begin listen and serve on " + s.Addr) + + mux.Add(slice.FilterType[interface{ RegisterHTTP(*http.ServeMux) }](svc.Services...)...) + return s.ListenAndServe() + }) + svc.OnStop(s.Shutdown) + + return nil +}) diff --git a/cmd/msgbus/app.msgbus.go b/cmd/msgbus/app.msgbus.go new file mode 120000 index 0000000..c6b168b --- /dev/null +++ b/cmd/msgbus/app.msgbus.go @@ -0,0 +1 @@ +../ev/app.msgbus.go \ No newline at end of file diff --git a/cmd/msgbus/main.go b/cmd/msgbus/main.go new file mode 120000 index 0000000..7e61bac --- /dev/null +++ b/cmd/msgbus/main.go @@ -0,0 +1 @@ +../ev/main.go \ No newline at end of file diff --git a/cmd/msgbus/svc.es.go b/cmd/msgbus/svc.es.go new file mode 120000 index 0000000..6280e25 --- /dev/null +++ b/cmd/msgbus/svc.es.go @@ -0,0 +1 @@ +../ev/svc.es.go \ No newline at end of file diff --git a/cmd/msgbus/svc.gql.go b/cmd/msgbus/svc.gql.go new file mode 120000 index 0000000..36864cf --- /dev/null +++ b/cmd/msgbus/svc.gql.go @@ -0,0 +1 @@ +../ev/svc.gql.go \ No newline at end of file diff --git a/cmd/msgbus/svc.http.go b/cmd/msgbus/svc.http.go new file mode 120000 index 0000000..3dc665f --- /dev/null +++ b/cmd/msgbus/svc.http.go @@ -0,0 +1 @@ +../ev/svc.http.go \ No newline at end of file diff --git a/cmd/peers/app.peerfinder.go b/cmd/peers/app.peerfinder.go new file mode 120000 index 0000000..d03b992 --- /dev/null +++ b/cmd/peers/app.peerfinder.go @@ -0,0 +1 @@ +../ev/app.peerfinder.go \ No newline at end of file diff --git a/cmd/peers/main.go b/cmd/peers/main.go new file mode 120000 index 0000000..7e61bac --- /dev/null +++ b/cmd/peers/main.go @@ -0,0 +1 @@ +../ev/main.go \ No newline at end of file diff --git a/cmd/peers/svc.es.go b/cmd/peers/svc.es.go new file mode 120000 index 0000000..6280e25 --- /dev/null +++ b/cmd/peers/svc.es.go @@ -0,0 +1 @@ +../ev/svc.es.go \ No newline at end of file diff --git a/cmd/peers/svc.http.go b/cmd/peers/svc.http.go new file mode 120000 index 0000000..3dc665f --- /dev/null +++ b/cmd/peers/svc.http.go @@ -0,0 +1 @@ +../ev/svc.http.go \ No newline at end of file diff --git a/cmd/salty/app.msgbus.go b/cmd/salty/app.msgbus.go new file mode 120000 index 0000000..c6b168b --- /dev/null +++ b/cmd/salty/app.msgbus.go @@ -0,0 +1 @@ +../ev/app.msgbus.go \ No newline at end of file diff --git a/cmd/salty/app.salty.go b/cmd/salty/app.salty.go new file mode 120000 index 0000000..2098028 --- /dev/null +++ b/cmd/salty/app.salty.go @@ -0,0 +1 @@ +../ev/app.salty.go \ No newline at end of file diff --git a/cmd/salty/main.go b/cmd/salty/main.go new file mode 120000 index 0000000..7e61bac --- /dev/null +++ b/cmd/salty/main.go @@ -0,0 +1 @@ +../ev/main.go \ No newline at end of file diff --git a/cmd/salty/svc.es.go b/cmd/salty/svc.es.go new file mode 120000 index 0000000..6280e25 --- /dev/null +++ b/cmd/salty/svc.es.go @@ -0,0 +1 @@ +../ev/svc.es.go \ No newline at end of file diff --git a/cmd/salty/svc.http.go b/cmd/salty/svc.http.go new file mode 120000 index 0000000..3dc665f --- /dev/null +++ b/cmd/salty/svc.http.go @@ -0,0 +1 @@ +../ev/svc.http.go \ No newline at end of file diff --git a/pkg/es/es.go b/ev.go similarity index 99% rename from pkg/es/es.go rename to ev.go index edf5d99..dbd1ce5 100644 --- a/pkg/es/es.go +++ b/ev.go @@ -1,5 +1,5 @@ // package es implements an event store and drivers for extending its functionality. -package es +package ev import ( "context" diff --git a/pkg/es/es_test.go b/ev_test.go similarity index 84% rename from pkg/es/es_test.go rename to ev_test.go index d5bd3ff..91a9030 100644 --- a/pkg/es/es_test.go +++ b/ev_test.go @@ -1,4 +1,4 @@ -package es_test +package ev_test import ( "context" @@ -11,8 +11,8 @@ import ( "github.com/matryer/is" "go.uber.org/multierr" + "github.com/sour-is/ev" "github.com/sour-is/ev/app/peerfinder" - "github.com/sour-is/ev/pkg/es" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" "github.com/sour-is/ev/pkg/es/driver/projecter" resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" @@ -79,17 +79,17 @@ func TestES(t *testing.T) { is.NoErr(err) { - store, err := es.Open(ctx, "mem") - is.True(errors.Is(err, es.ErrNoDriver)) + store, err := ev.Open(ctx, "mem") + is.True(errors.Is(err, ev.ErrNoDriver)) is.True(store.EventStream() == nil) } { - _, err := es.Open(ctx, "bogo:") - is.True(errors.Is(err, es.ErrNoDriver)) + _, err := ev.Open(ctx, "bogo:") + is.True(errors.Is(err, ev.ErrNoDriver)) } - store, err := es.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) + store, err := ev.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) is.NoErr(err) thing := &Thing{Name: "time"} @@ -135,10 +135,10 @@ func TestESOperations(t *testing.T) { is := is.New(t) ctx := context.Background() - store, err := es.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) + store, err := ev.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) is.NoErr(err) - thing, err := es.Create(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error { + thing, err := ev.Create(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error { return agg.OnSetValue("foo") }) @@ -146,7 +146,7 @@ func TestESOperations(t *testing.T) { is.Equal(thing.Version(), uint64(1)) is.Equal(thing.Value, "foo") - thing, err = es.Update(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error { + thing, err = ev.Update(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error { return agg.OnSetValue("bar") }) @@ -154,7 +154,7 @@ func TestESOperations(t *testing.T) { is.Equal(thing.Version(), uint64(2)) is.Equal(thing.Value, "bar") - thing, err = es.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error { + thing, err = ev.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error { return agg.OnSetValue("bin") }) @@ -162,7 +162,7 @@ func TestESOperations(t *testing.T) { is.Equal(thing.Version(), uint64(1)) is.Equal(thing.Value, "bin") - thing, err = es.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error { + thing, err = ev.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error { return agg.OnSetValue("baz") }) @@ -178,8 +178,8 @@ func TestUnwrap(t *testing.T) { err := errors.New("foo") werr := fmt.Errorf("wrap: %w", err) - is.Equal(es.Unwrap(werr), err) - is.Equal(es.Unwrap("test"), "") + is.Equal(ev.Unwrap(werr), err) + is.Equal(ev.Unwrap("test"), "") } func TestUnwrapProjector(t *testing.T) { @@ -188,7 +188,7 @@ func TestUnwrapProjector(t *testing.T) { ctx, stop := context.WithCancel(context.Background()) defer stop() - es, err := es.Open( + es, err := ev.Open( ctx, "mem:", resolvelinks.New(), @@ -211,7 +211,7 @@ func TestMain(m *testing.M) { defer stop() err := multierr.Combine( - es.Init(ctx), + ev.Init(ctx), event.Init(ctx), memstore.Init(ctx), ) diff --git a/gqlgen.yml b/gqlgen.yml index 4b92908..b7d12de 100644 --- a/gqlgen.yml +++ b/gqlgen.yml @@ -14,6 +14,10 @@ model: filename: internal/graph/model/models_gen.go package: model +resolver: + filename: internal/graph/resolver/resolver.go + package: resolver + models: ID: model: diff --git a/httpmux.go b/httpmux.go deleted file mode 100644 index ed40f0f..0000000 --- a/httpmux.go +++ /dev/null @@ -1,40 +0,0 @@ -package main - -import ( - "net/http" - - "github.com/rs/cors" -) - -type mux struct { - *http.ServeMux - api *http.ServeMux -} - -func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { - mux := newMux() - for _, fn := range fns { - fn.RegisterHTTP(mux.ServeMux) - - if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok { - fn.RegisterAPIv1(mux.api) - } - } - - return cors.AllowAll().Handler(mux) -} -func newMux() *mux { - mux := &mux{ - api: http.NewServeMux(), - ServeMux: http.NewServeMux(), - } - mux.Handle("/api/v1/", http.StripPrefix("/api/v1", mux.api)) - - return mux -} - -type RegisterHTTP func(*http.ServeMux) - -func (fn RegisterHTTP) RegisterHTTP(mux *http.ServeMux) { - fn(mux) -} diff --git a/internal/graph/resolver/resolver.go b/internal/graph/resolver/resolver.go new file mode 100644 index 0000000..f66016d --- /dev/null +++ b/internal/graph/resolver/resolver.go @@ -0,0 +1,63 @@ +package resolver + +// THIS CODE IS A STARTING POINT ONLY. IT WILL NOT BE UPDATED WITH SCHEMA CHANGES. + +import ( + "context" + + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/graph/generated" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/gql" +) + +type Resolver struct{} + +// // foo +func (r *mutationResolver) TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) { + panic("not implemented") +} + +// // foo +func (r *mutationResolver) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) { + panic("not implemented") +} + +// // foo +func (r *queryResolver) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} + +// // foo +func (r *queryResolver) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} + +// // foo +func (r *queryResolver) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) { + panic("not implemented") +} + +// // foo +func (r *subscriptionResolver) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) { + panic("not implemented") +} + +// // foo +func (r *subscriptionResolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) { + panic("not implemented") +} + +// Mutation returns generated.MutationResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } + +// Subscription returns generated.SubscriptionResolver implementation. +func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } + +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } +type subscriptionResolver struct{ *Resolver } diff --git a/internal/lg/init.go b/internal/lg/init.go index 9a2a572..3c7ec2d 100644 --- a/internal/lg/init.go +++ b/internal/lg/init.go @@ -9,7 +9,7 @@ import ( "go.uber.org/multierr" ) -func Init(ctx context.Context, name string) (context.Context, func() error) { +func Init(ctx context.Context, name string) (context.Context, func(context.Context) error) { ctx, span := Span(ctx) defer span.End() @@ -21,7 +21,7 @@ func Init(ctx context.Context, name string) (context.Context, func() error) { reverse(stop[:]) - return ctx, func() error { + return ctx, func(context.Context) error { log.Println("flushing logs...") errs := make([]error, len(stop)) for i, fn := range stop { diff --git a/main.go b/main.go deleted file mode 100644 index c749a42..0000000 --- a/main.go +++ /dev/null @@ -1,245 +0,0 @@ -package main - -import ( - "context" - "errors" - "log" - "net/http" - "net/url" - "os" - "os/signal" - "runtime/debug" - "strings" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.uber.org/multierr" - "golang.org/x/sync/errgroup" - - "github.com/sour-is/ev/app/gql" - "github.com/sour-is/ev/app/msgbus" - "github.com/sour-is/ev/app/peerfinder" - "github.com/sour-is/ev/app/salty" - "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/cron" - "github.com/sour-is/ev/pkg/es" - diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" - memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" - "github.com/sour-is/ev/pkg/es/driver/projecter" - resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" - "github.com/sour-is/ev/pkg/es/driver/streamer" - "github.com/sour-is/ev/pkg/es/event" - "github.com/sour-is/ev/pkg/gql/resolver" - "github.com/sour-is/ev/pkg/set" -) - -func main() { - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) - go func() { - <-ctx.Done() - defer cancel() // restore interrupt function - }() - - // Initialize logger - ctx, stop := lg.Init(ctx, appName) - defer stop() - - // Run application - if err := run(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal(err) - } -} -func run(ctx context.Context) error { - g, ctx := errgroup.WithContext(ctx) - stop := &stopFns{} - - cron := cron.New(cron.DefaultGranularity) - - { - ctx, span := lg.Span(ctx) - - log.Println(appName, version) - span.SetAttributes( - attribute.String("app", appName), - attribute.String("version", version), - ) - - err := multierr.Combine( - es.Init(ctx), - event.Init(ctx), - diskstore.Init(ctx), - memstore.Init(ctx), - ) - if err != nil { - span.RecordError(err) - return err - } - - es, err := es.Open( - ctx, - env("EV_DATA", "mem:"), - resolvelinks.New(), - streamer.New(ctx), - projecter.New( - ctx, - projecter.DefaultProjection, - ), - ) - if err != nil { - span.RecordError(err) - return err - } - - s := http.Server{ - Addr: env("EV_HTTP", ":8080"), - } - - if strings.HasPrefix(s.Addr, ":") { - s.Addr = "[::]" + s.Addr - } - - enable := set.New(strings.Fields(env("EV_ENABLE", "salty msgbus gql peers"))...) - var svcs []interface{ RegisterHTTP(*http.ServeMux) } - var res []resolver.IsResolver - - res = append(res, es) - - if enable.Has("salty") { - span.AddEvent("Enable Salty") - base, err := url.JoinPath(env("EV_BASE_URL", "http://"+s.Addr), "inbox") - if err != nil { - span.RecordError(err) - return err - } - - salty, err := salty.New(ctx, es, base) - if err != nil { - span.RecordError(err) - return err - } - svcs = append(svcs, salty) - res = append(res, salty) - } - - if enable.Has("msgbus") { - span.AddEvent("Enable Msgbus") - msgbus, err := msgbus.New(ctx, es) - if err != nil { - span.RecordError(err) - return err - } - svcs = append(svcs, msgbus) - res = append(res, msgbus) - } - - if enable.Has("peers") { - span.AddEvent("Enable Peers") - es.Option(projecter.New(ctx, peerfinder.Projector)) - - peers, err := peerfinder.New(ctx, es, env("PEER_STATUS", "")) - if err != nil { - span.RecordError(err) - return err - } - svcs = append(svcs, peers) - cron.Once(ctx, peers.RefreshJob) - cron.NewJob("0,15,30,45", peers.RefreshJob) - cron.Once(ctx, peers.CleanJob) - cron.NewJob("0 1", peers.CleanJob) - g.Go(func() error { - return peers.Run(ctx) - }) - stop.add(peers.Stop) - } - - if enable.Has("gql") { - span.AddEvent("Enable GraphQL") - gql, err := resolver.New(ctx, &gql.Resolver{}, res...) - if err != nil { - span.RecordError(err) - return err - } - svcs = append(svcs, gql) - } - svcs = append(svcs, lg.NewHTTP(ctx), RegisterHTTP(func(mux *http.ServeMux) { - mux.Handle("/", http.RedirectHandler("/playground", http.StatusTemporaryRedirect)) - })) - - s.Handler = httpMux(svcs...) - - log.Print("Listen on ", s.Addr) - span.AddEvent("begin listen and serve on " + s.Addr) - - Mup, err := lg.Meter(ctx).SyncInt64().UpDownCounter("up") - if err != nil { - return err - } - Mup.Add(ctx, 1) - - g.Go(s.ListenAndServe) - stop.add(s.Shutdown) - - span.End() - } - - g.Go(func() error { - <-ctx.Done() - // shutdown jobs - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - return stop.stop(ctx) - }) - g.Go(func() error { - return cron.Run(ctx) - }) - - if err := g.Wait(); err != nil && err != http.ErrServerClosed { - return err - } - return nil -} -func env(name, defaultValue string) string { - name = strings.TrimSpace(name) - defaultValue = strings.TrimSpace(defaultValue) - if v := strings.TrimSpace(os.Getenv(name)); v != "" { - log.Println("#", name, "=", v) - return v - } - log.Println("#", name, "=", defaultValue, "(default)") - return defaultValue -} - -var appName, version = func() (string, string) { - if info, ok := debug.ReadBuildInfo(); ok { - _, name, _ := strings.Cut(info.Main.Path, "/") - name = strings.Replace(name, "-", ".", -1) - name = strings.Replace(name, "/", "-", -1) - return name, info.Main.Version - } - - return "sour.is-ev", "(devel)" -}() - -type application interface { - Setup(context.Context) error -} - -type stopFns struct { - fns []func(context.Context) error -} - -func (s *stopFns) add(fn func(context.Context) error) { - s.fns = append(s.fns, fn) -} -func (s *stopFns) stop(ctx context.Context) error { - g, _ := errgroup.WithContext(ctx) - for i := range s.fns { - fn := s.fns[i] - g.Go(func() error { - return fn(ctx) - }) - } - return g.Wait() -} diff --git a/pkg/README.md b/pkg/README.md new file mode 100644 index 0000000..994d1ce --- /dev/null +++ b/pkg/README.md @@ -0,0 +1,3 @@ +# Pkg Tools + +This is a collection of modules that provide simple reusable functions. \ No newline at end of file diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go index b7e58ec..afe2f74 100644 --- a/pkg/cron/cron.go +++ b/pkg/cron/cron.go @@ -60,7 +60,7 @@ func parseInto(c string, s *set.BoundSet[int8]) *set.BoundSet[int8] { // 24hour time. Any of the values may be -1 as an "any" match, so passing in // a day of -1, the event occurs every day; passing in a second value of -1, the // event will fire every second that the other parameters match. -func (c *cron) NewJob(expr string, task task) { +func (c *cron) NewCron(expr string, task func(context.Context, time.Time) error) { sp := append(strings.Fields(expr), make([]string, 5)...)[:5] job := job{ @@ -73,7 +73,7 @@ func (c *cron) NewJob(expr string, task task) { } c.jobs = append(c.jobs, job) } -func (c *cron) Once(ctx context.Context, once task) { +func (c *cron) RunOnce(ctx context.Context, once func(context.Context, time.Time) error) { c.state.Modify(ctx, func(ctx context.Context, state *state) error { state.queue = append(state.queue, once) return nil diff --git a/pkg/env/env.go b/pkg/env/env.go new file mode 100644 index 0000000..305fb15 --- /dev/null +++ b/pkg/env/env.go @@ -0,0 +1,40 @@ +package env + +import ( + "log" + "os" + "strings" +) + +func Default(name, defaultValue string) string { + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", v) + return v + } + log.Println("# ", name, "=", defaultValue, "(default)") + return defaultValue +} + +type secret string + +func (s secret) String() string { + if s == "" { + return "(nil)" + } + return "***" +} +func (s secret) Secret() string { + return string(s) +} +func Secret(name, defaultValue string) secret { + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", secret(v)) + return secret(v) + } + log.Println("# ", name, "=", secret(defaultValue), "(default)") + return secret(defaultValue) +} diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 3601ae8..396e12b 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -16,9 +16,9 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.uber.org/multierr" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/cache" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" @@ -41,8 +41,8 @@ type diskStore struct { m_disk_write syncint64.Counter } -const AppendOnly = es.AppendOnly -const AllEvents = es.AllEvents +const AppendOnly = ev.AppendOnly +const AllEvents = ev.AllEvents func Init(ctx context.Context) error { ctx, span := lg.Span(ctx) @@ -65,7 +65,7 @@ func Init(ctx context.Context) error { d.m_disk_write, err = m.SyncInt64().Counter("disk_write") errs = multierr.Append(errs, err) - es.Register(ctx, "file", d) + ev.Register(ctx, "file", d) return errs } @@ -204,7 +204,7 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint } if version != AppendOnly && version != last { - err = fmt.Errorf("%w: current version wrong %d != %d", es.ErrWrongVersion, version, last) + err = fmt.Errorf("%w: current version wrong %d != %d", ev.ErrWrongVersion, version, last) span.RecordError(err) return err } @@ -411,7 +411,7 @@ func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event b, err = stream.Read(index) if err != nil { if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { - err = fmt.Errorf("%w: empty", es.ErrNotFound) + err = fmt.Errorf("%w: empty", ev.ErrNotFound) } span.RecordError(err) @@ -444,7 +444,7 @@ func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.E b, err = stream.Read(idx) if err != nil { if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { - err = fmt.Errorf("%w: empty", es.ErrNotFound) + err = fmt.Errorf("%w: empty", ev.ErrNotFound) } span.RecordError(err) diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index 370f476..64478a2 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -5,8 +5,8 @@ import ( "context" "fmt" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" @@ -24,14 +24,14 @@ type memstore struct { state *locker.Locked[state] } -const AppendOnly = es.AppendOnly -const AllEvents = es.AllEvents +const AppendOnly = ev.AppendOnly +const AllEvents = ev.AllEvents func Init(ctx context.Context) error { ctx, span := lg.Span(ctx) defer span.End() - return es.Register(ctx, "mem", &memstore{}) + return ev.Register(ctx, "mem", &memstore{}) } var _ driver.Driver = (*memstore)(nil) @@ -84,7 +84,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint last := uint64(len(*stream)) if version != AppendOnly && version != last { - return fmt.Errorf("%w: current version wrong %d != %d", es.ErrWrongVersion, version, last) + return fmt.Errorf("%w: current version wrong %d != %d", ev.ErrWrongVersion, version, last) } for i := range events { diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 39f62fe..ea3e2bd 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -5,8 +5,8 @@ import ( "context" "strings" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" ) @@ -19,7 +19,7 @@ type projector struct { func New(_ context.Context, fns ...func(event.Event) []event.Event) *projector { return &projector{fns: fns} } -func (p *projector) Apply(e *es.EventStore) { +func (p *projector) Apply(e *ev.EventStore) { up := e.Driver for up != nil { @@ -29,7 +29,7 @@ func (p *projector) Apply(e *es.EventStore) { return } - up = es.Unwrap(up) + up = ev.Unwrap(up) } p.up = e.Driver @@ -112,7 +112,7 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6 span.RecordError(err) continue } - _, err = l.Append(ctx, event.NewEvents(e), es.AppendOnly) + _, err = l.Append(ctx, event.NewEvents(e), ev.AppendOnly) span.RecordError(err) } }() diff --git a/pkg/es/driver/projecter/projector_test.go b/pkg/es/driver/projecter/projector_test.go index 942b621..4e19878 100644 --- a/pkg/es/driver/projecter/projector_test.go +++ b/pkg/es/driver/projecter/projector_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/matryer/is" - "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/driver/projecter" "github.com/sour-is/ev/pkg/es/event" @@ -112,10 +112,10 @@ func TestProjecter(t *testing.T) { return mockEL, nil } - es.Init(ctx) - es.Register(ctx, "mock", mock) + ev.Init(ctx) + ev.Register(ctx, "mock", mock) - es, err := es.Open( + es, err := ev.Open( ctx, "mock:", projecter.New(ctx, projecter.DefaultProjection), diff --git a/pkg/es/driver/resolve-links/resolve-links.go b/pkg/es/driver/resolve-links/resolve-links.go index 7ec91cc..f079f88 100644 --- a/pkg/es/driver/resolve-links/resolve-links.go +++ b/pkg/es/driver/resolve-links/resolve-links.go @@ -4,8 +4,8 @@ import ( "context" "errors" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" ) @@ -18,7 +18,7 @@ func New() *resolvelinks { return &resolvelinks{} } -func (r *resolvelinks) Apply(es *es.EventStore) { +func (r *resolvelinks) Apply(es *ev.EventStore) { r.up = es.Driver es.Driver = r } @@ -77,7 +77,7 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve } ptr := ptrs[streamID] lis, err := d.ReadN(ctx, ids...) - if err != nil && !errors.Is(err, es.ErrNotFound) { + if err != nil && !errors.Is(err, ev.ErrNotFound) { return nil, err } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index b8b1f7f..9a4f184 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -9,8 +9,8 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" - "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" @@ -32,9 +32,9 @@ func New(ctx context.Context) *streamer { return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})} } -var _ es.Option = (*streamer)(nil) +var _ ev.Option = (*streamer)(nil) -func (s *streamer) Apply(e *es.EventStore) { +func (s *streamer) Apply(e *ev.EventStore) { s.up = e.Driver e.Driver = s } @@ -72,7 +72,7 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) sub := &subscription{topic: streamID, events: events} sub.position = locker.New(&position{ idx: start, - size: es.AllEvents, + size: ev.AllEvents, }) sub.unsub = s.delete(streamID, sub) @@ -232,7 +232,7 @@ func (s *subscription) Recv(ctx context.Context) <-chan bool { _, span := lg.Span(ctx) defer span.End() - if position.size == es.AllEvents { + if position.size == ev.AllEvents { return nil } if position.size == 0 { diff --git a/pkg/es/graph.go b/pkg/es/graph.go index e1329d7..4d54431 100644 --- a/pkg/es/graph.go +++ b/pkg/es/graph.go @@ -7,6 +7,7 @@ import ( "net/http" "time" + "github.com/sour-is/ev" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" @@ -23,13 +24,17 @@ type contextKey struct { var esKey = contextKey{"event-store"} +type EventStore struct { + *ev.EventStore +} + func (es *EventStore) IsResolver() {} func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { ctx, span := lg.Span(ctx) defer span.End() lis, err := es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) - if err != nil && !errors.Is(err, ErrNotFound) { + if err != nil && !errors.Is(err, ev.ErrNotFound) { span.RecordError(err) return nil, err } diff --git a/pkg/gql/resolver/resolver.go b/pkg/gql/resolver/resolver.go index a7289fc..68b6e1b 100644 --- a/pkg/gql/resolver/resolver.go +++ b/pkg/gql/resolver/resolver.go @@ -52,12 +52,14 @@ outer: rs := reflect.ValueOf(resolvers[i]) if field.IsNil() && rs.Type().Implements(field.Type()) { + // log.Print("found ", field.Type().Name()) span.AddEvent(fmt.Sprint("found ", field.Type().Name())) field.Set(rs) continue outer } } + // log.Print(fmt.Sprint("default ", field.Type().Name())) span.AddEvent(fmt.Sprint("default ", field.Type().Name())) field.Set(noop) } diff --git a/pkg/mux/httpmux.go b/pkg/mux/httpmux.go new file mode 100644 index 0000000..c2a2dea --- /dev/null +++ b/pkg/mux/httpmux.go @@ -0,0 +1,45 @@ +package mux + +import ( + "net/http" +) + +type mux struct { + *http.ServeMux + api *http.ServeMux + wellknown *http.ServeMux +} + +func (mux *mux) Add(fns ...interface{ RegisterHTTP(*http.ServeMux) }) { + for _, fn := range fns { + // log.Printf("HTTP: %T", fn) + fn.RegisterHTTP(mux.ServeMux) + + if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok { + // log.Printf("APIv1: %T", fn) + fn.RegisterAPIv1(mux.api) + } + + if fn, ok := fn.(interface{ RegisterWellKnown(*http.ServeMux) }); ok { + // log.Printf("APIv1: %T", fn) + fn.RegisterWellKnown(mux.wellknown) + } + } +} +func New() *mux { + mux := &mux{ + api: http.NewServeMux(), + wellknown: http.NewServeMux(), + ServeMux: http.NewServeMux(), + } + mux.Handle("/api/v1/", http.StripPrefix("/api/v1", mux.api)) + mux.Handle("/.well-known/", http.StripPrefix("/.well-known/", mux.api)) + + return mux +} + +type RegisterHTTP func(*http.ServeMux) + +func (fn RegisterHTTP) RegisterHTTP(mux *http.ServeMux) { + fn(mux) +} diff --git a/httpmux_test.go b/pkg/mux/httpmux_test.go similarity index 84% rename from httpmux_test.go rename to pkg/mux/httpmux_test.go index 53b580e..0ffa87b 100644 --- a/httpmux_test.go +++ b/pkg/mux/httpmux_test.go @@ -1,4 +1,4 @@ -package main +package mux_test import ( "net/http" @@ -6,6 +6,7 @@ import ( "testing" "github.com/matryer/is" + "github.com/sour-is/ev/pkg/mux" ) type mockHTTP struct { @@ -15,7 +16,6 @@ type mockHTTP struct { func (m *mockHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.onServeHTTP() } - func (h *mockHTTP) RegisterHTTP(mux *http.ServeMux) { mux.Handle("/", h) } @@ -28,7 +28,8 @@ func TestHttpMux(t *testing.T) { called := false - mux := httpMux(&mockHTTP{func() { called = true }}) + mux := mux.New() + mux.Add(&mockHTTP{func() { called = true }}) is.True(mux != nil) diff --git a/pkg/service/service.go b/pkg/service/service.go new file mode 100644 index 0000000..745eb94 --- /dev/null +++ b/pkg/service/service.go @@ -0,0 +1,169 @@ +package service + +import ( + "context" + "log" + "net/http" + "runtime/debug" + "sort" + "strings" + "time" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/cron" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" +) + +type crontab interface { + NewCron(expr string, task func(context.Context, time.Time) error) + RunOnce(ctx context.Context, once func(context.Context, time.Time) error) +} +type Harness struct { + crontab + + Services []any + + onStart []func(context.Context) error + onStop []func(context.Context) error +} + +func (s *Harness) Setup(ctx context.Context, apps ...application) error { + ctx, span := lg.Span(ctx) + defer span.End() + + // setup crontab + c := cron.New(cron.DefaultGranularity) + s.OnStart(c.Run) + s.crontab = c + + var err error + for _, app := range apps { + err = multierr.Append(err, app(ctx, s)) + } + + span.RecordError(err) + return err +} +func (s *Harness) OnStart(fn func(context.Context) error) { + s.onStart = append(s.onStart, fn) +} +func (s *Harness) OnStop(fn func(context.Context) error) { + s.onStop = append(s.onStop, fn) +} +func (s *Harness) Add(svcs ...any) { + s.Services = append(s.Services, svcs...) +} +func (s *Harness) stop(ctx context.Context) error { + g, _ := errgroup.WithContext(ctx) + for i := range s.onStop { + fn := s.onStop[i] + g.Go(func() error { + if err := fn(ctx); err != nil && err != http.ErrServerClosed { + return err + } + return nil + }) + } + return g.Wait() +} +func (s *Harness) Run(ctx context.Context, appName, version string) error { + { + ctx, span := lg.Span(ctx) + + log.Println(appName, version) + span.SetAttributes( + attribute.String("app", appName), + attribute.String("version", version), + ) + + Mup, err := lg.Meter(ctx).SyncInt64().UpDownCounter("up") + if err != nil { + return err + } + Mup.Add(ctx, 1) + + span.End() + } + + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { + <-ctx.Done() + // shutdown jobs + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return s.stop(ctx) + }) + + for i := range s.onStart { + fn := s.onStart[i] + g.Go(func() error { return fn(ctx) }) + } + + return g.Wait() +} + +type application func(context.Context, *Harness) error // Len is the number of elements in the collection. + +type appscore struct { + score int + application +} +type Apps []appscore + +func (a *Apps) Apps() []application { + sort.Sort(a) + lis := make([]application, len(*a)) + for i, app := range *a { + lis[i] = app.application + } + return lis +} + +// Len is the number of elements in the collection. +func (a *Apps) Len() int { + if a == nil { + return 0 + } + return len(*a) +} + +// Less reports whether the element with index i +func (a *Apps) Less(i int, j int) bool { + if a == nil { + return false + } + + return (*a)[i].score < (*a)[j].score +} + +// Swap swaps the elements with indexes i and j. +func (a *Apps) Swap(i int, j int) { + if a == nil { + return + } + + (*a)[i], (*a)[j] = (*a)[j], (*a)[i] +} + +func (a *Apps) Register(score int, app application) (none struct{}) { + if a == nil { + return + } + + *a = append(*a, appscore{score, app}) + return +} + +func AppName() (string, string) { + if info, ok := debug.ReadBuildInfo(); ok { + _, name, _ := strings.Cut(info.Main.Path, "/") + name = strings.Replace(name, "-", ".", -1) + name = strings.Replace(name, "/", "-", -1) + return name, info.Main.Version + } + + return "sour.is-app", "(devel)" +} diff --git a/pkg/slice/slice.go b/pkg/slice/slice.go new file mode 100644 index 0000000..538b028 --- /dev/null +++ b/pkg/slice/slice.go @@ -0,0 +1,35 @@ +package slice + +// FilterType returns a subset that matches the type. +func FilterType[T any](in ...any) []T { + lis := make([]T, 0, len(in)) + for _, u := range in { + if t, ok := u.(T); ok { + lis = append(lis, t) + } + } + return lis +} + +// Find returns the first of type found. or false if not found. +func Find[T any](in ...any) (T, bool) { + return First(FilterType[T](in...)...) +} + +// First returns the first element in a slice. +func First[T any](in ...T) (T, bool) { + if len(in) == 0 { + var zero T + return zero, false + } + return in[0], true +} + +// Map applys func to each element s and returns results as slice. +func Map[T, U any](s []T, f func(T) U) []U { + r := make([]U, len(s)) + for i, v := range s { + r[i] = f(v) + } + return r +}