chore: add mercury
This commit is contained in:
116
mercury/sql/init-pg.sql
Normal file
116
mercury/sql/init-pg.sql
Normal file
@@ -0,0 +1,116 @@
|
||||
CREATE SEQUENCE IF NOT EXISTS mercury_spaces_id_seq;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS mercury_spaces
|
||||
(
|
||||
space character varying NOT NULL,
|
||||
id integer NOT NULL DEFAULT nextval('mercury_spaces_id_seq'::regclass),
|
||||
notes character varying[] NOT NULL DEFAULT '{}'::character varying[],
|
||||
tags character varying[] NOT NULL DEFAULT '{}'::character varying[],
|
||||
CONSTRAINT mercury_namespace_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS mercury_namespace_space_uindex
|
||||
ON mercury_spaces USING btree
|
||||
(space ASC NULLS LAST);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS mercury_values
|
||||
(
|
||||
id integer NOT NULL,
|
||||
seq integer NOT NULL,
|
||||
name character varying NOT NULL,
|
||||
"values" character varying[] NOT NULL DEFAULT '{}'::character varying[],
|
||||
tags character varying[] NOT NULL DEFAULT '{}'::character varying[],
|
||||
notes character varying[] NOT NULL DEFAULT '{}'::character varying[],
|
||||
CONSTRAINT mercury_values_pk PRIMARY KEY (id, seq)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS mercury_values_name_index
|
||||
ON mercury_values USING btree
|
||||
(name ASC NULLS LAST);
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_registry_vw
|
||||
AS
|
||||
SELECT
|
||||
s.id,
|
||||
v.seq,
|
||||
s.space,
|
||||
v.name,
|
||||
v."values",
|
||||
v.notes,
|
||||
v.tags
|
||||
FROM mercury_spaces s
|
||||
JOIN mercury_values v ON s.id = v.id;
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_groups_vw
|
||||
AS
|
||||
SELECT DISTINCT
|
||||
unnest(vw."values") AS user_id,
|
||||
vw.name AS group_id
|
||||
FROM mercury_registry_vw vw
|
||||
WHERE vw.space::text = 'mercury.groups'::text;
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_group_rules_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name AS group_id,
|
||||
unnest(vw."values") AS rules
|
||||
FROM mercury_registry_vw vw
|
||||
WHERE vw.space::text = 'mercury.policy'::text
|
||||
)
|
||||
SELECT tt.group_id,
|
||||
split_part(tt.rules::text, ' '::text, 1) AS role,
|
||||
split_part(tt.rules::text, ' '::text, 2) AS type,
|
||||
split_part(tt.rules::text, ' '::text, 3) AS match
|
||||
FROM tt;
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_user_rules_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name AS group_id,
|
||||
unnest(vw."values") AS rules
|
||||
FROM mercury_registry_vw vw
|
||||
WHERE vw.space::text = 'mercury.policy'::text
|
||||
)
|
||||
SELECT
|
||||
g.user_id,
|
||||
split_part(tt.rules::text, ' '::text, 1) AS role,
|
||||
split_part(tt.rules::text, ' '::text, 2) AS type,
|
||||
split_part(tt.rules::text, ' '::text, 3) AS match
|
||||
FROM mercury_groups_vw g
|
||||
JOIN tt ON g.group_id::text = tt.group_id::text;
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_rules_vw
|
||||
AS
|
||||
SELECT
|
||||
'U-'::text || vw.user_id::text AS id,
|
||||
vw.role,
|
||||
vw.type,
|
||||
vw.match
|
||||
FROM mercury_user_rules_vw vw
|
||||
UNION
|
||||
SELECT
|
||||
'G-'::text || vw.group_id::text AS id,
|
||||
vw.role,
|
||||
vw.type,
|
||||
vw.match
|
||||
FROM mercury_group_rules_vw vw;
|
||||
|
||||
CREATE OR REPLACE VIEW mercury_notify_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name,
|
||||
unnest(vw."values") AS rules
|
||||
FROM mercury_registry_vw vw
|
||||
WHERE vw.space::text = 'mercury.notify'::text
|
||||
)
|
||||
SELECT
|
||||
tt.name,
|
||||
split_part(tt.rules::text, ' '::text, 1) AS match,
|
||||
split_part(tt.rules::text, ' '::text, 2) AS event,
|
||||
split_part(tt.rules::text, ' '::text, 3) AS method,
|
||||
split_part(tt.rules::text, ' '::text, 4) AS url
|
||||
FROM tt;
|
||||
118
mercury/sql/init-sql3.sql
Normal file
118
mercury/sql/init-sql3.sql
Normal file
@@ -0,0 +1,118 @@
|
||||
CREATE TABLE IF NOT EXISTS mercury_spaces
|
||||
(
|
||||
space character varying NOT NULL unique,
|
||||
id integer NOT NULL CONSTRAINT mercury_namespace_pk PRIMARY KEY autoincrement,
|
||||
notes json NOT NULL DEFAULT '[]',
|
||||
tags json NOT NULL DEFAULT '[]'
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS mercury_values
|
||||
(
|
||||
id integer NOT NULL,
|
||||
seq integer NOT NULL,
|
||||
name character varying NOT NULL,
|
||||
"values" json NOT NULL DEFAULT '[]',
|
||||
tags json NOT NULL DEFAULT '[]',
|
||||
notes json NOT NULL DEFAULT '[]',
|
||||
CONSTRAINT mercury_values_pk PRIMARY KEY (id, seq)
|
||||
);
|
||||
|
||||
drop view if exists mercury_registry_vw;
|
||||
CREATE VIEW if not exists mercury_registry_vw
|
||||
AS
|
||||
SELECT
|
||||
s.id,
|
||||
v.seq,
|
||||
s.space,
|
||||
v.name,
|
||||
v."values",
|
||||
v.notes,
|
||||
v.tags
|
||||
FROM mercury_spaces s
|
||||
JOIN mercury_values v ON s.id = v.id;
|
||||
|
||||
drop view if exists mercury_groups_vw;
|
||||
CREATE VIEW if not exists mercury_groups_vw
|
||||
AS
|
||||
SELECT DISTINCT
|
||||
j.value AS user_id,
|
||||
vw.name AS group_id
|
||||
FROM mercury_registry_vw vw, json_each(vw."values") j
|
||||
WHERE vw.space = 'mercury.groups';
|
||||
|
||||
drop view if exists mercury_group_rules_vw;
|
||||
CREATE VIEW if not exists mercury_group_rules_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name AS group_id,
|
||||
j.value AS rules
|
||||
FROM mercury_registry_vw vw, json_each(vw."values") j
|
||||
WHERE vw.space = 'mercury.policy'
|
||||
)
|
||||
SELECT tt.group_id,
|
||||
tt.rules rule,
|
||||
'' AS role,
|
||||
'' AS type,
|
||||
''AS match
|
||||
FROM tt;
|
||||
|
||||
drop view if exists mercury_user_rules_vw;
|
||||
CREATE VIEW if not exists mercury_user_rules_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name AS group_id,
|
||||
j.value AS rules
|
||||
FROM mercury_registry_vw vw, json_each(vw."values") j
|
||||
WHERE vw.space = 'mercury.policy'
|
||||
)
|
||||
SELECT
|
||||
g.user_id,
|
||||
tt.rules rule,
|
||||
'' AS role,
|
||||
'' AS type,
|
||||
'' AS match
|
||||
FROM mercury_groups_vw g
|
||||
JOIN tt ON g.group_id = tt.group_id;
|
||||
|
||||
drop view if exists mercury_rules_vw;
|
||||
CREATE VIEW if not exists mercury_rules_vw
|
||||
AS
|
||||
SELECT
|
||||
'U-' || vw.user_id AS id,
|
||||
vw.rule,
|
||||
vw.role,
|
||||
vw.type,
|
||||
vw.match
|
||||
FROM mercury_user_rules_vw vw
|
||||
UNION
|
||||
SELECT
|
||||
'G-' || vw.group_id AS id,
|
||||
vw.rule,
|
||||
vw.role,
|
||||
vw.type,
|
||||
vw.match
|
||||
FROM mercury_group_rules_vw vw;
|
||||
|
||||
drop view if exists mercury_notify_vw;
|
||||
CREATE VIEW if not exists mercury_notify_vw
|
||||
AS
|
||||
WITH
|
||||
tt as (
|
||||
SELECT DISTINCT
|
||||
vw.name,
|
||||
j.value AS rules
|
||||
FROM mercury_registry_vw vw, json_each(vw."values") j
|
||||
WHERE vw.space = 'mercury.notify'
|
||||
)
|
||||
SELECT
|
||||
tt.name,
|
||||
tt.rules rule,
|
||||
substr(tt.rules, 1, instr(tt.rules, ' ')-1) AS match,
|
||||
substr(tt.rules, instr(tt.rules, ' ')+1, instr(substr(tt.rules, instr(tt.rules, ' ')+1), ' ')-1) AS event,
|
||||
'' AS method,
|
||||
'' as url
|
||||
FROM tt;
|
||||
130
mercury/sql/list-string.go
Normal file
130
mercury/sql/list-string.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
type valueFn func() (v driver.Value, err error)
|
||||
|
||||
func (fn valueFn) Value() (v driver.Value, err error) {
|
||||
return fn()
|
||||
}
|
||||
|
||||
type scanFn func(value any) error
|
||||
|
||||
func (fn scanFn) Scan(v any) error {
|
||||
return fn(v)
|
||||
}
|
||||
|
||||
func listScan(e *[]string, ends [2]rune) scanFn {
|
||||
return func(value any) error {
|
||||
var str string
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
str = v
|
||||
case []byte:
|
||||
str = string(v)
|
||||
case []rune:
|
||||
str = string(v)
|
||||
default:
|
||||
return fmt.Errorf("array must be uint64, got: %T", value)
|
||||
}
|
||||
|
||||
if e == nil {
|
||||
*e = []string{}
|
||||
}
|
||||
|
||||
str = trim(str, ends[0], ends[1])
|
||||
if len(str) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, s := range splitComma(string(str)) {
|
||||
*e = append(*e, s)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func listValue(e []string, ends [2]rune) valueFn {
|
||||
return func() (value driver.Value, err error) {
|
||||
var b strings.Builder
|
||||
|
||||
if len(e) == 0 {
|
||||
return string(ends[:]), nil
|
||||
}
|
||||
|
||||
_, err = b.WriteRune(ends[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var arr []string
|
||||
for _, s := range e {
|
||||
arr = append(arr, `"`+s+`"`)
|
||||
}
|
||||
_, err = b.WriteString(strings.Join(arr, ","))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = b.WriteRune(ends[1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return b.String(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func splitComma(s string) []string {
|
||||
lastQuote := rune(0)
|
||||
f := func(c rune) bool {
|
||||
switch {
|
||||
case c == lastQuote:
|
||||
lastQuote = rune(0)
|
||||
return false
|
||||
case lastQuote != rune(0):
|
||||
return false
|
||||
case unicode.In(c, unicode.Quotation_Mark):
|
||||
lastQuote = c
|
||||
return false
|
||||
default:
|
||||
return c == ','
|
||||
}
|
||||
}
|
||||
lis := strings.FieldsFunc(s, f)
|
||||
|
||||
var out []string
|
||||
for _, s := range lis {
|
||||
s = trim(s, '"', '"')
|
||||
out = append(out, s)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func trim(s string, start, end rune) string {
|
||||
r0, size0 := utf8.DecodeRuneInString(s)
|
||||
if size0 == 0 {
|
||||
return s
|
||||
}
|
||||
if r0 != start {
|
||||
return s
|
||||
}
|
||||
|
||||
r1, size1 := utf8.DecodeLastRuneInString(s)
|
||||
if size1 == 0 {
|
||||
return s
|
||||
}
|
||||
if r1 != end {
|
||||
return s
|
||||
}
|
||||
|
||||
return s[size0 : len(s)-size1]
|
||||
}
|
||||
55
mercury/sql/notify.go
Normal file
55
mercury/sql/notify.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
|
||||
"go.sour.is/pkg/lg"
|
||||
"go.sour.is/pkg/mercury"
|
||||
)
|
||||
|
||||
// Notify stores the attributes for a registry space
|
||||
type Notify struct {
|
||||
Name string `json:"name" view:"mercury_notify_vw"`
|
||||
Match string `json:"match"`
|
||||
Event string `json:"event"`
|
||||
Method string `json:"-" db:"method"`
|
||||
URL string `json:"-" db:"url"`
|
||||
}
|
||||
|
||||
// GetNotify get list of rules
|
||||
func (pgm *sqlHandler) GetNotify(ctx context.Context, event string) (lis mercury.ListNotify, err error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
rows, err := squirrel.Select(`"name"`, `"match"`, `"event"`, `"method"`, `"url"`, `"rule"`).
|
||||
From("mercury_notify_vw").
|
||||
Where(squirrel.Eq{"event": event}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
RunWith(pgm.db).
|
||||
QueryContext(context.TODO())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var s mercury.Notify
|
||||
var rule string
|
||||
err = rows.Scan(&s.Name, &s.Match, &s.Event, &s.Method, &s.URL, &rule)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rule != "" {
|
||||
s.Match, rule, _ = strings.Cut(rule, " ")
|
||||
s.Event, rule, _ = strings.Cut(rule, " ")
|
||||
s.Method, s.URL, _ = strings.Cut(rule, " ")
|
||||
}
|
||||
lis = append(lis, s)
|
||||
}
|
||||
|
||||
return lis, rows.Err()
|
||||
}
|
||||
40
mercury/sql/otel.go
Normal file
40
mercury/sql/otel.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"go.nhat.io/otelsql"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
|
||||
)
|
||||
|
||||
func openDB(driver, dsn string) (*sql.DB, error) {
|
||||
system := semconv.DBSystemPostgreSQL
|
||||
if driver == "sqlite" {
|
||||
system = semconv.DBSystemSqlite
|
||||
}
|
||||
|
||||
// Register the otelsql wrapper for the provided postgres driver.
|
||||
driverName, err := otelsql.Register(driver,
|
||||
otelsql.AllowRoot(),
|
||||
otelsql.TraceQueryWithoutArgs(),
|
||||
otelsql.TraceRowsClose(),
|
||||
otelsql.TraceRowsAffected(),
|
||||
// otelsql.WithDatabaseName("my_database"), // Optional.
|
||||
otelsql.WithSystem(system), // Optional.
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Connect to a Postgres database using the postgres driver wrapper.
|
||||
db, err := sql.Open(driverName, dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := otelsql.RecordStats(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
99
mercury/sql/rules.go
Normal file
99
mercury/sql/rules.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
"go.sour.is/pkg/lg"
|
||||
"go.sour.is/pkg/mercury"
|
||||
"go.sour.is/pkg/ident"
|
||||
)
|
||||
|
||||
type grouper interface {
|
||||
GetGroups() []string
|
||||
}
|
||||
|
||||
// GetRules get list of rules
|
||||
func (p *sqlHandler) GetRules(ctx context.Context, user ident.Ident) (lis mercury.Rules, err error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
var ids []string
|
||||
ids = append(ids, "U-"+user.Identity())
|
||||
switch u := user.(type) {
|
||||
case grouper:
|
||||
for _, g := range u.GetGroups() {
|
||||
ids = append(ids, "G-"+g)
|
||||
}
|
||||
}
|
||||
if groups, err := p.getGroups(ctx, user.Identity()); err != nil {
|
||||
for _, g := range groups {
|
||||
ids = append(ids, "G-"+g)
|
||||
}
|
||||
}
|
||||
|
||||
query := squirrel.Select(`"role"`, `"type"`, `"match"`, `"rule"`).
|
||||
From("mercury_rules_vw").
|
||||
Where(squirrel.Eq{"id": ids}).
|
||||
PlaceholderFormat(squirrel.Dollar)
|
||||
rows, err := query.
|
||||
RunWith(p.db).
|
||||
QueryContext(ctx)
|
||||
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var s mercury.Rule
|
||||
var rule string
|
||||
err = rows.Scan(&s.Role, &s.Type, &s.Match, &rule)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return nil, err
|
||||
}
|
||||
if rule != "" {
|
||||
s.Role, rule, _ = strings.Cut(rule, " ")
|
||||
s.Type, s.Match, _ = strings.Cut(rule, " ")
|
||||
}
|
||||
lis = append(lis, s)
|
||||
}
|
||||
err = rows.Err()
|
||||
span.RecordError(err)
|
||||
|
||||
span.AddEvent(fmt.Sprint("read rules ", len(lis)))
|
||||
return lis, err
|
||||
}
|
||||
|
||||
// getGroups get list of groups
|
||||
func (pgm *sqlHandler) getGroups(ctx context.Context, user string) (lis []string, err error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
rows, err := squirrel.Select("group_id").
|
||||
From("mercury_groups_vw").
|
||||
Where(squirrel.Eq{"user_id": user}).
|
||||
PlaceholderFormat(squirrel.Dollar).
|
||||
RunWith(pgm.db).
|
||||
QueryContext(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var s string
|
||||
err = rows.Scan(&s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lis = append(lis, s)
|
||||
}
|
||||
|
||||
return lis, rows.Err()
|
||||
}
|
||||
419
mercury/sql/sql.go
Normal file
419
mercury/sql/sql.go
Normal file
@@ -0,0 +1,419 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
"go.sour.is/pkg/lg"
|
||||
"go.sour.is/pkg/mercury"
|
||||
"go.sour.is/pkg/rsql"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
type sqlHandler struct {
|
||||
db *sql.DB
|
||||
paceholderFormat sq.PlaceholderFormat
|
||||
listFormat [2]rune
|
||||
}
|
||||
|
||||
var (
|
||||
_ mercury.GetIndex = (*sqlHandler)(nil)
|
||||
_ mercury.GetConfig = (*sqlHandler)(nil)
|
||||
_ mercury.GetRules = (*sqlHandler)(nil)
|
||||
_ mercury.WriteConfig = (*sqlHandler)(nil)
|
||||
)
|
||||
|
||||
func Register() {
|
||||
mercury.Registry.Register("sql", func(s *mercury.Space) any {
|
||||
var dsn string
|
||||
var opts strings.Builder
|
||||
var dbtype string
|
||||
for _, c := range s.List {
|
||||
if c.Name == "match" {
|
||||
continue
|
||||
}
|
||||
if c.Name == "dbtype" {
|
||||
dbtype = c.First()
|
||||
continue
|
||||
}
|
||||
if c.Name == "dsn" {
|
||||
dsn = c.First()
|
||||
break
|
||||
}
|
||||
fmt.Fprintln(&opts, c.Name, "=", c.First())
|
||||
}
|
||||
if dsn == "" {
|
||||
dsn = opts.String()
|
||||
}
|
||||
|
||||
db, err := openDB(dbtype, dsn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = db.Ping(); err != nil {
|
||||
return err
|
||||
}
|
||||
switch dbtype {
|
||||
case "sqlite":
|
||||
return &sqlHandler{db, sq.Dollar, [2]rune{'[', ']'}}
|
||||
case "postgres":
|
||||
return &sqlHandler{db, sq.Dollar, [2]rune{'{', '}'}}
|
||||
default:
|
||||
return fmt.Errorf("unsupported dbtype: %s", dbtype)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type Space struct {
|
||||
mercury.Space
|
||||
ID uint64
|
||||
}
|
||||
type Value struct {
|
||||
mercury.Value
|
||||
ID uint64
|
||||
}
|
||||
|
||||
func (p *sqlHandler) GetIndex(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program) (mercury.Config, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
cols := rsql.GetDbColumns(mercury.Space{})
|
||||
where, err := getWhere(search, cols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lis, err := p.listSpace(ctx, nil, where)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config := make(mercury.Config, len(lis))
|
||||
for i, s := range lis {
|
||||
config[i] = &s.Space
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (p *sqlHandler) GetConfig(ctx context.Context, search mercury.NamespaceSearch, pgm *rsql.Program, fields []string) (mercury.Config, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
idx, err := p.GetIndex(ctx, search, pgm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spaceMap := make(map[string]int, len(idx))
|
||||
for u, s := range idx {
|
||||
spaceMap[s.Space] = u
|
||||
}
|
||||
|
||||
where, err := getWhere(search, rsql.GetDbColumns(mercury.Value{}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query := sq.Select(`"space"`, `"name"`, `"seq"`, `"notes"`, `"tags"`, `"values"`).
|
||||
From("mercury_registry_vw").
|
||||
Where(where).
|
||||
OrderBy("space asc", "name asc").
|
||||
PlaceholderFormat(p.paceholderFormat)
|
||||
span.AddEvent(lg.LogQuery(query.ToSql()))
|
||||
rows, err := query.RunWith(p.db).
|
||||
QueryContext(ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var s mercury.Value
|
||||
|
||||
err = rows.Scan(
|
||||
&s.Space,
|
||||
&s.Name,
|
||||
&s.Seq,
|
||||
listScan(&s.Notes, p.listFormat),
|
||||
listScan(&s.Tags, p.listFormat),
|
||||
listScan(&s.Values, p.listFormat),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if u, ok := spaceMap[s.Space]; ok {
|
||||
idx[u].List = append(idx[u].List, s)
|
||||
}
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
span.RecordError(err)
|
||||
|
||||
span.AddEvent(fmt.Sprint("read index ", len(idx)))
|
||||
return idx, err
|
||||
}
|
||||
|
||||
func (p *sqlHandler) listSpace(ctx context.Context, tx sq.BaseRunner, where sq.Sqlizer) ([]*Space, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
if tx == nil {
|
||||
tx = p.db
|
||||
}
|
||||
|
||||
query := sq.Select(`"id"`, `"space"`, `"tags"`, `"notes"`).
|
||||
From("mercury_spaces").
|
||||
Where(where).
|
||||
OrderBy("space asc").
|
||||
PlaceholderFormat(sq.Dollar)
|
||||
span.AddEvent(lg.LogQuery(query.ToSql()))
|
||||
rows, err := query.RunWith(tx).
|
||||
QueryContext(ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var lis []*Space
|
||||
for rows.Next() {
|
||||
var s Space
|
||||
err = rows.Scan(
|
||||
&s.ID,
|
||||
&s.Space.Space,
|
||||
listScan(&s.Space.Notes, p.listFormat),
|
||||
listScan(&s.Space.Tags, p.listFormat),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lis = append(lis, &s)
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
span.RecordError(err)
|
||||
|
||||
span.AddEvent(fmt.Sprint("read config ", len(lis)))
|
||||
return lis, err
|
||||
}
|
||||
|
||||
// WriteConfig writes a config map to database
|
||||
func (p *sqlHandler) WriteConfig(ctx context.Context, config mercury.Config) (err error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
// Delete spaces that are present in input but are empty.
|
||||
deleteSpaces := make(map[string]struct{})
|
||||
|
||||
// get names of each space
|
||||
var names = make(map[string]int)
|
||||
for i, v := range config {
|
||||
names[v.Space] = i
|
||||
|
||||
if len(v.Tags) == 0 && len(v.Notes) == 0 && len(v.List) == 0 {
|
||||
deleteSpaces[v.Space] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil && tx != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
// get current spaces
|
||||
lis, err := p.listSpace(ctx, tx, sq.Eq{"space": maps.Keys(names)})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// determine which are being updated
|
||||
var deleteIDs []uint64
|
||||
var updateIDs []uint64
|
||||
var currentNames = make(map[string]struct{}, len(lis))
|
||||
var updateSpaces []*mercury.Space
|
||||
var insertSpaces []*mercury.Space
|
||||
|
||||
for _, s := range lis {
|
||||
spaceName := s.Space.Space
|
||||
currentNames[spaceName] = struct{}{}
|
||||
|
||||
if _, ok := deleteSpaces[spaceName]; ok {
|
||||
deleteIDs = append(deleteIDs, s.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
updateSpaces = append(updateSpaces, config[names[spaceName]])
|
||||
updateIDs = append(updateIDs, s.ID)
|
||||
}
|
||||
for _, s := range config {
|
||||
spaceName := s.Space
|
||||
if _, ok := currentNames[spaceName]; !ok {
|
||||
insertSpaces = append(insertSpaces, s)
|
||||
}
|
||||
}
|
||||
|
||||
// delete spaces
|
||||
if ids := deleteIDs; len(ids) > 0 {
|
||||
_, err = sq.Delete("mercury_spaces").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete values
|
||||
if ids := append(updateIDs, deleteIDs...); len(ids) > 0 {
|
||||
_, err = sq.Delete("mercury_values").Where(sq.Eq{"id": ids}).RunWith(tx).PlaceholderFormat(sq.Dollar).ExecContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var newValues []*Value
|
||||
|
||||
// update spaces
|
||||
for i, u := range updateSpaces {
|
||||
query := sq.Update("mercury_spaces").
|
||||
Where(sq.Eq{"id": updateIDs[i]}).
|
||||
Set("tags", listValue(u.Tags, p.listFormat)).
|
||||
Set("notes", listValue(u.Notes, p.listFormat)).
|
||||
PlaceholderFormat(sq.Dollar)
|
||||
span.AddEvent(lg.LogQuery(query.ToSql()))
|
||||
_, err := query.RunWith(tx).ExecContext(ctx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// log.Debugf("UPDATED %d SPACES", len(updateSpaces))
|
||||
for _, v := range u.List {
|
||||
newValues = append(newValues, &Value{Value: v, ID: updateIDs[i]})
|
||||
}
|
||||
}
|
||||
|
||||
// insert spaces
|
||||
for _, s := range insertSpaces {
|
||||
var id uint64
|
||||
query := sq.Insert("mercury_spaces").
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
Columns("space", "tags", "notes").
|
||||
Values(s.Space, listValue(s.Tags, p.listFormat), listValue(s.Notes, p.listFormat)).
|
||||
Suffix("RETURNING \"id\"")
|
||||
span.AddEvent(lg.LogQuery(query.ToSql()))
|
||||
|
||||
err := query.
|
||||
RunWith(tx).
|
||||
QueryRowContext(ctx).
|
||||
Scan(&id)
|
||||
if err != nil {
|
||||
s, v, _ := query.ToSql()
|
||||
log.Println(s, v, err)
|
||||
return err
|
||||
}
|
||||
for _, v := range s.List {
|
||||
newValues = append(newValues, &Value{Value: v, ID: id})
|
||||
}
|
||||
}
|
||||
|
||||
// write all values to db.
|
||||
err = p.writeValues(ctx, tx, newValues)
|
||||
// log.Debugf("WROTE %d ATTRS", len(attrs))
|
||||
|
||||
tx.Commit()
|
||||
tx = nil
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// writeValues writes the values to db
|
||||
func (p *sqlHandler) writeValues(ctx context.Context, tx sq.BaseRunner, lis []*Value) (err error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
if len(lis) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
newInsert := func() sq.InsertBuilder {
|
||||
return sq.Insert("mercury_values").
|
||||
RunWith(tx).
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
Columns(
|
||||
`"id"`,
|
||||
`"seq"`,
|
||||
`"name"`,
|
||||
`"values"`,
|
||||
`"notes"`,
|
||||
`"tags"`,
|
||||
)
|
||||
}
|
||||
chunk := int(65000 / 3)
|
||||
insert := newInsert()
|
||||
for i, s := range lis {
|
||||
insert = insert.Values(
|
||||
s.ID,
|
||||
s.Seq,
|
||||
s.Name,
|
||||
listValue(s.Values, p.listFormat),
|
||||
listValue(s.Notes, p.listFormat),
|
||||
listValue(s.Tags, p.listFormat),
|
||||
)
|
||||
// log.Debug(s.Name)
|
||||
|
||||
if i > 0 && i%chunk == 0 {
|
||||
// log.Debugf("inserting %v rows into %v", i%chunk, d.Table)
|
||||
// log.Debug(insert.ToSql())
|
||||
span.AddEvent(lg.LogQuery(insert.ToSql()))
|
||||
|
||||
_, err = insert.ExecContext(ctx)
|
||||
if err != nil {
|
||||
// log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
insert = newInsert()
|
||||
}
|
||||
}
|
||||
if len(lis)%chunk > 0 {
|
||||
// log.Debugf("inserting %v rows into %v", len(lis)%chunk, d.Table)
|
||||
// log.Debug(insert.ToSql())
|
||||
span.AddEvent(lg.LogQuery(insert.ToSql()))
|
||||
|
||||
_, err = insert.ExecContext(ctx)
|
||||
if err != nil {
|
||||
// log.Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getWhere(search mercury.NamespaceSearch, d *rsql.DbColumns) (sq.Sqlizer, error) {
|
||||
var where sq.Or
|
||||
space, err := d.Col("space")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, m := range search {
|
||||
switch m.(type) {
|
||||
case mercury.NamespaceNode:
|
||||
where = append(where, sq.Eq{space: m.Value()})
|
||||
case mercury.NamespaceStar:
|
||||
where = append(where, sq.Like{space: m.Value()})
|
||||
case mercury.NamespaceTrace:
|
||||
e := sq.Expr(`? LIKE `+space+` || '%'`, m.Value())
|
||||
where = append(where, e)
|
||||
}
|
||||
}
|
||||
return where, nil
|
||||
}
|
||||
Reference in New Issue
Block a user