add ping/pong and read/write deadlines websockets

This commit is contained in:
AJ ONeal 2020-07-20 16:07:14 -06:00
parent ce39066bc8
commit 399e8dd651
1 changed files with 76 additions and 16 deletions

View File

@ -15,11 +15,15 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
var defaultReadWait = 20 * time.Second
var defaultWriteWait = 20 * time.Second
// WebsocketTunnel wraps a websocket.Conn instance to behave like net.Conn. // WebsocketTunnel wraps a websocket.Conn instance to behave like net.Conn.
// TODO make conform.
type WebsocketTunnel struct { type WebsocketTunnel struct {
wsconn WSConn wsconn WSConn
tmpr io.Reader readWait time.Duration
writeWait time.Duration
tmpr io.Reader
//w io.WriteCloser //w io.WriteCloser
//pingCh chan struct{} //pingCh chan struct{}
} }
@ -30,6 +34,7 @@ type WSConn interface {
NextWriter(messageType int) (io.WriteCloser, error) NextWriter(messageType int) (io.WriteCloser, error)
WriteControl(messageType int, data []byte, deadline time.Time) error WriteControl(messageType int, data []byte, deadline time.Time) error
WriteMessage(messageType int, data []byte) error WriteMessage(messageType int, data []byte) error
SetPongHandler(h func(appData string) error)
SetReadDeadline(t time.Time) error SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error SetWriteDeadline(t time.Time) error
Close() error Close() error
@ -39,9 +44,44 @@ type WSConn interface {
// NewWebsocketTunnel allocates a new websocket connection wrapper // NewWebsocketTunnel allocates a new websocket connection wrapper
func NewWebsocketTunnel(wsconn WSConn) net.Conn { func NewWebsocketTunnel(wsconn WSConn) net.Conn {
// TODO only set ping when SetReadDeadline would otherwise fail
// See https://github.com/gorilla/websocket/blob/a6870891/examples/chat/conn.go#L86
writeWait := defaultWriteWait
readWait := defaultReadWait
go func() {
// Ping every 15 seconds, or stop listening
for {
time.Sleep(15 * time.Second)
deadline := time.Now().Add(writeWait)
// https://www.gorillatoolkit.org/pkg/websocket
// "The Close and WriteControl methods can be called concurrently with all other methods."
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] sending ping (set write deadline %s)\n", writeWait)
}
if err := wsconn.WriteControl(websocket.PingMessage, []byte(""), deadline); nil != err {
wsconn.Close()
fmt.Fprintf(os.Stderr, "failed to write ping message to websocket: %s\n", err)
break
}
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] sent ping (cleared write deadline)\n")
}
}
}()
wsconn.SetPongHandler(func(pong string) error {
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] received pong (reset read deadline %s): %q\n", readWait, pong)
}
wsconn.SetReadDeadline(time.Now().Add(readWait))
return nil
})
return &WebsocketTunnel{ return &WebsocketTunnel{
wsconn: wsconn, wsconn: wsconn,
tmpr: nil, readWait: readWait,
writeWait: writeWait,
tmpr: nil,
} }
} }
@ -57,16 +97,21 @@ func DialWebsocketTunnel(ctx context.Context, relay, authz string) (net.Conn, er
} }
wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz+"&versions=v1", headers) wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz+"&versions=v1", headers)
if nil != err { if nil != err {
fmt.Println("[debug] [wstun] simple dial failed", err, wsconn, ctx) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] simple dial failed %q %v %v\n", err, wsconn, ctx)
}
} }
return NewWebsocketTunnel(wsconn), err return NewWebsocketTunnel(wsconn), err
} }
func (wsw *WebsocketTunnel) Read(b []byte) (int, error) { func (wsw *WebsocketTunnel) Read(b []byte) (int, error) {
wsw.wsconn.SetReadDeadline(time.Now().Add(wsw.readWait))
if nil == wsw.tmpr { if nil == wsw.tmpr {
_, msgr, err := wsw.wsconn.NextReader() _, msgr, err := wsw.wsconn.NextReader()
if nil != err { if nil != err {
fmt.Println("[debug] [wstun] NextReader err:", err) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] NextReader err: %q\n", err)
}
return 0, err return 0, err
} }
wsw.tmpr = msgr wsw.tmpr = msgr
@ -74,11 +119,11 @@ func (wsw *WebsocketTunnel) Read(b []byte) (int, error) {
n, err := wsw.tmpr.Read(b) n, err := wsw.tmpr.Read(b)
if dbg.Debug { if dbg.Debug {
fmt.Println("[debug] [wstun] Read", n, dbg.Trunc(b, n)) fmt.Fprintf(os.Stderr, "[debug] [wstun] Read %d %v\n", n, dbg.Trunc(b, n))
} }
if nil != err { if nil != err {
if dbg.Debug { if dbg.Debug {
fmt.Println("[debug] [wstun] Read (EOF=WS packet complete) err:", err) fmt.Fprintf(os.Stderr, "[debug] [wstun] Read (EOF=WS packet complete) err: %q\n", err)
} }
if io.EOF == err { if io.EOF == err {
wsw.tmpr = nil wsw.tmpr = nil
@ -90,21 +135,30 @@ func (wsw *WebsocketTunnel) Read(b []byte) (int, error) {
} }
func (wsw *WebsocketTunnel) Write(b []byte) (int, error) { func (wsw *WebsocketTunnel) Write(b []byte) (int, error) {
fmt.Println("[debug] [wstun] Write", len(b)) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] Write %d\n", len(b))
}
// TODO create or reset ping deadline // TODO create or reset ping deadline
// TODO document that more complete writes are preferred? // TODO document that more complete writes are preferred?
wsw.wsconn.SetWriteDeadline(time.Now().Add(wsw.writeWait))
msgw, err := wsw.wsconn.NextWriter(websocket.BinaryMessage) msgw, err := wsw.wsconn.NextWriter(websocket.BinaryMessage)
if nil != err { if nil != err {
fmt.Println("[debug] [wstun] NextWriter err:", err) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] NextWriter err: %q\n", err)
}
return 0, err return 0, err
} }
n, err := msgw.Write(b) n, err := msgw.Write(b)
if nil != err { if nil != err {
fmt.Println("[debug] [wstun] Write err:", err) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] Write err: %q\n", err)
}
return n, err return n, err
} }
fmt.Println("[debug] [wstun] Write n", n, "=", len(b)) if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] Write n %d = %d\n", n, len(b))
}
// if the message error fails, we can assume the websocket is damaged // if the message error fails, we can assume the websocket is damaged
return n, msgw.Close() return n, msgw.Close()
@ -112,7 +166,9 @@ func (wsw *WebsocketTunnel) Write(b []byte) (int, error) {
// Close will close the websocket with a control message // Close will close the websocket with a control message
func (wsw *WebsocketTunnel) Close() error { func (wsw *WebsocketTunnel) Close() error {
fmt.Println("[debug] [wstun] closing the websocket.Conn") if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] closing the websocket.Conn\n")
}
// TODO handle EOF as websocket.CloseNormal? // TODO handle EOF as websocket.CloseNormal?
message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection") message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection")
@ -150,12 +206,16 @@ func (wsw *WebsocketTunnel) SetDeadline(t time.Time) error {
// SetReadDeadline sets the deadline for future Read calls // SetReadDeadline sets the deadline for future Read calls
func (wsw *WebsocketTunnel) SetReadDeadline(t time.Time) error { func (wsw *WebsocketTunnel) SetReadDeadline(t time.Time) error {
fmt.Println("[debug] [wstun] read deadline") if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] read deadline\n")
}
return wsw.wsconn.SetReadDeadline(t) return wsw.wsconn.SetReadDeadline(t)
} }
// SetWriteDeadline sets the deadline for future Write calls // SetWriteDeadline sets the deadline for future Write calls
func (wsw *WebsocketTunnel) SetWriteDeadline(t time.Time) error { func (wsw *WebsocketTunnel) SetWriteDeadline(t time.Time) error {
fmt.Println("[debug] [wstun] write deadline") if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] [wstun] write deadline\n")
}
return wsw.wsconn.SetWriteDeadline(t) return wsw.wsconn.SetWriteDeadline(t)
} }