websocket tunnel interface cleanup

This commit is contained in:
AJ ONeal 2020-05-21 12:28:33 -06:00
parent 39ada8ec7a
commit 0eaa1aca2a
4 changed files with 91 additions and 77 deletions

View File

@ -4,15 +4,12 @@ import (
"context"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
jwt "github.com/dgrijalva/jwt-go"
"github.com/gorilla/websocket"
_ "github.com/joho/godotenv/autoload"
)
@ -20,63 +17,54 @@ import (
func main() {
// TODO replace the websocket connection with a mock server
relay := os.Getenv("RELAY") // "wss://roottest.duckdns.org:8443"
relay := os.Getenv("RELAY") // "wss://example.com:443"
authz, err := getToken(os.Getenv("SECRET"))
if nil != err {
panic(err)
}
ctx := context.Background()
wsd := websocket.Dialer{}
headers := http.Header{}
headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz))
// *http.Response
sep := "?"
if strings.Contains(relay, sep) {
sep = "&"
}
wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz, headers)
if nil != err {
fmt.Println("relay:", relay)
log.Fatal(err)
return
}
/*
// TODO for http proxy
return mplexer.TargetOptions {
Hostname // default localhost
Termination // default TLS
XFWD // default... no?
Port // default 0
Conn // should be dialed beforehand
}, nil
*/
/*
t := telebit.New(token)
mux := telebit.RouteMux{}
mux.HandleTLS("*", mux) // go back to itself
mux.HandleProxy("example.com", "localhost:3000")
mux.HandleTCP("example.com", func (c *telebit.Conn) {
return httpmux.Serve()
})
l := t.Listen("wss://example.com")
conn := l.Accept()
telebit.Serve(listener, mux)
t.ListenAndServe("wss://example.com", mux)
*/
mux := packer.NewRouteMux()
//mux.HandleTLS("*", mux.TerminateTLS(mux))
mux.ForwardTCP("*", "localhost:3000", 120*time.Second)
// TODO set failure
wsw := packer.NewWSWrap(wsconn)
log.Fatal("Closed server: ", packer.ListenAndServe(wsw, mux))
tun, err := packer.DialWebsocketTunnel(ctx, relay, authz)
if nil != err {
fmt.Println("relay:", relay)
log.Fatal(err)
return
}
log.Fatal("Closed server: ", packer.ListenAndServe(tun, mux))
}
/*
// TODO for http proxy
return mplexer.TargetOptions {
Hostname // default localhost
Termination // default TLS
XFWD // default... no?
Port // default 0
Conn // should be dialed beforehand
}, nil
*/
/*
t := telebit.New(token)
mux := telebit.RouteMux{}
mux.HandleTLS("*", mux) // go back to itself
mux.HandleProxy("example.com", "localhost:3000")
mux.HandleTCP("example.com", func (c *telebit.Conn) {
return httpmux.Serve()
})
l := t.Listen("wss://example.com")
conn := l.Accept()
telebit.Serve(listener, mux)
t.ListenAndServe("wss://example.com", mux)
*/
func getToken(secret string) (token string, err error) {
domains := []string{"dandel.duckdns.org"}
tokenData := jwt.MapClaims{"domains": domains}

View File

@ -11,7 +11,7 @@ import (
// A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections.
type Listener struct {
//wsconn *websocket.Conn
wsw *WSWrap
tun net.Conn
incoming chan *Conn
close chan struct{}
encoder *Encoder
@ -22,15 +22,15 @@ type Listener struct {
}
// Listen creates a new Listener and sets it up to receive and distribute connections.
func Listen(wsw *WSWrap) *Listener {
func Listen(tun net.Conn) *Listener {
ctx := context.TODO()
// Feed the socket into the Encoder and Decoder
listener := &Listener{
wsw: wsw,
tun: tun,
incoming: make(chan *Conn, 1), // buffer ever so slightly
close: make(chan struct{}),
encoder: NewEncoder(ctx, wsw),
encoder: NewEncoder(ctx, tun),
conns: map[string]net.Conn{},
//conns: map[string]*Conn{},
}
@ -40,11 +40,11 @@ func Listen(wsw *WSWrap) *Listener {
go func() {
err := listener.encoder.Run()
fmt.Printf("encoder stopped entirely: %q", err)
wsw.Close()
tun.Close()
}()
// Decode the stream as it comes in
decoder := NewDecoder(wsw)
decoder := NewDecoder(tun)
go func() {
// TODO pass error to Accept()
err := decoder.Decode(listener)
@ -60,8 +60,8 @@ func Listen(wsw *WSWrap) *Listener {
}
// ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler
func ListenAndServe(wsw *WSWrap, mux Handler) error {
listener := Listen(wsw)
func ListenAndServe(tun net.Conn, mux Handler) error {
listener := Listen(tun)
return Serve(listener, mux)
}
@ -102,7 +102,7 @@ func (l *Listener) Accept() (*Conn, error) {
// Close stops accepting new connections and closes the underlying websocket.
// TODO return errors.
func (l *Listener) Close() error {
l.wsw.Close()
l.tun.Close()
close(l.incoming)
l.close <- struct{}{}
return nil

View File

@ -10,10 +10,10 @@ func TestDialServer(t *testing.T) {
// TODO replace the websocket connection with a mock server
//ctx := context.Background()
wsw := &WSWrap{}
tun := &WebsocketTunnel{}
mux := NewRouteMux()
t.Fatal(ListenAndServe(wsw, mux))
t.Fatal(ListenAndServe(tun, mux))
}
var ErrNoImpl error = errors.New("not implemented")

View File

@ -1,18 +1,21 @@
package packer
import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/gorilla/websocket"
)
// WSWrap wraps a websocket.Conn instance to behave like net.Conn.
// WebsocketTunnel wraps a websocket.Conn instance to behave like net.Conn.
// TODO make conform.
type WSWrap struct {
type WebsocketTunnel struct {
wsconn WSConn
tmpr io.Reader
//w io.WriteCloser
@ -26,20 +29,35 @@ type WSConn interface {
WriteControl(messageType int, data []byte, deadline time.Time) error
WriteMessage(messageType int, data []byte) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
Close() error
RemoteAddr() net.Addr
// LocalAddr() net.Addr
}
// NewWSWrap allocates a new websocket connection wrapper
func NewWSWrap(wsconn WSConn) *WSWrap {
return &WSWrap{
// NewWebsocketTunnel allocates a new websocket connection wrapper
func NewWebsocketTunnel(wsconn WSConn) net.Conn {
return &WebsocketTunnel{
wsconn: wsconn,
tmpr: nil,
}
}
func (wsw *WSWrap) Read(b []byte) (int, error) {
// DialWebsocketTunnel connects to the given websocket relay as wraps it as net.Conn
func DialWebsocketTunnel(ctx context.Context, relay, authz string) (net.Conn, error) {
wsd := websocket.Dialer{}
headers := http.Header{}
headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz))
// *http.Response
sep := "?"
if strings.Contains(relay, sep) {
sep = "&"
}
wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz+"&versions=v1", headers)
return NewWebsocketTunnel(wsconn), err
}
func (wsw *WebsocketTunnel) Read(b []byte) (int, error) {
if nil == wsw.tmpr {
_, msgr, err := wsw.wsconn.NextReader()
if nil != err {
@ -61,7 +79,7 @@ func (wsw *WSWrap) Read(b []byte) (int, error) {
return n, err
}
func (wsw *WSWrap) Write(b []byte) (int, error) {
func (wsw *WebsocketTunnel) Write(b []byte) (int, error) {
// TODO create or reset ping deadline
// TODO document that more complete writes are preferred?
@ -81,7 +99,7 @@ func (wsw *WSWrap) Write(b []byte) (int, error) {
}
// Close will close the websocket with a control message
func (wsw *WSWrap) Close() error {
func (wsw *WebsocketTunnel) Close() error {
fmt.Println("[debug] closing the websocket.Conn")
// TODO handle EOF as websocket.CloseNormal?
@ -95,27 +113,35 @@ func (wsw *WSWrap) Close() error {
return err
}
// LocalAddr returns the local network address.
func (wsw *WSWrap) LocalAddr() *Addr {
panic("not implemented")
// LocalAddr is not implemented and will panic
func (wsw *WebsocketTunnel) LocalAddr() net.Addr {
// TODO do we reverse this since the "local" address is that of the relay?
// return wsw.wsconn.RemoteAddr()
panic("LocalAddr() not implemented")
}
// RemoteAddr returns the remote network address.
func (wsw *WSWrap) RemoteAddr() *Addr {
panic("not implemented")
// RemoteAddr is not implemented and will panic. Additionally, it wouldn't mean anything useful anyway.
func (wsw *WebsocketTunnel) RemoteAddr() net.Addr {
// TODO do we reverse this since the "remote" address means nothing / is that of one of the clients?
// return wsw.wsconn.LocalAddr()
panic("RemoteAddr() not implemented")
}
// SetDeadline sets the read and write deadlines associated
func (wsw *WSWrap) SetDeadline(t time.Time) error {
panic("not implemented")
func (wsw *WebsocketTunnel) SetDeadline(t time.Time) error {
err := wsw.SetReadDeadline(t)
if nil == err {
err = wsw.SetWriteDeadline(t)
}
return err
}
// SetReadDeadline sets the deadline for future Read calls
func (wsw *WSWrap) SetReadDeadline(t time.Time) error {
panic("not implemented")
func (wsw *WebsocketTunnel) SetReadDeadline(t time.Time) error {
return wsw.wsconn.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
func (wsw *WSWrap) SetWriteDeadline(t time.Time) error {
panic("not implemented")
func (wsw *WebsocketTunnel) SetWriteDeadline(t time.Time) error {
return wsw.wsconn.SetWriteDeadline(t)
}