From 53478d35fd416dcf237bce66c0710d81128cc63e Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Fri, 22 May 2020 04:41:24 -0600 Subject: [PATCH] rename package packer->telebit --- mplexer/{packer => }/addr.go | 2 +- mplexer/cmd/client/client.go | 62 ----- mplexer/cmd/telebit/telebit.go | 10 +- mplexer/{packer => }/conn.go | 2 +- mplexer/{packer => }/connwrap.go | 2 +- mplexer/{packer => }/decoder.go | 2 +- mplexer/{packer => }/decoder_test.go | 2 +- mplexer/{packer => }/encoder.go | 2 +- mplexer/{packer => }/encoder_test.go | 2 +- mplexer/listener.go | 288 ++++++++++++------------ mplexer/{packer => }/listener_test.go | 2 +- mplexer/mplexer.go | 160 ------------- mplexer/{packer => }/packer.go | 2 +- mplexer/packer/listener.go | 176 --------------- mplexer/{packer => }/packer_test.go | 2 +- mplexer/{packer => }/parser.go | 2 +- mplexer/{packer => }/parser_test.go | 2 +- mplexer/{packer => }/routemux.go | 2 +- mplexer/{packer => }/server.go | 2 +- mplexer/sortinghat.go | 10 - mplexer/{packer => }/telebit.go | 2 +- mplexer/{packer => }/v1.go | 2 +- mplexer/{packer => }/websockettunnel.go | 2 +- mplexer/{packer => }/wsjunk_test.go | 2 +- 24 files changed, 161 insertions(+), 581 deletions(-) rename mplexer/{packer => }/addr.go (98%) delete mode 100644 mplexer/cmd/client/client.go rename mplexer/{packer => }/conn.go (99%) rename mplexer/{packer => }/connwrap.go (99%) rename mplexer/{packer => }/decoder.go (98%) rename mplexer/{packer => }/decoder_test.go (98%) rename mplexer/{packer => }/encoder.go (99%) rename mplexer/{packer => }/encoder_test.go (99%) rename mplexer/{packer => }/listener_test.go (98%) delete mode 100644 mplexer/mplexer.go rename mplexer/{packer => }/packer.go (96%) delete mode 100644 mplexer/packer/listener.go rename mplexer/{packer => }/packer_test.go (98%) rename mplexer/{packer => }/parser.go (99%) rename mplexer/{packer => }/parser_test.go (99%) rename mplexer/{packer => }/routemux.go (99%) rename mplexer/{packer => }/server.go (98%) delete mode 100644 mplexer/sortinghat.go rename mplexer/{packer => }/telebit.go (99%) rename mplexer/{packer => }/v1.go (99%) rename mplexer/{packer => }/websockettunnel.go (99%) rename mplexer/{packer => }/wsjunk_test.go (98%) diff --git a/mplexer/packer/addr.go b/mplexer/addr.go similarity index 98% rename from mplexer/packer/addr.go rename to mplexer/addr.go index acde363..1ca380c 100644 --- a/mplexer/packer/addr.go +++ b/mplexer/addr.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "fmt" diff --git a/mplexer/cmd/client/client.go b/mplexer/cmd/client/client.go deleted file mode 100644 index a60b129..0000000 --- a/mplexer/cmd/client/client.go +++ /dev/null @@ -1,62 +0,0 @@ -package main - -import ( - "context" - "net" - - "git.coolaj86.com/coolaj86/go-telebitd/mplexer" -) - -func main() { - r := &Router{ - secret: os.Getenv("SECRET"), - } - m := &mplexer.MultiplexLocal{ - Relay: os.Getenv("RELAY"), - SortingHat: r, - } - - ctx := context.Background() - - // TODO more m.ListenAndServe(mux) style? - m.ListenAndServe(ctx) -} - -type Router struct { - secret string -} - -func (r *Router) Authz() (string, error) { - return r.secret, nil -} - -// this function is very client-specific logic -func (r *Router) LookupTarget(paddr packer.Addr) (net.Conn, error) { - //if target := LookupPort(paddr.Servername()); nil != target { } - if target := r.LookupServername(paddr.Port()); nil != target { - tconn, err := net.Dial(target.Network(), target.Hostname()) - if nil != err { - return nil, err - } - /* - // TODO for http proxy - return mplexer.TargetOptions { - Hostname // default localhost - Termination // default TLS - XFWD // default... no? - Port // default 0 - Conn // should be dialed beforehand - }, nil - */ - return tconn, nil - } -} - -func (r *Router) LookupServername(servername string) mplexer.Addr { - return &mplexer.NewAddr( - mplexer.HTTPS, - mplexer.TCP, // TCP -> termination.None? / Plain? - "localhost", - 3000, - ) -} diff --git a/mplexer/cmd/telebit/telebit.go b/mplexer/cmd/telebit/telebit.go index 101604a..0018795 100644 --- a/mplexer/cmd/telebit/telebit.go +++ b/mplexer/cmd/telebit/telebit.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" + telebit "git.coolaj86.com/coolaj86/go-telebitd/mplexer" "github.com/caddyserver/certmagic" jwt "github.com/dgrijalva/jwt-go" @@ -109,7 +109,7 @@ func main() { ctx := context.Background() - acme := &packer.ACME{ + acme := &telebit.ACME{ Email: *email, StoragePath: *certpath, Agree: *acmeAgree, @@ -119,14 +119,14 @@ func main() { EnableTLSALPNChallenge: enableTLSALPN01, } - mux := packer.NewRouteMux() + mux := telebit.NewRouteMux() mux.HandleTLS("*", acme, mux) for _, fwd := range forwards { mux.ForwardTCP("*", "localhost:"+fwd.port, 120*time.Second) //mux.ForwardTCP(fwd.pattern, "localhost:"+fwd.port, 120*time.Second) } - tun, err := packer.DialWebsocketTunnel(ctx, *relay, *token) + tun, err := telebit.DialWebsocketTunnel(ctx, *relay, *token) if nil != err { fmt.Println("relay:", relay) log.Fatal(err) @@ -134,7 +134,7 @@ func main() { } fmt.Printf("Listening at %s\n", *relay) - log.Fatal("Closed server: ", packer.ListenAndServe(tun, mux)) + log.Fatal("Closed server: ", telebit.ListenAndServe(tun, mux)) } type ACMEProvider struct { diff --git a/mplexer/packer/conn.go b/mplexer/conn.go similarity index 99% rename from mplexer/packer/conn.go rename to mplexer/conn.go index 5887395..c9beee1 100644 --- a/mplexer/packer/conn.go +++ b/mplexer/conn.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "net" diff --git a/mplexer/packer/connwrap.go b/mplexer/connwrap.go similarity index 99% rename from mplexer/packer/connwrap.go rename to mplexer/connwrap.go index ab731b8..ef0890b 100644 --- a/mplexer/packer/connwrap.go +++ b/mplexer/connwrap.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "net" diff --git a/mplexer/packer/decoder.go b/mplexer/decoder.go similarity index 98% rename from mplexer/packer/decoder.go rename to mplexer/decoder.go index adfd0c0..d0912b5 100644 --- a/mplexer/packer/decoder.go +++ b/mplexer/decoder.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "io" diff --git a/mplexer/packer/decoder_test.go b/mplexer/decoder_test.go similarity index 98% rename from mplexer/packer/decoder_test.go rename to mplexer/decoder_test.go index ded7c70..6d90b66 100644 --- a/mplexer/packer/decoder_test.go +++ b/mplexer/decoder_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "net" diff --git a/mplexer/packer/encoder.go b/mplexer/encoder.go similarity index 99% rename from mplexer/packer/encoder.go rename to mplexer/encoder.go index 81b4053..dd19304 100644 --- a/mplexer/packer/encoder.go +++ b/mplexer/encoder.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "context" diff --git a/mplexer/packer/encoder_test.go b/mplexer/encoder_test.go similarity index 99% rename from mplexer/packer/encoder_test.go rename to mplexer/encoder_test.go index 57a8569..27e4714 100644 --- a/mplexer/packer/encoder_test.go +++ b/mplexer/encoder_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "context" diff --git a/mplexer/listener.go b/mplexer/listener.go index cd77335..02e0d6f 100644 --- a/mplexer/listener.go +++ b/mplexer/listener.go @@ -1,188 +1,176 @@ -package mplexer +package telebit import ( "context" - "errors" "fmt" "io" "net" "net/http" - "os" - "time" - - "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" - - "github.com/gorilla/websocket" ) -// Listener defines a listener for use with http servers +// A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections. type Listener struct { - //ParentAddr net.Addr - //Conns chan *Conn - ws *websocket.Conn - ctx context.Context - parser *packer.Parser + //wsconn *websocket.Conn + tun net.Conn + incoming chan *Conn + close chan struct{} + encoder *Encoder + chunksParsed int + bytesRead int + conns map[string]net.Conn + //conns map[string]*Conn } -// Listen creates a channel for connections and returns the listener -func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) { - authz, err := m.SortingHat.Authz() - if nil != err { - return nil, err - } +// Listen creates a new Listener and sets it up to receive and distribute connections. +func Listen(tun net.Conn) *Listener { + ctx := context.TODO() - wsd := websocket.Dialer{} - headers := http.Header{} - headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz)) - // *http.Response - wsconn, _, err := wsd.DialContext(ctx, m.Relay, headers) - if nil != err { - return nil, err - } - - //conns := make(chan *packer.Conn) - //parser := &packer.NewParser(ctx, conns) - - /* - go func() { - conn, err := packer.Accept() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to accept new relayed connection: %s\n", err) - return - } - conns <- conn - }() - */ - - handler := &Handler{} + // Feed the socket into the Encoder and Decoder listener := &Listener{ - //Conns: conns, - parser: packer.NewParser(ctx, handler), + tun: tun, + incoming: make(chan *Conn, 1), // buffer ever so slightly + close: make(chan struct{}), + encoder: NewEncoder(ctx, tun), + conns: map[string]net.Conn{}, + //conns: map[string]*Conn{}, } - go m.listen(ctx, wsconn, listener) - return listener, nil -} -type Handler struct { -} - -func (h *Handler) WriteMessage(packer.Addr, []byte) { - panic(errors.New("not implemented")) -} - -func (m *MultiplexLocal) listen(ctx context.Context, wsconn *websocket.Conn, listener *Listener) { - // will cancel if ws errors out or closes - // (TODO: this may also be redundant) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Ping every 15 seconds, or stop listening + // TODO perhaps the wrapper should have a mutex + // rather than having a goroutine in the encoder go func() { - for { - time.Sleep(15 * time.Second) - deadline := time.Now().Add(45 * time.Second) - if err := wsconn.WriteControl(websocket.PingMessage, []byte(""), deadline); nil != err { - fmt.Fprintf(os.Stderr, "failed to write ping message to websocket: %s\n", err) - cancel() - break - } - } + err := listener.encoder.Run() + fmt.Printf("encoder stopped entirely: %q", err) + tun.Close() }() - // The write loop (which fails if ping fails) + // Decode the stream as it comes in + decoder := NewDecoder(tun) go func() { - // TODO optimal buffer size - b := make([]byte, 128*1024) - for { - n, err := listener.parser.Read(b) - if n > 0 { - if err := wsconn.WriteMessage(websocket.BinaryMessage, b); nil != err { - fmt.Fprintf(os.Stderr, "failed to write packer message to websocket: %s\n", err) - break - } - } + // TODO pass error to Accept() + err := decoder.Decode(listener) + + // The listener itself must be closed explicitly because + // there's an encoder with a callback between the websocket + // and the multiplexer, so it doesn't know to stop listening otherwise + listener.Close() + fmt.Printf("the main stream is done: %q\n", err) + }() + + return listener +} + +// ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler +func ListenAndServe(tun net.Conn, mux Handler) error { + listener := Listen(tun) + return Serve(listener, mux) +} + +// Serve Accept()s connections which have already been unwrapped and serves them with the given Handler +func Serve(listener *Listener, mux Handler) error { + for { + client, err := listener.Accept() + if nil != err { + return err + } + + go func() { + err = mux.Serve(client) if nil != err { if io.EOF != err { - fmt.Fprintf(os.Stderr, "failed to read message from packer: %s\n", err) - break + fmt.Printf("client could not be served: %q\n", err.Error()) } - fmt.Fprintf(os.Stderr, "[TODO debug] closed packer: %s\n", err) - break } - } - // TODO handle EOF as websocket.CloseNormal? - message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection") - deadline := time.Now().Add(10 * time.Second) - if err := wsconn.WriteControl(websocket.CloseMessage, message, deadline); nil != err { - fmt.Fprintf(os.Stderr, "failed to write close message to websocket: %s\n", err) - } - _ = wsconn.Close() - }() - - // The read loop (also fails if ping fails) - for { - _, message, err := wsconn.ReadMessage() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to read message from websocket: %s\n", err) - break - } - - // - _, err = listener.packer.Write(message) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to process message from websocket: %s\n", err) - break - } + client.Close() + }() } - - // just to be sure - listener.packer.Close() - wsconn.Close() - - return } -/* -// Feed will block while pushing a net.Conn onto Conns -func (l *Listener) Feed(conn *Conn) { - l.Conns <- conn -} -*/ - -// net.Listener interface - -/* -// Accept will block and wait for a new net.Conn -func (l *Listener) Accept() (*Conn, error) { +// Accept returns a tunneled network connection +func (l *Listener) Accept() (net.Conn, error) { select { - case conn, ok := <-l.Conns: + case rconn, ok := <-l.incoming: if ok { - return conn, nil + return rconn, nil } return nil, io.EOF - case <-l.ctx.Done(): - // TODO is another error more suitable? - // TODO is this redundant? - return nil, io.EOF + case <-l.close: + return nil, http.ErrServerClosed } } -*/ -func (l *Listener) Accept() (*packer.Conn, error) { - return l.Accept() -} - -// Close will close the Conns channel +// Close stops accepting new connections and closes the underlying websocket. +// TODO return errors. func (l *Listener) Close() error { - //close(l.Conns) - //return nil - return l.packer.Close() -} - -// Addr returns nil to fulfill the net.Listener interface -func (l *Listener) Addr() net.Addr { - // Addr may (or may not) return the original TCP or TLS listener's address - //return l.ParentAddr + l.tun.Close() + close(l.incoming) + l.close <- struct{}{} return nil } + +// RouteBytes receives address information and a buffer and creates or re-uses a pipe that can be Accept()ed. +func (l *Listener) RouteBytes(srcAddr, dstAddr Addr, b []byte) { + // TODO use context to be able to cancel many at once? + l.chunksParsed++ + + src := &srcAddr + dst := &dstAddr + pipe := l.getPipe(src, dst, len(b)) + //fmt.Printf("%s\n", b) + + // handle errors before data writes because I don't + // remember where the error message goes + if "error" == string(dst.scheme) { + pipe.Close() + delete(l.conns, src.Network()) + fmt.Printf("a stream errored remotely: %v\n", src) + } + + // write data, if any + if len(b) > 0 { + l.bytesRead += len(b) + pipe.Write(b) + } + // EOF, if needed + if "end" == string(dst.scheme) { + fmt.Println("[debug] end") + pipe.Close() + delete(l.conns, src.Network()) + } +} + +func (l *Listener) getPipe(src, dst *Addr, count int) net.Conn { + connID := src.Network() + pipe, ok := l.conns[connID] + + // Pipe exists + if ok { + return pipe + } + fmt.Printf("New client (%d byte hello)\n\tfrom %#v\n\tto %#v:\n", count, src, dst) + + // Create pipe + rawPipe, pipe := net.Pipe() + newconn := &Conn{ + //updated: time.Now(), + relaySourceAddr: *src, + relayTargetAddr: *dst, + relay: rawPipe, + } + l.conns[connID] = pipe + l.incoming <- newconn + + // Handle encoding + go func() { + // TODO handle err + err := l.encoder.Encode(pipe, *src, *dst) + // the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such + // or it might be an actual error + // In any case, we'll just close it all + newconn.Close() + pipe.Close() + fmt.Printf("a stream is done: %q\n", err) + }() + + return pipe +} diff --git a/mplexer/packer/listener_test.go b/mplexer/listener_test.go similarity index 98% rename from mplexer/packer/listener_test.go rename to mplexer/listener_test.go index e170afa..49d1437 100644 --- a/mplexer/packer/listener_test.go +++ b/mplexer/listener_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "errors" diff --git a/mplexer/mplexer.go b/mplexer/mplexer.go deleted file mode 100644 index ac9c802..0000000 --- a/mplexer/mplexer.go +++ /dev/null @@ -1,160 +0,0 @@ -package mplexer - -import ( - "context" - "fmt" - "io" - "net" - "os" - "time" - - "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" -) - -type MultiplexLocal struct { - Relay string - SortingHat SortingHat - Timeout time.Duration - listener *Listener -} - -func New(relay string, hat SortingHat) *MultiplexLocal { - return &MultiplexLocal{ - Relay: relay, - SortingHat: hat, - Timeout: 30 * time.Second, - } -} - -func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error { - // Cancels if Accept() returns an error (i.e. because it was closed) - // (TODO: this may be redundant) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - listener, err := m.Listen(ctx) - if nil != err { - return err - } - - for { - pconn, err := listener.Accept() // packer.Conn - if nil != err { - return err - } - - go m.serve(ctx, pconn) - } -} - -func (m *MultiplexLocal) Close() error { - return m.listener.Close() -} - -func (m *MultiplexLocal) serve(ctx context.Context, pconn *packer.Conn) { - //paddr := pconn.LocalAddr().(*Addr) // packer.Addr - paddr := pconn.LocalAddr() - //addr.Network() - //addr.String() - paddr.Scheme() - //paddr.Encrypted() - //paddr.Servername() - - // todo: some sort of logic to avoid infinite loop to self? - // (that's probably not possible since the connection could - // route several layers deep) - if target, err := m.SortingHat.LookupTarget(paddr); nil != target { - if nil != err { - // TODO get a log channel or some such - fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s\n", err) - err := pconn.Error(err) - if nil != err { - fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s\n", err) - } - return - } - pipePacker(ctx, pconn, target, m.Timeout) - } -} - -func pipePacker(ctx context.Context, pconn *packer.Conn, target net.Conn, timeout time.Duration) { - // how can this be done so that target errors are - // sent back to the relay server? - - // Also something like ReadAhead(size) should signal - // to read and send up to `size` bytes without waiting - // for a response - since we can't signal 'non-read' as - // is the normal operation of tcp... or can we? - // And how do we distinguish idle from dropped? - // Maybe this should have been a udp protocol??? - - defer pconn.Close() - defer target.Close() - - srcCh := make(chan []byte) - dstCh := make(chan []byte) - errCh := make(chan error) - - // Source (Relay) Read Channel - go func() { - // TODO what's the optimal size to buffer? - // TODO user buffered reader - b := make([]byte, 128*1024) - for { - pconn.SetDeadline(time.Now().Add(timeout)) - n, err := pconn.Read(b) - if n > 0 { - srcCh <- b - } - if nil != err { - // TODO let client log this server-side error (unless EOF) - // (nil here because we probably can't send the error to the relay) - errCh <- nil - break - } - } - }() - - // Target (Local) Read Channel - go func() { - // TODO what's the optimal size to buffer? - // TODO user buffered reader - b := make([]byte, 128*1024) - for { - target.SetDeadline(time.Now().Add(timeout)) - n, err := target.Read(b) - if n > 0 { - dstCh <- b - } - if nil != err { - if io.EOF == err { - err = nil - } - errCh <- err - break - } - } - }() - - for { - select { - case <-ctx.Done(): - break - case b := <-srcCh: - target.SetDeadline(time.Now().Add(timeout)) - _, err := target.Write(b) - if nil != err { - // TODO log error locally - pconn.Error(err) - break - } - case b := <-dstCh: - pconn.SetDeadline(time.Now().Add(timeout)) - _, err := pconn.Write(b) - if nil != err { - // TODO log error locally - break - } - } - } -} diff --git a/mplexer/packer/packer.go b/mplexer/packer.go similarity index 96% rename from mplexer/packer/packer.go rename to mplexer/packer.go index 93ab897..c4bccbe 100644 --- a/mplexer/packer/packer.go +++ b/mplexer/packer.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "fmt" diff --git a/mplexer/packer/listener.go b/mplexer/packer/listener.go deleted file mode 100644 index b8d8aca..0000000 --- a/mplexer/packer/listener.go +++ /dev/null @@ -1,176 +0,0 @@ -package packer - -import ( - "context" - "fmt" - "io" - "net" - "net/http" -) - -// A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections. -type Listener struct { - //wsconn *websocket.Conn - tun net.Conn - incoming chan *Conn - close chan struct{} - encoder *Encoder - chunksParsed int - bytesRead int - conns map[string]net.Conn - //conns map[string]*Conn -} - -// Listen creates a new Listener and sets it up to receive and distribute connections. -func Listen(tun net.Conn) *Listener { - ctx := context.TODO() - - // Feed the socket into the Encoder and Decoder - listener := &Listener{ - tun: tun, - incoming: make(chan *Conn, 1), // buffer ever so slightly - close: make(chan struct{}), - encoder: NewEncoder(ctx, tun), - conns: map[string]net.Conn{}, - //conns: map[string]*Conn{}, - } - - // TODO perhaps the wrapper should have a mutex - // rather than having a goroutine in the encoder - go func() { - err := listener.encoder.Run() - fmt.Printf("encoder stopped entirely: %q", err) - tun.Close() - }() - - // Decode the stream as it comes in - decoder := NewDecoder(tun) - go func() { - // TODO pass error to Accept() - err := decoder.Decode(listener) - - // The listener itself must be closed explicitly because - // there's an encoder with a callback between the websocket - // and the multiplexer, so it doesn't know to stop listening otherwise - listener.Close() - fmt.Printf("the main stream is done: %q\n", err) - }() - - return listener -} - -// ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler -func ListenAndServe(tun net.Conn, mux Handler) error { - listener := Listen(tun) - return Serve(listener, mux) -} - -// Serve Accept()s connections which have already been unwrapped and serves them with the given Handler -func Serve(listener *Listener, mux Handler) error { - for { - client, err := listener.Accept() - if nil != err { - return err - } - - go func() { - err = mux.Serve(client) - if nil != err { - if io.EOF != err { - fmt.Printf("client could not be served: %q\n", err.Error()) - } - } - client.Close() - }() - } -} - -// Accept returns a tunneled network connection -func (l *Listener) Accept() (net.Conn, error) { - select { - case rconn, ok := <-l.incoming: - if ok { - return rconn, nil - } - return nil, io.EOF - - case <-l.close: - return nil, http.ErrServerClosed - } -} - -// Close stops accepting new connections and closes the underlying websocket. -// TODO return errors. -func (l *Listener) Close() error { - l.tun.Close() - close(l.incoming) - l.close <- struct{}{} - return nil -} - -// RouteBytes receives address information and a buffer and creates or re-uses a pipe that can be Accept()ed. -func (l *Listener) RouteBytes(srcAddr, dstAddr Addr, b []byte) { - // TODO use context to be able to cancel many at once? - l.chunksParsed++ - - src := &srcAddr - dst := &dstAddr - pipe := l.getPipe(src, dst, len(b)) - //fmt.Printf("%s\n", b) - - // handle errors before data writes because I don't - // remember where the error message goes - if "error" == string(dst.scheme) { - pipe.Close() - delete(l.conns, src.Network()) - fmt.Printf("a stream errored remotely: %v\n", src) - } - - // write data, if any - if len(b) > 0 { - l.bytesRead += len(b) - pipe.Write(b) - } - // EOF, if needed - if "end" == string(dst.scheme) { - fmt.Println("[debug] end") - pipe.Close() - delete(l.conns, src.Network()) - } -} - -func (l *Listener) getPipe(src, dst *Addr, count int) net.Conn { - connID := src.Network() - pipe, ok := l.conns[connID] - - // Pipe exists - if ok { - return pipe - } - fmt.Printf("New client (%d byte hello)\n\tfrom %#v\n\tto %#v:\n", count, src, dst) - - // Create pipe - rawPipe, pipe := net.Pipe() - newconn := &Conn{ - //updated: time.Now(), - relaySourceAddr: *src, - relayTargetAddr: *dst, - relay: rawPipe, - } - l.conns[connID] = pipe - l.incoming <- newconn - - // Handle encoding - go func() { - // TODO handle err - err := l.encoder.Encode(pipe, *src, *dst) - // the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such - // or it might be an actual error - // In any case, we'll just close it all - newconn.Close() - pipe.Close() - fmt.Printf("a stream is done: %q\n", err) - }() - - return pipe -} diff --git a/mplexer/packer/packer_test.go b/mplexer/packer_test.go similarity index 98% rename from mplexer/packer/packer_test.go rename to mplexer/packer_test.go index 82292f5..c09f596 100644 --- a/mplexer/packer/packer_test.go +++ b/mplexer/packer_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "strconv" diff --git a/mplexer/packer/parser.go b/mplexer/parser.go similarity index 99% rename from mplexer/packer/parser.go rename to mplexer/parser.go index ed2f329..b1fb6f6 100644 --- a/mplexer/packer/parser.go +++ b/mplexer/parser.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "errors" diff --git a/mplexer/packer/parser_test.go b/mplexer/parser_test.go similarity index 99% rename from mplexer/packer/parser_test.go rename to mplexer/parser_test.go index a856c21..f1d165b 100644 --- a/mplexer/packer/parser_test.go +++ b/mplexer/parser_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "math/rand" diff --git a/mplexer/packer/routemux.go b/mplexer/routemux.go similarity index 99% rename from mplexer/packer/routemux.go rename to mplexer/routemux.go index 9859459..930c8fe 100644 --- a/mplexer/packer/routemux.go +++ b/mplexer/routemux.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "fmt" diff --git a/mplexer/packer/server.go b/mplexer/server.go similarity index 98% rename from mplexer/packer/server.go rename to mplexer/server.go index 743379d..97fdd04 100644 --- a/mplexer/packer/server.go +++ b/mplexer/server.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "context" diff --git a/mplexer/sortinghat.go b/mplexer/sortinghat.go deleted file mode 100644 index 6a00db5..0000000 --- a/mplexer/sortinghat.go +++ /dev/null @@ -1,10 +0,0 @@ -package mplexer - -import ( - "git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer" -) - -type SortingHat interface { - LookupTarget(*packer.Addr) (*packer.Conn, error) - Authz() (string, error) -} diff --git a/mplexer/packer/telebit.go b/mplexer/telebit.go similarity index 99% rename from mplexer/packer/telebit.go rename to mplexer/telebit.go index 3044f0f..369872d 100644 --- a/mplexer/packer/telebit.go +++ b/mplexer/telebit.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "crypto/tls" diff --git a/mplexer/packer/v1.go b/mplexer/v1.go similarity index 99% rename from mplexer/packer/v1.go rename to mplexer/v1.go index aaa9473..adbf1a2 100644 --- a/mplexer/packer/v1.go +++ b/mplexer/v1.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "errors" diff --git a/mplexer/packer/websockettunnel.go b/mplexer/websockettunnel.go similarity index 99% rename from mplexer/packer/websockettunnel.go rename to mplexer/websockettunnel.go index d0e283c..73ac330 100644 --- a/mplexer/packer/websockettunnel.go +++ b/mplexer/websockettunnel.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "context" diff --git a/mplexer/packer/wsjunk_test.go b/mplexer/wsjunk_test.go similarity index 98% rename from mplexer/packer/wsjunk_test.go rename to mplexer/wsjunk_test.go index 1a844e0..43f3068 100644 --- a/mplexer/packer/wsjunk_test.go +++ b/mplexer/wsjunk_test.go @@ -1,4 +1,4 @@ -package packer +package telebit import ( "io"