From b1bff4cbf0a013c0afd5bb6eab3e5a0c3e240f02 Mon Sep 17 00:00:00 2001 From: xuu Date: Fri, 19 Apr 2024 10:56:27 -0600 Subject: [PATCH] add libsql support --- go.mod | 7 +- go.sum | 5 + go.work.sum | 9 +- ident/source/mercury.go | 55 +++--- libsql_embed/open.go | 68 ++++++-- mercury/app/environ.go | 17 +- mercury/namespace_test.go | 59 ------- mercury/registry.go | 90 +++++----- mercury/routes.go | 10 +- mercury/{namespace.go => spec.go} | 89 +++++++++- mercury/spec_test.go | 109 ++++++++++++ mercury/sql/list-string.go | 4 +- mercury/sql/notify.go | 2 +- mercury/sql/otel.go | 34 ++-- mercury/sql/sql.go | 280 ++++++++++++++++++++++++------ rsql/dbcolumns.go | 18 +- service/service.go | 1 - 17 files changed, 609 insertions(+), 248 deletions(-) delete mode 100644 mercury/namespace_test.go rename mercury/{namespace.go => spec.go} (53%) create mode 100644 mercury/spec_test.go diff --git a/go.mod b/go.mod index f8cf90d..8e9e505 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect github.com/oklog/ulid/v2 v2.1.0 github.com/prometheus/client_golang v1.18.0 - github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1 go.nhat.io/otelsql v0.12.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 go.opentelemetry.io/contrib/instrumentation/runtime v0.48.0 @@ -72,10 +71,10 @@ require ( go.opentelemetry.io/otel/trace v1.23.1 go.opentelemetry.io/proto/otlp v1.1.0 // indirect golang.org/x/exp v0.0.0-20240213143201-ec583247a57a - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/grpc v1.61.1 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect modernc.org/sqlite v1.29.1 ) diff --git a/go.sum b/go.sum index dedc078..aa7574f 100644 --- a/go.sum +++ b/go.sum @@ -183,6 +183,8 @@ golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20170912212905-13449ad91cb2/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20170517211232-f52d1811a629/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= @@ -191,6 +193,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -216,6 +219,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/go.work.sum b/go.work.sum index 370b28e..a32bac6 100644 --- a/go.work.sum +++ b/go.work.sum @@ -160,7 +160,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -242,6 +241,8 @@ go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+ go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -275,10 +276,10 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/telemetry v0.0.0-20240208230135-b75ee8823808/go.mod h1:KG1lNk5ZFNssSZLrpVb4sMXKMpGwGXOxSG3rnu2gZQQ= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -291,7 +292,6 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6f google.golang.org/api v0.162.0/go.mod h1:6SulDkfoBIg4NFmCuZ39XeeAgSHCPecfSUuDyYlAHs0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= @@ -325,7 +325,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= diff --git a/ident/source/mercury.go b/ident/source/mercury.go index c290cb5..142aa8f 100644 --- a/ident/source/mercury.go +++ b/ident/source/mercury.go @@ -7,17 +7,17 @@ import ( "strings" "time" + "go.sour.is/pkg/ident" "go.sour.is/pkg/lg" "go.sour.is/pkg/mercury" - "go.sour.is/pkg/ident" ) const identNS = "ident." const identSFX = ".credentials" type registry interface { - GetIndex(ctx context.Context, match, search string) (c mercury.Config, err error) - GetConfig(ctx context.Context, match, search, fields string) (mercury.Config, error) + GetIndex(ctx context.Context, search mercury.Search) (c mercury.Config, err error) + GetConfig(ctx context.Context, search mercury.Search) (mercury.Config, error) WriteConfig(ctx context.Context, spaces mercury.Config) error } @@ -50,6 +50,7 @@ func (id *mercuryIdent) FromConfig(cfg mercury.Config) error { switch { case strings.HasSuffix(s.Space, ".credentials"): id.passwd = []byte(s.FirstValue("passwd").First()) + id.ed25519 = []byte(s.FirstValue("ed25519").First()) default: id.display = s.FirstValue("displayName").First() } @@ -59,34 +60,29 @@ func (id *mercuryIdent) FromConfig(cfg mercury.Config) error { func (id *mercuryIdent) ToConfig() mercury.Config { space := id.Space() + list := func(values ...mercury.Value) []mercury.Value { return values } + value := func(space string, seq uint64, name string, values ...string) mercury.Value { + return mercury.Value{ + Space: space, + Seq: seq, + Name: name, + Values: values, + } + } return mercury.Config{ &mercury.Space{ Space: space, - List: []mercury.Value{ - { - Space: space, - Seq: 1, - Name: "displayName", - Values: []string{id.display}, - }, - { - Space: space, - Seq: 2, - Name: "lastLogin", - Values: []string{time.UnixMilli(int64(id.Session().SessionID.Time())).Format(time.RFC3339)}, - }, - }, + List: list( + value(space, 1, "displayName", id.display), + value(space, 2, "lastLogin", time.UnixMilli(int64(id.Session().SessionID.Time())).Format(time.RFC3339)), + ), }, &mercury.Space{ Space: space + identSFX, - List: []mercury.Value{ - { - Space: space + identSFX, - Seq: 1, - Name: "passwd", - Values: []string{string(id.passwd)}, - }, - }, + List: list( + value(space+identSFX, 1, "passwd", string(id.passwd)), + value(space+identSFX, 1, "ed25519", string(id.ed25519)), + ), }, } } @@ -140,7 +136,7 @@ func (s *mercurySource) readIdentURL(r *http.Request) (ident.Ident, error) { } space := id.Space() - c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "") + c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX)) if err != nil { span.RecordError(err) return id, err @@ -183,7 +179,7 @@ func (s *mercurySource) readIdentBasic(r *http.Request) (ident.Ident, error) { } space := id.Space() - c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "") + c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX)) if err != nil { span.RecordError(err) return id, err @@ -228,7 +224,7 @@ func (s *mercurySource) readIdentHTTP(r *http.Request) (ident.Ident, error) { } space := id.Space() - c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "") + c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX)) if err != nil { span.RecordError(err) return id, err @@ -260,9 +256,8 @@ func (s *mercurySource) RegisterIdent(ctx context.Context, identity, display str defer span.End() id := &mercuryIdent{identity: identity, display: display, passwd: passwd} - space := id.Space() - _, err := s.r.GetIndex(ctx, space, "") + _, err := s.r.GetIndex(ctx, mercury.ParseSearch( id.Space())) if err != nil { return nil, err } diff --git a/libsql_embed/open.go b/libsql_embed/open.go index 2da1a52..c8ce30d 100644 --- a/libsql_embed/open.go +++ b/libsql_embed/open.go @@ -4,7 +4,10 @@ import ( "context" "database/sql" "database/sql/driver" + "errors" "fmt" + "io" + "log" "net/url" "os" "path/filepath" @@ -17,28 +20,37 @@ import ( ) func init() { - sql.Register("libsql+embed", &db{}) + sql.Register("libsql+embed", &db{conns: make(map[string]*connector)}) } type db struct { - conns map[string]connector + conns map[string]*connector mu sync.RWMutex } type connector struct { *libsql.Connector - dsn string - dir string - driver *db + dsn string + dir string + driver *db + removeDir bool } +var _ io.Closer = (*connector)(nil) + func (c *connector) Close() error { + log.Println("closing db connection", c.dir) + defer log.Println("closed db connection", c.dir) + c.driver.mu.Lock() delete(c.driver.conns, c.dsn) c.driver.mu.Unlock() - defer os.RemoveAll(c.dir) + if c.removeDir { + defer os.RemoveAll(c.dir) + } + log.Println("sync db") if err := c.Connector.Sync(); err != nil { return fmt.Errorf("syncing database: %w", err) } @@ -47,7 +59,12 @@ func (c *connector) Close() error { } func (db *db) OpenConnector(dsn string) (driver.Connector, error) { - if c, ok := func() (connector, bool) { + // log.Println("connector", dsn) + if dsn == "" { + return nil, fmt.Errorf("no dsn") + } + + if c, ok := func() (*connector, bool) { db.mu.RLock() defer db.mu.RUnlock() c, ok := db.conns[dsn] @@ -79,20 +96,39 @@ func (db *db) OpenConnector(dsn string) (driver.Connector, error) { libsql.WithAuthToken(authToken), } - if refresh, err := strconv.ParseInt(u.Query().Get("refresh"),10,64); err == nil { + if refresh, err := strconv.ParseInt(u.Query().Get("refresh"), 10, 64); err == nil { + log.Println("refresh: ", refresh) opts = append(opts, libsql.WithSyncInterval(time.Duration(refresh)*time.Minute)) } if readWrite, err := strconv.ParseBool(u.Query().Get("readYourWrites")); err == nil { + log.Println("read your writes: ", readWrite) opts = append(opts, libsql.WithReadYourWrites(readWrite)) } if key := u.Query().Get("key"); key != "" { opts = append(opts, libsql.WithEncryption(key)) } - - dir, err := os.MkdirTemp("", "libsql-*") - if err != nil { - return nil, fmt.Errorf("creating temporary directory: %w", err) + + var dir string + var removeDir bool + if dir = u.Query().Get("store"); dir == "" { + removeDir = true + dir, err = os.MkdirTemp("", "libsql-*") + log.Println("creating temporary directory:", dir) + if err != nil { + return nil, fmt.Errorf("creating temporary directory: %w", err) + } + } else { + stat, err := os.Stat(dir) + if errors.Is(err, os.ErrNotExist) { + if err = os.MkdirAll(dir, 0700); err != nil { + return nil, err + } + } else { + if !stat.IsDir() { + return nil, fmt.Errorf("store not directory") + } + } } dbPath := filepath.Join(dir, dbname) @@ -105,13 +141,19 @@ func (db *db) OpenConnector(dsn string) (driver.Connector, error) { return nil, fmt.Errorf("creating connector: %w", err) } - connector := connector{c, dsn, dir, db} + log.Println("sync db") + if err := c.Sync(); err != nil { + return nil, fmt.Errorf("syncing database: %w", err) + } + connector := &connector{c, dsn, dir, db, removeDir} db.conns[dsn] = connector return connector, nil } func (db *db) Open(dsn string) (driver.Conn, error) { + log.Println("open", dsn) + c, err := db.OpenConnector(dsn) if err != nil { return nil, err diff --git a/mercury/app/environ.go b/mercury/app/environ.go index 9a8aa76..dcc9ef3 100644 --- a/mercury/app/environ.go +++ b/mercury/app/environ.go @@ -8,9 +8,8 @@ import ( "sort" "strings" - "go.sour.is/pkg/mercury" "go.sour.is/pkg/ident" - "go.sour.is/pkg/rsql" + "go.sour.is/pkg/mercury" "go.sour.is/pkg/set" ) @@ -20,8 +19,9 @@ const ( mercuryHost = "mercury.host" appDotEnviron = "mercury.environ" ) + var ( - mercuryPolicy = func(id string) string { return "mercury.@" + id + ".policy" } + mercuryPolicy = func(id string) string { return "mercury.@" + id + ".policy" } ) func Register(name string, cfg mercury.SpaceMap) { @@ -41,8 +41,13 @@ type mercuryEnviron struct { lookup func(context.Context, ident.Ident) (mercury.Rules, error) } +func getSearch(spec mercury.Search) mercury.NamespaceSearch { + return spec.NamespaceSearch +} + // Index returns nil -func (app *mercuryEnviron) GetIndex(ctx context.Context, search mercury.NamespaceSearch, _ *rsql.Program) (lis mercury.Config, err error) { +func (app *mercuryEnviron) GetIndex(ctx context.Context, spec mercury.Search) (lis mercury.Config, err error) { + search := getSearch(spec) if search.Match(mercurySource) { for _, s := range app.cfg.ToArray() { @@ -74,7 +79,9 @@ func (app *mercuryEnviron) GetIndex(ctx context.Context, search mercury.Namespac } // Objects returns nil -func (app *mercuryEnviron) GetConfig(ctx context.Context, search mercury.NamespaceSearch, _ *rsql.Program, _ []string) (lis mercury.Config, err error) { +func (app *mercuryEnviron) GetConfig(ctx context.Context, spec mercury.Search) (lis mercury.Config, err error) { + search := getSearch(spec) + if search.Match(mercurySource) { for _, s := range app.cfg.ToArray() { if search.Match(s.Space) { diff --git a/mercury/namespace_test.go b/mercury/namespace_test.go deleted file mode 100644 index 82e7951..0000000 --- a/mercury/namespace_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package mercury_test - -import ( - "fmt" - "testing" - - "github.com/matryer/is" - "go.sour.is/pkg/mercury" - - sq "github.com/Masterminds/squirrel" -) - -func TestNamespaceParse(t *testing.T) { - var tests = []struct { - in string - out string - args []any - }{ - { - in: "d42.bgp.kapha.*;trace:d42.bgp.kapha", - out: "(column LIKE ? OR ? LIKE column || '%')", - args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"}, - }, - - { - in: "d42.bgp.kapha.*,d42.bgp.kapha", - out: "(column LIKE ? OR column = ?)", - args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"}, - }, - } - - for i, tt := range tests { - t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { - is := is.New(t) - out := mercury.ParseNamespace(tt.in) - sql, args, err := getWhere(out).ToSql() - is.NoErr(err) - is.Equal(sql, tt.out) - is.Equal(args, tt.args) - }) - } -} - -func getWhere(search mercury.NamespaceSearch) sq.Sqlizer { - var where sq.Or - space := "column" - for _, m := range search { - switch m.(type) { - case mercury.NamespaceNode: - where = append(where, sq.Eq{space: m.Value()}) - case mercury.NamespaceStar: - where = append(where, sq.Like{space: m.Value()}) - case mercury.NamespaceTrace: - e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value()) - where = append(where, e) - } - } - return where -} diff --git a/mercury/registry.go b/mercury/registry.go index a48d371..ee98d26 100644 --- a/mercury/registry.go +++ b/mercury/registry.go @@ -3,6 +3,7 @@ package mercury import ( "context" "fmt" + "log" "path/filepath" "sort" "strconv" @@ -10,16 +11,15 @@ import ( "go.sour.is/pkg/ident" "go.sour.is/pkg/lg" - "go.sour.is/pkg/rsql" "go.sour.is/pkg/set" "golang.org/x/sync/errgroup" ) type GetIndex interface { - GetIndex(context.Context, NamespaceSearch, *rsql.Program) (Config, error) + GetIndex(context.Context, Search) (Config, error) } type GetConfig interface { - GetConfig(context.Context, NamespaceSearch, *rsql.Program, []string) (Config, error) + GetConfig(context.Context, Search) (Config, error) } type WriteConfig interface { WriteConfig(context.Context, Config) error @@ -60,7 +60,7 @@ func (reg *registry) accessFilter(rules Rules, lis Config) (out Config, err erro // HandlerItem a single handler matching type matcher[T any] struct { Name string - Match NamespaceSearch + Match Search Priority int Handler T } @@ -122,17 +122,23 @@ func (r *registry) Register(name string, h func(*Space) any) { func (r *registry) Configure(m SpaceMap) error { r.resetMatchers() for space, c := range m { + log.Println("configure: ", space) + if strings.HasPrefix(space, "mercury.source.") { space = strings.TrimPrefix(space, "mercury.source.") handler, name, _ := strings.Cut(space, ".") matches := c.FirstValue("match") + readonly := c.HasTag("readonly") for _, match := range matches.Values { ps := strings.Fields(match) priority, err := strconv.Atoi(ps[0]) if err != nil { return err } - r.add(name, handler, ps[1], priority, c) + err = r.add(name, handler, strings.Join(ps[1:],"|"), priority, c, readonly) + if err != nil { + return err + } } } @@ -146,7 +152,10 @@ func (r *registry) Configure(m SpaceMap) error { if err != nil { return err } - r.add(name, handler, ps[1], priority, c) + err = r.add(name, handler, strings.Join(ps[1:],"|"), priority, c, false) + if err != nil { + return err + } } } } @@ -156,8 +165,8 @@ func (r *registry) Configure(m SpaceMap) error { } // Register add a handler to registry -func (r *registry) add(name, handler, match string, priority int, cfg *Space) error { - // log.Infos("mercury regster", "match", match, "pri", priority) +func (r *registry) add(name, handler, match string, priority int, cfg *Space, readonly bool) error { + log.Println("mercury regster", "match", match, "pri", priority) mkHandler, ok := r.handlers[handler] if !ok { return fmt.Errorf("handler not registered: %s", handler) @@ -173,61 +182,68 @@ func (r *registry) add(name, handler, match string, priority int, cfg *Space) er if hdlr, ok := hdlr.(GetIndex); ok { r.matchers.getIndex = append( r.matchers.getIndex, - matcher[GetIndex]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[GetIndex]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } if hdlr, ok := hdlr.(GetConfig); ok { r.matchers.getConfig = append( r.matchers.getConfig, - matcher[GetConfig]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[GetConfig]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } - if hdlr, ok := hdlr.(WriteConfig); ok { + if hdlr, ok := hdlr.(WriteConfig); !readonly && ok { r.matchers.writeConfig = append( r.matchers.writeConfig, - matcher[WriteConfig]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[WriteConfig]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } if hdlr, ok := hdlr.(GetRules); ok { r.matchers.getRules = append( r.matchers.getRules, - matcher[GetRules]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[GetRules]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } if hdlr, ok := hdlr.(GetNotify); ok { r.matchers.getNotify = append( r.matchers.getNotify, - matcher[GetNotify]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[GetNotify]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } if hdlr, ok := hdlr.(SendNotify); ok { r.matchers.sendNotify = append( r.matchers.sendNotify, - matcher[SendNotify]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr}, + matcher[SendNotify]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr}, ) } return nil } -// GetIndex query each handler that match namespace. -func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config, err error) { - ctx, span := lg.Span(ctx) - defer span.End() +func getMatches(search Search, matchers matchers) []Search { + matches := make([]Search, len(matchers.getIndex)) - spec := ParseNamespace(match) - pgm := rsql.DefaultParse(search) - matches := make([]NamespaceSearch, len(r.matchers.getIndex)) - - for _, n := range spec { - for i, hdlr := range r.matchers.getIndex { + for _, n := range search.NamespaceSearch { + for i, hdlr := range matchers.getIndex { if hdlr.Match.Match(n.Raw()) { - matches[i] = append(matches[i], n) + matches[i].NamespaceSearch = append(matches[i].NamespaceSearch, n) + matches[i].Count = search.Count + matches[i].Cursor = search.Cursor // need to decode cursor for the match + matches[i].Fields = search.Fields + matches[i].Find = search.Find } } } + return matches +} + +// GetIndex query each handler that match namespace. +func (r *registry) GetIndex(ctx context.Context, search Search) (c Config, err error) { + ctx, span := lg.Span(ctx) + defer span.End() + + matches := getMatches(search, r.matchers) wg, ctx := errgroup.WithContext(ctx) slots := make(chan Config, len(r.matchers.getConfig)) @@ -248,7 +264,7 @@ func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config wg.Go(func() error { span.AddEvent(fmt.Sprintf("INDEX %s %s", hdlr.Name, hdlr.Match)) - lis, err := hdlr.Handler.GetIndex(ctx, matches[i], pgm) + lis, err := hdlr.Handler.GetIndex(ctx, matches[i]) slots <- lis return err }) @@ -265,31 +281,19 @@ func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config // Search query each handler with a key=value search // GetConfig query each handler that match for fully qualified namespaces. -func (r *registry) GetConfig(ctx context.Context, match, search, fields string) (Config, error) { +func (r *registry) GetConfig(ctx context.Context, search Search) (Config, error) { ctx, span := lg.Span(ctx) defer span.End() - spec := ParseNamespace(match) - pgm := rsql.DefaultParse(search) - flds := strings.Split(fields, ",") - - matches := make([]NamespaceSearch, len(r.matchers.getConfig)) - - for _, n := range spec { - for i, hdlr := range r.matchers.getConfig { - if hdlr.Match.Match(n.Raw()) { - matches[i] = append(matches[i], n) - } - } - } + matches := getMatches(search, r.matchers) m := make(SpaceMap) for i, hdlr := range r.matchers.getConfig { - if len(matches[i]) == 0 { + if len(matches[i].NamespaceSearch) == 0 { continue } span.AddEvent(fmt.Sprintf("QUERY %s %s", hdlr.Name, hdlr.Match)) - lis, err := hdlr.Handler.GetConfig(ctx, matches[i], pgm, flds) + lis, err := hdlr.Handler.GetConfig(ctx, matches[i]) if err != nil { return nil, err } diff --git a/mercury/routes.go b/mercury/routes.go index 62b63dc..279a649 100644 --- a/mercury/routes.go +++ b/mercury/routes.go @@ -70,12 +70,12 @@ func (s *root) configV1(w http.ResponseWriter, r *http.Request) { } log.Print("SPC: ", space) - ns := ParseNamespace(space) + ns := ParseSearch(space) log.Print("PRE: ", ns) //ns = rules.ReduceSearch(ns) log.Print("POST: ", ns) - lis, err := Registry.GetConfig(ctx, ns.String(), "", "") + lis, err := Registry.GetConfig(ctx, ns) if err != nil { http.Error(w, "ERR: "+err.Error(), http.StatusInternalServerError) return @@ -250,11 +250,11 @@ func (s *root) indexV1(w http.ResponseWriter, r *http.Request) { space = "*" } - ns := ParseNamespace(space) - ns = rules.ReduceSearch(ns) + ns := ParseSearch(space) + ns.NamespaceSearch = rules.ReduceSearch(ns.NamespaceSearch) span.AddEvent(ns.String()) - lis, err := Registry.GetIndex(ctx, ns.String(), "") + lis, err := Registry.GetIndex(ctx, ns) if err != nil { span.RecordError(err) http.Error(w, "ERR: "+err.Error(), http.StatusInternalServerError) diff --git a/mercury/namespace.go b/mercury/spec.go similarity index 53% rename from mercury/namespace.go rename to mercury/spec.go index 328bfbe..55c356f 100644 --- a/mercury/namespace.go +++ b/mercury/spec.go @@ -1,11 +1,33 @@ package mercury import ( + "log" "path/filepath" + "strconv" "strings" ) -// NamespaceSpec implements a parsed namespace search +// Search implements a parsed namespace search +// It parses the input and generates an AST to inform the driver how to select values. +// * => all spaces +// mercury.* => all prefixed with `mercury.` +// mercury.config => only space `mercury.config` +// mercury.source.*#readonly => all prefixed with `mercury.source.` AND has tag `readonly` +// test.*|mercury.* => all prefixed with `test.` AND `mercury.` +// test.* find bin=eq=bar => all prefixed with `test.` AND has an attribute bin that equals bar +// test.* fields foo,bin => all prefixed with `test.` only show fields foo and bin +// - count 20 => start a cursor with 20 results +// - count 20 after => continue after cursor for 20 results +// cursor encodes start points for each of the matched sources +type Search struct { + NamespaceSearch + Find []ops + Fields []string + Count uint64 + Offset uint64 + Cursor string +} + type NamespaceSpec interface { Value() string String() string @@ -17,8 +39,10 @@ type NamespaceSpec interface { type NamespaceSearch []NamespaceSpec // ParseNamespace returns a list of parsed values -func ParseNamespace(ns string) (lis NamespaceSearch) { - for _, part := range strings.Split(ns, ";") { +func ParseSearch(text string) (search Search) { + ns, text, _ := strings.Cut(text, " ") + var lis NamespaceSearch + for _, part := range strings.Split(ns, "|") { if strings.HasPrefix(part, "trace:") { lis = append(lis, NamespaceTrace(part[6:])) } else if strings.Contains(part, "*") { @@ -27,6 +51,40 @@ func ParseNamespace(ns string) (lis NamespaceSearch) { lis = append(lis, NamespaceNode(part)) } } + search.NamespaceSearch = lis + + field, text, next := strings.Cut(text, " ") + text = strings.TrimSpace(text) + for next { + switch strings.ToLower(field) { + case "find": + field, text, _ = strings.Cut(text, " ") + text = strings.TrimSpace(text) + search.Find = simpleParse(field) + + case "fields": + field, text, _ = strings.Cut(text, " ") + text = strings.TrimSpace(text) + search.Fields = strings.Split(field, ",") + + case "count": + field, text, _ = strings.Cut(text, " ") + text = strings.TrimSpace(text) + search.Count, _ = strconv.ParseUint(field, 10, 64) + + case "offset": + field, text, _ = strings.Cut(text, " ") + text = strings.TrimSpace(text) + search.Offset, _ = strconv.ParseUint(field, 10, 64) + + case "after": + field, text, _ = strings.Cut(text, " ") + text = strings.TrimSpace(text) + search.Cursor = field + } + field, text, next = strings.Cut(text, " ") + text = strings.TrimSpace(text) + } return } @@ -117,3 +175,28 @@ func match(n NamespaceSpec, s string) bool { } return ok } + +type ops struct { + Left string + Op string + Right string +} + +func simpleParse(in string) (out []ops) { + items := strings.Split(in, ",") + for _, i := range items { + log.Println(i) + eq := strings.Split(i, "=") + switch len(eq) { + case 2: + out = append(out, ops{eq[0], "eq", eq[1]}) + case 3: + if eq[1] == "" { + eq[1] = "eq" + } + out = append(out, ops{eq[0], eq[1], eq[2]}) + } + } + + return +} diff --git a/mercury/spec_test.go b/mercury/spec_test.go new file mode 100644 index 0000000..f8fbb36 --- /dev/null +++ b/mercury/spec_test.go @@ -0,0 +1,109 @@ +package mercury_test + +import ( + "fmt" + "testing" + + "github.com/matryer/is" + "go.sour.is/pkg/mercury" + "go.sour.is/pkg/mercury/sql" + + sq "github.com/Masterminds/squirrel" +) + +var MAX_FILTER int = 40 + +func TestNamespaceParse(t *testing.T) { + var tests = []struct { + getWhere func(mercury.Search) sq.Sqlizer + in string + out string + args []any + }{ + { + getWhere: getWhere, + in: "d42.bgp.kapha.*|trace:d42.bgp.kapha", + out: "(column LIKE ? OR ? LIKE column || '%')", + args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"}, + }, + + { + getWhere: getWhere, + in: "d42.bgp.kapha.*|d42.bgp.kapha", + out: "(column LIKE ? OR column = ?)", + args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"}, + }, + + { + getWhere: mkWhere(t, sql.GetWhereSQ), + in: "d42.bgp.kapha.* find active=eq=true", + out: `SELECT * FROM spaces JOIN ( SELECT DISTINCT id FROM mercury_values mv, json_each(mv."values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?)`, + args: []any{"active", "true", "d42.bgp.kapha.%"}, + }, + + { + getWhere: mkWhere(t, sql.GetWhereSQ), + in: "d42.bgp.kapha.* count 10 offset 5", + out: `SELECT * FROM spaces WHERE (space LIKE ?) LIMIT 10 OFFSET 5`, + args: []any{"d42.bgp.kapha.%"}, + }, + + { + getWhere: mkWhere(t, sql.GetWhereSQ), + in: "d42.bgp.kapha.* fields a,b,c", + out: `SELECT * FROM spaces WHERE (space LIKE ?)`, + args: []any{"d42.bgp.kapha.%"}, + }, + + { + getWhere: mkWhere(t, sql.GetWhereSQ), + in: "dn42.* find @type=in=[person,net]", + out: `SELECT `, + args: []any{"d42.bgp.kapha.%"}, + }, + } + +//SELECT * FROM spaces JOIN ( SELECT DISTINCT id FROM mercury_values mv, json_valid("values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?) != +//SELECT * FROM spaces JOIN ( SELECT DISTINCT mv.id FROM mercury_values mv, json_each(mv."values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?) + + for i, tt := range tests { + t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { + is := is.New(t) + out := mercury.ParseSearch(tt.in) + sql, args, err := tt.getWhere(out).ToSql() + is.NoErr(err) + is.Equal(sql, tt.out) + is.Equal(args, tt.args) + }) + } +} + +func getWhere(search mercury.Search) sq.Sqlizer { + var where sq.Or + space := "column" + for _, m := range search.NamespaceSearch { + switch m.(type) { + case mercury.NamespaceNode: + where = append(where, sq.Eq{space: m.Value()}) + case mercury.NamespaceStar: + where = append(where, sq.Like{space: m.Value()}) + case mercury.NamespaceTrace: + e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value()) + where = append(where, e) + } + } + return where +} + +func mkWhere(t *testing.T, where func(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error)) func(search mercury.Search) sq.Sqlizer { + t.Helper() + + return func(search mercury.Search) sq.Sqlizer { + w, err := where(search) + if err != nil { + t.Log(err) + t.Fail() + } + return w(sq.Select("*").From("spaces")) + } +} diff --git a/mercury/sql/list-string.go b/mercury/sql/list-string.go index 6b6fa2f..bbaff9b 100644 --- a/mercury/sql/list-string.go +++ b/mercury/sql/list-string.go @@ -43,9 +43,7 @@ func listScan(e *[]string, ends [2]rune) scanFn { return nil } - for _, s := range splitComma(string(str)) { - *e = append(*e, s) - } + *e = append(*e, splitComma(string(str))...) return nil } diff --git a/mercury/sql/notify.go b/mercury/sql/notify.go index 3f8711b..558c4b1 100644 --- a/mercury/sql/notify.go +++ b/mercury/sql/notify.go @@ -29,7 +29,7 @@ func (pgm *sqlHandler) GetNotify(ctx context.Context, event string) (lis mercury Where(squirrel.Eq{"event": event}). PlaceholderFormat(squirrel.Dollar). RunWith(pgm.db). - QueryContext(context.TODO()) + QueryContext(ctx) if err != nil { return nil, err diff --git a/mercury/sql/otel.go b/mercury/sql/otel.go index 354067a..b5bed4a 100644 --- a/mercury/sql/otel.go +++ b/mercury/sql/otel.go @@ -2,6 +2,7 @@ package sql import ( "database/sql" + "strings" "go.nhat.io/otelsql" semconv "go.opentelemetry.io/otel/semconv/v1.20.0" @@ -9,25 +10,28 @@ import ( func openDB(driver, dsn string) (*sql.DB, error) { system := semconv.DBSystemPostgreSQL - if driver == "sqlite" { + if driver == "sqlite" || strings.HasPrefix(driver, "libsql") { system = semconv.DBSystemSqlite } - - // Register the otelsql wrapper for the provided postgres driver. - driverName, err := otelsql.Register(driver, - otelsql.AllowRoot(), - otelsql.TraceQueryWithoutArgs(), - otelsql.TraceRowsClose(), - otelsql.TraceRowsAffected(), - // otelsql.WithDatabaseName("my_database"), // Optional. - otelsql.WithSystem(system), // Optional. - ) - if err != nil { - return nil, err + + if driver == "postgres" { + var err error + // Register the otelsql wrapper for the provided postgres driver. + driver, err = otelsql.Register(driver, + otelsql.AllowRoot(), + otelsql.TraceQueryWithoutArgs(), + otelsql.TraceRowsClose(), + otelsql.TraceRowsAffected(), + // otelsql.WithDatabaseName("my_database"), // Optional. + otelsql.WithSystem(system), // Optional. + ) + if err != nil { + return nil, err + } } - + // Connect to a Postgres database using the postgres driver wrapper. - db, err := sql.Open(driverName, dsn) + db, err := sql.Open(driver, dsn) if err != nil { return nil, err } diff --git a/mercury/sql/sql.go b/mercury/sql/sql.go index 9a606e9..a29cc24 100644 --- a/mercury/sql/sql.go +++ b/mercury/sql/sql.go @@ -3,21 +3,27 @@ package sql import ( "context" "database/sql" + "errors" "fmt" "log" + "slices" "strings" sq "github.com/Masterminds/squirrel" "go.sour.is/pkg/lg" "go.sour.is/pkg/mercury" - "go.sour.is/pkg/rsql" "golang.org/x/exp/maps" ) +var MAX_FILTER int = 40 + type sqlHandler struct { + name string db *sql.DB paceholderFormat sq.PlaceholderFormat listFormat [2]rune + readonly bool + getWhere func(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error) } var ( @@ -27,11 +33,13 @@ var ( _ mercury.WriteConfig = (*sqlHandler)(nil) ) -func Register() { +func Register() func(context.Context) error { + var hdlrs []*sqlHandler mercury.Registry.Register("sql", func(s *mercury.Space) any { var dsn string var opts strings.Builder var dbtype string + var readonly bool = slices.Contains(s.Tags, "readonly") for _, c := range s.List { if c.Name == "match" { continue @@ -49,7 +57,6 @@ func Register() { if dsn == "" { dsn = opts.String() } - db, err := openDB(dbtype, dsn) if err != nil { return err @@ -58,31 +65,47 @@ func Register() { return err } switch dbtype { - case "sqlite": - return &sqlHandler{db, sq.Dollar, [2]rune{'[', ']'}} + case "sqlite", "libsql", "libsql+embed": + h := &sqlHandler{s.Space, db, sq.Question, [2]rune{'[', ']'}, readonly, GetWhereSQ} + hdlrs = append(hdlrs, h) + return h case "postgres": - return &sqlHandler{db, sq.Dollar, [2]rune{'{', '}'}} + h := &sqlHandler{s.Space, db, sq.Dollar, [2]rune{'{', '}'}, readonly, GetWherePG} + hdlrs = append(hdlrs, h) + return h default: return fmt.Errorf("unsupported dbtype: %s", dbtype) } }) + + return func(ctx context.Context) error { + var errs error + + for _, h := range hdlrs { + // if err = ctx.Err(); err != nil { + // return errors.Join(errs, err) + // } + errs = errors.Join(errs, h.db.Close()) + } + + return errs + } } type Space struct { mercury.Space - ID uint64 + id uint64 } type Value struct { mercury.Value - ID uint64 + id uint64 } -func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program) (mercury.Config, error) { +func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.Search) (mercury.Config, error) { ctx, span := lg.Span(ctx) defer span.End() - cols := rsql.GetDbColumns(mercury.Space{}) - where, err := getWhere(search, cols) + where, err := p.getWhere(search) if err != nil { return nil, err } @@ -100,28 +123,40 @@ func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.NamespaceSearc return config, nil } -func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program, fields []string) (mercury.Config, error) { +func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.Search) (config mercury.Config, err error) { ctx, span := lg.Span(ctx) defer span.End() - idx, err := p.GetIndex(ctx, search, pgm) + where, err := p.getWhere(search) if err != nil { return nil, err } - spaceMap := make(map[string]int, len(idx)) - for u, s := range idx { - spaceMap[s.Space] = u + lis, err := p.listSpace(ctx, nil, where) + if err != nil { + log.Println(err) + return nil, err } - where, err := getWhere(search, rsql.GetDbColumns(mercury.Value{})) - if err != nil { - return nil, err + if len(lis) == 0 { + return nil, nil } - query := sq.Select(`"space"`, `"name"`, `"seq"`, `"notes"`, `"tags"`, `"values"`). - From("mercury_registry_vw"). - Where(where). - OrderBy("space asc", "name asc"). + + spaceIDX := make([]uint64, len(lis)) + spaceMap := make(map[uint64]int, len(lis)) + config = make(mercury.Config, len(lis)) + for i, s := range lis { + spaceIDX[i] = s.id + config[i] = &s.Space + spaceMap[s.id] = i + } + + query := sq.Select(`"id"`, `"name"`, `"seq"`, `"notes"`, `"tags"`, `"values"`). + From("mercury_values"). + Where(sq.Eq{"id": spaceIDX}). + OrderBy("id asc", "seq asc"). PlaceholderFormat(p.paceholderFormat) + + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(query.ToSql())) rows, err := query.RunWith(p.db). QueryContext(ctx) @@ -133,10 +168,10 @@ func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSear defer rows.Close() for rows.Next() { - var s mercury.Value + var s Value err = rows.Scan( - &s.Space, + &s.id, &s.Name, &s.Seq, listScan(&s.Notes, p.listFormat), @@ -146,19 +181,20 @@ func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSear if err != nil { return nil, err } - if u, ok := spaceMap[s.Space]; ok { - idx[u].List = append(idx[u].List, s) + if u, ok := spaceMap[s.id]; ok { + lis[u].List = append(lis[u].List, s.Value) } } err = rows.Err() span.RecordError(err) - span.AddEvent(fmt.Sprint("read index ", len(idx))) - return idx, err + span.AddEvent(fmt.Sprint("read index ", len(lis))) + // log.Println(config.String()) + return config, err } -func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.Sqlizer) ([]*Space, error) { +func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where func(sq.SelectBuilder) sq.SelectBuilder) ([]*Space, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -168,9 +204,11 @@ func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.S query := sq.Select(`"id"`, `"space"`, `"notes"`, `"tags"`, `"trailer"`). From("mercury_spaces"). - Where(where). OrderBy("space asc"). - PlaceholderFormat(sq.Dollar) + PlaceholderFormat(p.paceholderFormat) + query = where(query) + + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(query.ToSql())) rows, err := query.RunWith(tx). QueryContext(ctx) @@ -185,7 +223,7 @@ func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.S for rows.Next() { var s Space err = rows.Scan( - &s.ID, + &s.id, &s.Space.Space, listScan(&s.Space.Notes, p.listFormat), listScan(&s.Space.Tags, p.listFormat), @@ -209,6 +247,10 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er ctx, span := lg.Span(ctx) defer span.End() + if p.readonly { + return fmt.Errorf("readonly database") + } + // Delete spaces that are present in input but are empty. deleteSpaces := make(map[string]struct{}) @@ -233,7 +275,8 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er }() // get current spaces - lis, err := p.listSpace(ctx, tx, sq.Eq{"space": maps.Keys(names)}) + where := func(qry sq.SelectBuilder) sq.SelectBuilder { return qry.Where(sq.Eq{"space": maps.Keys(names)}) } + lis, err := p.listSpace(ctx, tx, where) if err != nil { return } @@ -250,12 +293,12 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er currentNames[spaceName] = struct{}{} if _, ok := deleteSpaces[spaceName]; ok { - deleteIDs = append(deleteIDs, s.ID) + deleteIDs = append(deleteIDs, s.id) continue } updateSpaces = append(updateSpaces, config[names[spaceName]]) - updateIDs = append(updateIDs, s.ID) + updateIDs = append(updateIDs, s.id) } for _, s := range config { spaceName := s.Space @@ -266,7 +309,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er // delete spaces if ids := deleteIDs; len(ids) > 0 { - _, err = sq.Delete("mercury_spaces").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx) + _, err = sq.Delete("mercury_spaces").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(p.paceholderFormat).ExecContext(ctx) if err != nil { return err } @@ -274,7 +317,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er // delete values if ids := append(updateIDs, deleteIDs...); len(ids) > 0 { - _, err = sq.Delete("mercury_values").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx) + _, err = sq.Delete("mercury_values").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(p.paceholderFormat).ExecContext(ctx) if err != nil { return err } @@ -289,7 +332,8 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er Set("tags", listValue(u.Tags, p.listFormat)). Set("notes", listValue(u.Notes, p.listFormat)). Set("trailer", listValue(u.Trailer, p.listFormat)). - PlaceholderFormat(sq.Dollar) + PlaceholderFormat(p.paceholderFormat) + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(query.ToSql())) _, err := query.RunWith(tx).ExecContext(ctx) @@ -298,7 +342,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er } // log.Debugf("UPDATED %d SPACES", len(updateSpaces)) for _, v := range u.List { - newValues = append(newValues, &Value{Value: v, ID: updateIDs[i]}) + newValues = append(newValues, &Value{Value: v, id: updateIDs[i]}) } } @@ -306,15 +350,16 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er for _, s := range insertSpaces { var id uint64 query := sq.Insert("mercury_spaces"). - PlaceholderFormat(sq.Dollar). + PlaceholderFormat(p.paceholderFormat). Columns("space", "tags", "notes", "trailer"). Values( - s.Space, - listValue(s.Tags, p.listFormat), + s.Space, + listValue(s.Tags, p.listFormat), listValue(s.Notes, p.listFormat), listValue(s.Trailer, p.listFormat), - ). + ). Suffix("RETURNING \"id\"") + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(query.ToSql())) err := query. @@ -322,12 +367,13 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er QueryRowContext(ctx). Scan(&id) if err != nil { + span.AddEvent(p.name) s, v, _ := query.ToSql() log.Println(s, v, err) return err } for _, v := range s.List { - newValues = append(newValues, &Value{Value: v, ID: id}) + newValues = append(newValues, &Value{Value: v, id: id}) } } @@ -353,7 +399,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V newInsert := func() sq.InsertBuilder { return sq.Insert("mercury_values"). RunWith(tx). - PlaceholderFormat(sq.Dollar). + PlaceholderFormat(p.paceholderFormat). Columns( `"id"`, `"seq"`, @@ -367,7 +413,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V insert := newInsert() for i, s := range lis { insert = insert.Values( - s.ID, + s.id, s.Seq, s.Name, listValue(s.Values, p.listFormat), @@ -378,7 +424,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V if i > 0 && i%chunk == 0 { // log.Debugf("inserting %v rows into %v", i%chunk, d.Table) - // log.Debug(insert.ToSql()) + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(insert.ToSql())) _, err = insert.ExecContext(ctx) @@ -392,7 +438,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V } if len(lis)%chunk > 0 { // log.Debugf("inserting %v rows into %v", len(lis)%chunk, d.Table) - // log.Debug(insert.ToSql()) + span.AddEvent(p.name) span.AddEvent(lg.LogQuery(insert.ToSql())) _, err = insert.ExecContext(ctx) @@ -405,13 +451,11 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V return } -func getWhere(search mercury.NamespaceSearch, d *rsql.DbColumns) (sq.Sqlizer, error) { +func GetWherePG(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error) { var where sq.Or - space, err := d.Col("space") - if err != nil { - return nil, err - } - for _, m := range search { + space := "space" + + for _, m := range search.NamespaceSearch { switch m.(type) { case mercury.NamespaceNode: where = append(where, sq.Eq{space: m.Value()}) @@ -422,5 +466,129 @@ func getWhere(search mercury.NamespaceSearch, d *rsql.DbColumns) (sq.Sqlizer, er where = append(where, e) } } - return where, nil + + var joins []sq.SelectBuilder + for i, o := range search.Find { + log.Println(o) + if i > MAX_FILTER { + err := fmt.Errorf("too many filters [%d]", MAX_FILTER) + return nil, err + } + q := sq.Select("DISTINCT id").From("mercury_values") + + switch o.Op { + case "key": + q = q.Where(sq.Eq{"name": o.Left}) + case "nkey": + q = q.Where(sq.NotEq{"name": o.Left}) + case "eq": + q = q.Where("name = ? AND ? = any (values)", o.Left, o.Right) + case "neq": + q = q.Where("name = ? AND ? != any (values)", o.Left, o.Right) + + case "gt": + q = q.Where("name = ? AND ? > any (values)", o.Left, o.Right) + case "lt": + q = q.Where("name = ? AND ? < any (values)", o.Left, o.Right) + case "ge": + q = q.Where("name = ? AND ? >= any (values)", o.Left, o.Right) + case "le": + q = q.Where("name = ? AND ? <= any (values)", o.Left, o.Right) + + // case "like": + // q = q.Where("name = ? AND value LIKE ?", o.Left, o.Right) + // case "in": + // q = q.Where(sq.Eq{"name": o.Left, "value": strings.Split(o.Right, " ")}) + } + joins = append(joins, q) + } + + return func(s sq.SelectBuilder) sq.SelectBuilder { + for i, q := range joins { + s = s.JoinClause(q.Prefix("JOIN (").Suffix(fmt.Sprintf(`) r%03d USING (id)`, i))) + } + + if search.Count > 0 { + s = s.Limit(search.Count) + } + return s.Where(where) + }, nil +} + +func GetWhereSQ(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error) { + var where sq.Or + + var errs error + id := "id" + space := "space" + name := "name" + values_each := `json_valid("values")` + values_valid := `json_valid("values")` + + if errs != nil { + return nil, errs + } + + for _, m := range search.NamespaceSearch { + switch m.(type) { + case mercury.NamespaceNode: + where = append(where, sq.Eq{space: m.Value()}) + case mercury.NamespaceStar: + where = append(where, sq.Like{space: m.Value()}) + case mercury.NamespaceTrace: + e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value()) + where = append(where, e) + } + } + + var joins []sq.SelectBuilder + for i, o := range search.Find { + log.Println(o) + if i > MAX_FILTER { + err := fmt.Errorf("too many filters [%d]", MAX_FILTER) + return nil, err + } + q := sq.Select("DISTINCT " + id).From(`mercury_values mv, ` + values_each + ` vs`) + + switch o.Op { + case "key": + q = q.Where(sq.Eq{name: o.Left}) + case "nkey": + q = q.Where(sq.NotEq{name: o.Left}) + case "eq": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left, `vs.value`: o.Right}}) + case "neq": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.NotEq{`vs.value`: o.Right}}) + + case "gt": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Gt{`vs.value`: o.Right}}) + case "lt": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Lt{`vs.value`: o.Right}}) + case "ge": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.GtOrEq{`vs.value`: o.Right}}) + case "le": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.LtOrEq{`vs.value`: o.Right}}) + case "like": + q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Like{`vs.value`: o.Right}}) + case "in": + q = q.Where(sq.Eq{name: o.Left, "vs.value": strings.Split(o.Right, " ")}) + } + joins = append(joins, q) + } + + return func(s sq.SelectBuilder) sq.SelectBuilder { + for i, q := range joins { + s = s.JoinClause(q.Prefix("JOIN (").Suffix(fmt.Sprintf(`) r%03d USING (id)`, i))) + } + + if search.Count > 0 { + s = s.Limit(search.Count) + } + + if search.Offset > 0 { + s = s.Offset(search.Offset) + } + + return s.Where(where) + }, nil } diff --git a/rsql/dbcolumns.go b/rsql/dbcolumns.go index e5fc263..bbb505f 100644 --- a/rsql/dbcolumns.go +++ b/rsql/dbcolumns.go @@ -11,8 +11,8 @@ import ( type DbColumns struct { Cols []string index map[string]int - Table string - View string + Table string + View string } // Col returns the mapped column names @@ -40,16 +40,16 @@ func GetDbColumns(o interface{}) *DbColumns { for i := 0; i < t.NumField(); i++ { field := t.Field(i) - sp := append(strings.Split(field.Tag.Get("db"), ","), "") - - tag := sp[0] + tag, _, _ := strings.Cut(field.Tag.Get("db"), ",") json := field.Tag.Get("json") + json, _, _ = strings.Cut(json, ",") if tag == "" { tag = json } graphql := field.Tag.Get("graphql") + graphql, _, _ = strings.Cut(graphql, ",") if tag == "" { tag = graphql } @@ -88,3 +88,11 @@ func GetDbColumns(o interface{}) *DbColumns { } return &d } + +func QuoteCols(cols []string) []string { + lis := make([]string, len(cols)) + for i := range cols { + lis[i] = `"` + cols[i] + `"` + } + return lis +} diff --git a/service/service.go b/service/service.go index fd085d6..9da3dc0 100644 --- a/service/service.go +++ b/service/service.go @@ -114,7 +114,6 @@ func (s *Harness) Run(ctx context.Context, appName, version string) error { err := g.Wait() if err != nil { log.Printf("Shutdown due to error: %s", err) - } return err }