From 0eaa1aca2a1442e9952521d2a1aea49c9040c04b Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Thu, 21 May 2020 12:28:33 -0600 Subject: [PATCH] websocket tunnel interface cleanup --- mplexer/cmd/telebit/telebit.go | 80 ++++++++----------- mplexer/packer/listener.go | 18 ++--- mplexer/packer/listener_test.go | 4 +- .../packer/{wswrap.go => websockettunnel.go} | 66 ++++++++++----- 4 files changed, 91 insertions(+), 77 deletions(-) rename mplexer/packer/{wswrap.go => websockettunnel.go} (53%) diff --git a/mplexer/cmd/telebit/telebit.go b/mplexer/cmd/telebit/telebit.go index 22dcfb3..4800b2e 100644 --- a/mplexer/cmd/telebit/telebit.go +++ b/mplexer/cmd/telebit/telebit.go @@ -4,15 +4,12 @@ import ( "context" "fmt" "log" - "net/http" "os" - "strings" "time" "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" jwt "github.com/dgrijalva/jwt-go" - "github.com/gorilla/websocket" _ "github.com/joho/godotenv/autoload" ) @@ -20,63 +17,54 @@ import ( func main() { // TODO replace the websocket connection with a mock server - relay := os.Getenv("RELAY") // "wss://roottest.duckdns.org:8443" + relay := os.Getenv("RELAY") // "wss://example.com:443" authz, err := getToken(os.Getenv("SECRET")) if nil != err { panic(err) } ctx := context.Background() - wsd := websocket.Dialer{} - headers := http.Header{} - headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz)) - // *http.Response - sep := "?" - if strings.Contains(relay, sep) { - sep = "&" - } - wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz, headers) - if nil != err { - fmt.Println("relay:", relay) - log.Fatal(err) - return - } - - /* - // TODO for http proxy - return mplexer.TargetOptions { - Hostname // default localhost - Termination // default TLS - XFWD // default... no? - Port // default 0 - Conn // should be dialed beforehand - }, nil - */ - - /* - t := telebit.New(token) - mux := telebit.RouteMux{} - mux.HandleTLS("*", mux) // go back to itself - mux.HandleProxy("example.com", "localhost:3000") - mux.HandleTCP("example.com", func (c *telebit.Conn) { - return httpmux.Serve() - }) - - l := t.Listen("wss://example.com") - conn := l.Accept() - telebit.Serve(listener, mux) - t.ListenAndServe("wss://example.com", mux) - */ mux := packer.NewRouteMux() //mux.HandleTLS("*", mux.TerminateTLS(mux)) mux.ForwardTCP("*", "localhost:3000", 120*time.Second) // TODO set failure - wsw := packer.NewWSWrap(wsconn) - log.Fatal("Closed server: ", packer.ListenAndServe(wsw, mux)) + tun, err := packer.DialWebsocketTunnel(ctx, relay, authz) + if nil != err { + fmt.Println("relay:", relay) + log.Fatal(err) + return + } + log.Fatal("Closed server: ", packer.ListenAndServe(tun, mux)) } +/* + // TODO for http proxy + return mplexer.TargetOptions { + Hostname // default localhost + Termination // default TLS + XFWD // default... no? + Port // default 0 + Conn // should be dialed beforehand + }, nil +*/ + +/* + t := telebit.New(token) + mux := telebit.RouteMux{} + mux.HandleTLS("*", mux) // go back to itself + mux.HandleProxy("example.com", "localhost:3000") + mux.HandleTCP("example.com", func (c *telebit.Conn) { + return httpmux.Serve() + }) + + l := t.Listen("wss://example.com") + conn := l.Accept() + telebit.Serve(listener, mux) + t.ListenAndServe("wss://example.com", mux) +*/ + func getToken(secret string) (token string, err error) { domains := []string{"dandel.duckdns.org"} tokenData := jwt.MapClaims{"domains": domains} diff --git a/mplexer/packer/listener.go b/mplexer/packer/listener.go index f2eca0e..c655a03 100644 --- a/mplexer/packer/listener.go +++ b/mplexer/packer/listener.go @@ -11,7 +11,7 @@ import ( // A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections. type Listener struct { //wsconn *websocket.Conn - wsw *WSWrap + tun net.Conn incoming chan *Conn close chan struct{} encoder *Encoder @@ -22,15 +22,15 @@ type Listener struct { } // Listen creates a new Listener and sets it up to receive and distribute connections. -func Listen(wsw *WSWrap) *Listener { +func Listen(tun net.Conn) *Listener { ctx := context.TODO() // Feed the socket into the Encoder and Decoder listener := &Listener{ - wsw: wsw, + tun: tun, incoming: make(chan *Conn, 1), // buffer ever so slightly close: make(chan struct{}), - encoder: NewEncoder(ctx, wsw), + encoder: NewEncoder(ctx, tun), conns: map[string]net.Conn{}, //conns: map[string]*Conn{}, } @@ -40,11 +40,11 @@ func Listen(wsw *WSWrap) *Listener { go func() { err := listener.encoder.Run() fmt.Printf("encoder stopped entirely: %q", err) - wsw.Close() + tun.Close() }() // Decode the stream as it comes in - decoder := NewDecoder(wsw) + decoder := NewDecoder(tun) go func() { // TODO pass error to Accept() err := decoder.Decode(listener) @@ -60,8 +60,8 @@ func Listen(wsw *WSWrap) *Listener { } // ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler -func ListenAndServe(wsw *WSWrap, mux Handler) error { - listener := Listen(wsw) +func ListenAndServe(tun net.Conn, mux Handler) error { + listener := Listen(tun) return Serve(listener, mux) } @@ -102,7 +102,7 @@ func (l *Listener) Accept() (*Conn, error) { // Close stops accepting new connections and closes the underlying websocket. // TODO return errors. func (l *Listener) Close() error { - l.wsw.Close() + l.tun.Close() close(l.incoming) l.close <- struct{}{} return nil diff --git a/mplexer/packer/listener_test.go b/mplexer/packer/listener_test.go index 3d76437..e170afa 100644 --- a/mplexer/packer/listener_test.go +++ b/mplexer/packer/listener_test.go @@ -10,10 +10,10 @@ func TestDialServer(t *testing.T) { // TODO replace the websocket connection with a mock server //ctx := context.Background() - wsw := &WSWrap{} + tun := &WebsocketTunnel{} mux := NewRouteMux() - t.Fatal(ListenAndServe(wsw, mux)) + t.Fatal(ListenAndServe(tun, mux)) } var ErrNoImpl error = errors.New("not implemented") diff --git a/mplexer/packer/wswrap.go b/mplexer/packer/websockettunnel.go similarity index 53% rename from mplexer/packer/wswrap.go rename to mplexer/packer/websockettunnel.go index 92fc1b4..31d4d62 100644 --- a/mplexer/packer/wswrap.go +++ b/mplexer/packer/websockettunnel.go @@ -1,18 +1,21 @@ package packer import ( + "context" "fmt" "io" "net" + "net/http" "os" + "strings" "time" "github.com/gorilla/websocket" ) -// WSWrap wraps a websocket.Conn instance to behave like net.Conn. +// WebsocketTunnel wraps a websocket.Conn instance to behave like net.Conn. // TODO make conform. -type WSWrap struct { +type WebsocketTunnel struct { wsconn WSConn tmpr io.Reader //w io.WriteCloser @@ -26,20 +29,35 @@ type WSConn interface { WriteControl(messageType int, data []byte, deadline time.Time) error WriteMessage(messageType int, data []byte) error SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error Close() error RemoteAddr() net.Addr // LocalAddr() net.Addr } -// NewWSWrap allocates a new websocket connection wrapper -func NewWSWrap(wsconn WSConn) *WSWrap { - return &WSWrap{ +// NewWebsocketTunnel allocates a new websocket connection wrapper +func NewWebsocketTunnel(wsconn WSConn) net.Conn { + return &WebsocketTunnel{ wsconn: wsconn, tmpr: nil, } } -func (wsw *WSWrap) Read(b []byte) (int, error) { +// DialWebsocketTunnel connects to the given websocket relay as wraps it as net.Conn +func DialWebsocketTunnel(ctx context.Context, relay, authz string) (net.Conn, error) { + wsd := websocket.Dialer{} + headers := http.Header{} + headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz)) + // *http.Response + sep := "?" + if strings.Contains(relay, sep) { + sep = "&" + } + wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz+"&versions=v1", headers) + return NewWebsocketTunnel(wsconn), err +} + +func (wsw *WebsocketTunnel) Read(b []byte) (int, error) { if nil == wsw.tmpr { _, msgr, err := wsw.wsconn.NextReader() if nil != err { @@ -61,7 +79,7 @@ func (wsw *WSWrap) Read(b []byte) (int, error) { return n, err } -func (wsw *WSWrap) Write(b []byte) (int, error) { +func (wsw *WebsocketTunnel) Write(b []byte) (int, error) { // TODO create or reset ping deadline // TODO document that more complete writes are preferred? @@ -81,7 +99,7 @@ func (wsw *WSWrap) Write(b []byte) (int, error) { } // Close will close the websocket with a control message -func (wsw *WSWrap) Close() error { +func (wsw *WebsocketTunnel) Close() error { fmt.Println("[debug] closing the websocket.Conn") // TODO handle EOF as websocket.CloseNormal? @@ -95,27 +113,35 @@ func (wsw *WSWrap) Close() error { return err } -// LocalAddr returns the local network address. -func (wsw *WSWrap) LocalAddr() *Addr { - panic("not implemented") +// LocalAddr is not implemented and will panic +func (wsw *WebsocketTunnel) LocalAddr() net.Addr { + // TODO do we reverse this since the "local" address is that of the relay? + // return wsw.wsconn.RemoteAddr() + panic("LocalAddr() not implemented") } -// RemoteAddr returns the remote network address. -func (wsw *WSWrap) RemoteAddr() *Addr { - panic("not implemented") +// RemoteAddr is not implemented and will panic. Additionally, it wouldn't mean anything useful anyway. +func (wsw *WebsocketTunnel) RemoteAddr() net.Addr { + // TODO do we reverse this since the "remote" address means nothing / is that of one of the clients? + // return wsw.wsconn.LocalAddr() + panic("RemoteAddr() not implemented") } // SetDeadline sets the read and write deadlines associated -func (wsw *WSWrap) SetDeadline(t time.Time) error { - panic("not implemented") +func (wsw *WebsocketTunnel) SetDeadline(t time.Time) error { + err := wsw.SetReadDeadline(t) + if nil == err { + err = wsw.SetWriteDeadline(t) + } + return err } // SetReadDeadline sets the deadline for future Read calls -func (wsw *WSWrap) SetReadDeadline(t time.Time) error { - panic("not implemented") +func (wsw *WebsocketTunnel) SetReadDeadline(t time.Time) error { + return wsw.wsconn.SetReadDeadline(t) } // SetWriteDeadline sets the deadline for future Write calls -func (wsw *WSWrap) SetWriteDeadline(t time.Time) error { - panic("not implemented") +func (wsw *WebsocketTunnel) SetWriteDeadline(t time.Time) error { + return wsw.wsconn.SetWriteDeadline(t) }