add libsql support

mercury
xuu 2024-04-19 10:56:27 -06:00
parent 1f8b4ab24f
commit b1bff4cbf0
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
17 changed files with 609 additions and 248 deletions

7
go.mod
View File

@ -61,7 +61,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.18.0
github.com/tursodatabase/go-libsql v0.0.0-20240322134723-08771dcdd2f1
go.nhat.io/otelsql v0.12.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.48.0
@ -72,10 +71,10 @@ require (
go.opentelemetry.io/otel/trace v1.23.1
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/grpc v1.61.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
modernc.org/sqlite v1.29.1
)

5
go.sum
View File

@ -183,6 +183,8 @@ golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20170912212905-13449ad91cb2/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20170517211232-f52d1811a629/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
@ -191,6 +193,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
@ -216,6 +219,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -160,7 +160,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -242,6 +241,8 @@ go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+
go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo=
go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -275,10 +276,10 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/telemetry v0.0.0-20240208230135-b75ee8823808/go.mod h1:KG1lNk5ZFNssSZLrpVb4sMXKMpGwGXOxSG3rnu2gZQQ=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -291,7 +292,6 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6f
google.golang.org/api v0.162.0/go.mod h1:6SulDkfoBIg4NFmCuZ39XeeAgSHCPecfSUuDyYlAHs0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
@ -325,7 +325,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=

View File

@ -7,17 +7,17 @@ import (
"strings"
"time"
"go.sour.is/pkg/ident"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/mercury"
"go.sour.is/pkg/ident"
)
const identNS = "ident."
const identSFX = ".credentials"
type registry interface {
GetIndex(ctx context.Context, match, search string) (c mercury.Config, err error)
GetConfig(ctx context.Context, match, search, fields string) (mercury.Config, error)
GetIndex(ctx context.Context, search mercury.Search) (c mercury.Config, err error)
GetConfig(ctx context.Context, search mercury.Search) (mercury.Config, error)
WriteConfig(ctx context.Context, spaces mercury.Config) error
}
@ -50,6 +50,7 @@ func (id *mercuryIdent) FromConfig(cfg mercury.Config) error {
switch {
case strings.HasSuffix(s.Space, ".credentials"):
id.passwd = []byte(s.FirstValue("passwd").First())
id.ed25519 = []byte(s.FirstValue("ed25519").First())
default:
id.display = s.FirstValue("displayName").First()
}
@ -59,34 +60,29 @@ func (id *mercuryIdent) FromConfig(cfg mercury.Config) error {
func (id *mercuryIdent) ToConfig() mercury.Config {
space := id.Space()
list := func(values ...mercury.Value) []mercury.Value { return values }
value := func(space string, seq uint64, name string, values ...string) mercury.Value {
return mercury.Value{
Space: space,
Seq: seq,
Name: name,
Values: values,
}
}
return mercury.Config{
&mercury.Space{
Space: space,
List: []mercury.Value{
{
Space: space,
Seq: 1,
Name: "displayName",
Values: []string{id.display},
},
{
Space: space,
Seq: 2,
Name: "lastLogin",
Values: []string{time.UnixMilli(int64(id.Session().SessionID.Time())).Format(time.RFC3339)},
},
},
List: list(
value(space, 1, "displayName", id.display),
value(space, 2, "lastLogin", time.UnixMilli(int64(id.Session().SessionID.Time())).Format(time.RFC3339)),
),
},
&mercury.Space{
Space: space + identSFX,
List: []mercury.Value{
{
Space: space + identSFX,
Seq: 1,
Name: "passwd",
Values: []string{string(id.passwd)},
},
},
List: list(
value(space+identSFX, 1, "passwd", string(id.passwd)),
value(space+identSFX, 1, "ed25519", string(id.ed25519)),
),
},
}
}
@ -140,7 +136,7 @@ func (s *mercurySource) readIdentURL(r *http.Request) (ident.Ident, error) {
}
space := id.Space()
c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "")
c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX))
if err != nil {
span.RecordError(err)
return id, err
@ -183,7 +179,7 @@ func (s *mercurySource) readIdentBasic(r *http.Request) (ident.Ident, error) {
}
space := id.Space()
c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "")
c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX))
if err != nil {
span.RecordError(err)
return id, err
@ -228,7 +224,7 @@ func (s *mercurySource) readIdentHTTP(r *http.Request) (ident.Ident, error) {
}
space := id.Space()
c, err := s.r.GetConfig(ctx, "trace:"+space+identSFX, "", "")
c, err := s.r.GetConfig(ctx, mercury.ParseSearch("trace:"+space+identSFX))
if err != nil {
span.RecordError(err)
return id, err
@ -260,9 +256,8 @@ func (s *mercurySource) RegisterIdent(ctx context.Context, identity, display str
defer span.End()
id := &mercuryIdent{identity: identity, display: display, passwd: passwd}
space := id.Space()
_, err := s.r.GetIndex(ctx, space, "")
_, err := s.r.GetIndex(ctx, mercury.ParseSearch( id.Space()))
if err != nil {
return nil, err
}

View File

@ -4,7 +4,10 @@ import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
"path/filepath"
@ -17,28 +20,37 @@ import (
)
func init() {
sql.Register("libsql+embed", &db{})
sql.Register("libsql+embed", &db{conns: make(map[string]*connector)})
}
type db struct {
conns map[string]connector
conns map[string]*connector
mu sync.RWMutex
}
type connector struct {
*libsql.Connector
dsn string
dir string
driver *db
dsn string
dir string
driver *db
removeDir bool
}
var _ io.Closer = (*connector)(nil)
func (c *connector) Close() error {
log.Println("closing db connection", c.dir)
defer log.Println("closed db connection", c.dir)
c.driver.mu.Lock()
delete(c.driver.conns, c.dsn)
c.driver.mu.Unlock()
defer os.RemoveAll(c.dir)
if c.removeDir {
defer os.RemoveAll(c.dir)
}
log.Println("sync db")
if err := c.Connector.Sync(); err != nil {
return fmt.Errorf("syncing database: %w", err)
}
@ -47,7 +59,12 @@ func (c *connector) Close() error {
}
func (db *db) OpenConnector(dsn string) (driver.Connector, error) {
if c, ok := func() (connector, bool) {
// log.Println("connector", dsn)
if dsn == "" {
return nil, fmt.Errorf("no dsn")
}
if c, ok := func() (*connector, bool) {
db.mu.RLock()
defer db.mu.RUnlock()
c, ok := db.conns[dsn]
@ -79,20 +96,39 @@ func (db *db) OpenConnector(dsn string) (driver.Connector, error) {
libsql.WithAuthToken(authToken),
}
if refresh, err := strconv.ParseInt(u.Query().Get("refresh"),10,64); err == nil {
if refresh, err := strconv.ParseInt(u.Query().Get("refresh"), 10, 64); err == nil {
log.Println("refresh: ", refresh)
opts = append(opts, libsql.WithSyncInterval(time.Duration(refresh)*time.Minute))
}
if readWrite, err := strconv.ParseBool(u.Query().Get("readYourWrites")); err == nil {
log.Println("read your writes: ", readWrite)
opts = append(opts, libsql.WithReadYourWrites(readWrite))
}
if key := u.Query().Get("key"); key != "" {
opts = append(opts, libsql.WithEncryption(key))
}
dir, err := os.MkdirTemp("", "libsql-*")
if err != nil {
return nil, fmt.Errorf("creating temporary directory: %w", err)
var dir string
var removeDir bool
if dir = u.Query().Get("store"); dir == "" {
removeDir = true
dir, err = os.MkdirTemp("", "libsql-*")
log.Println("creating temporary directory:", dir)
if err != nil {
return nil, fmt.Errorf("creating temporary directory: %w", err)
}
} else {
stat, err := os.Stat(dir)
if errors.Is(err, os.ErrNotExist) {
if err = os.MkdirAll(dir, 0700); err != nil {
return nil, err
}
} else {
if !stat.IsDir() {
return nil, fmt.Errorf("store not directory")
}
}
}
dbPath := filepath.Join(dir, dbname)
@ -105,13 +141,19 @@ func (db *db) OpenConnector(dsn string) (driver.Connector, error) {
return nil, fmt.Errorf("creating connector: %w", err)
}
connector := connector{c, dsn, dir, db}
log.Println("sync db")
if err := c.Sync(); err != nil {
return nil, fmt.Errorf("syncing database: %w", err)
}
connector := &connector{c, dsn, dir, db, removeDir}
db.conns[dsn] = connector
return connector, nil
}
func (db *db) Open(dsn string) (driver.Conn, error) {
log.Println("open", dsn)
c, err := db.OpenConnector(dsn)
if err != nil {
return nil, err

View File

@ -8,9 +8,8 @@ import (
"sort"
"strings"
"go.sour.is/pkg/mercury"
"go.sour.is/pkg/ident"
"go.sour.is/pkg/rsql"
"go.sour.is/pkg/mercury"
"go.sour.is/pkg/set"
)
@ -20,8 +19,9 @@ const (
mercuryHost = "mercury.host"
appDotEnviron = "mercury.environ"
)
var (
mercuryPolicy = func(id string) string { return "mercury.@" + id + ".policy" }
mercuryPolicy = func(id string) string { return "mercury.@" + id + ".policy" }
)
func Register(name string, cfg mercury.SpaceMap) {
@ -41,8 +41,13 @@ type mercuryEnviron struct {
lookup func(context.Context, ident.Ident) (mercury.Rules, error)
}
func getSearch(spec mercury.Search) mercury.NamespaceSearch {
return spec.NamespaceSearch
}
// Index returns nil
func (app *mercuryEnviron) GetIndex(ctx context.Context, search mercury.NamespaceSearch, _ *rsql.Program) (lis mercury.Config, err error) {
func (app *mercuryEnviron) GetIndex(ctx context.Context, spec mercury.Search) (lis mercury.Config, err error) {
search := getSearch(spec)
if search.Match(mercurySource) {
for _, s := range app.cfg.ToArray() {
@ -74,7 +79,9 @@ func (app *mercuryEnviron) GetIndex(ctx context.Context, search mercury.Namespac
}
// Objects returns nil
func (app *mercuryEnviron) GetConfig(ctx context.Context, search mercury.NamespaceSearch, _ *rsql.Program, _ []string) (lis mercury.Config, err error) {
func (app *mercuryEnviron) GetConfig(ctx context.Context, spec mercury.Search) (lis mercury.Config, err error) {
search := getSearch(spec)
if search.Match(mercurySource) {
for _, s := range app.cfg.ToArray() {
if search.Match(s.Space) {

View File

@ -1,59 +0,0 @@
package mercury_test
import (
"fmt"
"testing"
"github.com/matryer/is"
"go.sour.is/pkg/mercury"
sq "github.com/Masterminds/squirrel"
)
func TestNamespaceParse(t *testing.T) {
var tests = []struct {
in string
out string
args []any
}{
{
in: "d42.bgp.kapha.*;trace:d42.bgp.kapha",
out: "(column LIKE ? OR ? LIKE column || '%')",
args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"},
},
{
in: "d42.bgp.kapha.*,d42.bgp.kapha",
out: "(column LIKE ? OR column = ?)",
args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
is := is.New(t)
out := mercury.ParseNamespace(tt.in)
sql, args, err := getWhere(out).ToSql()
is.NoErr(err)
is.Equal(sql, tt.out)
is.Equal(args, tt.args)
})
}
}
func getWhere(search mercury.NamespaceSearch) sq.Sqlizer {
var where sq.Or
space := "column"
for _, m := range search {
switch m.(type) {
case mercury.NamespaceNode:
where = append(where, sq.Eq{space: m.Value()})
case mercury.NamespaceStar:
where = append(where, sq.Like{space: m.Value()})
case mercury.NamespaceTrace:
e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value())
where = append(where, e)
}
}
return where
}

View File

@ -3,6 +3,7 @@ package mercury
import (
"context"
"fmt"
"log"
"path/filepath"
"sort"
"strconv"
@ -10,16 +11,15 @@ import (
"go.sour.is/pkg/ident"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/rsql"
"go.sour.is/pkg/set"
"golang.org/x/sync/errgroup"
)
type GetIndex interface {
GetIndex(context.Context, NamespaceSearch, *rsql.Program) (Config, error)
GetIndex(context.Context, Search) (Config, error)
}
type GetConfig interface {
GetConfig(context.Context, NamespaceSearch, *rsql.Program, []string) (Config, error)
GetConfig(context.Context, Search) (Config, error)
}
type WriteConfig interface {
WriteConfig(context.Context, Config) error
@ -60,7 +60,7 @@ func (reg *registry) accessFilter(rules Rules, lis Config) (out Config, err erro
// HandlerItem a single handler matching
type matcher[T any] struct {
Name string
Match NamespaceSearch
Match Search
Priority int
Handler T
}
@ -122,17 +122,23 @@ func (r *registry) Register(name string, h func(*Space) any) {
func (r *registry) Configure(m SpaceMap) error {
r.resetMatchers()
for space, c := range m {
log.Println("configure: ", space)
if strings.HasPrefix(space, "mercury.source.") {
space = strings.TrimPrefix(space, "mercury.source.")
handler, name, _ := strings.Cut(space, ".")
matches := c.FirstValue("match")
readonly := c.HasTag("readonly")
for _, match := range matches.Values {
ps := strings.Fields(match)
priority, err := strconv.Atoi(ps[0])
if err != nil {
return err
}
r.add(name, handler, ps[1], priority, c)
err = r.add(name, handler, strings.Join(ps[1:],"|"), priority, c, readonly)
if err != nil {
return err
}
}
}
@ -146,7 +152,10 @@ func (r *registry) Configure(m SpaceMap) error {
if err != nil {
return err
}
r.add(name, handler, ps[1], priority, c)
err = r.add(name, handler, strings.Join(ps[1:],"|"), priority, c, false)
if err != nil {
return err
}
}
}
}
@ -156,8 +165,8 @@ func (r *registry) Configure(m SpaceMap) error {
}
// Register add a handler to registry
func (r *registry) add(name, handler, match string, priority int, cfg *Space) error {
// log.Infos("mercury regster", "match", match, "pri", priority)
func (r *registry) add(name, handler, match string, priority int, cfg *Space, readonly bool) error {
log.Println("mercury regster", "match", match, "pri", priority)
mkHandler, ok := r.handlers[handler]
if !ok {
return fmt.Errorf("handler not registered: %s", handler)
@ -173,61 +182,68 @@ func (r *registry) add(name, handler, match string, priority int, cfg *Space) er
if hdlr, ok := hdlr.(GetIndex); ok {
r.matchers.getIndex = append(
r.matchers.getIndex,
matcher[GetIndex]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[GetIndex]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
if hdlr, ok := hdlr.(GetConfig); ok {
r.matchers.getConfig = append(
r.matchers.getConfig,
matcher[GetConfig]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[GetConfig]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
if hdlr, ok := hdlr.(WriteConfig); ok {
if hdlr, ok := hdlr.(WriteConfig); !readonly && ok {
r.matchers.writeConfig = append(
r.matchers.writeConfig,
matcher[WriteConfig]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[WriteConfig]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
if hdlr, ok := hdlr.(GetRules); ok {
r.matchers.getRules = append(
r.matchers.getRules,
matcher[GetRules]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[GetRules]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
if hdlr, ok := hdlr.(GetNotify); ok {
r.matchers.getNotify = append(
r.matchers.getNotify,
matcher[GetNotify]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[GetNotify]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
if hdlr, ok := hdlr.(SendNotify); ok {
r.matchers.sendNotify = append(
r.matchers.sendNotify,
matcher[SendNotify]{Name: name, Match: ParseNamespace(match), Priority: priority, Handler: hdlr},
matcher[SendNotify]{Name: name, Match: ParseSearch(match), Priority: priority, Handler: hdlr},
)
}
return nil
}
// GetIndex query each handler that match namespace.
func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config, err error) {
ctx, span := lg.Span(ctx)
defer span.End()
func getMatches(search Search, matchers matchers) []Search {
matches := make([]Search, len(matchers.getIndex))
spec := ParseNamespace(match)
pgm := rsql.DefaultParse(search)
matches := make([]NamespaceSearch, len(r.matchers.getIndex))
for _, n := range spec {
for i, hdlr := range r.matchers.getIndex {
for _, n := range search.NamespaceSearch {
for i, hdlr := range matchers.getIndex {
if hdlr.Match.Match(n.Raw()) {
matches[i] = append(matches[i], n)
matches[i].NamespaceSearch = append(matches[i].NamespaceSearch, n)
matches[i].Count = search.Count
matches[i].Cursor = search.Cursor // need to decode cursor for the match
matches[i].Fields = search.Fields
matches[i].Find = search.Find
}
}
}
return matches
}
// GetIndex query each handler that match namespace.
func (r *registry) GetIndex(ctx context.Context, search Search) (c Config, err error) {
ctx, span := lg.Span(ctx)
defer span.End()
matches := getMatches(search, r.matchers)
wg, ctx := errgroup.WithContext(ctx)
slots := make(chan Config, len(r.matchers.getConfig))
@ -248,7 +264,7 @@ func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config
wg.Go(func() error {
span.AddEvent(fmt.Sprintf("INDEX %s %s", hdlr.Name, hdlr.Match))
lis, err := hdlr.Handler.GetIndex(ctx, matches[i], pgm)
lis, err := hdlr.Handler.GetIndex(ctx, matches[i])
slots <- lis
return err
})
@ -265,31 +281,19 @@ func (r *registry) GetIndex(ctx context.Context, match, search string) (c Config
// Search query each handler with a key=value search
// GetConfig query each handler that match for fully qualified namespaces.
func (r *registry) GetConfig(ctx context.Context, match, search, fields string) (Config, error) {
func (r *registry) GetConfig(ctx context.Context, search Search) (Config, error) {
ctx, span := lg.Span(ctx)
defer span.End()
spec := ParseNamespace(match)
pgm := rsql.DefaultParse(search)
flds := strings.Split(fields, ",")
matches := make([]NamespaceSearch, len(r.matchers.getConfig))
for _, n := range spec {
for i, hdlr := range r.matchers.getConfig {
if hdlr.Match.Match(n.Raw()) {
matches[i] = append(matches[i], n)
}
}
}
matches := getMatches(search, r.matchers)
m := make(SpaceMap)
for i, hdlr := range r.matchers.getConfig {
if len(matches[i]) == 0 {
if len(matches[i].NamespaceSearch) == 0 {
continue
}
span.AddEvent(fmt.Sprintf("QUERY %s %s", hdlr.Name, hdlr.Match))
lis, err := hdlr.Handler.GetConfig(ctx, matches[i], pgm, flds)
lis, err := hdlr.Handler.GetConfig(ctx, matches[i])
if err != nil {
return nil, err
}

View File

@ -70,12 +70,12 @@ func (s *root) configV1(w http.ResponseWriter, r *http.Request) {
}
log.Print("SPC: ", space)
ns := ParseNamespace(space)
ns := ParseSearch(space)
log.Print("PRE: ", ns)
//ns = rules.ReduceSearch(ns)
log.Print("POST: ", ns)
lis, err := Registry.GetConfig(ctx, ns.String(), "", "")
lis, err := Registry.GetConfig(ctx, ns)
if err != nil {
http.Error(w, "ERR: "+err.Error(), http.StatusInternalServerError)
return
@ -250,11 +250,11 @@ func (s *root) indexV1(w http.ResponseWriter, r *http.Request) {
space = "*"
}
ns := ParseNamespace(space)
ns = rules.ReduceSearch(ns)
ns := ParseSearch(space)
ns.NamespaceSearch = rules.ReduceSearch(ns.NamespaceSearch)
span.AddEvent(ns.String())
lis, err := Registry.GetIndex(ctx, ns.String(), "")
lis, err := Registry.GetIndex(ctx, ns)
if err != nil {
span.RecordError(err)
http.Error(w, "ERR: "+err.Error(), http.StatusInternalServerError)

View File

@ -1,11 +1,33 @@
package mercury
import (
"log"
"path/filepath"
"strconv"
"strings"
)
// NamespaceSpec implements a parsed namespace search
// Search implements a parsed namespace search
// It parses the input and generates an AST to inform the driver how to select values.
// * => all spaces
// mercury.* => all prefixed with `mercury.`
// mercury.config => only space `mercury.config`
// mercury.source.*#readonly => all prefixed with `mercury.source.` AND has tag `readonly`
// test.*|mercury.* => all prefixed with `test.` AND `mercury.`
// test.* find bin=eq=bar => all prefixed with `test.` AND has an attribute bin that equals bar
// test.* fields foo,bin => all prefixed with `test.` only show fields foo and bin
// - count 20 => start a cursor with 20 results
// - count 20 after <cursor> => continue after cursor for 20 results
// cursor encodes start points for each of the matched sources
type Search struct {
NamespaceSearch
Find []ops
Fields []string
Count uint64
Offset uint64
Cursor string
}
type NamespaceSpec interface {
Value() string
String() string
@ -17,8 +39,10 @@ type NamespaceSpec interface {
type NamespaceSearch []NamespaceSpec
// ParseNamespace returns a list of parsed values
func ParseNamespace(ns string) (lis NamespaceSearch) {
for _, part := range strings.Split(ns, ";") {
func ParseSearch(text string) (search Search) {
ns, text, _ := strings.Cut(text, " ")
var lis NamespaceSearch
for _, part := range strings.Split(ns, "|") {
if strings.HasPrefix(part, "trace:") {
lis = append(lis, NamespaceTrace(part[6:]))
} else if strings.Contains(part, "*") {
@ -27,6 +51,40 @@ func ParseNamespace(ns string) (lis NamespaceSearch) {
lis = append(lis, NamespaceNode(part))
}
}
search.NamespaceSearch = lis
field, text, next := strings.Cut(text, " ")
text = strings.TrimSpace(text)
for next {
switch strings.ToLower(field) {
case "find":
field, text, _ = strings.Cut(text, " ")
text = strings.TrimSpace(text)
search.Find = simpleParse(field)
case "fields":
field, text, _ = strings.Cut(text, " ")
text = strings.TrimSpace(text)
search.Fields = strings.Split(field, ",")
case "count":
field, text, _ = strings.Cut(text, " ")
text = strings.TrimSpace(text)
search.Count, _ = strconv.ParseUint(field, 10, 64)
case "offset":
field, text, _ = strings.Cut(text, " ")
text = strings.TrimSpace(text)
search.Offset, _ = strconv.ParseUint(field, 10, 64)
case "after":
field, text, _ = strings.Cut(text, " ")
text = strings.TrimSpace(text)
search.Cursor = field
}
field, text, next = strings.Cut(text, " ")
text = strings.TrimSpace(text)
}
return
}
@ -117,3 +175,28 @@ func match(n NamespaceSpec, s string) bool {
}
return ok
}
type ops struct {
Left string
Op string
Right string
}
func simpleParse(in string) (out []ops) {
items := strings.Split(in, ",")
for _, i := range items {
log.Println(i)
eq := strings.Split(i, "=")
switch len(eq) {
case 2:
out = append(out, ops{eq[0], "eq", eq[1]})
case 3:
if eq[1] == "" {
eq[1] = "eq"
}
out = append(out, ops{eq[0], eq[1], eq[2]})
}
}
return
}

109
mercury/spec_test.go Normal file
View File

@ -0,0 +1,109 @@
package mercury_test
import (
"fmt"
"testing"
"github.com/matryer/is"
"go.sour.is/pkg/mercury"
"go.sour.is/pkg/mercury/sql"
sq "github.com/Masterminds/squirrel"
)
var MAX_FILTER int = 40
func TestNamespaceParse(t *testing.T) {
var tests = []struct {
getWhere func(mercury.Search) sq.Sqlizer
in string
out string
args []any
}{
{
getWhere: getWhere,
in: "d42.bgp.kapha.*|trace:d42.bgp.kapha",
out: "(column LIKE ? OR ? LIKE column || '%')",
args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"},
},
{
getWhere: getWhere,
in: "d42.bgp.kapha.*|d42.bgp.kapha",
out: "(column LIKE ? OR column = ?)",
args: []any{"d42.bgp.kapha.%", "d42.bgp.kapha"},
},
{
getWhere: mkWhere(t, sql.GetWhereSQ),
in: "d42.bgp.kapha.* find active=eq=true",
out: `SELECT * FROM spaces JOIN ( SELECT DISTINCT id FROM mercury_values mv, json_each(mv."values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?)`,
args: []any{"active", "true", "d42.bgp.kapha.%"},
},
{
getWhere: mkWhere(t, sql.GetWhereSQ),
in: "d42.bgp.kapha.* count 10 offset 5",
out: `SELECT * FROM spaces WHERE (space LIKE ?) LIMIT 10 OFFSET 5`,
args: []any{"d42.bgp.kapha.%"},
},
{
getWhere: mkWhere(t, sql.GetWhereSQ),
in: "d42.bgp.kapha.* fields a,b,c",
out: `SELECT * FROM spaces WHERE (space LIKE ?)`,
args: []any{"d42.bgp.kapha.%"},
},
{
getWhere: mkWhere(t, sql.GetWhereSQ),
in: "dn42.* find @type=in=[person,net]",
out: `SELECT `,
args: []any{"d42.bgp.kapha.%"},
},
}
//SELECT * FROM spaces JOIN ( SELECT DISTINCT id FROM mercury_values mv, json_valid("values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?) !=
//SELECT * FROM spaces JOIN ( SELECT DISTINCT mv.id FROM mercury_values mv, json_each(mv."values") vs WHERE (json_valid("values") AND name = ? AND vs.value = ?) ) r000 USING (id) WHERE (space LIKE ?)
for i, tt := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
is := is.New(t)
out := mercury.ParseSearch(tt.in)
sql, args, err := tt.getWhere(out).ToSql()
is.NoErr(err)
is.Equal(sql, tt.out)
is.Equal(args, tt.args)
})
}
}
func getWhere(search mercury.Search) sq.Sqlizer {
var where sq.Or
space := "column"
for _, m := range search.NamespaceSearch {
switch m.(type) {
case mercury.NamespaceNode:
where = append(where, sq.Eq{space: m.Value()})
case mercury.NamespaceStar:
where = append(where, sq.Like{space: m.Value()})
case mercury.NamespaceTrace:
e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value())
where = append(where, e)
}
}
return where
}
func mkWhere(t *testing.T, where func(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error)) func(search mercury.Search) sq.Sqlizer {
t.Helper()
return func(search mercury.Search) sq.Sqlizer {
w, err := where(search)
if err != nil {
t.Log(err)
t.Fail()
}
return w(sq.Select("*").From("spaces"))
}
}

View File

@ -43,9 +43,7 @@ func listScan(e *[]string, ends [2]rune) scanFn {
return nil
}
for _, s := range splitComma(string(str)) {
*e = append(*e, s)
}
*e = append(*e, splitComma(string(str))...)
return nil
}

View File

@ -29,7 +29,7 @@ func (pgm *sqlHandler) GetNotify(ctx context.Context, event string) (lis mercury
Where(squirrel.Eq{"event": event}).
PlaceholderFormat(squirrel.Dollar).
RunWith(pgm.db).
QueryContext(context.TODO())
QueryContext(ctx)
if err != nil {
return nil, err

View File

@ -2,6 +2,7 @@ package sql
import (
"database/sql"
"strings"
"go.nhat.io/otelsql"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
@ -9,25 +10,28 @@ import (
func openDB(driver, dsn string) (*sql.DB, error) {
system := semconv.DBSystemPostgreSQL
if driver == "sqlite" {
if driver == "sqlite" || strings.HasPrefix(driver, "libsql") {
system = semconv.DBSystemSqlite
}
// Register the otelsql wrapper for the provided postgres driver.
driverName, err := otelsql.Register(driver,
otelsql.AllowRoot(),
otelsql.TraceQueryWithoutArgs(),
otelsql.TraceRowsClose(),
otelsql.TraceRowsAffected(),
// otelsql.WithDatabaseName("my_database"), // Optional.
otelsql.WithSystem(system), // Optional.
)
if err != nil {
return nil, err
if driver == "postgres" {
var err error
// Register the otelsql wrapper for the provided postgres driver.
driver, err = otelsql.Register(driver,
otelsql.AllowRoot(),
otelsql.TraceQueryWithoutArgs(),
otelsql.TraceRowsClose(),
otelsql.TraceRowsAffected(),
// otelsql.WithDatabaseName("my_database"), // Optional.
otelsql.WithSystem(system), // Optional.
)
if err != nil {
return nil, err
}
}
// Connect to a Postgres database using the postgres driver wrapper.
db, err := sql.Open(driverName, dsn)
db, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}

View File

@ -3,21 +3,27 @@ package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"slices"
"strings"
sq "github.com/Masterminds/squirrel"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/mercury"
"go.sour.is/pkg/rsql"
"golang.org/x/exp/maps"
)
var MAX_FILTER int = 40
type sqlHandler struct {
name string
db *sql.DB
paceholderFormat sq.PlaceholderFormat
listFormat [2]rune
readonly bool
getWhere func(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error)
}
var (
@ -27,11 +33,13 @@ var (
_ mercury.WriteConfig = (*sqlHandler)(nil)
)
func Register() {
func Register() func(context.Context) error {
var hdlrs []*sqlHandler
mercury.Registry.Register("sql", func(s *mercury.Space) any {
var dsn string
var opts strings.Builder
var dbtype string
var readonly bool = slices.Contains(s.Tags, "readonly")
for _, c := range s.List {
if c.Name == "match" {
continue
@ -49,7 +57,6 @@ func Register() {
if dsn == "" {
dsn = opts.String()
}
db, err := openDB(dbtype, dsn)
if err != nil {
return err
@ -58,31 +65,47 @@ func Register() {
return err
}
switch dbtype {
case "sqlite":
return &sqlHandler{db, sq.Dollar, [2]rune{'[', ']'}}
case "sqlite", "libsql", "libsql+embed":
h := &sqlHandler{s.Space, db, sq.Question, [2]rune{'[', ']'}, readonly, GetWhereSQ}
hdlrs = append(hdlrs, h)
return h
case "postgres":
return &sqlHandler{db, sq.Dollar, [2]rune{'{', '}'}}
h := &sqlHandler{s.Space, db, sq.Dollar, [2]rune{'{', '}'}, readonly, GetWherePG}
hdlrs = append(hdlrs, h)
return h
default:
return fmt.Errorf("unsupported dbtype: %s", dbtype)
}
})
return func(ctx context.Context) error {
var errs error
for _, h := range hdlrs {
// if err = ctx.Err(); err != nil {
// return errors.Join(errs, err)
// }
errs = errors.Join(errs, h.db.Close())
}
return errs
}
}
type Space struct {
mercury.Space
ID uint64
id uint64
}
type Value struct {
mercury.Value
ID uint64
id uint64
}
func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program) (mercury.Config, error) {
func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.Search) (mercury.Config, error) {
ctx, span := lg.Span(ctx)
defer span.End()
cols := rsql.GetDbColumns(mercury.Space{})
where, err := getWhere(search, cols)
where, err := p.getWhere(search)
if err != nil {
return nil, err
}
@ -100,28 +123,40 @@ func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.NamespaceSearc
return config, nil
}
func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program, fields []string) (mercury.Config, error) {
func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.Search) (config mercury.Config, err error) {
ctx, span := lg.Span(ctx)
defer span.End()
idx, err := p.GetIndex(ctx, search, pgm)
where, err := p.getWhere(search)
if err != nil {
return nil, err
}
spaceMap := make(map[string]int, len(idx))
for u, s := range idx {
spaceMap[s.Space] = u
lis, err := p.listSpace(ctx, nil, where)
if err != nil {
log.Println(err)
return nil, err
}
where, err := getWhere(search, rsql.GetDbColumns(mercury.Value{}))
if err != nil {
return nil, err
if len(lis) == 0 {
return nil, nil
}
query := sq.Select(`"space"`, `"name"`, `"seq"`, `"notes"`, `"tags"`, `"values"`).
From("mercury_registry_vw").
Where(where).
OrderBy("space asc", "name asc").
spaceIDX := make([]uint64, len(lis))
spaceMap := make(map[uint64]int, len(lis))
config = make(mercury.Config, len(lis))
for i, s := range lis {
spaceIDX[i] = s.id
config[i] = &s.Space
spaceMap[s.id] = i
}
query := sq.Select(`"id"`, `"name"`, `"seq"`, `"notes"`, `"tags"`, `"values"`).
From("mercury_values").
Where(sq.Eq{"id": spaceIDX}).
OrderBy("id asc", "seq asc").
PlaceholderFormat(p.paceholderFormat)
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(query.ToSql()))
rows, err := query.RunWith(p.db).
QueryContext(ctx)
@ -133,10 +168,10 @@ func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSear
defer rows.Close()
for rows.Next() {
var s mercury.Value
var s Value
err = rows.Scan(
&s.Space,
&s.id,
&s.Name,
&s.Seq,
listScan(&s.Notes, p.listFormat),
@ -146,19 +181,20 @@ func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSear
if err != nil {
return nil, err
}
if u, ok := spaceMap[s.Space]; ok {
idx[u].List = append(idx[u].List, s)
if u, ok := spaceMap[s.id]; ok {
lis[u].List = append(lis[u].List, s.Value)
}
}
err = rows.Err()
span.RecordError(err)
span.AddEvent(fmt.Sprint("read index ", len(idx)))
return idx, err
span.AddEvent(fmt.Sprint("read index ", len(lis)))
// log.Println(config.String())
return config, err
}
func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.Sqlizer) ([]*Space, error) {
func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where func(sq.SelectBuilder) sq.SelectBuilder) ([]*Space, error) {
ctx, span := lg.Span(ctx)
defer span.End()
@ -168,9 +204,11 @@ func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.S
query := sq.Select(`"id"`, `"space"`, `"notes"`, `"tags"`, `"trailer"`).
From("mercury_spaces").
Where(where).
OrderBy("space asc").
PlaceholderFormat(sq.Dollar)
PlaceholderFormat(p.paceholderFormat)
query = where(query)
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(query.ToSql()))
rows, err := query.RunWith(tx).
QueryContext(ctx)
@ -185,7 +223,7 @@ func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.S
for rows.Next() {
var s Space
err = rows.Scan(
&s.ID,
&s.id,
&s.Space.Space,
listScan(&s.Space.Notes, p.listFormat),
listScan(&s.Space.Tags, p.listFormat),
@ -209,6 +247,10 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
ctx, span := lg.Span(ctx)
defer span.End()
if p.readonly {
return fmt.Errorf("readonly database")
}
// Delete spaces that are present in input but are empty.
deleteSpaces := make(map[string]struct{})
@ -233,7 +275,8 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
}()
// get current spaces
lis, err := p.listSpace(ctx, tx, sq.Eq{"space": maps.Keys(names)})
where := func(qry sq.SelectBuilder) sq.SelectBuilder { return qry.Where(sq.Eq{"space": maps.Keys(names)}) }
lis, err := p.listSpace(ctx, tx, where)
if err != nil {
return
}
@ -250,12 +293,12 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
currentNames[spaceName] = struct{}{}
if _, ok := deleteSpaces[spaceName]; ok {
deleteIDs = append(deleteIDs, s.ID)
deleteIDs = append(deleteIDs, s.id)
continue
}
updateSpaces = append(updateSpaces, config[names[spaceName]])
updateIDs = append(updateIDs, s.ID)
updateIDs = append(updateIDs, s.id)
}
for _, s := range config {
spaceName := s.Space
@ -266,7 +309,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
// delete spaces
if ids := deleteIDs; len(ids) > 0 {
_, err = sq.Delete("mercury_spaces").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx)
_, err = sq.Delete("mercury_spaces").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(p.paceholderFormat).ExecContext(ctx)
if err != nil {
return err
}
@ -274,7 +317,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
// delete values
if ids := append(updateIDs, deleteIDs...); len(ids) > 0 {
_, err = sq.Delete("mercury_values").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx)
_, err = sq.Delete("mercury_values").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(p.paceholderFormat).ExecContext(ctx)
if err != nil {
return err
}
@ -289,7 +332,8 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
Set("tags", listValue(u.Tags, p.listFormat)).
Set("notes", listValue(u.Notes, p.listFormat)).
Set("trailer", listValue(u.Trailer, p.listFormat)).
PlaceholderFormat(sq.Dollar)
PlaceholderFormat(p.paceholderFormat)
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(query.ToSql()))
_, err := query.RunWith(tx).ExecContext(ctx)
@ -298,7 +342,7 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
}
// log.Debugf("UPDATED %d SPACES", len(updateSpaces))
for _, v := range u.List {
newValues = append(newValues, &Value{Value: v, ID: updateIDs[i]})
newValues = append(newValues, &Value{Value: v, id: updateIDs[i]})
}
}
@ -306,15 +350,16 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
for _, s := range insertSpaces {
var id uint64
query := sq.Insert("mercury_spaces").
PlaceholderFormat(sq.Dollar).
PlaceholderFormat(p.paceholderFormat).
Columns("space", "tags", "notes", "trailer").
Values(
s.Space,
listValue(s.Tags, p.listFormat),
s.Space,
listValue(s.Tags, p.listFormat),
listValue(s.Notes, p.listFormat),
listValue(s.Trailer, p.listFormat),
).
).
Suffix("RETURNING \"id\"")
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(query.ToSql()))
err := query.
@ -322,12 +367,13 @@ func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (er
QueryRowContext(ctx).
Scan(&id)
if err != nil {
span.AddEvent(p.name)
s, v, _ := query.ToSql()
log.Println(s, v, err)
return err
}
for _, v := range s.List {
newValues = append(newValues, &Value{Value: v, ID: id})
newValues = append(newValues, &Value{Value: v, id: id})
}
}
@ -353,7 +399,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V
newInsert := func() sq.InsertBuilder {
return sq.Insert("mercury_values").
RunWith(tx).
PlaceholderFormat(sq.Dollar).
PlaceholderFormat(p.paceholderFormat).
Columns(
`"id"`,
`"seq"`,
@ -367,7 +413,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V
insert := newInsert()
for i, s := range lis {
insert = insert.Values(
s.ID,
s.id,
s.Seq,
s.Name,
listValue(s.Values, p.listFormat),
@ -378,7 +424,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V
if i > 0 && i%chunk == 0 {
// log.Debugf("inserting %v rows into %v", i%chunk, d.Table)
// log.Debug(insert.ToSql())
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(insert.ToSql()))
_, err = insert.ExecContext(ctx)
@ -392,7 +438,7 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V
}
if len(lis)%chunk > 0 {
// log.Debugf("inserting %v rows into %v", len(lis)%chunk, d.Table)
// log.Debug(insert.ToSql())
span.AddEvent(p.name)
span.AddEvent(lg.LogQuery(insert.ToSql()))
_, err = insert.ExecContext(ctx)
@ -405,13 +451,11 @@ func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*V
return
}
func getWhere(search mercury.NamespaceSearch, d *rsql.DbColumns) (sq.Sqlizer, error) {
func GetWherePG(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error) {
var where sq.Or
space, err := d.Col("space")
if err != nil {
return nil, err
}
for _, m := range search {
space := "space"
for _, m := range search.NamespaceSearch {
switch m.(type) {
case mercury.NamespaceNode:
where = append(where, sq.Eq{space: m.Value()})
@ -422,5 +466,129 @@ func getWhere(search mercury.NamespaceSearch, d *rsql.DbColumns) (sq.Sqlizer, er
where = append(where, e)
}
}
return where, nil
var joins []sq.SelectBuilder
for i, o := range search.Find {
log.Println(o)
if i > MAX_FILTER {
err := fmt.Errorf("too many filters [%d]", MAX_FILTER)
return nil, err
}
q := sq.Select("DISTINCT id").From("mercury_values")
switch o.Op {
case "key":
q = q.Where(sq.Eq{"name": o.Left})
case "nkey":
q = q.Where(sq.NotEq{"name": o.Left})
case "eq":
q = q.Where("name = ? AND ? = any (values)", o.Left, o.Right)
case "neq":
q = q.Where("name = ? AND ? != any (values)", o.Left, o.Right)
case "gt":
q = q.Where("name = ? AND ? > any (values)", o.Left, o.Right)
case "lt":
q = q.Where("name = ? AND ? < any (values)", o.Left, o.Right)
case "ge":
q = q.Where("name = ? AND ? >= any (values)", o.Left, o.Right)
case "le":
q = q.Where("name = ? AND ? <= any (values)", o.Left, o.Right)
// case "like":
// q = q.Where("name = ? AND value LIKE ?", o.Left, o.Right)
// case "in":
// q = q.Where(sq.Eq{"name": o.Left, "value": strings.Split(o.Right, " ")})
}
joins = append(joins, q)
}
return func(s sq.SelectBuilder) sq.SelectBuilder {
for i, q := range joins {
s = s.JoinClause(q.Prefix("JOIN (").Suffix(fmt.Sprintf(`) r%03d USING (id)`, i)))
}
if search.Count > 0 {
s = s.Limit(search.Count)
}
return s.Where(where)
}, nil
}
func GetWhereSQ(search mercury.Search) (func(sq.SelectBuilder) sq.SelectBuilder, error) {
var where sq.Or
var errs error
id := "id"
space := "space"
name := "name"
values_each := `json_valid("values")`
values_valid := `json_valid("values")`
if errs != nil {
return nil, errs
}
for _, m := range search.NamespaceSearch {
switch m.(type) {
case mercury.NamespaceNode:
where = append(where, sq.Eq{space: m.Value()})
case mercury.NamespaceStar:
where = append(where, sq.Like{space: m.Value()})
case mercury.NamespaceTrace:
e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value())
where = append(where, e)
}
}
var joins []sq.SelectBuilder
for i, o := range search.Find {
log.Println(o)
if i > MAX_FILTER {
err := fmt.Errorf("too many filters [%d]", MAX_FILTER)
return nil, err
}
q := sq.Select("DISTINCT " + id).From(`mercury_values mv, ` + values_each + ` vs`)
switch o.Op {
case "key":
q = q.Where(sq.Eq{name: o.Left})
case "nkey":
q = q.Where(sq.NotEq{name: o.Left})
case "eq":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left, `vs.value`: o.Right}})
case "neq":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.NotEq{`vs.value`: o.Right}})
case "gt":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Gt{`vs.value`: o.Right}})
case "lt":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Lt{`vs.value`: o.Right}})
case "ge":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.GtOrEq{`vs.value`: o.Right}})
case "le":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.LtOrEq{`vs.value`: o.Right}})
case "like":
q = q.Where(sq.And{sq.Expr(values_valid), sq.Eq{name: o.Left}, sq.Like{`vs.value`: o.Right}})
case "in":
q = q.Where(sq.Eq{name: o.Left, "vs.value": strings.Split(o.Right, " ")})
}
joins = append(joins, q)
}
return func(s sq.SelectBuilder) sq.SelectBuilder {
for i, q := range joins {
s = s.JoinClause(q.Prefix("JOIN (").Suffix(fmt.Sprintf(`) r%03d USING (id)`, i)))
}
if search.Count > 0 {
s = s.Limit(search.Count)
}
if search.Offset > 0 {
s = s.Offset(search.Offset)
}
return s.Where(where)
}, nil
}

View File

@ -11,8 +11,8 @@ import (
type DbColumns struct {
Cols []string
index map[string]int
Table string
View string
Table string
View string
}
// Col returns the mapped column names
@ -40,16 +40,16 @@ func GetDbColumns(o interface{}) *DbColumns {
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
sp := append(strings.Split(field.Tag.Get("db"), ","), "")
tag := sp[0]
tag, _, _ := strings.Cut(field.Tag.Get("db"), ",")
json := field.Tag.Get("json")
json, _, _ = strings.Cut(json, ",")
if tag == "" {
tag = json
}
graphql := field.Tag.Get("graphql")
graphql, _, _ = strings.Cut(graphql, ",")
if tag == "" {
tag = graphql
}
@ -88,3 +88,11 @@ func GetDbColumns(o interface{}) *DbColumns {
}
return &d
}
func QuoteCols(cols []string) []string {
lis := make([]string, len(cols))
for i := range cols {
lis[i] = `"` + cols[i] + `"`
}
return lis
}

View File

@ -114,7 +114,6 @@ func (s *Harness) Run(ctx context.Context, appName, version string) error {
err := g.Wait()
if err != nil {
log.Printf("Shutdown due to error: %s", err)
}
return err
}