more logging, fixed write loop

This commit is contained in:
AJ ONeal 2020-07-15 10:46:02 +00:00
parent 9f1985373e
commit ff231391d3
5 changed files with 56 additions and 33 deletions

View File

@ -48,8 +48,9 @@ func (d *Decoder) Decode(out Router) error {
for { for {
select { select {
case b := <-rx: case b := <-rx:
fmt.Println("[debug] [decoder] [srv] Tunnel write", len(b), hex.EncodeToString(b)) n, err := p.Write(b)
_, err := p.Write(b) fmt.Println("[debug] [decoder] [srv] Tunnel write", n, len(b), hex.EncodeToString(b))
// TODO BUG: handle when 'n' bytes written is less than len(b)
if nil != err { if nil != err {
fmt.Println("[debug] [decoder] [srv] Tunnel write error") fmt.Println("[debug] [decoder] [srv] Tunnel write error")
// an error to write represents an unrecoverable error, // an error to write represents an unrecoverable error,

View File

@ -13,9 +13,9 @@ import (
// Encoder converts TCP to MPLEXY-TCP // Encoder converts TCP to MPLEXY-TCP
type Encoder struct { type Encoder struct {
ctx context.Context ctx context.Context
subctx context.Context //subctx context.Context
mux sync.Mutex mux sync.Mutex
//out io.WriteCloser //out io.WriteCloser
out io.Writer out io.Writer
outErr chan error outErr chan error
@ -37,10 +37,10 @@ func NewEncoder(ctx context.Context, wout io.Writer) *Encoder {
// to cancel and close south-side connections, if needed. // to cancel and close south-side connections, if needed.
// TODO should this be pushed elsewhere to handled? // TODO should this be pushed elsewhere to handled?
func (enc *Encoder) Run() error { func (enc *Encoder) Run() error {
ctx, cancel := context.WithCancel(enc.ctx) //ctx, cancel := context.WithCancel(enc.ctx)
defer cancel() //defer cancel()
enc.subctx = ctx //enc.subctx = ctx
for { for {
select { select {
@ -73,7 +73,7 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error {
b := make([]byte, enc.bufferSize) b := make([]byte, enc.bufferSize)
//fmt.Println("loopers gonna loop") //fmt.Println("loopers gonna loop")
n, err := rin.Read(b) n, err := rin.Read(b)
fmt.Println("[debug] [encoder] [srv] Browser read", n) fmt.Println("[debug] [encoder] [srv] Browser read", n, hex.EncodeToString(b[:n]))
if n > 0 { if n > 0 {
rx <- b[:n] rx <- b[:n]
} }
@ -96,10 +96,12 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error {
//rin.Close() //rin.Close()
fmt.Println("[debug] [encoder] [srv] Browser ctx.Done()") fmt.Println("[debug] [encoder] [srv] Browser ctx.Done()")
return errors.New("cancelled by encoder read or parent context") return errors.New("cancelled by encoder read or parent context")
case <-enc.subctx.Done(): /*
//rin.Close() case <-enc.subctx.Done():
fmt.Println("[debug] [encoder] [srv] Browser subctx.Done()") //rin.Close()
return errors.New("cancelled by encoder write context") fmt.Println("[debug] [encoder] [srv] Browser subctx.Done()")
return errors.New("cancelled by encoder write context")
*/
case b := <-rx: case b := <-rx:
header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: dst.Hostname(), port: dst.Port()}) header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: dst.Hostname(), port: dst.Port()})
if nil != err { if nil != err {

View File

@ -2,6 +2,7 @@ package telebit
import ( import (
"errors" "errors"
"fmt"
) )
type Parser struct { type Parser struct {
@ -68,7 +69,7 @@ func (p *Parser) Write(b []byte) (int, error) {
switch p.parseState { switch p.parseState {
case VersionState: case VersionState:
//fmt.Println("[debug] version state", b[0]) fmt.Println("[debug] version state", b[0])
p.state.version = b[0] p.state.version = b[0]
b = b[1:] b = b[1:]
p.consumed++ p.consumed++
@ -79,7 +80,7 @@ func (p *Parser) Write(b []byte) (int, error) {
switch p.state.version { switch p.state.version {
case V1: case V1:
//fmt.Println("[debug] v1 unmarshal") fmt.Println("[debug] v1 unmarshal")
return p.unpackV1(b) return p.unpackV1(b)
default: default:
return 0, errors.New("incorrect version or version not implemented") return 0, errors.New("incorrect version or version not implemented")

View File

@ -182,16 +182,17 @@ func (p *Parser) unpackV1Header(b []byte, n int) ([]byte, error) {
} }
*/ */
p.parseState++ p.parseState++
fmt.Printf("[debug] parse state: %v\n", p.parseState) fmt.Printf("[debug] unpackV1 parse state: %v\n", p.parseState)
if "end" == service { if "end" == service {
fmt.Println("[debug] unpackV1 end")
p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{}) p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{})
} }
return b, nil return b, nil
} }
func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) { func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
fmt.Printf("[debug] state: %+v\n", p.state) fmt.Printf("[debug] unpackV1 payload state: %+v\n", p.state)
// Handle "connect" and "end" // Handle "connect" and "end"
if 0 == p.state.payloadLen { if 0 == p.state.payloadLen {
/* /*
@ -210,7 +211,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
*/ */
//fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen) //fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen)
fmt.Printf("[debug] RouteBytes: %#v %#v %s\n", p.state.srcAddr, p.state.dstAddr, p.state.dstAddr.scheme) fmt.Printf("[debug] unpackV1 RouteBytes: %#v %#v %s\n", p.state.srcAddr, p.state.dstAddr, p.state.dstAddr.scheme)
p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{}) p.handler.RouteBytes(p.state.srcAddr, p.state.dstAddr, []byte{})
return b, nil return b, nil
} }

View File

@ -1,6 +1,7 @@
package table package table
import ( import (
"encoding/hex"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -112,27 +113,37 @@ type SubscriberConn struct {
} }
func (s *SubscriberConn) RouteBytes(src, dst telebit.Addr, payload []byte) { func (s *SubscriberConn) RouteBytes(src, dst telebit.Addr, payload []byte) {
id := src.String() id := fmt.Sprintf("%s:%d", src.Hostname(), src.Port())
fmt.Println("Routing some more bytes:") fmt.Println("[debug] Routing some more bytes:", len(payload))
fmt.Println("src", id, src) fmt.Printf("id %s\nsrc %+v\n", id, src)
fmt.Println("dst", dst) fmt.Printf("dst %s %+v\n", dst.Scheme(), dst)
clientX, ok := s.Clients.Load(id) clientX, ok := s.Clients.Load(id)
if !ok { if !ok {
// TODO send back closed client error // TODO send back closed client error
fmt.Println("[debug] no client found for", id)
return return
} }
client, _ := clientX.(net.Conn) client, _ := clientX.(net.Conn)
if "end" == dst.Scheme() {
fmt.Println("[debug] closing client", id)
_ = client.Close()
return
}
for { for {
n, err := client.Write(payload) n, err := client.Write(payload)
if nil != err { fmt.Println("[debug] table Write", len(payload), hex.EncodeToString(payload))
if n > 0 && io.ErrShortWrite == err { if nil == err || io.EOF == err {
payload = payload[n:]
continue
}
// TODO send back closed client error
break break
} }
if n > 0 && io.ErrShortWrite == err {
payload = payload[n:]
continue
}
break
// TODO send back closed client error
//return err
} }
} }
@ -147,6 +158,7 @@ func (s *SubscriberConn) Serve(client net.Conn) error {
} }
id := client.RemoteAddr().String() id := client.RemoteAddr().String()
fmt.Printf("[DEBUG] NEW ID (ip:port) %s\n", id)
s.Clients.Store(id, client) s.Clients.Store(id, client)
//fmt.Println("[debug] immediately cancel client to simplify testing / debugging") //fmt.Println("[debug] immediately cancel client to simplify testing / debugging")
@ -166,15 +178,20 @@ func (s *SubscriberConn) Serve(client net.Conn) error {
dstAddr := dstParts[0] dstAddr := dstParts[0]
dstPort, _ := strconv.Atoi(dstParts[1]) dstPort, _ := strconv.Atoi(dstParts[1])
fmt.Println("[debug] dstParts", dstParts) fmt.Println("[debug] dstParts", dstParts)
servername := wconn.Servername()
termination := telebit.Unknown termination := telebit.Unknown
scheme := telebit.None scheme := telebit.None
if "" != servername {
dstAddr = servername
//scheme = telebit.TLS
scheme = telebit.HTTPS
}
if 80 == dstPort { if 80 == dstPort {
scheme = telebit.HTTPS
} else if 443 == dstPort {
// TODO dstAddr = wconn.Servername() // TODO dstAddr = wconn.Servername()
scheme = telebit.HTTP scheme = telebit.HTTP
} else if 443 == dstPort {
dstAddr = wconn.Servername()
scheme = telebit.HTTPS
} }
src := telebit.NewAddr( src := telebit.NewAddr(
@ -189,11 +206,12 @@ func (s *SubscriberConn) Serve(client net.Conn) error {
dstAddr, dstAddr,
dstPort, dstPort,
) )
fmt.Println("[debug] NewAddr src", src) fmt.Printf("[debug] NewAddr src %+v\n", src)
fmt.Println("[debug] NewAddr dst", dst) fmt.Printf("[debug] NewAddr dst %+v\n", dst)
err := s.MultiEncoder.Encode(wconn, *src, *dst) err := s.MultiEncoder.Encode(wconn, *src, *dst)
_ = wconn.Close() _ = wconn.Close()
fmt.Printf("[debug] Encoder Complete %+v %+v\n", id, err)
s.Clients.Delete(id) s.Clients.Delete(id)
return err return err
} }