From 5f804f3424b8dc8d538f4498bac7f2de5ed130b4 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Wed, 20 May 2020 23:30:24 -0600 Subject: [PATCH] made (loosely) testable against server --- mplexer/packer/conn.go | 9 +- mplexer/packer/decoder.go | 36 ++-- mplexer/packer/listener_test.go | 353 ++++++++++++++++++++++++-------- mplexer/packer/parser.go | 8 +- mplexer/packer/telebit.go | 12 ++ mplexer/packer/v1.go | 13 +- 6 files changed, 308 insertions(+), 123 deletions(-) create mode 100644 mplexer/packer/telebit.go diff --git a/mplexer/packer/conn.go b/mplexer/packer/conn.go index 35a7559..7fb49b3 100644 --- a/mplexer/packer/conn.go +++ b/mplexer/packer/conn.go @@ -1,22 +1,23 @@ package packer import ( - "errors" "net" "time" ) +// Conn TODO rename to Pipe, perhaps? type Conn struct { relaySourceAddr Addr relayRemoteAddr Addr relay net.Conn local net.Conn - updated time.Time + //updated time.Time } // TODO conn.go -> conn/conn.go // TODO NewConn -> New +// NewConn TODO rename to NewPipe, perhaps? func NewConn() *Conn { return nil } @@ -41,11 +42,13 @@ func (c *Conn) Close() error { return c.relay.Close() } +/* // Error signals an error back to the relay func (c *Conn) Error(err error) error { panic(errors.New("not implemented")) return nil } +*/ /* // LocalAddr returns the local network address. @@ -62,7 +65,7 @@ func (c *Conn) LocalAddr() *Addr { } // RemoteAddr returns the remote network address. -func (c *Conn) RemoteAddr() net.Addr { +func (c *Conn) RemoteAddr() *Addr { // TODO is this the right one? return &c.relayRemoteAddr } diff --git a/mplexer/packer/decoder.go b/mplexer/packer/decoder.go index 8665338..adfd0c0 100644 --- a/mplexer/packer/decoder.go +++ b/mplexer/packer/decoder.go @@ -1,42 +1,36 @@ package packer import ( - "context" - "errors" "io" ) -// Decoder handles a ReadCloser stream containing mplexy-encoded clients +// Decoder handles a Reader stream containing mplexy-encoded clients type Decoder struct { - ctx context.Context - r io.ReadCloser + in io.Reader + bufferSize int } // NewDecoder returns an initialized Decoder -func NewDecoder(ctx context.Context, r io.ReadCloser) *Decoder { +func NewDecoder(rin io.Reader) *Decoder { return &Decoder{ - ctx: ctx, - r: r, + in: rin, + bufferSize: defaultBufferSize, } } -// StreamDecode will call WriteMessage as often as addressable data exists, +// Decode will call WriteMessage as often as addressable data exists, // reading up to bufferSize (default 8192) at a time // (header + data, though headers are often sent separately from data). -func (d *Decoder) StreamDecode(handler Handler, bufferSize int) error { - p := NewParser(handler) +func (d *Decoder) Decode(out Router) error { + p := NewParser(out) rx := make(chan []byte) rxErr := make(chan error) - if 0 == bufferSize { - bufferSize = 8192 - } - go func() { for { - b := make([]byte, bufferSize) + b := make([]byte, d.bufferSize) //fmt.Println("loopers gonna loop") - n, err := d.r.Read(b) + n, err := d.in.Read(b) if n > 0 { rx <- b[:n] } @@ -52,20 +46,16 @@ func (d *Decoder) StreamDecode(handler Handler, bufferSize int) error { select { // TODO, do we actually need ctx here? // would it be sufficient to expect the reader to be closed by the caller instead? - case <-d.ctx.Done(): - // TODO: verify that closing the reader will cause the goroutine to be released - d.r.Close() - return errors.New("cancelled by context") case b := <-rx: _, err := p.Write(b) if nil != err { // an error to write represents an unrecoverable error, // not just a downstream client error - d.r.Close() + //d.in.Close() return err } case err := <-rxErr: - d.r.Close() + //d.in.Close() if io.EOF == err { // it can be assumed that err will close though, right return nil diff --git a/mplexer/packer/listener_test.go b/mplexer/packer/listener_test.go index 03f9739..31d4a52 100644 --- a/mplexer/packer/listener_test.go +++ b/mplexer/packer/listener_test.go @@ -8,142 +8,203 @@ import ( "net" "net/http" "os" + "strings" "testing" "time" + jwt "github.com/dgrijalva/jwt-go" + "github.com/gorilla/websocket" ) func TestDialServer(t *testing.T) { - // TODO replace the websocket connection with a mock server - relay := os.Getenv("RELAY") - authz := os.Getenv("SECRET") + relay := "wss://roottest.duckdns.org:8443" + authz, err := getToken("xxxxyyyyssss8347") + if nil != err { + panic(err) + } ctx := context.Background() wsd := websocket.Dialer{} headers := http.Header{} headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz)) // *http.Response - wsconn, _, err := wsd.DialContext(ctx, relay, headers) + sep := "?" + if strings.Contains(relay, sep) { + sep = "&" + } + wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz, headers) if nil != err { + fmt.Println("relay:", relay) t.Fatal(err) return } - mux := &MyMux{} - err = ListenAndServe(wsconn, mux) - t.Fatal(err) + /* + t := telebit.New(token) + mux := telebit.RouteMux{} + mux.HandleTLS("*", mux) // go back to itself + mux.HandleProxy("example.com", "localhost:3000") + mux.HandleTCP("example.com", func (c *telebit.Conn) { + return httpmux.Serve() + }) + + l := t.Listen("wss://example.com") + conn := l.Accept() + telebit.Serve(listener, mux) + t.ListenAndServe("wss://example.com", mux) + */ + + mux := NewRouteMux() + // TODO set failure + t.Fatal(ListenAndServe(wsconn, mux)) +} + +func getToken(secret string) (token string, err error) { + domains := []string{"dandel.duckdns.org"} + tokenData := jwt.MapClaims{"domains": domains} + + jwtToken := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData) + if token, err = jwtToken.SignedString([]byte(secret)); err != nil { + return "", err + } + return token, nil } type Listener struct { - ws *websocket.Conn - incoming chan *Conn - close chan struct{} + ws *websocket.Conn + incoming chan *Conn + close chan struct{} + encoder *Encoder + conns map[string]*Conn + chunksParsed int + bytesRead int } -func ListenAndServe(ws *websocket.Conn, mux Mux) error { +func ListenAndServe(ws *websocket.Conn, mux Handler) error { listener := Listen(ws) return Serve(listener, mux) } func Listen(ws *websocket.Conn) *Listener { + ctx := context.TODO() + + // Wrap the websocket and feed it into the Encoder and Decoder + rw := &WSConn{c: ws, nr: nil} listener := &Listener{ ws: ws, - incoming: make(chan *Conn, 1), + conns: map[string]*Conn{}, + incoming: make(chan *Conn, 1), // buffer ever so slightly close: make(chan struct{}), + encoder: NewEncoder(ctx, rw), } + // TODO perhaps the wrapper should have a mutex + // rather than having a goroutine in the encoder + go func() { + err := listener.encoder.Run() + fmt.Printf("encoder stopped entirely: %q", err) + rw.c.Close() + }() - ctx := context.TODO() - r := &WSConn{ - c: ws, - r: nil, - w: nil, - } - decoder := NewDecoder(ctx, r) - - // Feed websocket into Decoder - th := &testHandler2{ - conns: map[string]*Conn{}, - connCh: listener.incoming, - } + // Decode the stream as it comes in + decoder := NewDecoder(rw) go func() { // TODO pass error to Accept() - err := decoder.StreamDecode(th, 0) - fmt.Printf("the main stream is done: %q", err) + err := decoder.Decode(listener) + rw.Close() + fmt.Printf("the main stream is done: %q\n", err) }() return listener } -type testHandler2 struct { - conns map[string]*Conn - connCh chan *Conn - chunksParsed int - bytesRead int -} +func (l *Listener) RouteBytes(a Addr, b []byte) { + // TODO use context to be able to cancel many at once? + l.chunksParsed++ -func (th *testHandler2) WriteMessage(a Addr, b []byte) { - th.chunksParsed++ addr := &a - _, ok := th.conns[addr.Network()] - if !ok { - rconn, wconn := net.Pipe() - conn := &Conn{ - updated: time.Now(), - relayRemoteAddr: *addr, - relay: rconn, - local: wconn, - } - th.conns[addr.Network()] = conn - th.connCh <- conn + pipe := l.getPipe(addr) + + // handle errors before data writes because I don't + // remember where the error message goes + if "error" == string(addr.scheme) { + pipe.Close() + delete(l.conns, addr.Network()) + fmt.Printf("a stream errored remotely: %v\n", addr) + } + + // write data, if any + if len(b) > 0 { + l.bytesRead += len(b) + pipe.Write(b) + } + // EOF, if needed + if "end" == string(addr.scheme) { + pipe.Close() + delete(l.conns, addr.Network()) } - th.bytesRead += len(b) } -func Serve(listener *Listener, mux Mux) error { - w := &WSConn{ - c: listener.ws, - r: nil, - w: nil, - } - ctx := context.TODO() - encoder := NewEncoder(ctx, w) - encoder.Start() +func (l *Listener) getPipe(addr *Addr) *Conn { + connID := addr.Network() + pipe, ok := l.conns[connID] + // Pipe exists + if ok { + return pipe + } + + // Create pipe + rawPipe, encodable := net.Pipe() + pipe = &Conn{ + //updated: time.Now(), + relayRemoteAddr: *addr, + relay: rawPipe, + } + l.conns[connID] = pipe + l.incoming <- pipe + + // Handle encoding + go func() { + // TODO handle err + err := l.encoder.Encode(encodable, *pipe.LocalAddr()) + // the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such + // or it might be an actual error + // In any case, we'll just close it all + encodable.Close() + pipe.Close() + fmt.Printf("a stream is done: %q\n", err) + }() + + return pipe +} + +func Serve(listener *Listener, mux Handler) error { for { client, err := listener.Accept() if nil != err { return err } - lconn, err := mux.LookupTarget(client.LocalAddr()) - if nil != err { - conn.Close() - continue - } go func() { - // TODO handle err - err := encoder.StreamEncode(*conn.LocalAddr(), lconn, 0) - fmt.Printf("a stream is done: %q", err) + err = mux.Serve(client) + if nil != err { + if io.EOF != err { + fmt.Printf("client could not be served: %q\n", err.Error()) + } + } + client.Close() }() } } -func Blah() { - go func() { - pipe - } - - -} - func (l *Listener) Accept() (*Conn, error) { select { - case conn, ok := <-l.incoming: + case rconn, ok := <-l.incoming: if ok { - return conn, nil + return rconn, nil } return nil, io.EOF @@ -154,17 +215,127 @@ func (l *Listener) Accept() (*Conn, error) { } } -type Mux interface { - LookupTarget(*Addr) (net.Conn, error) +type Handler interface { + Serve(*Conn) error + GetTargetConn(*Addr) (net.Conn, error) } -type MyMux struct { +type RouteMux struct { + defaultTimeout time.Duration +} + +func NewRouteMux() *RouteMux { + mux := &RouteMux{ + defaultTimeout: 45 * time.Second, + } + return mux +} + +func (m *RouteMux) Serve(client *Conn) error { + // TODO could proxy or handle directly, etc + target, err := m.GetTargetConn(client.RemoteAddr()) + if nil != err { + return err + } + + return Forward(client, target, m.defaultTimeout) +} + +// Forward port-forwards a relay (websocket) client to a target (local) server +func Forward(client *Conn, target net.Conn, timeout time.Duration) error { + + // Something like ReadAhead(size) should signal + // to read and send up to `size` bytes without waiting + // for a response - since we can't signal 'non-read' as + // is the normal operation of tcp... or can we? + // And how do we distinguish idle from dropped? + // Maybe this should have been a udp protocol??? + + defer client.Close() + defer target.Close() + + srcCh := make(chan []byte) + dstCh := make(chan []byte) + srcErrCh := make(chan error) + dstErrCh := make(chan error) + + // Source (Relay) Read Channel + go func() { + for { + b := make([]byte, defaultBufferSize) + n, err := client.Read(b) + if n > 0 { + srcCh <- b + } + if nil != err { + // TODO let client log this server-side error (unless EOF) + // (nil here because we probably can't send the error to the relay) + srcErrCh <- err + break + } + } + }() + + // Target (Local) Read Channel + go func() { + for { + b := make([]byte, defaultBufferSize) + n, err := target.Read(b) + if n > 0 { + dstCh <- b + } + if nil != err { + if io.EOF == err { + err = nil + } + dstErrCh <- err + break + } + } + }() + + var err error = nil + for { + select { + // TODO do we need a context here? + //case <-ctx.Done(): + // break + case b := <-srcCh: + client.SetDeadline(time.Now().Add(timeout)) + _, err = target.Write(b) + if nil != err { + fmt.Printf("write to target failed: %q", err.Error()) + break + } + case b := <-dstCh: + target.SetDeadline(time.Now().Add(timeout)) + _, err = client.Write(b) + if nil != err { + fmt.Printf("write to remote failed: %q", err.Error()) + break + } + case err = <-srcErrCh: + if nil != err { + fmt.Printf("read from remote failed: %q", err.Error()) + } + break + case err = <-dstErrCh: + if nil != err { + fmt.Printf("read from target failed: %q", err.Error()) + } + break + + } + } + + client.Close() + return err } // this function is very client-specific logic -func (m *MyMux) LookupTarget(paddr *Addr) (net.Conn, error) { - //if target := LookupPort(paddr.Port()); nil != target { } - if target := m.LookupServername(paddr.Hostname()); nil != target { +func (m *RouteMux) GetTargetConn(paddr *Addr) (net.Conn, error) { + //if target := GetTargetByPort(paddr.Port()); nil != target { } + if target := m.GetTargetByServername(paddr.Hostname()); nil != target { tconn, err := net.Dial(target.Network(), target.Hostname()) if nil != err { return nil, err @@ -185,7 +356,7 @@ func (m *MyMux) LookupTarget(paddr *Addr) (net.Conn, error) { return nil, errors.New("Bad Gateway") } -func (m *MyMux) LookupServername(servername string) *Addr { +func (m *RouteMux) GetTargetByServername(servername string) *Addr { return NewAddr( HTTPS, TCP, // TCP -> termination.None? / Plain? @@ -195,21 +366,21 @@ func (m *MyMux) LookupServername(servername string) *Addr { } type WSConn struct { - c *websocket.Conn - r io.Reader - w io.WriteCloser - pingCh chan struct{} + c *websocket.Conn + nr io.Reader + //w io.WriteCloser + //pingCh chan struct{} } func (ws *WSConn) Read(b []byte) (int, error) { - if nil == ws.r { + if nil == ws.nr { _, r, err := ws.c.NextReader() if nil != err { return 0, err } - ws.r = r + ws.nr = r } - n, err := ws.r.Read(b) + n, err := ws.nr.Read(b) if io.EOF == err { err = nil } @@ -218,17 +389,17 @@ func (ws *WSConn) Read(b []byte) (int, error) { func (ws *WSConn) Write(b []byte) (int, error) { // TODO create or reset ping deadline + // TODO document that more complete writes are preferred? w, err := ws.c.NextWriter(websocket.BinaryMessage) if nil != err { return 0, err } - ws.w = w - n, err := ws.w.Write(b) + n, err := w.Write(b) if nil != err { return n, err } - err = ws.w.Close() + err = w.Close() return n, err } diff --git a/mplexer/packer/parser.go b/mplexer/packer/parser.go index 780e39a..3a515ce 100644 --- a/mplexer/packer/parser.go +++ b/mplexer/packer/parser.go @@ -5,7 +5,7 @@ import ( ) type Parser struct { - handler Handler + handler Router newConns chan *Conn conns map[string]*Conn state ParserState @@ -36,7 +36,7 @@ const ( VersionState State = 0 ) -func NewParser(handler Handler) *Parser { +func NewParser(handler Router) *Parser { return &Parser{ conns: make(map[string]*Conn), newConns: make(chan *Conn, 2), // Buffered to make testing easier @@ -46,8 +46,8 @@ func NewParser(handler Handler) *Parser { } } -type Handler interface { - WriteMessage(Addr, []byte) +type Router interface { + RouteBytes(Addr, []byte) } // Write receives tunnel data and creates or writes to connections diff --git a/mplexer/packer/telebit.go b/mplexer/packer/telebit.go new file mode 100644 index 0000000..9295a99 --- /dev/null +++ b/mplexer/packer/telebit.go @@ -0,0 +1,12 @@ +package packer + +import "errors" + +// Note: 64k is the TCP max, but 1460b is the 100mbit Ethernet max (1500 MTU - overhead), +// but 1Gbit Ethernet (Jumbo frame) has an 9000b MTU +// Nerds posting benchmarks on SO show that 8k seems about right, +// but even 1024b could work well. +var defaultBufferSize = 8192 + +// ErrBadGateway means that the target did not accept the connection +var ErrBadGateway = errors.New("EBADGATEWAY") diff --git a/mplexer/packer/v1.go b/mplexer/packer/v1.go index 6949f9e..5a36858 100644 --- a/mplexer/packer/v1.go +++ b/mplexer/packer/v1.go @@ -8,19 +8,28 @@ import ( ) const ( + // HeaderLengthState is the 2nd (1) state HeaderLengthState State = 1 + iota + // HeaderState is the 3rd (2) state HeaderState + // PayloadState is the 4th (3) state PayloadState ) const ( + // FamilyIndex is the 1st (0) address element, either IPv4 or IPv6 FamilyIndex int = iota + // AddressIndex is the 2nd (1) address element, the IP or Hostname AddressIndex + // PortIndex is the 3rd (2) address element, the Port PortIndex + // LengthIndex is the 4th (3) address element, the Payload size LengthIndex + // ServiceIndex is the 5th (4) address element, the Scheme or Control message type ServiceIndex ) +// Header is the MPLEXY address/control meta data that comes before a packet type Header struct { Family string Address string @@ -178,7 +187,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { */ //fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen) - p.handler.WriteMessage(p.state.addr, []byte{}) + p.handler.RouteBytes(p.state.addr, []byte{}) return b, nil } @@ -198,7 +207,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { return b, nil } */ - p.handler.WriteMessage(p.state.addr, c) + p.handler.RouteBytes(p.state.addr, c) p.consumed += k p.state.payloadWritten += k