diff --git a/mplexer/listener.go b/mplexer/listener.go index 34a1bf2..c3b3b09 100644 --- a/mplexer/listener.go +++ b/mplexer/listener.go @@ -2,10 +2,15 @@ package mplexer import ( "context" + "errors" "fmt" "io" "net" "net/http" + "os" + "time" + + "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" "github.com/gorilla/websocket" ) @@ -13,11 +18,13 @@ import ( // Listener defines a listener for use with http servers type Listener struct { //ParentAddr net.Addr - Conns chan *Conn - ws *websocket.Conn + //Conns chan *Conn + ws *websocket.Conn + ctx context.Context + parser *packer.Parser } -// NewListener creates a channel for connections and returns the listener +// Listen creates a channel for connections and returns the listener func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) { authz, err := m.SortingHat.Authz() if nil != err { @@ -32,32 +39,145 @@ func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) { if nil != err { return nil, err } + + //conns := make(chan *packer.Conn) + //parser := &packer.NewParser(ctx, conns) + + /* + go func() { + conn, err := packer.Accept() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to accept new relayed connection: %s\n", err) + return + } + conns <- conn + }() + */ + + handler := &Handler{} listener := &Listener{ - Conns: make(chan *Conn), + //Conns: conns, + parser: packer.NewParser(ctx, handler), } + go m.listen(ctx, wsconn, listener) return listener, nil } +type Handler struct { +} + +func (h *Handler) WriteMessage(packer.Addr, []byte) { + panic(errors.New("not implemented")) +} + +func (m *MultiplexLocal) listen(ctx context.Context, wsconn *websocket.Conn, listener *Listener) { + // will cancel if ws errors out or closes + // (TODO: this may also be redundant) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Ping every 15 seconds, or stop listening + go func() { + for { + time.Sleep(15 * time.Second) + deadline := time.Now().Add(45 * time.Second) + if err := wsconn.WriteControl(websocket.PingMessage, "", deadline); nil != err { + fmt.Fprintf(os.Stderr, "failed to write ping message to websocket: %s\n", err) + cancel() + break + } + } + }() + + // The write loop (which fails if ping fails) + go func() { + // TODO optimal buffer size + b := make([]byte, 128*1024) + for { + n, err := listener.packer.Read(b) + if n > 0 { + if err := wsconn.WriteMessage(websocket.BinaryMessage, b); nil != err { + fmt.Fprintf(os.Stderr, "failed to write packer message to websocket: %s\n", err) + break + } + } + if nil != err { + if io.EOF != err { + fmt.Fprintf(os.Stderr, "failed to read message from packer: %s\n", err) + break + } + fmt.Fprintf(os.Stderr, "[TODO debug] closed packer: %s\n", err) + break + } + } + // TODO handle EOF as websocket.CloseNormal? + message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection") + deadline := time.Now().Add(10 * time.Second) + if err := wsconn.WriteControl(websocket.CloseMessage, message, deadline); nil != err { + fmt.Fprintf(os.Stderr, "failed to write close message to websocket: %s\n", err) + } + _ = wsconn.Close() + }() + + // The read loop (also fails if ping fails) + for { + _, message, err := wsconn.ReadMessage() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to read message from websocket: %s\n", err) + break + } + + // + _, err = listener.packer.Write(message) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to process message from websocket: %s\n", err) + break + } + } + + // just to be sure + listener.packer.Close() + wsconn.Close() + + return +} + +/* // Feed will block while pushing a net.Conn onto Conns func (l *Listener) Feed(conn *Conn) { l.Conns <- conn } +*/ // net.Listener interface +/* // Accept will block and wait for a new net.Conn func (l *Listener) Accept() (*Conn, error) { - conn, ok := <-l.Conns - if ok { - return conn, nil + select { + case conn, ok := <-l.Conns: + if ok { + return conn, nil + } + return nil, io.EOF + + case <-l.ctx.Done(): + // TODO is another error more suitable? + // TODO is this redundant? + return nil, io.EOF } - return nil, io.EOF +} +*/ + +func (l *Listener) Accept() (*packer.Conn, error) { + return l.Accept() } // Close will close the Conns channel func (l *Listener) Close() error { - close(l.Conns) - return nil + //close(l.Conns) + //return nil + return l.packer.Close() } // Addr returns nil to fulfill the net.Listener interface diff --git a/mplexer/mplexer.go b/mplexer/mplexer.go index b0141c5..ac9c802 100644 --- a/mplexer/mplexer.go +++ b/mplexer/mplexer.go @@ -7,12 +7,15 @@ import ( "net" "os" "time" + + "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" ) type MultiplexLocal struct { Relay string SortingHat SortingHat Timeout time.Duration + listener *Listener } func New(relay string, hat SortingHat) *MultiplexLocal { @@ -24,6 +27,11 @@ func New(relay string, hat SortingHat) *MultiplexLocal { } func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error { + // Cancels if Accept() returns an error (i.e. because it was closed) + // (TODO: this may be redundant) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + listener, err := m.Listen(ctx) if nil != err { return err @@ -39,7 +47,11 @@ func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error { } } -func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) { +func (m *MultiplexLocal) Close() error { + return m.listener.Close() +} + +func (m *MultiplexLocal) serve(ctx context.Context, pconn *packer.Conn) { //paddr := pconn.LocalAddr().(*Addr) // packer.Addr paddr := pconn.LocalAddr() //addr.Network() @@ -54,10 +66,10 @@ func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) { if target, err := m.SortingHat.LookupTarget(paddr); nil != target { if nil != err { // TODO get a log channel or some such - fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s", err) + fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s\n", err) err := pconn.Error(err) if nil != err { - fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s", err) + fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s\n", err) } return } @@ -65,7 +77,7 @@ func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) { } } -func pipePacker(ctx context.Context, pconn *Conn, target net.Conn, timeout time.Duration) { +func pipePacker(ctx context.Context, pconn *packer.Conn, target net.Conn, timeout time.Duration) { // how can this be done so that target errors are // sent back to the relay server? diff --git a/mplexer/packer/addr.go b/mplexer/packer/addr.go index e1d5f89..1841a78 100644 --- a/mplexer/packer/addr.go +++ b/mplexer/packer/addr.go @@ -1,4 +1,4 @@ -package mplexer +package packer import ( "fmt" @@ -24,6 +24,7 @@ const ( type Addr struct { scheme Scheme termination Termination + family string // TODO what should be the format? "tcpv6"? addr string port int } diff --git a/mplexer/packer/conn.go b/mplexer/packer/conn.go index f0e0764..35a7559 100644 --- a/mplexer/packer/conn.go +++ b/mplexer/packer/conn.go @@ -1,4 +1,4 @@ -package mplexer +package packer import ( "errors" @@ -7,12 +7,11 @@ import ( ) type Conn struct { - // TODO - relayRemoteAddr string - relayRemotePort int - relaySourceProto string - relaySourceAddr string - relaySourcePort int + relaySourceAddr Addr + relayRemoteAddr Addr + relay net.Conn + local net.Conn + updated time.Time } // TODO conn.go -> conn/conn.go @@ -26,23 +25,20 @@ func NewConn() *Conn { // Read can be made to time out and return an Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetReadDeadline. func (c *Conn) Read(b []byte) (n int, err error) { - panic(errors.New("not implemented")) - return 0, nil + return c.relay.Read(b) } // Write writes data to the connection. // Write can be made to time out and return an Error with Timeout() == true // after a fixed time limit; see SetDeadline and SetWriteDeadline. func (c *Conn) Write(b []byte) (n int, err error) { - panic(errors.New("not implemented")) - return 0, nil + return c.relay.Write(b) } // Close closes the connection. // Any blocked Read or Write operations will be unblocked and return errors. func (c *Conn) Close() error { - panic(errors.New("not implemented")) - return nil + return c.relay.Close() } // Error signals an error back to the relay @@ -61,14 +57,14 @@ func (c *Conn) LocalAddr() net.Addr { // LocalAddr returns the local network address. func (c *Conn) LocalAddr() *Addr { - panic(errors.New("not implemented")) - return &Addr{} + // TODO is this the right one? + return &c.relaySourceAddr } // RemoteAddr returns the remote network address. func (c *Conn) RemoteAddr() net.Addr { - panic(errors.New("not implemented")) - return &net.IPAddr{} + // TODO is this the right one? + return &c.relayRemoteAddr } // SetDeadline sets the read and write deadlines associated @@ -94,16 +90,14 @@ func (c *Conn) RemoteAddr() net.Addr { // failure on I/O can be detected using // errors.Is(err, syscall.ETIMEDOUT). func (c *Conn) SetDeadline(t time.Time) error { - panic(errors.New("not implemented")) - return nil + return c.relay.SetDeadline(t) } // SetReadDeadline sets the deadline for future Read calls // and any currently-blocked Read call. // A zero value for t means Read will not time out. func (c *Conn) SetReadDeadline(t time.Time) error { - panic(errors.New("not implemented")) - return nil + return c.relay.SetReadDeadline(t) } // SetWriteDeadline sets the deadline for future Write calls @@ -112,6 +106,5 @@ func (c *Conn) SetReadDeadline(t time.Time) error { // some of the data was successfully written. // A zero value for t means Write will not time out. func (c *Conn) SetWriteDeadline(t time.Time) error { - panic(errors.New("not implemented")) - return nil + return c.relay.SetWriteDeadline(t) } diff --git a/mplexer/packer/packer.go b/mplexer/packer/packer.go new file mode 100644 index 0000000..afe1d18 --- /dev/null +++ b/mplexer/packer/packer.go @@ -0,0 +1,12 @@ +package packer + +import ( + "strconv" +) + +func Marshal(addr Addr, body []byte) ([]byte, []byte) { + header := []byte(`IPv4,192.168.1.101,6743,` + strconv.Itoa(len(body)) + `,http,80,ex1.telebit.io`) + raw := []byte{255 - 1, byte(len(header))} + header = append(raw, header...) + return header, body +} diff --git a/mplexer/packer/parser.go b/mplexer/packer/parser.go new file mode 100644 index 0000000..7e3b73f --- /dev/null +++ b/mplexer/packer/parser.go @@ -0,0 +1,88 @@ +package packer + +import ( + "context" + "errors" + "fmt" +) + +type Parser struct { + ctx context.Context + handler Handler + newConns chan *Conn + conns map[string]*Conn + state ParserState + parseState State + dataReady chan struct{} + data []byte + written int +} + +type ParserState struct { + written int + version byte + headerLen int + header []byte + payloadLen int + addr Addr + payloadWritten int +} + +type State int + +const ( + V1 byte = 255 - (1 + iota) + V2 +) + +const ( + VersionState State = 0 +) + +func NewParser(ctx context.Context, handler Handler) *Parser { + return &Parser{ + ctx: ctx, + conns: make(map[string]*Conn), + newConns: make(chan *Conn, 2), // Buffered to make testing easier + dataReady: make(chan struct{}, 2), + data: []byte{}, + handler: handler, + } +} + +type Handler interface { + WriteMessage(Addr, []byte) +} + +// Write receives tunnel data and creates or writes to connections +func (p *Parser) Write(b []byte) (int, error) { + if len(b) < 1 { + return 0, errors.New("developer error: wrote 0 bytes") + } + + // so that we can overwrite the main state + // as soon as a full message has completed + // but still keep the number of bytes written + if 0 == p.state.written { + p.written = 0 + } + + switch p.parseState { + case VersionState: + fmt.Println("version state", b[0]) + p.state.version = b[0] + b = b[1:] + p.state.written += 1 + p.parseState += 1 + default: + // do nothing + } + + switch p.state.version { + case V1: + fmt.Println("v1 unmarshal") + return p.written, p.unpackV1(b) + default: + return 0, errors.New("incorrect version or version not implemented") + } +} diff --git a/mplexer/packer/parser_test.go b/mplexer/packer/parser_test.go new file mode 100644 index 0000000..67d97e9 --- /dev/null +++ b/mplexer/packer/parser_test.go @@ -0,0 +1,69 @@ +package packer + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" +) + +type testHandler struct { + conns map[string]*Conn + chunksParsed int + bytesRead int +} + +func (th *testHandler) WriteMessage(a Addr, b []byte) { + addr := &a + _, ok := th.conns[addr.Network()] + if !ok { + rconn, wconn := net.Pipe() + conn := &Conn{ + updated: time.Now(), + relayRemoteAddr: *addr, + relay: rconn, + local: wconn, + } + th.conns[addr.Network()] = conn + } + th.chunksParsed += 1 + th.bytesRead += len(b) +} + +func TestParseWholeBlock(t *testing.T) { + ctx := context.Background() + //ctx, cancel := context.WithCancel(ctx) + + th := &testHandler{ + conns: map[string]*Conn{}, + } + + p := NewParser(ctx, th) + body := []byte(`Hello, World!`) + fmt.Println("payload len", len(body)) + header := []byte("IPv4,192.168.1.101,6743," + strconv.Itoa(len(body)) + ",http,80,ex1.telebit.io,\n") + fmt.Println("header len", len(header)) + raw := []byte{255 - 1, byte(len(header))} + raw = append(raw, header...) + raw = append(raw, body...) + fmt.Println("total len", len(raw)) + n, err := p.Write(raw) + if nil != err { + t.Fatal(err) + } + + if 1 != len(th.conns) { + t.Fatal("should have parsed one connection") + } + if 1 != th.chunksParsed { + t.Fatal("should have parsed one chunck") + } + if len(body) != th.bytesRead { + t.Fatalf("should have parsed a body of %d bytes, but saw %d\n", len(body), th.bytesRead) + } + if n != len(raw) { + t.Fatalf("should have parsed all %d bytes, not just %d\n", n, len(raw)) + } +} diff --git a/mplexer/packer/server.go b/mplexer/packer/server.go new file mode 100644 index 0000000..743379d --- /dev/null +++ b/mplexer/packer/server.go @@ -0,0 +1,72 @@ +package packer + +import ( + "context" + "errors" +) + +type Server struct { + ctx context.Context + newConns chan *Conn + data []byte + dataReady chan struct{} +} + +func (s *Server) Accept() (*Conn, error) { + select { + case <-s.ctx.Done(): + return nil, errors.New("TODO: ErrClosed") + case conn := <-s.newConns: + return conn, nil + } +} + +// Read packs transforms local responses into wrapped data for the tunnel +func (s *Server) Read(b []byte) (int, error) { + select { + case <-s.ctx.Done(): + return 0, errors.New("TODO: EOF / ErrClosed") + case <-s.dataReady: + if 0 == len(s.data) { + return s.Read(b) + } + return s.read(b) + } +} + +func (s *Server) read(b []byte) (int, error) { + // TODO mutex data while reading, against writing? + + c := len(b) // capacity + a := len(s.data) // available + n := c + + // see if the available data is smaller than the receiving buffer + if a < c { + n = a + } + + // copy available data up to capacity + for i := 0; i < n; i++ { + b[i] = s.data[i] + } + // shrink the data slice by amount read + s.data = s.data[n:] + + // if there's data left over, flag as ready to read again + // otherwise... flag as ready to write? + if len(b) > 0 { + s.dataReady <- struct{}{} + } else { + //p.writeReady <- struct{}{} + } + + // Note a read error should not be possible here + // as all traffic (including errors) can be wrapped + return n, nil +} + +// Close (TODO) should politely close all connections, if possible (set Read() to io.EOF, or use ErrClosed?) +func (s *Server) Close() error { + return errors.New("not implemented") +} diff --git a/mplexer/packer/v1.go b/mplexer/packer/v1.go new file mode 100644 index 0000000..a5dc220 --- /dev/null +++ b/mplexer/packer/v1.go @@ -0,0 +1,196 @@ +package packer + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +const ( + HeaderLengthState State = 1 + iota + HeaderState + PayloadState +) + +const ( + FamilyIndex int = iota + AddressIndex + PortIndex + LengthIndex + ServiceIndex +) + +type Header struct { + Family string + Address string + Port string + Service string +} + +func (p *Parser) unpackV1(b []byte) error { + z := 0 + for { + if z > 10 { + panic("stuck in an infinite loop?") + } + z += 1 + n := len(b) + // at least one loop + if z > 1 && n < 1 { + fmt.Println("v1 end", z, n) + break + } + + var err error + switch p.parseState { + case HeaderLengthState: + fmt.Println("v1 h len") + b = p.unpackV1HeaderLength(b) + case HeaderState: + fmt.Println("v1 header") + b, err = p.unpackV1Header(b, n) + if nil != err { + fmt.Println("v1 header err", err) + return err + } + case PayloadState: + fmt.Println("v1 payload") + // if this payload is complete, reset all state + if p.state.payloadWritten == p.state.payloadLen { + p.state = ParserState{} + } + b, err = p.unpackV1Payload(b, n) + if nil != err { + return err + } + default: + // do nothing + return errors.New("error unpacking") + } + } + + return nil +} + +func (p *Parser) unpackV1HeaderLength(b []byte) []byte { + p.state.headerLen = int(b[0]) + fmt.Println("unpacked header len", p.state.headerLen) + b = b[1:] + p.state.written += 1 + p.parseState += 1 + return b +} + +func (p *Parser) unpackV1Header(b []byte, n int) ([]byte, error) { + fmt.Println("got", len(b), "bytes", string(b)) + m := len(p.state.header) + k := p.state.headerLen - m + if n < k { + k = n + } + p.state.written += k + c := b[0:k] + b = b[k:] + fmt.Println("has", m, "want", k, "more and have", len(b), "more") + p.state.header = append(p.state.header, c...) + if p.state.headerLen != len(p.state.header) { + return b, nil + } + parts := strings.Split(string(p.state.header), ",") + p.state.header = nil + if len(parts) < 5 { + return nil, errors.New("error unpacking header") + } + + payloadLenStr := parts[LengthIndex] + payloadLen, err := strconv.Atoi(payloadLenStr) + if nil != err { + return nil, errors.New("error unpacking header payload length") + } + p.state.payloadLen = payloadLen + port, _ := strconv.Atoi(parts[PortIndex]) + service := parts[ServiceIndex] + + if "control" == service { + return nil, errors.New("'control' messages not implemented") + } + + addr := Addr{ + family: parts[FamilyIndex], + addr: parts[AddressIndex], + port: port, + scheme: Scheme(service), + } + p.state.addr = addr + /* + p.state.conn = p.conns[addr.Network()] + if nil == p.state.conn { + rconn, wconn := net.Pipe() + conn := Conn{ + updated: time.Now(), + relayRemoteAddr: addr, + relay: rconn, + local: wconn, + } + copied := conn + p.state.conn = &copied + p.conns[addr.Network()] = p.state.conn + p.newConns <- p.state.conn + } + */ + p.parseState += 1 + + return b, nil +} + +func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { + // Handle "connect" and "end" + if 0 == p.state.payloadLen { + /* + p.newMsg <- msg{ + addr: Addr, + bytes: []byte{}, + } + + addr := &p.state.conn.relayRemoteAddr + if "end" == string(addr.scheme) { + if err := p.state.conn.Close(); nil != err { + // TODO log potential error? + } + } + return b, nil + */ + + p.handler.WriteMessage(p.state.addr, []byte{}) + return b, nil + } + + k := p.state.payloadLen - p.state.payloadWritten + if k < n { + k = n + } + c := b[0:k] + b = b[k:] + // TODO don't let a write on one connection block others, + // and also put backpressure on just that connection + /* + m, err := p.state.conn.local.Write(c) + p.state.written += m + p.state.payloadWritten += m + if nil != err { + // TODO we want to surface this error somewhere, but not to the websocket + return b, nil + } + */ + p.handler.WriteMessage(p.state.addr, c) + p.state.written += k + p.state.payloadWritten += k + p.written = p.state.written + + // if this payload is complete, reset all state + if p.state.payloadWritten == p.state.payloadLen { + p.state = ParserState{} + } + return b, nil +} diff --git a/mplexer/sortinghat.go b/mplexer/sortinghat.go index e1cf327..6a00db5 100644 --- a/mplexer/sortinghat.go +++ b/mplexer/sortinghat.go @@ -1,10 +1,10 @@ package mplexer import ( - "net" + "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" ) type SortingHat interface { - LookupTarget(*Addr) (net.Conn, error) + LookupTarget(*packer.Addr) (*packer.Conn, error) Authz() (string, error) }