239 lines
4.7 KiB
Go
239 lines
4.7 KiB
Go
|
package table
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"database/sql"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"time"
|
||
|
|
||
|
"github.com/foxcpp/maddy/framework/config"
|
||
|
"github.com/foxcpp/maddy/framework/module"
|
||
|
|
||
|
"github.com/tursodatabase/go-libsql"
|
||
|
)
|
||
|
|
||
|
type Table struct {
|
||
|
modName string
|
||
|
instName string
|
||
|
|
||
|
lookup string
|
||
|
add string
|
||
|
list string
|
||
|
set string
|
||
|
del string
|
||
|
|
||
|
dir string
|
||
|
sql *sql.DB
|
||
|
connector *libsql.Connector
|
||
|
}
|
||
|
|
||
|
func NewTable(modName, instName string, _, _ []string) (module.Module, error) {
|
||
|
return &Table{
|
||
|
modName: modName,
|
||
|
instName: instName,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func (s *Table) Name() string {
|
||
|
return s.modName
|
||
|
}
|
||
|
|
||
|
func (s *Table) InstanceName() string {
|
||
|
return s.instName
|
||
|
}
|
||
|
|
||
|
func (s *Table) Init(cfg *config.Map) error {
|
||
|
var (
|
||
|
initQueries []string
|
||
|
primaryUrl string
|
||
|
authToken string
|
||
|
dbName = "local.db"
|
||
|
)
|
||
|
cfg.StringList("init", false, false, nil, &initQueries)
|
||
|
|
||
|
cfg.String("url", false, true, "", &primaryUrl)
|
||
|
cfg.String("token", false, true, "", &authToken)
|
||
|
|
||
|
cfg.String("lookup", false, true, "", &s.lookup)
|
||
|
cfg.String("add", false, false, "", &s.add)
|
||
|
cfg.String("list", false, false, "", &s.list)
|
||
|
cfg.String("del", false, false, "", &s.del)
|
||
|
cfg.String("set", false, false, "", &s.set)
|
||
|
|
||
|
if _, err := cfg.Process(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
var err error
|
||
|
s.dir, err = os.MkdirTemp("", "libsql-*")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error creating temporary directory: %w", err)
|
||
|
}
|
||
|
|
||
|
dbPath := filepath.Join(s.dir, dbName)
|
||
|
|
||
|
syncInterval := time.Minute
|
||
|
|
||
|
s.connector, err = libsql.NewEmbeddedReplicaConnector(dbPath, primaryUrl,
|
||
|
libsql.WithAuthToken(authToken),
|
||
|
libsql.WithSyncInterval(syncInterval),
|
||
|
)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Error creating connector: %w", err)
|
||
|
}
|
||
|
|
||
|
s.sql = sql.OpenDB(s.connector)
|
||
|
|
||
|
if len(initQueries) > 0 {
|
||
|
// bldr := ws4.NewRequestBuilder()
|
||
|
// for _, init := range initQueries {
|
||
|
// bldr.AddStatement(init)
|
||
|
// }
|
||
|
// req, err := bldr.Build()
|
||
|
// if err != nil {
|
||
|
// return config.NodeErr(cfg.Block, "failed to init db: %v", err)
|
||
|
// }
|
||
|
// _, _, err = db.Send(req)
|
||
|
// if err != nil {
|
||
|
// return config.NodeErr(cfg.Block, "failed to init db: %v", err)
|
||
|
// }
|
||
|
tx, err := s.sql.Begin()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, qry := range initQueries {
|
||
|
res, err := tx.Exec(qry)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
rows, err := res.RowsAffected()
|
||
|
log.Println("db rows effected: ", rows, err)
|
||
|
}
|
||
|
if err = tx.Commit(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Table) Close() error {
|
||
|
defer os.RemoveAll(s.dir)
|
||
|
defer s.connector.Close()
|
||
|
|
||
|
return s.sql.Close()
|
||
|
}
|
||
|
|
||
|
func (s *Table) Lookup(ctx context.Context, val string) (value string, ok bool, err error) {
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("%s: lookup: %w", s.modName, err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
row := s.sql.QueryRowContext(ctx, s.lookup, val)
|
||
|
err = row.Scan(&value)
|
||
|
|
||
|
return value, err == nil, err
|
||
|
}
|
||
|
|
||
|
func (s *Table) LookupMulti(ctx context.Context, val string) (lis []string, err error) {
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("%s: lookupMulti: %w", s.modName, err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var rows *sql.Rows
|
||
|
rows, err = s.sql.QueryContext(ctx, s.lookup, val)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer rows.Close()
|
||
|
|
||
|
for rows.Next() {
|
||
|
var value string
|
||
|
err = rows.Scan(&value)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
lis = append(lis, value)
|
||
|
}
|
||
|
err = rows.Err()
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (s *Table) Keys() (lis []string, err error) {
|
||
|
if s.list == "" {
|
||
|
return nil, fmt.Errorf("%s: table is not mutable (no 'list' query)", s.modName)
|
||
|
}
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("%s: list: %w", s.modName, err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var rows *sql.Rows
|
||
|
rows, err = s.sql.Query(s.list)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer rows.Close()
|
||
|
|
||
|
for rows.Next() {
|
||
|
var value string
|
||
|
err = rows.Scan(&value)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
lis = append(lis, value)
|
||
|
}
|
||
|
err = rows.Err()
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (s *Table) RemoveKey(k string) (err error) {
|
||
|
if s.del == "" {
|
||
|
return fmt.Errorf("%s: table is not mutable (no 'del' query)", s.modName)
|
||
|
}
|
||
|
|
||
|
_, err = s.sql.ExecContext(context.TODO(), s.del, k)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%s: del %s: %w", s.modName, k, err)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Table) SetKey(k, v string) (err error) {
|
||
|
if s.set == "" {
|
||
|
return fmt.Errorf("%s: table is not mutable (no 'set' query)", s.modName)
|
||
|
}
|
||
|
if s.add == "" {
|
||
|
return fmt.Errorf("%s: table is not mutable (no 'add' query)", s.modName)
|
||
|
}
|
||
|
|
||
|
res, err := s.sql.ExecContext(context.TODO(), s.set, k, v)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%s: set %s: %w", s.modName, k, err)
|
||
|
}
|
||
|
var n int64
|
||
|
if n, err = res.RowsAffected(); err != nil && n == 0 {
|
||
|
_, err = s.sql.ExecContext(context.TODO(), s.add, k, v)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%s: add %s: %w", s.modName, k, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func init() {
|
||
|
module.Register("table.turso_query", NewTable)
|
||
|
}
|