fix: file sharing of wal
This commit is contained in:
parent
50b44a7bb7
commit
23639bc306
|
@ -3,6 +3,7 @@ package diskstore
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -115,6 +116,7 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e
|
||||||
}
|
}
|
||||||
|
|
||||||
start, count := math.PagerBox(first, last, pos, count)
|
start, count := math.PagerBox(first, last, pos, count)
|
||||||
|
log.Println("reading", first, last, pos, count, start)
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package streamer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"log"
|
||||||
|
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/driver"
|
"github.com/sour-is/ev/pkg/es/driver"
|
||||||
|
@ -55,16 +56,20 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64)
|
||||||
size: es.AllEvents,
|
size: es.AllEvents,
|
||||||
})
|
})
|
||||||
sub.unsub = s.delete(streamID, sub)
|
sub.unsub = s.delete(streamID, sub)
|
||||||
|
log.Println("start ", sub)
|
||||||
|
|
||||||
return sub, s.state.Modify(ctx, func(state *state) error {
|
return sub, s.state.Modify(ctx, func(state *state) error {
|
||||||
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
|
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
|
||||||
|
log.Println("add ", len(state.subscribers[streamID]))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
|
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
|
||||||
return s.state.Modify(ctx, func(state *state) error {
|
return s.state.Modify(ctx, func(state *state) error {
|
||||||
|
log.Println("trigger ", len(state.subscribers[streamID]))
|
||||||
for _, sub := range state.subscribers[streamID] {
|
for _, sub := range state.subscribers[streamID] {
|
||||||
return sub.position.Modify(ctx, func(position *position) error {
|
log.Println("trigg ", sub)
|
||||||
|
err := sub.position.Modify(ctx, func(position *position) error {
|
||||||
position.size = int64(events.Last().EventMeta().Position - uint64(position.idx))
|
position.size = int64(events.Last().EventMeta().Position - uint64(position.idx))
|
||||||
|
|
||||||
if position.wait != nil {
|
if position.wait != nil {
|
||||||
|
@ -73,6 +78,7 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil { return err }
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -83,6 +89,7 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
log.Println("unsub ", s)
|
||||||
return s.state.Modify(ctx, func(state *state) error {
|
return s.state.Modify(ctx, func(state *state) error {
|
||||||
lis := state.subscribers[streamID]
|
lis := state.subscribers[streamID]
|
||||||
for i := range lis {
|
for i := range lis {
|
||||||
|
@ -143,6 +150,8 @@ type subscription struct {
|
||||||
|
|
||||||
func (s *subscription) Recv(ctx context.Context) bool {
|
func (s *subscription) Recv(ctx context.Context) bool {
|
||||||
var wait func(context.Context) bool
|
var wait func(context.Context) bool
|
||||||
|
log.Println("wait ", s)
|
||||||
|
defer log.Println("recv ", s)
|
||||||
err := s.position.Modify(ctx, func(position *position) error {
|
err := s.position.Modify(ctx, func(position *position) error {
|
||||||
if position.size == es.AllEvents {
|
if position.size == es.AllEvents {
|
||||||
return nil
|
return nil
|
||||||
|
@ -178,7 +187,13 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) {
|
||||||
var events event.Events
|
var events event.Events
|
||||||
return events, s.position.Modify(ctx, func(position *position) error {
|
return events, s.position.Modify(ctx, func(position *position) error {
|
||||||
var err error
|
var err error
|
||||||
events, err = s.events.Read(ctx, int64(position.idx), position.size)
|
log.Println("pos=", position, s)
|
||||||
|
events, err = s.events.Read(ctx, position.idx, position.size)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err, s)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Println("events=", len(events), s)
|
||||||
position.size = int64(len(events))
|
position.size = int64(len(events))
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
position.idx = int64(events.First().EventMeta().Position - 1)
|
position.idx = int64(events.First().EventMeta().Position - 1)
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||||
if strings.Contains(r.Header.Get("Accept"), "application/json") {
|
if strings.Contains(r.Header.Get("Accept"), "application/json") {
|
||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
|
|
||||||
if err = encodeJSON(w, first, events); err != nil {
|
if err = encodeJSON(w, first, events...); err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
@ -147,7 +147,7 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
if strings.Contains(r.Header.Get("Accept"), "application/json") {
|
if strings.Contains(r.Header.Get("Accept"), "application/json") {
|
||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
if err = encodeJSON(w, first, events); err != nil {
|
if err = encodeJSON(w, first, events...); err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
@ -178,7 +178,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||||
qry := r.URL.Query()
|
qry := r.URL.Query()
|
||||||
|
|
||||||
if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil {
|
if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil {
|
||||||
pos = i
|
pos = i - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Print("WS topic=", name, " idx=", pos)
|
log.Print("WS topic=", name, " idx=", pos)
|
||||||
|
@ -235,15 +235,15 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Println("got events ", len(events))
|
log.Println("got events ", len(events))
|
||||||
for _, e := range events {
|
for i := range events {
|
||||||
e, ok := e.(*PostEvent)
|
e, ok := events[i].(*PostEvent)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Println("send", e.String())
|
log.Println("send", e.String())
|
||||||
|
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err = encodeJSON(&b, first, events); err != nil {
|
if err = encodeJSON(&b, first, e); err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ func fields(s string) []string {
|
||||||
return strings.Split(s, "/")
|
return strings.Split(s, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeJSON(w io.Writer, first event.Event, events event.Events) error {
|
func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error {
|
||||||
out := make([]struct {
|
out := make([]struct {
|
||||||
ID uint64 `json:"id"`
|
ID uint64 `json:"id"`
|
||||||
Payload []byte `json:"payload"`
|
Payload []byte `json:"payload"`
|
||||||
|
|
Loading…
Reference in New Issue
Block a user