made (loosely) testable against server

This commit is contained in:
AJ ONeal 2020-05-20 23:30:24 -06:00
parent e2c8a92089
commit 5f804f3424
6 changed files with 308 additions and 123 deletions

View File

@ -1,22 +1,23 @@
package packer
import (
"errors"
"net"
"time"
)
// Conn TODO rename to Pipe, perhaps?
type Conn struct {
relaySourceAddr Addr
relayRemoteAddr Addr
relay net.Conn
local net.Conn
updated time.Time
//updated time.Time
}
// TODO conn.go -> conn/conn.go
// TODO NewConn -> New
// NewConn TODO rename to NewPipe, perhaps?
func NewConn() *Conn {
return nil
}
@ -41,11 +42,13 @@ func (c *Conn) Close() error {
return c.relay.Close()
}
/*
// Error signals an error back to the relay
func (c *Conn) Error(err error) error {
panic(errors.New("not implemented"))
return nil
}
*/
/*
// LocalAddr returns the local network address.
@ -62,7 +65,7 @@ func (c *Conn) LocalAddr() *Addr {
}
// RemoteAddr returns the remote network address.
func (c *Conn) RemoteAddr() net.Addr {
func (c *Conn) RemoteAddr() *Addr {
// TODO is this the right one?
return &c.relayRemoteAddr
}

View File

@ -1,42 +1,36 @@
package packer
import (
"context"
"errors"
"io"
)
// Decoder handles a ReadCloser stream containing mplexy-encoded clients
// Decoder handles a Reader stream containing mplexy-encoded clients
type Decoder struct {
ctx context.Context
r io.ReadCloser
in io.Reader
bufferSize int
}
// NewDecoder returns an initialized Decoder
func NewDecoder(ctx context.Context, r io.ReadCloser) *Decoder {
func NewDecoder(rin io.Reader) *Decoder {
return &Decoder{
ctx: ctx,
r: r,
in: rin,
bufferSize: defaultBufferSize,
}
}
// StreamDecode will call WriteMessage as often as addressable data exists,
// Decode will call WriteMessage as often as addressable data exists,
// reading up to bufferSize (default 8192) at a time
// (header + data, though headers are often sent separately from data).
func (d *Decoder) StreamDecode(handler Handler, bufferSize int) error {
p := NewParser(handler)
func (d *Decoder) Decode(out Router) error {
p := NewParser(out)
rx := make(chan []byte)
rxErr := make(chan error)
if 0 == bufferSize {
bufferSize = 8192
}
go func() {
for {
b := make([]byte, bufferSize)
b := make([]byte, d.bufferSize)
//fmt.Println("loopers gonna loop")
n, err := d.r.Read(b)
n, err := d.in.Read(b)
if n > 0 {
rx <- b[:n]
}
@ -52,20 +46,16 @@ func (d *Decoder) StreamDecode(handler Handler, bufferSize int) error {
select {
// TODO, do we actually need ctx here?
// would it be sufficient to expect the reader to be closed by the caller instead?
case <-d.ctx.Done():
// TODO: verify that closing the reader will cause the goroutine to be released
d.r.Close()
return errors.New("cancelled by context")
case b := <-rx:
_, err := p.Write(b)
if nil != err {
// an error to write represents an unrecoverable error,
// not just a downstream client error
d.r.Close()
//d.in.Close()
return err
}
case err := <-rxErr:
d.r.Close()
//d.in.Close()
if io.EOF == err {
// it can be assumed that err will close though, right
return nil

View File

@ -8,142 +8,203 @@ import (
"net"
"net/http"
"os"
"strings"
"testing"
"time"
jwt "github.com/dgrijalva/jwt-go"
"github.com/gorilla/websocket"
)
func TestDialServer(t *testing.T) {
// TODO replace the websocket connection with a mock server
relay := os.Getenv("RELAY")
authz := os.Getenv("SECRET")
relay := "wss://roottest.duckdns.org:8443"
authz, err := getToken("xxxxyyyyssss8347")
if nil != err {
panic(err)
}
ctx := context.Background()
wsd := websocket.Dialer{}
headers := http.Header{}
headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz))
// *http.Response
wsconn, _, err := wsd.DialContext(ctx, relay, headers)
sep := "?"
if strings.Contains(relay, sep) {
sep = "&"
}
wsconn, _, err := wsd.DialContext(ctx, relay+sep+"access_token="+authz, headers)
if nil != err {
fmt.Println("relay:", relay)
t.Fatal(err)
return
}
mux := &MyMux{}
err = ListenAndServe(wsconn, mux)
t.Fatal(err)
/*
t := telebit.New(token)
mux := telebit.RouteMux{}
mux.HandleTLS("*", mux) // go back to itself
mux.HandleProxy("example.com", "localhost:3000")
mux.HandleTCP("example.com", func (c *telebit.Conn) {
return httpmux.Serve()
})
l := t.Listen("wss://example.com")
conn := l.Accept()
telebit.Serve(listener, mux)
t.ListenAndServe("wss://example.com", mux)
*/
mux := NewRouteMux()
// TODO set failure
t.Fatal(ListenAndServe(wsconn, mux))
}
func getToken(secret string) (token string, err error) {
domains := []string{"dandel.duckdns.org"}
tokenData := jwt.MapClaims{"domains": domains}
jwtToken := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData)
if token, err = jwtToken.SignedString([]byte(secret)); err != nil {
return "", err
}
return token, nil
}
type Listener struct {
ws *websocket.Conn
incoming chan *Conn
close chan struct{}
encoder *Encoder
conns map[string]*Conn
chunksParsed int
bytesRead int
}
func ListenAndServe(ws *websocket.Conn, mux Mux) error {
func ListenAndServe(ws *websocket.Conn, mux Handler) error {
listener := Listen(ws)
return Serve(listener, mux)
}
func Listen(ws *websocket.Conn) *Listener {
ctx := context.TODO()
// Wrap the websocket and feed it into the Encoder and Decoder
rw := &WSConn{c: ws, nr: nil}
listener := &Listener{
ws: ws,
incoming: make(chan *Conn, 1),
close: make(chan struct{}),
}
ctx := context.TODO()
r := &WSConn{
c: ws,
r: nil,
w: nil,
}
decoder := NewDecoder(ctx, r)
// Feed websocket into Decoder
th := &testHandler2{
conns: map[string]*Conn{},
connCh: listener.incoming,
incoming: make(chan *Conn, 1), // buffer ever so slightly
close: make(chan struct{}),
encoder: NewEncoder(ctx, rw),
}
// TODO perhaps the wrapper should have a mutex
// rather than having a goroutine in the encoder
go func() {
err := listener.encoder.Run()
fmt.Printf("encoder stopped entirely: %q", err)
rw.c.Close()
}()
// Decode the stream as it comes in
decoder := NewDecoder(rw)
go func() {
// TODO pass error to Accept()
err := decoder.StreamDecode(th, 0)
fmt.Printf("the main stream is done: %q", err)
err := decoder.Decode(listener)
rw.Close()
fmt.Printf("the main stream is done: %q\n", err)
}()
return listener
}
type testHandler2 struct {
conns map[string]*Conn
connCh chan *Conn
chunksParsed int
bytesRead int
}
func (l *Listener) RouteBytes(a Addr, b []byte) {
// TODO use context to be able to cancel many at once?
l.chunksParsed++
func (th *testHandler2) WriteMessage(a Addr, b []byte) {
th.chunksParsed++
addr := &a
_, ok := th.conns[addr.Network()]
if !ok {
rconn, wconn := net.Pipe()
conn := &Conn{
updated: time.Now(),
pipe := l.getPipe(addr)
// handle errors before data writes because I don't
// remember where the error message goes
if "error" == string(addr.scheme) {
pipe.Close()
delete(l.conns, addr.Network())
fmt.Printf("a stream errored remotely: %v\n", addr)
}
// write data, if any
if len(b) > 0 {
l.bytesRead += len(b)
pipe.Write(b)
}
// EOF, if needed
if "end" == string(addr.scheme) {
pipe.Close()
delete(l.conns, addr.Network())
}
}
func (l *Listener) getPipe(addr *Addr) *Conn {
connID := addr.Network()
pipe, ok := l.conns[connID]
// Pipe exists
if ok {
return pipe
}
// Create pipe
rawPipe, encodable := net.Pipe()
pipe = &Conn{
//updated: time.Now(),
relayRemoteAddr: *addr,
relay: rconn,
local: wconn,
relay: rawPipe,
}
th.conns[addr.Network()] = conn
th.connCh <- conn
}
th.bytesRead += len(b)
l.conns[connID] = pipe
l.incoming <- pipe
// Handle encoding
go func() {
// TODO handle err
err := l.encoder.Encode(encodable, *pipe.LocalAddr())
// the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such
// or it might be an actual error
// In any case, we'll just close it all
encodable.Close()
pipe.Close()
fmt.Printf("a stream is done: %q\n", err)
}()
return pipe
}
func Serve(listener *Listener, mux Mux) error {
w := &WSConn{
c: listener.ws,
r: nil,
w: nil,
}
ctx := context.TODO()
encoder := NewEncoder(ctx, w)
encoder.Start()
func Serve(listener *Listener, mux Handler) error {
for {
client, err := listener.Accept()
if nil != err {
return err
}
lconn, err := mux.LookupTarget(client.LocalAddr())
if nil != err {
conn.Close()
continue
}
go func() {
// TODO handle err
err := encoder.StreamEncode(*conn.LocalAddr(), lconn, 0)
fmt.Printf("a stream is done: %q", err)
err = mux.Serve(client)
if nil != err {
if io.EOF != err {
fmt.Printf("client could not be served: %q\n", err.Error())
}
}
client.Close()
}()
}
}
func Blah() {
go func() {
pipe
}
}
func (l *Listener) Accept() (*Conn, error) {
select {
case conn, ok := <-l.incoming:
case rconn, ok := <-l.incoming:
if ok {
return conn, nil
return rconn, nil
}
return nil, io.EOF
@ -154,17 +215,127 @@ func (l *Listener) Accept() (*Conn, error) {
}
}
type Mux interface {
LookupTarget(*Addr) (net.Conn, error)
type Handler interface {
Serve(*Conn) error
GetTargetConn(*Addr) (net.Conn, error)
}
type MyMux struct {
type RouteMux struct {
defaultTimeout time.Duration
}
func NewRouteMux() *RouteMux {
mux := &RouteMux{
defaultTimeout: 45 * time.Second,
}
return mux
}
func (m *RouteMux) Serve(client *Conn) error {
// TODO could proxy or handle directly, etc
target, err := m.GetTargetConn(client.RemoteAddr())
if nil != err {
return err
}
return Forward(client, target, m.defaultTimeout)
}
// Forward port-forwards a relay (websocket) client to a target (local) server
func Forward(client *Conn, target net.Conn, timeout time.Duration) error {
// Something like ReadAhead(size) should signal
// to read and send up to `size` bytes without waiting
// for a response - since we can't signal 'non-read' as
// is the normal operation of tcp... or can we?
// And how do we distinguish idle from dropped?
// Maybe this should have been a udp protocol???
defer client.Close()
defer target.Close()
srcCh := make(chan []byte)
dstCh := make(chan []byte)
srcErrCh := make(chan error)
dstErrCh := make(chan error)
// Source (Relay) Read Channel
go func() {
for {
b := make([]byte, defaultBufferSize)
n, err := client.Read(b)
if n > 0 {
srcCh <- b
}
if nil != err {
// TODO let client log this server-side error (unless EOF)
// (nil here because we probably can't send the error to the relay)
srcErrCh <- err
break
}
}
}()
// Target (Local) Read Channel
go func() {
for {
b := make([]byte, defaultBufferSize)
n, err := target.Read(b)
if n > 0 {
dstCh <- b
}
if nil != err {
if io.EOF == err {
err = nil
}
dstErrCh <- err
break
}
}
}()
var err error = nil
for {
select {
// TODO do we need a context here?
//case <-ctx.Done():
// break
case b := <-srcCh:
client.SetDeadline(time.Now().Add(timeout))
_, err = target.Write(b)
if nil != err {
fmt.Printf("write to target failed: %q", err.Error())
break
}
case b := <-dstCh:
target.SetDeadline(time.Now().Add(timeout))
_, err = client.Write(b)
if nil != err {
fmt.Printf("write to remote failed: %q", err.Error())
break
}
case err = <-srcErrCh:
if nil != err {
fmt.Printf("read from remote failed: %q", err.Error())
}
break
case err = <-dstErrCh:
if nil != err {
fmt.Printf("read from target failed: %q", err.Error())
}
break
}
}
client.Close()
return err
}
// this function is very client-specific logic
func (m *MyMux) LookupTarget(paddr *Addr) (net.Conn, error) {
//if target := LookupPort(paddr.Port()); nil != target { }
if target := m.LookupServername(paddr.Hostname()); nil != target {
func (m *RouteMux) GetTargetConn(paddr *Addr) (net.Conn, error) {
//if target := GetTargetByPort(paddr.Port()); nil != target { }
if target := m.GetTargetByServername(paddr.Hostname()); nil != target {
tconn, err := net.Dial(target.Network(), target.Hostname())
if nil != err {
return nil, err
@ -185,7 +356,7 @@ func (m *MyMux) LookupTarget(paddr *Addr) (net.Conn, error) {
return nil, errors.New("Bad Gateway")
}
func (m *MyMux) LookupServername(servername string) *Addr {
func (m *RouteMux) GetTargetByServername(servername string) *Addr {
return NewAddr(
HTTPS,
TCP, // TCP -> termination.None? / Plain?
@ -196,20 +367,20 @@ func (m *MyMux) LookupServername(servername string) *Addr {
type WSConn struct {
c *websocket.Conn
r io.Reader
w io.WriteCloser
pingCh chan struct{}
nr io.Reader
//w io.WriteCloser
//pingCh chan struct{}
}
func (ws *WSConn) Read(b []byte) (int, error) {
if nil == ws.r {
if nil == ws.nr {
_, r, err := ws.c.NextReader()
if nil != err {
return 0, err
}
ws.r = r
ws.nr = r
}
n, err := ws.r.Read(b)
n, err := ws.nr.Read(b)
if io.EOF == err {
err = nil
}
@ -218,17 +389,17 @@ func (ws *WSConn) Read(b []byte) (int, error) {
func (ws *WSConn) Write(b []byte) (int, error) {
// TODO create or reset ping deadline
// TODO document that more complete writes are preferred?
w, err := ws.c.NextWriter(websocket.BinaryMessage)
if nil != err {
return 0, err
}
ws.w = w
n, err := ws.w.Write(b)
n, err := w.Write(b)
if nil != err {
return n, err
}
err = ws.w.Close()
err = w.Close()
return n, err
}

View File

@ -5,7 +5,7 @@ import (
)
type Parser struct {
handler Handler
handler Router
newConns chan *Conn
conns map[string]*Conn
state ParserState
@ -36,7 +36,7 @@ const (
VersionState State = 0
)
func NewParser(handler Handler) *Parser {
func NewParser(handler Router) *Parser {
return &Parser{
conns: make(map[string]*Conn),
newConns: make(chan *Conn, 2), // Buffered to make testing easier
@ -46,8 +46,8 @@ func NewParser(handler Handler) *Parser {
}
}
type Handler interface {
WriteMessage(Addr, []byte)
type Router interface {
RouteBytes(Addr, []byte)
}
// Write receives tunnel data and creates or writes to connections

12
mplexer/packer/telebit.go Normal file
View File

@ -0,0 +1,12 @@
package packer
import "errors"
// Note: 64k is the TCP max, but 1460b is the 100mbit Ethernet max (1500 MTU - overhead),
// but 1Gbit Ethernet (Jumbo frame) has an 9000b MTU
// Nerds posting benchmarks on SO show that 8k seems about right,
// but even 1024b could work well.
var defaultBufferSize = 8192
// ErrBadGateway means that the target did not accept the connection
var ErrBadGateway = errors.New("EBADGATEWAY")

View File

@ -8,19 +8,28 @@ import (
)
const (
// HeaderLengthState is the 2nd (1) state
HeaderLengthState State = 1 + iota
// HeaderState is the 3rd (2) state
HeaderState
// PayloadState is the 4th (3) state
PayloadState
)
const (
// FamilyIndex is the 1st (0) address element, either IPv4 or IPv6
FamilyIndex int = iota
// AddressIndex is the 2nd (1) address element, the IP or Hostname
AddressIndex
// PortIndex is the 3rd (2) address element, the Port
PortIndex
// LengthIndex is the 4th (3) address element, the Payload size
LengthIndex
// ServiceIndex is the 5th (4) address element, the Scheme or Control message type
ServiceIndex
)
// Header is the MPLEXY address/control meta data that comes before a packet
type Header struct {
Family string
Address string
@ -178,7 +187,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
*/
//fmt.Printf("[debug] [2] payload written: %d | payload length: %d\n", p.state.payloadWritten, p.state.payloadLen)
p.handler.WriteMessage(p.state.addr, []byte{})
p.handler.RouteBytes(p.state.addr, []byte{})
return b, nil
}
@ -198,7 +207,7 @@ func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
return b, nil
}
*/
p.handler.WriteMessage(p.state.addr, c)
p.handler.RouteBytes(p.state.addr, c)
p.consumed += k
p.state.payloadWritten += k