wip: new client parse test

This commit is contained in:
AJ ONeal 2020-05-18 02:43:06 -06:00
parent 581ed296ac
commit c569a965e2
10 changed files with 603 additions and 40 deletions

View File

@ -2,10 +2,15 @@ package mplexer
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"time"
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
"github.com/gorilla/websocket"
)
@ -13,11 +18,13 @@ import (
// Listener defines a listener for use with http servers
type Listener struct {
//ParentAddr net.Addr
Conns chan *Conn
ws *websocket.Conn
//Conns chan *Conn
ws *websocket.Conn
ctx context.Context
parser *packer.Parser
}
// NewListener creates a channel for connections and returns the listener
// Listen creates a channel for connections and returns the listener
func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) {
authz, err := m.SortingHat.Authz()
if nil != err {
@ -32,32 +39,145 @@ func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) {
if nil != err {
return nil, err
}
//conns := make(chan *packer.Conn)
//parser := &packer.NewParser(ctx, conns)
/*
go func() {
conn, err := packer.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to accept new relayed connection: %s\n", err)
return
}
conns <- conn
}()
*/
handler := &Handler{}
listener := &Listener{
Conns: make(chan *Conn),
//Conns: conns,
parser: packer.NewParser(ctx, handler),
}
go m.listen(ctx, wsconn, listener)
return listener, nil
}
type Handler struct {
}
func (h *Handler) WriteMessage(packer.Addr, []byte) {
panic(errors.New("not implemented"))
}
func (m *MultiplexLocal) listen(ctx context.Context, wsconn *websocket.Conn, listener *Listener) {
// will cancel if ws errors out or closes
// (TODO: this may also be redundant)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Ping every 15 seconds, or stop listening
go func() {
for {
time.Sleep(15 * time.Second)
deadline := time.Now().Add(45 * time.Second)
if err := wsconn.WriteControl(websocket.PingMessage, "", deadline); nil != err {
fmt.Fprintf(os.Stderr, "failed to write ping message to websocket: %s\n", err)
cancel()
break
}
}
}()
// The write loop (which fails if ping fails)
go func() {
// TODO optimal buffer size
b := make([]byte, 128*1024)
for {
n, err := listener.packer.Read(b)
if n > 0 {
if err := wsconn.WriteMessage(websocket.BinaryMessage, b); nil != err {
fmt.Fprintf(os.Stderr, "failed to write packer message to websocket: %s\n", err)
break
}
}
if nil != err {
if io.EOF != err {
fmt.Fprintf(os.Stderr, "failed to read message from packer: %s\n", err)
break
}
fmt.Fprintf(os.Stderr, "[TODO debug] closed packer: %s\n", err)
break
}
}
// TODO handle EOF as websocket.CloseNormal?
message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection")
deadline := time.Now().Add(10 * time.Second)
if err := wsconn.WriteControl(websocket.CloseMessage, message, deadline); nil != err {
fmt.Fprintf(os.Stderr, "failed to write close message to websocket: %s\n", err)
}
_ = wsconn.Close()
}()
// The read loop (also fails if ping fails)
for {
_, message, err := wsconn.ReadMessage()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to read message from websocket: %s\n", err)
break
}
//
_, err = listener.packer.Write(message)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to process message from websocket: %s\n", err)
break
}
}
// just to be sure
listener.packer.Close()
wsconn.Close()
return
}
/*
// Feed will block while pushing a net.Conn onto Conns
func (l *Listener) Feed(conn *Conn) {
l.Conns <- conn
}
*/
// net.Listener interface
/*
// Accept will block and wait for a new net.Conn
func (l *Listener) Accept() (*Conn, error) {
conn, ok := <-l.Conns
if ok {
return conn, nil
select {
case conn, ok := <-l.Conns:
if ok {
return conn, nil
}
return nil, io.EOF
case <-l.ctx.Done():
// TODO is another error more suitable?
// TODO is this redundant?
return nil, io.EOF
}
return nil, io.EOF
}
*/
func (l *Listener) Accept() (*packer.Conn, error) {
return l.Accept()
}
// Close will close the Conns channel
func (l *Listener) Close() error {
close(l.Conns)
return nil
//close(l.Conns)
//return nil
return l.packer.Close()
}
// Addr returns nil to fulfill the net.Listener interface

View File

@ -7,12 +7,15 @@ import (
"net"
"os"
"time"
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
)
type MultiplexLocal struct {
Relay string
SortingHat SortingHat
Timeout time.Duration
listener *Listener
}
func New(relay string, hat SortingHat) *MultiplexLocal {
@ -24,6 +27,11 @@ func New(relay string, hat SortingHat) *MultiplexLocal {
}
func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error {
// Cancels if Accept() returns an error (i.e. because it was closed)
// (TODO: this may be redundant)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
listener, err := m.Listen(ctx)
if nil != err {
return err
@ -39,7 +47,11 @@ func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error {
}
}
func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) {
func (m *MultiplexLocal) Close() error {
return m.listener.Close()
}
func (m *MultiplexLocal) serve(ctx context.Context, pconn *packer.Conn) {
//paddr := pconn.LocalAddr().(*Addr) // packer.Addr
paddr := pconn.LocalAddr()
//addr.Network()
@ -54,10 +66,10 @@ func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) {
if target, err := m.SortingHat.LookupTarget(paddr); nil != target {
if nil != err {
// TODO get a log channel or some such
fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s", err)
fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s\n", err)
err := pconn.Error(err)
if nil != err {
fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s", err)
fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s\n", err)
}
return
}
@ -65,7 +77,7 @@ func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) {
}
}
func pipePacker(ctx context.Context, pconn *Conn, target net.Conn, timeout time.Duration) {
func pipePacker(ctx context.Context, pconn *packer.Conn, target net.Conn, timeout time.Duration) {
// how can this be done so that target errors are
// sent back to the relay server?

View File

@ -1,4 +1,4 @@
package mplexer
package packer
import (
"fmt"
@ -24,6 +24,7 @@ const (
type Addr struct {
scheme Scheme
termination Termination
family string // TODO what should be the format? "tcpv6"?
addr string
port int
}

View File

@ -1,4 +1,4 @@
package mplexer
package packer
import (
"errors"
@ -7,12 +7,11 @@ import (
)
type Conn struct {
// TODO
relayRemoteAddr string
relayRemotePort int
relaySourceProto string
relaySourceAddr string
relaySourcePort int
relaySourceAddr Addr
relayRemoteAddr Addr
relay net.Conn
local net.Conn
updated time.Time
}
// TODO conn.go -> conn/conn.go
@ -26,23 +25,20 @@ func NewConn() *Conn {
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *Conn) Read(b []byte) (n int, err error) {
panic(errors.New("not implemented"))
return 0, nil
return c.relay.Read(b)
}
// Write writes data to the connection.
// Write can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
func (c *Conn) Write(b []byte) (n int, err error) {
panic(errors.New("not implemented"))
return 0, nil
return c.relay.Write(b)
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *Conn) Close() error {
panic(errors.New("not implemented"))
return nil
return c.relay.Close()
}
// Error signals an error back to the relay
@ -61,14 +57,14 @@ func (c *Conn) LocalAddr() net.Addr {
// LocalAddr returns the local network address.
func (c *Conn) LocalAddr() *Addr {
panic(errors.New("not implemented"))
return &Addr{}
// TODO is this the right one?
return &c.relaySourceAddr
}
// RemoteAddr returns the remote network address.
func (c *Conn) RemoteAddr() net.Addr {
panic(errors.New("not implemented"))
return &net.IPAddr{}
// TODO is this the right one?
return &c.relayRemoteAddr
}
// SetDeadline sets the read and write deadlines associated
@ -94,16 +90,14 @@ func (c *Conn) RemoteAddr() net.Addr {
// failure on I/O can be detected using
// errors.Is(err, syscall.ETIMEDOUT).
func (c *Conn) SetDeadline(t time.Time) error {
panic(errors.New("not implemented"))
return nil
return c.relay.SetDeadline(t)
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *Conn) SetReadDeadline(t time.Time) error {
panic(errors.New("not implemented"))
return nil
return c.relay.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
@ -112,6 +106,5 @@ func (c *Conn) SetReadDeadline(t time.Time) error {
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c *Conn) SetWriteDeadline(t time.Time) error {
panic(errors.New("not implemented"))
return nil
return c.relay.SetWriteDeadline(t)
}

12
mplexer/packer/packer.go Normal file
View File

@ -0,0 +1,12 @@
package packer
import (
"strconv"
)
func Marshal(addr Addr, body []byte) ([]byte, []byte) {
header := []byte(`IPv4,192.168.1.101,6743,` + strconv.Itoa(len(body)) + `,http,80,ex1.telebit.io`)
raw := []byte{255 - 1, byte(len(header))}
header = append(raw, header...)
return header, body
}

88
mplexer/packer/parser.go Normal file
View File

@ -0,0 +1,88 @@
package packer
import (
"context"
"errors"
"fmt"
)
type Parser struct {
ctx context.Context
handler Handler
newConns chan *Conn
conns map[string]*Conn
state ParserState
parseState State
dataReady chan struct{}
data []byte
written int
}
type ParserState struct {
written int
version byte
headerLen int
header []byte
payloadLen int
addr Addr
payloadWritten int
}
type State int
const (
V1 byte = 255 - (1 + iota)
V2
)
const (
VersionState State = 0
)
func NewParser(ctx context.Context, handler Handler) *Parser {
return &Parser{
ctx: ctx,
conns: make(map[string]*Conn),
newConns: make(chan *Conn, 2), // Buffered to make testing easier
dataReady: make(chan struct{}, 2),
data: []byte{},
handler: handler,
}
}
type Handler interface {
WriteMessage(Addr, []byte)
}
// Write receives tunnel data and creates or writes to connections
func (p *Parser) Write(b []byte) (int, error) {
if len(b) < 1 {
return 0, errors.New("developer error: wrote 0 bytes")
}
// so that we can overwrite the main state
// as soon as a full message has completed
// but still keep the number of bytes written
if 0 == p.state.written {
p.written = 0
}
switch p.parseState {
case VersionState:
fmt.Println("version state", b[0])
p.state.version = b[0]
b = b[1:]
p.state.written += 1
p.parseState += 1
default:
// do nothing
}
switch p.state.version {
case V1:
fmt.Println("v1 unmarshal")
return p.written, p.unpackV1(b)
default:
return 0, errors.New("incorrect version or version not implemented")
}
}

View File

@ -0,0 +1,69 @@
package packer
import (
"context"
"fmt"
"net"
"strconv"
"testing"
"time"
)
type testHandler struct {
conns map[string]*Conn
chunksParsed int
bytesRead int
}
func (th *testHandler) WriteMessage(a Addr, b []byte) {
addr := &a
_, ok := th.conns[addr.Network()]
if !ok {
rconn, wconn := net.Pipe()
conn := &Conn{
updated: time.Now(),
relayRemoteAddr: *addr,
relay: rconn,
local: wconn,
}
th.conns[addr.Network()] = conn
}
th.chunksParsed += 1
th.bytesRead += len(b)
}
func TestParseWholeBlock(t *testing.T) {
ctx := context.Background()
//ctx, cancel := context.WithCancel(ctx)
th := &testHandler{
conns: map[string]*Conn{},
}
p := NewParser(ctx, th)
body := []byte(`Hello, World!`)
fmt.Println("payload len", len(body))
header := []byte("IPv4,192.168.1.101,6743," + strconv.Itoa(len(body)) + ",http,80,ex1.telebit.io,\n")
fmt.Println("header len", len(header))
raw := []byte{255 - 1, byte(len(header))}
raw = append(raw, header...)
raw = append(raw, body...)
fmt.Println("total len", len(raw))
n, err := p.Write(raw)
if nil != err {
t.Fatal(err)
}
if 1 != len(th.conns) {
t.Fatal("should have parsed one connection")
}
if 1 != th.chunksParsed {
t.Fatal("should have parsed one chunck")
}
if len(body) != th.bytesRead {
t.Fatalf("should have parsed a body of %d bytes, but saw %d\n", len(body), th.bytesRead)
}
if n != len(raw) {
t.Fatalf("should have parsed all %d bytes, not just %d\n", n, len(raw))
}
}

72
mplexer/packer/server.go Normal file
View File

@ -0,0 +1,72 @@
package packer
import (
"context"
"errors"
)
type Server struct {
ctx context.Context
newConns chan *Conn
data []byte
dataReady chan struct{}
}
func (s *Server) Accept() (*Conn, error) {
select {
case <-s.ctx.Done():
return nil, errors.New("TODO: ErrClosed")
case conn := <-s.newConns:
return conn, nil
}
}
// Read packs transforms local responses into wrapped data for the tunnel
func (s *Server) Read(b []byte) (int, error) {
select {
case <-s.ctx.Done():
return 0, errors.New("TODO: EOF / ErrClosed")
case <-s.dataReady:
if 0 == len(s.data) {
return s.Read(b)
}
return s.read(b)
}
}
func (s *Server) read(b []byte) (int, error) {
// TODO mutex data while reading, against writing?
c := len(b) // capacity
a := len(s.data) // available
n := c
// see if the available data is smaller than the receiving buffer
if a < c {
n = a
}
// copy available data up to capacity
for i := 0; i < n; i++ {
b[i] = s.data[i]
}
// shrink the data slice by amount read
s.data = s.data[n:]
// if there's data left over, flag as ready to read again
// otherwise... flag as ready to write?
if len(b) > 0 {
s.dataReady <- struct{}{}
} else {
//p.writeReady <- struct{}{}
}
// Note a read error should not be possible here
// as all traffic (including errors) can be wrapped
return n, nil
}
// Close (TODO) should politely close all connections, if possible (set Read() to io.EOF, or use ErrClosed?)
func (s *Server) Close() error {
return errors.New("not implemented")
}

196
mplexer/packer/v1.go Normal file
View File

@ -0,0 +1,196 @@
package packer
import (
"errors"
"fmt"
"strconv"
"strings"
)
const (
HeaderLengthState State = 1 + iota
HeaderState
PayloadState
)
const (
FamilyIndex int = iota
AddressIndex
PortIndex
LengthIndex
ServiceIndex
)
type Header struct {
Family string
Address string
Port string
Service string
}
func (p *Parser) unpackV1(b []byte) error {
z := 0
for {
if z > 10 {
panic("stuck in an infinite loop?")
}
z += 1
n := len(b)
// at least one loop
if z > 1 && n < 1 {
fmt.Println("v1 end", z, n)
break
}
var err error
switch p.parseState {
case HeaderLengthState:
fmt.Println("v1 h len")
b = p.unpackV1HeaderLength(b)
case HeaderState:
fmt.Println("v1 header")
b, err = p.unpackV1Header(b, n)
if nil != err {
fmt.Println("v1 header err", err)
return err
}
case PayloadState:
fmt.Println("v1 payload")
// if this payload is complete, reset all state
if p.state.payloadWritten == p.state.payloadLen {
p.state = ParserState{}
}
b, err = p.unpackV1Payload(b, n)
if nil != err {
return err
}
default:
// do nothing
return errors.New("error unpacking")
}
}
return nil
}
func (p *Parser) unpackV1HeaderLength(b []byte) []byte {
p.state.headerLen = int(b[0])
fmt.Println("unpacked header len", p.state.headerLen)
b = b[1:]
p.state.written += 1
p.parseState += 1
return b
}
func (p *Parser) unpackV1Header(b []byte, n int) ([]byte, error) {
fmt.Println("got", len(b), "bytes", string(b))
m := len(p.state.header)
k := p.state.headerLen - m
if n < k {
k = n
}
p.state.written += k
c := b[0:k]
b = b[k:]
fmt.Println("has", m, "want", k, "more and have", len(b), "more")
p.state.header = append(p.state.header, c...)
if p.state.headerLen != len(p.state.header) {
return b, nil
}
parts := strings.Split(string(p.state.header), ",")
p.state.header = nil
if len(parts) < 5 {
return nil, errors.New("error unpacking header")
}
payloadLenStr := parts[LengthIndex]
payloadLen, err := strconv.Atoi(payloadLenStr)
if nil != err {
return nil, errors.New("error unpacking header payload length")
}
p.state.payloadLen = payloadLen
port, _ := strconv.Atoi(parts[PortIndex])
service := parts[ServiceIndex]
if "control" == service {
return nil, errors.New("'control' messages not implemented")
}
addr := Addr{
family: parts[FamilyIndex],
addr: parts[AddressIndex],
port: port,
scheme: Scheme(service),
}
p.state.addr = addr
/*
p.state.conn = p.conns[addr.Network()]
if nil == p.state.conn {
rconn, wconn := net.Pipe()
conn := Conn{
updated: time.Now(),
relayRemoteAddr: addr,
relay: rconn,
local: wconn,
}
copied := conn
p.state.conn = &copied
p.conns[addr.Network()] = p.state.conn
p.newConns <- p.state.conn
}
*/
p.parseState += 1
return b, nil
}
func (p *Parser) unpackV1Payload(b []byte, n int) ([]byte, error) {
// Handle "connect" and "end"
if 0 == p.state.payloadLen {
/*
p.newMsg <- msg{
addr: Addr,
bytes: []byte{},
}
addr := &p.state.conn.relayRemoteAddr
if "end" == string(addr.scheme) {
if err := p.state.conn.Close(); nil != err {
// TODO log potential error?
}
}
return b, nil
*/
p.handler.WriteMessage(p.state.addr, []byte{})
return b, nil
}
k := p.state.payloadLen - p.state.payloadWritten
if k < n {
k = n
}
c := b[0:k]
b = b[k:]
// TODO don't let a write on one connection block others,
// and also put backpressure on just that connection
/*
m, err := p.state.conn.local.Write(c)
p.state.written += m
p.state.payloadWritten += m
if nil != err {
// TODO we want to surface this error somewhere, but not to the websocket
return b, nil
}
*/
p.handler.WriteMessage(p.state.addr, c)
p.state.written += k
p.state.payloadWritten += k
p.written = p.state.written
// if this payload is complete, reset all state
if p.state.payloadWritten == p.state.payloadLen {
p.state = ParserState{}
}
return b, nil
}

View File

@ -1,10 +1,10 @@
package mplexer
import (
"net"
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
)
type SortingHat interface {
LookupTarget(*Addr) (net.Conn, error)
LookupTarget(*packer.Addr) (*packer.Conn, error)
Authz() (string, error)
}