diff --git a/mplexer/decoder.go b/mplexer/decoder.go index b8a1b21..52a2ef0 100644 --- a/mplexer/decoder.go +++ b/mplexer/decoder.go @@ -48,8 +48,9 @@ func (d *Decoder) Decode(out Router) error { for { select { case b := <-rx: - fmt.Println("[debug] [decoder] [srv] Tunnel write", len(b), hex.EncodeToString(b)) - _, err := p.Write(b) + n, err := p.Write(b) + fmt.Println("[debug] [decoder] [srv] Tunnel write", n, len(b), hex.EncodeToString(b)) + // TODO BUG: handle when 'n' bytes written is less than len(b) if nil != err { fmt.Println("[debug] [decoder] [srv] Tunnel write error") // an error to write represents an unrecoverable error, diff --git a/mplexer/encoder.go b/mplexer/encoder.go index eac314a..0b3f387 100644 --- a/mplexer/encoder.go +++ b/mplexer/encoder.go @@ -13,9 +13,9 @@ import ( // Encoder converts TCP to MPLEXY-TCP type Encoder struct { - ctx context.Context - subctx context.Context - mux sync.Mutex + ctx context.Context + //subctx context.Context + mux sync.Mutex //out io.WriteCloser out io.Writer outErr chan error @@ -37,10 +37,10 @@ func NewEncoder(ctx context.Context, wout io.Writer) *Encoder { // to cancel and close south-side connections, if needed. // TODO should this be pushed elsewhere to handled? func (enc *Encoder) Run() error { - ctx, cancel := context.WithCancel(enc.ctx) - defer cancel() + //ctx, cancel := context.WithCancel(enc.ctx) + //defer cancel() - enc.subctx = ctx + //enc.subctx = ctx for { select { @@ -73,7 +73,7 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error { b := make([]byte, enc.bufferSize) //fmt.Println("loopers gonna loop") n, err := rin.Read(b) - fmt.Println("[debug] [encoder] [srv] Browser read", n) + fmt.Println("[debug] [encoder] [srv] Browser read", n, hex.EncodeToString(b[:n])) if n > 0 { rx <- b[:n] } @@ -96,10 +96,12 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error { //rin.Close() fmt.Println("[debug] [encoder] [srv] Browser ctx.Done()") return errors.New("cancelled by encoder read or parent context") - case <-enc.subctx.Done(): - //rin.Close() - fmt.Println("[debug] [encoder] [srv] Browser subctx.Done()") - return errors.New("cancelled by encoder write context") + /* + case <-enc.subctx.Done(): + //rin.Close() + fmt.Println("[debug] [encoder] [srv] Browser subctx.Done()") + return errors.New("cancelled by encoder write context") + */ case b := <-rx: header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: dst.Hostname(), port: dst.Port()}) if nil != err { diff --git a/mplexer/parser.go b/mplexer/parser.go index b1fb6f6..8ee1f49 100644 --- a/mplexer/parser.go +++ b/mplexer/parser.go @@ -2,6 +2,7 @@ package telebit import ( "errors" + "fmt" ) type Parser struct { @@ -68,7 +69,7 @@ func (p *Parser) Write(b []byte) (int, error) { switch p.parseState { case VersionState: - //fmt.Println("[debug] version state", b[0]) + fmt.Println("[debug] version state", b[0]) p.state.version = b[0] b = b[1:] p.consumed++ @@ -79,7 +80,7 @@ func (p *Parser) Write(b []byte) (int, error) { switch p.state.version { case V1: - //fmt.Println("[debug] v1 unmarshal") + fmt.Println("[debug] v1 unmarshal") return p.unpackV1(b) default: return 0, errors.New("incorrect version or version not implemented") diff --git a/mplexer/v1.go b/mplexer/v1.go index 604a535..97bd4cd 100644 --- a/mplexer/v1.go +++ b/mplexer/v1.go @@ -182,16 +182,17 @@ func (p *Parser) unpackV1Header(b []byte, n int) ([]byte, error) { } */ p.parseState++ - fmt.Printf("[debug] parse state: %v\n", p.parseState) + fmt.Printf("[debug] unpackV1 parse state: %v\n", p.parseState) if "end" == service { + fmt.Println("[debug] unpackV1 end") p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{}) } return b, nil } func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { - fmt.Printf("[debug] state: %+v\n", p.state) + fmt.Printf("[debug] unpackV1 payload state: %+v\n", p.state) // Handle "connect" and "end" if 0 == p.state.payloadLen { /* @@ -210,7 +211,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { */ //fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen) - fmt.Printf("[debug] RouteBytes: %#v %#v %s\n", p.state.srcAddr, p.state.dstAddr, p.state.dstAddr.scheme) + fmt.Printf("[debug] unpackV1 RouteBytes: %#v %#v %s\n", p.state.srcAddr, p.state.dstAddr, p.state.dstAddr.scheme) p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{}) return b, nil } diff --git a/table/table.go b/table/table.go index 29fc490..174360b 100644 --- a/table/table.go +++ b/table/table.go @@ -1,6 +1,7 @@ package table import ( + "encoding/hex" "fmt" "net" "sync" @@ -112,27 +113,37 @@ type SubscriberConn struct { } func (s *SubscriberConn) RouteBytes(src, dst telebit.Addr, payload []byte) { - id := src.String() - fmt.Println("Routing some more bytes:") - fmt.Println("src", id, src) - fmt.Println("dst", dst) + id := fmt.Sprintf("%s:%d", src.Hostname(), src.Port()) + fmt.Println("[debug] Routing some more bytes:", len(payload)) + fmt.Printf("id %s\nsrc %+v\n", id, src) + fmt.Printf("dst %s %+v\n", dst.Scheme(), dst) clientX, ok := s.Clients.Load(id) if !ok { // TODO send back closed client error + fmt.Println("[debug] no client found for", id) return } client, _ := clientX.(net.Conn) + if "end" == dst.Scheme() { + fmt.Println("[debug] closing client", id) + _ = client.Close() + return + } + for { n, err := client.Write(payload) - if nil != err { - if n > 0 && io.ErrShortWrite == err { - payload = payload[n:] - continue - } - // TODO send back closed client error + fmt.Println("[debug] table Write", len(payload), hex.EncodeToString(payload)) + if nil == err || io.EOF == err { break } + if n > 0 && io.ErrShortWrite == err { + payload = payload[n:] + continue + } + break + // TODO send back closed client error + //return err } } @@ -147,6 +158,7 @@ func (s *SubscriberConn) Serve(client net.Conn) error { } id := client.RemoteAddr().String() + fmt.Printf("[DEBUG] NEW ID (ip:port) %s\n", id) s.Clients.Store(id, client) //fmt.Println("[debug] immediately cancel client to simplify testing / debugging") @@ -166,15 +178,20 @@ func (s *SubscriberConn) Serve(client net.Conn) error { dstAddr := dstParts[0] dstPort, _ := strconv.Atoi(dstParts[1]) fmt.Println("[debug] dstParts", dstParts) + servername := wconn.Servername() termination := telebit.Unknown scheme := telebit.None + if "" != servername { + dstAddr = servername + //scheme = telebit.TLS + scheme = telebit.HTTPS + } if 80 == dstPort { + scheme = telebit.HTTPS + } else if 443 == dstPort { // TODO dstAddr = wconn.Servername() scheme = telebit.HTTP - } else if 443 == dstPort { - dstAddr = wconn.Servername() - scheme = telebit.HTTPS } src := telebit.NewAddr( @@ -189,11 +206,12 @@ func (s *SubscriberConn) Serve(client net.Conn) error { dstAddr, dstPort, ) - fmt.Println("[debug] NewAddr src", src) - fmt.Println("[debug] NewAddr dst", dst) + fmt.Printf("[debug] NewAddr src %+v\n", src) + fmt.Printf("[debug] NewAddr dst %+v\n", dst) err := s.MultiEncoder.Encode(wconn, *src, *dst) _ = wconn.Close() + fmt.Printf("[debug] Encoder Complete %+v %+v\n", id, err) s.Clients.Delete(id) return err }