feat: add websocket handler

This commit is contained in:
Jon Lundy 2022-08-10 11:19:08 -06:00
parent 72e7d5f265
commit 5e87ccda79
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024

View File

@ -12,6 +12,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/gorilla/websocket"
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
) )
@ -27,7 +28,77 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) {
return &service{es}, nil return &service{es}, nil
} }
var upgrader = websocket.Upgrader{
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if r.Header.Get("Upgrade") == "websocket" {
s.websocket(w, r)
return
}
s.get(w, r)
case http.MethodPost, http.MethodPut:
s.post(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func (s *service) get(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
name, _, _ := strings.Cut(r.URL.Path, "/")
if name == "" {
w.WriteHeader(http.StatusNotFound)
return
}
var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
var pos, count int64 = -1, -99
qry := r.URL.Query()
if i, err := strconv.ParseInt(qry.Get("idx"), 10, 64); err == nil {
pos = i
}
if i, err := strconv.ParseInt(qry.Get("n"), 10, 64); err == nil {
count = i
}
log.Print("GET topic=", name, " idx=", pos, " n=", count)
events, err := s.es.Read(ctx, "post-"+name, pos, count)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events); err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
for i := range events {
fmt.Fprintln(w, events[i])
}
}
func (s *service) post(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
name, tags, _ := strings.Cut(r.URL.Path, "/") name, tags, _ := strings.Cut(r.URL.Path, "/")
@ -41,94 +112,147 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
first = lis[0] first = lis[0]
} }
switch r.Method { b, err := io.ReadAll(io.LimitReader(r.Body, 64*1024))
case http.MethodGet: if err != nil {
var pos, count int64 = -1, -99 log.Print(err)
qry := r.URL.Query() w.WriteHeader(http.StatusBadRequest)
if i, err := strconv.ParseInt(qry.Get("idx"), 10, 64); err == nil {
pos = i
}
if i, err := strconv.ParseInt(qry.Get("n"), 10, 64); err == nil {
count = i
}
log.Print("GET topic=", name, " idx=", pos, " n=", count)
events, err := s.es.Read(ctx, "post-"+name, pos, count)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events); err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
for i := range events {
fmt.Fprintln(w, events[i])
}
return return
case http.MethodPost, http.MethodPut: }
b, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) r.Body.Close()
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusBadRequest)
return
}
r.Body.Close()
if name == "" { if name == "" {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
events := event.NewEvents(&PostEvent{ events := event.NewEvents(&PostEvent{
Payload: b, Payload: b,
Tags: fields(tags), Tags: fields(tags),
}) })
_, err = s.es.Append(r.Context(), "post-"+name, events) _, err = s.es.Append(r.Context(), "post-"+name, events)
if err != nil { if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if first == event.NilEvent {
first = events.First()
}
m := events.First().EventMeta()
log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID)
w.WriteHeader(http.StatusAccepted)
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events); err != nil {
log.Print(err) log.Print(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
if first == event.NilEvent { return
first = events.First() }
}
m := events.First().EventMeta() w.Header().Add("Content-Type", "text/plain")
log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID)
}
func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
w.WriteHeader(http.StatusAccepted) name, _, _ := strings.Cut(r.URL.Path, "/")
if strings.Contains(r.Header.Get("Accept"), "application/json") { if name == "" {
w.Header().Add("Content-Type", "application/json") w.WriteHeader(http.StatusNotFound)
if err = encodeJSON(w, first, events); err != nil { return
log.Print(err) }
w.WriteHeader(http.StatusInternalServerError) var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
var pos int64 = -1
qry := r.URL.Query()
if i, err := strconv.ParseInt(qry.Get("idx"), 10, 64); err == nil {
pos = i
}
log.Print("WS topic=", name, " idx=", pos)
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
ctx, cancel := context.WithCancel(ctx)
c.SetCloseHandler(func(code int, text string) error {
cancel()
return nil
})
go func() {
for {
if err := ctx.Err(); err != nil {
return return
} }
mt, message, err := c.ReadMessage()
return if err != nil {
log.Println("read:", err)
return
}
log.Printf("recv: %d %s", mt, message)
} }
}()
w.Header().Add("Content-Type", "text/plain") es := s.es.EventStream()
fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID) if es == nil {
log.Println("EventStore does not implement streaming")
w.WriteHeader(http.StatusInternalServerError)
return return
default: }
w.WriteHeader(http.StatusMethodNotAllowed)
sub, err := es.Subscribe(ctx, "post-"+name, pos)
if err != nil {
log.Println(err)
return
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
log.Println("stop ws")
sub.Close(ctx)
}()
log.Println("start ws")
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
log.Println("got events ", len(events))
for _, e := range events {
e, ok := e.(*PostEvent)
if !ok {
continue
}
log.Println("send", e.String())
var b bytes.Buffer
if err = encodeJSON(&b, first, events); err != nil {
log.Print(err)
}
err = c.WriteMessage(websocket.TextMessage, b.Bytes())
if err != nil {
log.Println("write:", err)
break
}
}
} }
} }
@ -199,7 +323,7 @@ func encodeJSON(w io.Writer, first event.Event, events event.Events) error {
out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano) out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Payload = e.Payload out[i].Payload = e.Payload
out[i].Tags = e.Tags out[i].Tags = e.Tags
out[i].Topic.Name = e.EventMeta().StreamID out[i].Topic.Name = strings.TrimPrefix(e.EventMeta().StreamID, "post-")
out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano) out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Topic.Seq = e.EventMeta().Position out[i].Topic.Seq = e.EventMeta().Position
} }