From 663caa5cc7f98b1dead1e36444c7b6db26e74600 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Wed, 29 Mar 2017 17:49:02 -0600 Subject: [PATCH 01/10] first commit of client hard coded values and fairly limited error handling --- go-rvpn-client/main.go | 28 +++++++++++++ rvpn/client/client.go | 61 ++++++++++++++++++++++++++++ rvpn/client/local_conns.go | 83 ++++++++++++++++++++++++++++++++++++++ rvpn/client/setup.go | 15 +++++++ 4 files changed, 187 insertions(+) create mode 100644 go-rvpn-client/main.go create mode 100644 rvpn/client/client.go create mode 100644 rvpn/client/local_conns.go create mode 100644 rvpn/client/setup.go diff --git a/go-rvpn-client/main.go b/go-rvpn-client/main.go new file mode 100644 index 0000000..03e4585 --- /dev/null +++ b/go-rvpn-client/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "git.daplie.com/Daplie/go-rvpn-server/rvpn/client" + jwt "github.com/dgrijalva/jwt-go" +) + +func main() { + tokenData := jwt.MapClaims{ + "domains": []string{ + "localhost.foo.daplie.me", + "localhost.bar.daplie.me", + }, + } + token := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData) + tokenStr, err := token.SignedString([]byte("abc123")) + if err != nil { + panic(err) + } + + config := client.Config{ + Server: "wss://localhost.daplie.me:9999", + Services: map[string]int{"https": 8443}, + Token: tokenStr, + Insecure: true, + } + panic(client.Run(&config)) +} diff --git a/rvpn/client/client.go b/rvpn/client/client.go new file mode 100644 index 0000000..c9e2450 --- /dev/null +++ b/rvpn/client/client.go @@ -0,0 +1,61 @@ +package client + +import ( + "crypto/tls" + "net/url" + + "fmt" + + "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" + "github.com/gorilla/websocket" +) + +type Config struct { + Server string + Token string + Services map[string]int + Insecure bool +} + +func Run(config *Config) error { + serverURL, err := url.Parse(config.Server) + if err != nil { + return fmt.Errorf("Invalid server URL: %v", err) + } + if serverURL.Scheme == "" { + serverURL.Scheme = "wss" + } + serverURL.Path = "" + + query := make(url.Values) + query.Set("access_token", config.Token) + serverURL.RawQuery = query.Encode() + + dialer := websocket.Dialer{} + if config.Insecure { + dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + + conn, _, err := dialer.Dial(serverURL.String(), nil) + if err != nil { + return fmt.Errorf("First connection to server failed - check auth: %v", err) + } + + localConns := newLocalConns(conn, config.Services) + for { + _, message, err := conn.ReadMessage() + if err != nil { + return fmt.Errorf("websocket read errored: %v", err) + } + + p, err := packer.ReadMessage(message) + if err != nil { + return fmt.Errorf("packer read failed: %v", err) + } + + err = localConns.Write(p) + if err != nil { + return fmt.Errorf("failed to write data: %v", err) + } + } +} diff --git a/rvpn/client/local_conns.go b/rvpn/client/local_conns.go new file mode 100644 index 0000000..d434449 --- /dev/null +++ b/rvpn/client/local_conns.go @@ -0,0 +1,83 @@ +package client + +import ( + "fmt" + "net" + "sync" + + "github.com/gorilla/websocket" + + "io" + + "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" +) + +type localConns struct { + lock sync.RWMutex + locals map[string]net.Conn + services map[string]int + remote *websocket.Conn +} + +func newLocalConns(remote *websocket.Conn, services map[string]int) *localConns { + l := new(localConns) + l.services = services + l.remote = remote + l.locals = make(map[string]net.Conn) + return l +} + +func (l *localConns) Write(p *packer.Packer) error { + l.lock.RLock() + defer l.lock.RUnlock() + + key := fmt.Sprintf("%s:%d", p.Header.Address(), p.Header.Port) + if conn := l.locals[key]; conn != nil { + _, err := conn.Write(p.Data.Data()) + return err + } + + go l.startConnection(p) + return nil +} + +func (l *localConns) startConnection(orig *packer.Packer) { + key := fmt.Sprintf("%s:%d", orig.Header.Address(), orig.Header.Port) + addr := fmt.Sprintf("127.0.0.1:%d", l.services[orig.Header.Service]) + conn, err := net.Dial("tcp", addr) + if err != nil { + loginfo.Println("failed to open connection to", addr, err) + return + } + loginfo.Println("opened connection to", addr, "with key", key) + defer loginfo.Println("finished connection to", addr, "with key", key) + + conn.Write(orig.Data.Data()) + + l.lock.Lock() + l.locals[key] = conn + l.lock.Unlock() + defer func() { + l.lock.Lock() + delete(l.locals, key) + l.lock.Unlock() + conn.Close() + }() + + buf := make([]byte, 4096) + for { + size, err := conn.Read(buf) + if err != nil { + if err != io.EOF { + loginfo.Println("failed to read from local connection to", addr, err) + } + return + } + + p := packer.NewPacker() + p.Header = orig.Header + p.Data.AppendBytes(buf[:size]) + packed := p.PackV1() + l.remote.WriteMessage(websocket.BinaryMessage, packed.Bytes()) + } +} diff --git a/rvpn/client/setup.go b/rvpn/client/setup.go new file mode 100644 index 0000000..e08f076 --- /dev/null +++ b/rvpn/client/setup.go @@ -0,0 +1,15 @@ +package client + +import ( + "log" + "os" +) + +const ( + logFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile +) + +var ( + loginfo = log.New(os.Stdout, "INFO: client: ", logFlags) + logdebug = log.New(os.Stdout, "DEBUG: client:", logFlags) +) From 99676ef4bfc1a2538f0b8fd08e06a6489674e1eb Mon Sep 17 00:00:00 2001 From: tigerbot Date: Thu, 30 Mar 2017 13:28:00 -0600 Subject: [PATCH 02/10] made it easier to send multiple messages with the same header --- rvpn/client/local_conns.go | 9 ++-- rvpn/genericlistener/listener_generic.go | 34 +++++++------ rvpn/packer/packer.go | 37 +++++++------- rvpn/packer/packer_data.go | 4 -- rvpn/packer/packer_header.go | 61 ++++++++++++++---------- 5 files changed, 75 insertions(+), 70 deletions(-) diff --git a/rvpn/client/local_conns.go b/rvpn/client/local_conns.go index d434449..704a98c 100644 --- a/rvpn/client/local_conns.go +++ b/rvpn/client/local_conns.go @@ -31,7 +31,7 @@ func (l *localConns) Write(p *packer.Packer) error { l.lock.RLock() defer l.lock.RUnlock() - key := fmt.Sprintf("%s:%d", p.Header.Address(), p.Header.Port) + key := fmt.Sprintf("%s:%d", p.Address(), p.Port()) if conn := l.locals[key]; conn != nil { _, err := conn.Write(p.Data.Data()) return err @@ -42,8 +42,8 @@ func (l *localConns) Write(p *packer.Packer) error { } func (l *localConns) startConnection(orig *packer.Packer) { - key := fmt.Sprintf("%s:%d", orig.Header.Address(), orig.Header.Port) - addr := fmt.Sprintf("127.0.0.1:%d", l.services[orig.Header.Service]) + key := fmt.Sprintf("%s:%d", orig.Address(), orig.Port()) + addr := fmt.Sprintf("127.0.0.1:%d", l.services[orig.Service()]) conn, err := net.Dial("tcp", addr) if err != nil { loginfo.Println("failed to open connection to", addr, err) @@ -74,8 +74,7 @@ func (l *localConns) startConnection(orig *packer.Packer) { return } - p := packer.NewPacker() - p.Header = orig.Header + p := packer.NewPacker(&orig.Header) p.Data.AppendBytes(buf[:size]) packed := p.PackV1() l.remote.WriteMessage(websocket.BinaryMessage, packed.Bytes()) diff --git a/rvpn/genericlistener/listener_generic.go b/rvpn/genericlistener/listener_generic.go index 4716a2b..d239951 100644 --- a/rvpn/genericlistener/listener_generic.go +++ b/rvpn/genericlistener/listener_generic.go @@ -292,11 +292,19 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname track := NewTrack(extConn, hostname) serverStatus.ExtConnectionRegister(track) - loginfo.Println("Domain Accepted", hostname, extConn.RemoteAddr().String()) + remoteStr := extConn.RemoteAddr().String() + loginfo.Println("Domain Accepted", hostname, remoteStr) - rAddr, rPort, err := net.SplitHostPort(extConn.RemoteAddr().String()) - if err != nil { - loginfo.Println("unable to decode hostport", extConn.RemoteAddr().String()) + var header *packer.Header + if rAddr, rPort, err := net.SplitHostPort(remoteStr); err != nil { + loginfo.Println("unable to decode hostport", remoteStr, err) + } else if port, err := strconv.Atoi(rPort); err != nil { + loginfo.Printf("unable to parse port string %q: %v\n", rPort, err) + } else if header, err = packer.NewHeader(rAddr, port, service); err != nil { + loginfo.Println("unable to create packer header", err) + } + + if header == nil { return } @@ -309,18 +317,8 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname loginfo.Println("Before Packer", hex.Dump(buffer)) - cnt := len(buffer) - - p := packer.NewPacker() - p.Header.SetAddress(rAddr) - p.Header.Port, err = strconv.Atoi(rPort) - if err != nil { - loginfo.Println("Unable to set Remote port", err) - return - } - - p.Header.Service = service - p.Data.AppendBytes(buffer[0:cnt]) + p := packer.NewPacker(header) + p.Data.AppendBytes(buffer) buf := p.PackV1() //loginfo.Println(hex.Dump(buf.Bytes())) @@ -329,8 +327,8 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname sendTrack := NewSendTrack(buf.Bytes(), hostname) serverStatus.SendExtRequest(conn, sendTrack) - _, err = extConn.Discard(cnt) - if err != nil { + cnt := len(buffer) + if _, err = extConn.Discard(cnt); err != nil { loginfo.Println("unable to discard", cnt, err) return } diff --git a/rvpn/packer/packer.go b/rvpn/packer/packer.go index 7f4af1b..3948bf2 100644 --- a/rvpn/packer/packer.go +++ b/rvpn/packer/packer.go @@ -16,16 +16,19 @@ const ( //Packer -- contains both header and data type Packer struct { - Header *packerHeader - Data *packerData + Header + Data packerData } -//NewPacker -- Structre -func NewPacker() (p *Packer) { - p = new(Packer) - p.Header = newPackerHeader() - p.Data = newPackerData() - return +// NewPacker creates a new Packer struct using the information from the provided header as +// its own header. (Because the header is stored directly and not as a pointer/reference +// it should be safe to override items like the service without affecting the template header.) +func NewPacker(header *Header) *Packer { + p := new(Packer) + if header != nil { + p.Header = *header + } + return p } func splitHeader(header []byte, names []string) (map[string]string, error) { @@ -48,7 +51,7 @@ func ReadMessage(b []byte) (*Packer, error) { // Detect protocol in use if b[0] == packerV1 { // Separate the header and body using the header length in the second byte. - p := NewPacker() + p := NewPacker(nil) header := b[2 : b[1]+2] data := b[b[1]+2:] @@ -69,8 +72,8 @@ func ReadMessage(b []byte) (*Packer, error) { p.Header.address = net.ParseIP(parts["address"]) if p.Header.address == nil { return nil, fmt.Errorf("Invalid network address %q", parts["address"]) - } else if p.Header.Family() == FamilyIPv4 && p.Header.address.To4() == nil { - return nil, fmt.Errorf("Address %q is not in address family %s", parts["address"], p.Header.FamilyText()) + } else if p.Header.family == FamilyIPv4 && p.Header.address.To4() == nil { + return nil, fmt.Errorf("Address %q is not in address family %s", parts["address"], p.Header.Family()) } //handle port @@ -79,7 +82,7 @@ func ReadMessage(b []byte) (*Packer, error) { } else if port <= 0 || port > 65535 { return nil, fmt.Errorf("Port %d out of range", port) } else { - p.Header.Port = port + p.Header.port = port } //handle data length @@ -90,7 +93,7 @@ func ReadMessage(b []byte) (*Packer, error) { } //handle Service - p.Header.Service = parts["service"] + p.Header.service = parts["service"] //handle payload p.Data.AppendBytes(data) @@ -103,11 +106,11 @@ func ReadMessage(b []byte) (*Packer, error) { //PackV1 -- Outputs version 1 of packer func (p *Packer) PackV1() bytes.Buffer { header := strings.Join([]string{ - p.Header.FamilyText(), - p.Header.AddressString(), - strconv.Itoa(p.Header.Port), + p.Header.Family(), + p.Header.Address(), + strconv.Itoa(p.Header.Port()), strconv.Itoa(p.Data.DataLen()), - p.Header.Service, + p.Header.Service(), }, ",") var buf bytes.Buffer diff --git a/rvpn/packer/packer_data.go b/rvpn/packer/packer_data.go index 08775ad..3dfbadc 100644 --- a/rvpn/packer/packer_data.go +++ b/rvpn/packer/packer_data.go @@ -9,10 +9,6 @@ type packerData struct { buffer bytes.Buffer } -func newPackerData() *packerData { - return new(packerData) -} - func (p *packerData) AppendString(dataString string) (int, error) { return p.buffer.WriteString(dataString) } diff --git a/rvpn/packer/packer_header.go b/rvpn/packer/packer_header.go index be7cbb2..668cec4 100644 --- a/rvpn/packer/packer_header.go +++ b/rvpn/packer/packer_header.go @@ -7,12 +7,15 @@ import ( type addressFamily int -// packerHeader structure to hold our header information. -type packerHeader struct { +// The Header struct holds most of the information contained in the header for packets +// between the client and the server (the length of the data is not included here). It +// is used to uniquely identify remote connections on the servers end and to communicate +// which service the remote client is trying to connect to. +type Header struct { family addressFamily address net.IP - Port int - Service string + port int + service string } //Family -- ENUM for Address Family @@ -26,16 +29,19 @@ var addressFamilyText = [...]string{ "IPv6", } -func newPackerHeader() (p *packerHeader) { - p = new(packerHeader) - p.SetAddress("127.0.0.1") - p.Port = 65535 - p.Service = "na" - return +// NewHeader create a new Header object. +func NewHeader(address string, port int, service string) (*Header, error) { + h := new(Header) + if err := h.setAddress(address); err != nil { + return nil, err + } + h.port = port + h.service = service + return h, nil } -//SetAddress -- Set Address. which sets address family automatically -func (p *packerHeader) SetAddress(addr string) { +// setAddress parses the provided address string and automatically sets the IP family. +func (p *Header) setAddress(addr string) error { p.address = net.ParseIP(addr) if p.address.To4() != nil { @@ -43,30 +49,33 @@ func (p *packerHeader) SetAddress(addr string) { } else if p.address.To16() != nil { p.family = FamilyIPv6 } else { - panic(fmt.Sprintf("setAddress does not support %q", addr)) + return fmt.Errorf("invalid IP address %q", addr) } + return nil } -func (p *packerHeader) AddressBytes() []byte { - if ip4 := p.address.To4(); ip4 != nil { - p.address = ip4 - } - - return []byte(p.address) +// Family returns the string corresponding to the address's IP family. +func (p *Header) Family() string { + return addressFamilyText[p.family] } -func (p *packerHeader) AddressString() string { +// Address returns the string form of the header's remote address. +func (p *Header) Address() string { return p.address.String() } -func (p *packerHeader) Address() net.IP { - return p.address +// Port returns the connected port of the remote connection. +func (p *Header) Port() int { + return p.port } -func (p *packerHeader) Family() addressFamily { - return p.family +// SetService overrides the header's original service. This is primarily useful +// for sending 'error' and 'end' messages. +func (p *Header) SetService(service string) { + p.service = service } -func (p *packerHeader) FamilyText() string { - return addressFamilyText[p.family] +// Service returns the service stored in the header. +func (p *Header) Service() string { + return p.service } From c80c87c6677beb34dc5ed71a51ce5931e8334a18 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Thu, 30 Mar 2017 18:04:28 -0600 Subject: [PATCH 03/10] changed the routine structure of the client --- go-rvpn-client/main.go | 7 +- rvpn/client/client.go | 28 ++---- rvpn/client/local_conns.go | 82 --------------- rvpn/client/ws_handler.go | 199 +++++++++++++++++++++++++++++++++++++ 4 files changed, 212 insertions(+), 104 deletions(-) delete mode 100644 rvpn/client/local_conns.go create mode 100644 rvpn/client/ws_handler.go diff --git a/go-rvpn-client/main.go b/go-rvpn-client/main.go index 03e4585..58a2a65 100644 --- a/go-rvpn-client/main.go +++ b/go-rvpn-client/main.go @@ -1,6 +1,8 @@ package main import ( + "context" + "git.daplie.com/Daplie/go-rvpn-server/rvpn/client" jwt "github.com/dgrijalva/jwt-go" ) @@ -18,11 +20,14 @@ func main() { panic(err) } + ctx, quit := context.WithCancel(context.Background()) + defer quit() + config := client.Config{ Server: "wss://localhost.daplie.me:9999", Services: map[string]int{"https": 8443}, Token: tokenStr, Insecure: true, } - panic(client.Run(&config)) + panic(client.Run(ctx, &config)) } diff --git a/rvpn/client/client.go b/rvpn/client/client.go index c9e2450..a0821f5 100644 --- a/rvpn/client/client.go +++ b/rvpn/client/client.go @@ -1,12 +1,11 @@ package client import ( + "context" "crypto/tls" + "fmt" "net/url" - "fmt" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" "github.com/gorilla/websocket" ) @@ -17,7 +16,7 @@ type Config struct { Insecure bool } -func Run(config *Config) error { +func Run(ctx context.Context, config *Config) error { serverURL, err := url.Parse(config.Server) if err != nil { return fmt.Errorf("Invalid server URL: %v", err) @@ -36,26 +35,13 @@ func Run(config *Config) error { dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } + handler := NewWsHandler(config.Services) + conn, _, err := dialer.Dial(serverURL.String(), nil) if err != nil { return fmt.Errorf("First connection to server failed - check auth: %v", err) } - localConns := newLocalConns(conn, config.Services) - for { - _, message, err := conn.ReadMessage() - if err != nil { - return fmt.Errorf("websocket read errored: %v", err) - } - - p, err := packer.ReadMessage(message) - if err != nil { - return fmt.Errorf("packer read failed: %v", err) - } - - err = localConns.Write(p) - if err != nil { - return fmt.Errorf("failed to write data: %v", err) - } - } + handler.HandleConn(ctx, conn) + return nil } diff --git a/rvpn/client/local_conns.go b/rvpn/client/local_conns.go deleted file mode 100644 index 704a98c..0000000 --- a/rvpn/client/local_conns.go +++ /dev/null @@ -1,82 +0,0 @@ -package client - -import ( - "fmt" - "net" - "sync" - - "github.com/gorilla/websocket" - - "io" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" -) - -type localConns struct { - lock sync.RWMutex - locals map[string]net.Conn - services map[string]int - remote *websocket.Conn -} - -func newLocalConns(remote *websocket.Conn, services map[string]int) *localConns { - l := new(localConns) - l.services = services - l.remote = remote - l.locals = make(map[string]net.Conn) - return l -} - -func (l *localConns) Write(p *packer.Packer) error { - l.lock.RLock() - defer l.lock.RUnlock() - - key := fmt.Sprintf("%s:%d", p.Address(), p.Port()) - if conn := l.locals[key]; conn != nil { - _, err := conn.Write(p.Data.Data()) - return err - } - - go l.startConnection(p) - return nil -} - -func (l *localConns) startConnection(orig *packer.Packer) { - key := fmt.Sprintf("%s:%d", orig.Address(), orig.Port()) - addr := fmt.Sprintf("127.0.0.1:%d", l.services[orig.Service()]) - conn, err := net.Dial("tcp", addr) - if err != nil { - loginfo.Println("failed to open connection to", addr, err) - return - } - loginfo.Println("opened connection to", addr, "with key", key) - defer loginfo.Println("finished connection to", addr, "with key", key) - - conn.Write(orig.Data.Data()) - - l.lock.Lock() - l.locals[key] = conn - l.lock.Unlock() - defer func() { - l.lock.Lock() - delete(l.locals, key) - l.lock.Unlock() - conn.Close() - }() - - buf := make([]byte, 4096) - for { - size, err := conn.Read(buf) - if err != nil { - if err != io.EOF { - loginfo.Println("failed to read from local connection to", addr, err) - } - return - } - - p := packer.NewPacker(&orig.Header) - p.Data.AppendBytes(buf[:size]) - packed := p.PackV1() - l.remote.WriteMessage(websocket.BinaryMessage, packed.Bytes()) - } -} diff --git a/rvpn/client/ws_handler.go b/rvpn/client/ws_handler.go new file mode 100644 index 0000000..be3e59b --- /dev/null +++ b/rvpn/client/ws_handler.go @@ -0,0 +1,199 @@ +package client + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/gorilla/websocket" + + "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" +) + +// WsHandler handles all of reading and writing for the websocket connection to the RVPN server +// and the TCP connections to the local servers. +type WsHandler struct { + lock sync.Mutex + localConns map[string]net.Conn + + servicePorts map[string]int + + ctx context.Context + dataChan chan *packer.Packer +} + +// NewWsHandler creates a new handler ready to be given a websocket connection. The services +// argument specifies what port each service type should be directed to on the local interface. +func NewWsHandler(services map[string]int) *WsHandler { + h := new(WsHandler) + h.servicePorts = services + h.localConns = make(map[string]net.Conn) + return h +} + +// HandleConn handles all of the traffic on the provided websocket connection. The function +// will not return until the connection ends. +// +// The WsHandler is designed to handle exactly one connection at a time. If HandleConn is called +// again while the instance is still handling another connection (or if the previous connection +// failed to fully cleanup) calling HandleConn again will panic. +func (h *WsHandler) HandleConn(ctx context.Context, conn *websocket.Conn) { + if h.dataChan != nil { + panic("WsHandler.HandleConn called while handling a previous connection") + } + if len(h.localConns) > 0 { + panic(fmt.Sprintf("WsHandler has lingering local connections: %v", h.localConns)) + } + h.dataChan = make(chan *packer.Packer) + + // The sub context allows us to clean up all of the goroutines associated with this websocket + // if it closes at any point for any reason. + subCtx, socketQuit := context.WithCancel(ctx) + defer socketQuit() + h.ctx = subCtx + + // Start the routine that will write all of the data from the local connection to the + // remote websocket connection. + go h.writeRemote(conn) + + for { + _, message, err := conn.ReadMessage() + if err != nil { + loginfo.Println("failed to read message from websocket", err) + return + } + + p, err := packer.ReadMessage(message) + if err != nil { + loginfo.Println("failed to parse message from websocket", err) + return + } + + h.writeLocal(p) + } +} + +func (h *WsHandler) writeRemote(conn *websocket.Conn) { + defer h.closeConnections() + defer func() { h.dataChan = nil }() + + for { + select { + case <-h.ctx.Done(): + // We can't tell if this happened because the websocket is already closed/errored or + // if it happened because the main context closed (in which case it would be preferable + // to properly close the connection). As such we try to close the connection and ignore + // all errors if it doesn't work. + message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection") + deadline := time.Now().Add(10 * time.Second) + conn.WriteControl(websocket.CloseMessage, message, deadline) + conn.Close() + return + + case p := <-h.dataChan: + packed := p.PackV1() + conn.WriteMessage(websocket.BinaryMessage, packed.Bytes()) + } + } +} + +func (h *WsHandler) sendSpecial(header *packer.Header, service string) { + p := packer.NewPacker(header) + p.SetService(service) + + // Avoid blocking on the data channel if the websocket is already closed + select { + case h.dataChan <- p: + case <-h.ctx.Done(): + } +} + +func (h *WsHandler) getLocalConn(p *packer.Packer) net.Conn { + h.lock.Lock() + defer h.lock.Unlock() + + key := fmt.Sprintf("%s:%d", p.Address(), p.Port()) + // Simplest case: it's already open, just return it. + if conn := h.localConns[key]; conn != nil { + return conn + } + + port := h.servicePorts[p.Service()] + if port == 0 { + loginfo.Println("cannot open connection for invalid service", p.Service()) + return nil + } + + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + loginfo.Println("unable to open local connection on port", port, err) + return nil + } + + loginfo.Println("opened new connection to port", port, "for", key) + h.localConns[key] = conn + go h.readLocal(key, &p.Header) + return conn +} + +func (h *WsHandler) writeLocal(p *packer.Packer) { + conn := h.getLocalConn(p) + if conn == nil { + h.sendSpecial(&p.Header, "error") + return + } + + if p.Service() == "error" || p.Service() == "end" { + conn.Close() + return + } + + if _, err := conn.Write(p.Data.Data()); err != nil { + h.sendSpecial(&p.Header, "error") + loginfo.Println("failed to write to local connection", err) + } +} + +func (h *WsHandler) readLocal(key string, header *packer.Header) { + h.lock.Lock() + conn := h.localConns[key] + h.lock.Unlock() + + defer conn.Close() + defer func() { + h.lock.Lock() + delete(h.localConns, key) + h.lock.Unlock() + }() + defer loginfo.Println("finished with client", key) + + buf := make([]byte, 4096) + for { + size, err := conn.Read(buf) + if err != nil { + if err == io.EOF { + h.sendSpecial(header, "end") + } else { + loginfo.Println("failed to read from local connection for", key, err) + h.sendSpecial(header, "error") + } + return + } + + p := packer.NewPacker(header) + p.Data.AppendBytes(buf[:size]) + h.dataChan <- p + } +} + +func (h *WsHandler) closeConnections() { + h.lock.Lock() + defer h.lock.Unlock() + + for _, conn := range h.localConns { + conn.Close() + } +} From fbed26d94b4a6d3cf88b4dcc6d9d4c6fa2ee89da Mon Sep 17 00:00:00 2001 From: tigerbot Date: Thu, 30 Mar 2017 18:13:27 -0600 Subject: [PATCH 04/10] changed client to retry after disconnect --- rvpn/client/client.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/rvpn/client/client.go b/rvpn/client/client.go index a0821f5..e98660e 100644 --- a/rvpn/client/client.go +++ b/rvpn/client/client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "net/url" + "time" "github.com/gorilla/websocket" ) @@ -37,11 +38,22 @@ func Run(ctx context.Context, config *Config) error { handler := NewWsHandler(config.Services) - conn, _, err := dialer.Dial(serverURL.String(), nil) - if err != nil { - return fmt.Errorf("First connection to server failed - check auth: %v", err) - } + authenticated := false + for { + if conn, _, err := dialer.Dial(serverURL.String(), nil); err == nil { + authenticated = true + handler.HandleConn(ctx, conn) + } else if !authenticated { + return fmt.Errorf("First connection to server failed - check auth: %v", err) + } + loginfo.Println("disconnected from remote server") - handler.HandleConn(ctx, conn) - return nil + // Sleep for a few seconds before trying again, but only if the context is still active + select { + case <-ctx.Done(): + return nil + case <-time.After(5 * time.Second): + } + loginfo.Println("attempting reconnect to remote server") + } } From 3848be53bd4252274564b8423600c791fdf6b543 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Mon, 3 Apr 2017 15:51:44 -0600 Subject: [PATCH 05/10] tweaked behavior after local connection read --- rvpn/client/ws_handler.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/rvpn/client/ws_handler.go b/rvpn/client/ws_handler.go index be3e59b..212970d 100644 --- a/rvpn/client/ws_handler.go +++ b/rvpn/client/ws_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "time" @@ -100,11 +101,16 @@ func (h *WsHandler) writeRemote(conn *websocket.Conn) { } } -func (h *WsHandler) sendSpecial(header *packer.Header, service string) { +func (h *WsHandler) sendPackedMessage(header *packer.Header, data []byte, service string) { p := packer.NewPacker(header) - p.SetService(service) + if len(data) > 0 { + p.Data.AppendBytes(data) + } + if service != "" { + p.SetService(service) + } - // Avoid blocking on the data channel if the websocket is already closed + // Avoid blocking on the data channel if the websocket closes or is already closed select { case h.dataChan <- p: case <-h.ctx.Done(): @@ -142,7 +148,7 @@ func (h *WsHandler) getLocalConn(p *packer.Packer) net.Conn { func (h *WsHandler) writeLocal(p *packer.Packer) { conn := h.getLocalConn(p) if conn == nil { - h.sendSpecial(&p.Header, "error") + h.sendPackedMessage(&p.Header, nil, "error") return } @@ -152,7 +158,7 @@ func (h *WsHandler) writeLocal(p *packer.Packer) { } if _, err := conn.Write(p.Data.Data()); err != nil { - h.sendSpecial(&p.Header, "error") + h.sendPackedMessage(&p.Header, nil, "error") loginfo.Println("failed to write to local connection", err) } } @@ -174,18 +180,16 @@ func (h *WsHandler) readLocal(key string, header *packer.Header) { for { size, err := conn.Read(buf) if err != nil { - if err == io.EOF { - h.sendSpecial(header, "end") + if err == io.EOF || strings.Contains(err.Error(), "use of closed network connection") { + h.sendPackedMessage(header, nil, "end") } else { loginfo.Println("failed to read from local connection for", key, err) - h.sendSpecial(header, "error") + h.sendPackedMessage(header, nil, "error") } return } - p := packer.NewPacker(header) - p.Data.AppendBytes(buf[:size]) - h.dataChan <- p + h.sendPackedMessage(header, buf[:size], "") } } From b44fe740af11ae2405457267910c7aeea060db9a Mon Sep 17 00:00:00 2001 From: tigerbot Date: Mon, 3 Apr 2017 17:37:29 -0600 Subject: [PATCH 06/10] move SNI to separate package for both client and server --- rvpn/server/listener_generic.go | 3 ++- rvpn/{server/tls_get_hello.go => sni/tls_get_hostname.go} | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) rename rvpn/{server/tls_get_hello.go => sni/tls_get_hostname.go} (91%) diff --git a/rvpn/server/listener_generic.go b/rvpn/server/listener_generic.go index ad56bdc..0f68e52 100644 --- a/rvpn/server/listener_generic.go +++ b/rvpn/server/listener_generic.go @@ -18,6 +18,7 @@ import ( "github.com/gorilla/websocket" "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" + "git.daplie.com/Daplie/go-rvpn-server/rvpn/sni" ) type contextKey string @@ -160,7 +161,7 @@ func handleConnection(ctx context.Context, wConn *WedgeConn) { wssHostName := ctx.Value(ctxWssHostName).(string) adminHostName := ctx.Value(ctxAdminHostName).(string) - sniHostName, err := getHello(peek) + sniHostName, err := sni.GetHostname(peek) if err != nil { loginfo.Println(err) return diff --git a/rvpn/server/tls_get_hello.go b/rvpn/sni/tls_get_hostname.go similarity index 91% rename from rvpn/server/tls_get_hello.go rename to rvpn/sni/tls_get_hostname.go index 8aa78c4..606fdf7 100644 --- a/rvpn/server/tls_get_hello.go +++ b/rvpn/sni/tls_get_hostname.go @@ -1,10 +1,11 @@ -package server +package sni import ( "errors" ) -func getHello(b []byte) (string, error) { +// GetHostname uses SNI to determine the intended target of a new TLS connection. +func GetHostname(b []byte) (string, error) { rest := b[5:] current := 0 handshakeType := rest[0] From c1c18b71b0d2230b4d181ab21ef6e11c5b626640 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Mon, 3 Apr 2017 17:56:28 -0600 Subject: [PATCH 07/10] added support for different port w/ different domains on same service --- go-rvpn-client/main.go | 13 ++++++++++-- rvpn/client/client.go | 14 ++++++++++++- rvpn/client/ws_handler.go | 42 +++++++++++++++++++++++++++++++++------ 3 files changed, 60 insertions(+), 9 deletions(-) diff --git a/go-rvpn-client/main.go b/go-rvpn-client/main.go index 58a2a65..63aba8a 100644 --- a/go-rvpn-client/main.go +++ b/go-rvpn-client/main.go @@ -24,8 +24,17 @@ func main() { defer quit() config := client.Config{ - Server: "wss://localhost.daplie.me:9999", - Services: map[string]int{"https": 8443}, + Server: "wss://localhost.daplie.me:9999", + Services: map[string]map[string]int{ + "https": map[string]int{ + "*": 8443, + "localhost.foo.daplie.me": 4443, + }, + "http": map[string]int{ + "*": 8443, + "localhost.foo.daplie.me": 4443, + }, + }, Token: tokenStr, Insecure: true, } diff --git a/rvpn/client/client.go b/rvpn/client/client.go index e98660e..a903305 100644 --- a/rvpn/client/client.go +++ b/rvpn/client/client.go @@ -10,13 +10,19 @@ import ( "github.com/gorilla/websocket" ) +// The Config struct holds all of the information needed to establish and handle a connection +// with the RVPN server. type Config struct { Server string Token string - Services map[string]int Insecure bool + Services map[string]map[string]int } +// Run establishes a connection with the RVPN server specified in the config. If the first attempt +// to connect fails it is assumed that something is wrong with the authentication and it will +// return an error. Otherwise it will continuously attempt to reconnect whenever the connection +// is broken. func Run(ctx context.Context, config *Config) error { serverURL, err := url.Parse(config.Server) if err != nil { @@ -36,11 +42,17 @@ func Run(ctx context.Context, config *Config) error { dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } + for name, portList := range config.Services { + if _, ok := portList["*"]; !ok { + return fmt.Errorf(`service %s missing port for "*"`, name) + } + } handler := NewWsHandler(config.Services) authenticated := false for { if conn, _, err := dialer.Dial(serverURL.String(), nil); err == nil { + loginfo.Println("connected to remote server") authenticated = true handler.HandleConn(ctx, conn) } else if !authenticated { diff --git a/rvpn/client/ws_handler.go b/rvpn/client/ws_handler.go index 212970d..7501d88 100644 --- a/rvpn/client/ws_handler.go +++ b/rvpn/client/ws_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "regexp" "strings" "sync" "time" @@ -12,15 +13,18 @@ import ( "github.com/gorilla/websocket" "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" + "git.daplie.com/Daplie/go-rvpn-server/rvpn/sni" ) +var hostRegexp = regexp.MustCompile(`(?im)(?:^|[\r\n])Host: *([^\r\n]+)[\r\n]`) + // WsHandler handles all of reading and writing for the websocket connection to the RVPN server // and the TCP connections to the local servers. type WsHandler struct { lock sync.Mutex localConns map[string]net.Conn - servicePorts map[string]int + servicePorts map[string]map[string]int ctx context.Context dataChan chan *packer.Packer @@ -28,7 +32,7 @@ type WsHandler struct { // NewWsHandler creates a new handler ready to be given a websocket connection. The services // argument specifies what port each service type should be directed to on the local interface. -func NewWsHandler(services map[string]int) *WsHandler { +func NewWsHandler(services map[string]map[string]int) *WsHandler { h := new(WsHandler) h.servicePorts = services h.localConns = make(map[string]net.Conn) @@ -127,9 +131,35 @@ func (h *WsHandler) getLocalConn(p *packer.Packer) net.Conn { return conn } - port := h.servicePorts[p.Service()] + service := strings.ToLower(p.Service()) + portList := h.servicePorts[service] + if portList == nil { + loginfo.Println("cannot open connection for invalid service", service) + return nil + } + + var hostname string + if service == "http" { + if match := hostRegexp.FindSubmatch(p.Data.Data()); match != nil { + hostname = strings.Split(string(match[1]), ":")[0] + } + } else if service == "https" { + hostname, _ = sni.GetHostname(p.Data.Data()) + } else { + hostname = "*" + } + if hostname == "" { + loginfo.Println("missing servername for", service, key) + return nil + } + hostname = strings.ToLower(hostname) + + port := portList[hostname] if port == 0 { - loginfo.Println("cannot open connection for invalid service", p.Service()) + port = portList["*"] + } + if port == 0 { + loginfo.Println("unable to determine local port for", service, hostname) return nil } @@ -139,8 +169,8 @@ func (h *WsHandler) getLocalConn(p *packer.Packer) net.Conn { return nil } - loginfo.Println("opened new connection to port", port, "for", key) h.localConns[key] = conn + loginfo.Printf("new client %q for %s:%d (%d clients)\n", key, hostname, port, len(h.localConns)) go h.readLocal(key, &p.Header) return conn } @@ -172,9 +202,9 @@ func (h *WsHandler) readLocal(key string, header *packer.Header) { defer func() { h.lock.Lock() delete(h.localConns, key) + loginfo.Printf("closing client %q: (%d clients)\n", key, len(h.localConns)) h.lock.Unlock() }() - defer loginfo.Println("finished with client", key) buf := make([]byte, 4096) for { From c024c1b88f7906385a193a011c2f98d2258992d9 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Tue, 4 Apr 2017 12:09:20 -0600 Subject: [PATCH 08/10] added cmd line arguments to the client --- go-rvpn-client/main.go | 214 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 190 insertions(+), 24 deletions(-) diff --git a/go-rvpn-client/main.go b/go-rvpn-client/main.go index 63aba8a..9ab43bc 100644 --- a/go-rvpn-client/main.go +++ b/go-rvpn-client/main.go @@ -2,41 +2,207 @@ package main import ( "context" + "fmt" + "regexp" + "strconv" + "strings" + + jwt "github.com/dgrijalva/jwt-go" + flag "github.com/spf13/pflag" + "github.com/spf13/viper" "git.daplie.com/Daplie/go-rvpn-server/rvpn/client" - jwt "github.com/dgrijalva/jwt-go" ) -func main() { - tokenData := jwt.MapClaims{ - "domains": []string{ - "localhost.foo.daplie.me", - "localhost.bar.daplie.me", - }, +var httpRegexp = regexp.MustCompile(`(?i)^http`) + +func init() { + flag.StringSlice("locals", []string{}, "comma separated list of : or "+ + ":: to which matching incoming connections should forward. "+ + "Ex: smtps:8465,https:example.com:8443") + flag.StringSlice("domains", []string{}, "comma separated list of domain names to set to the tunnel") + viper.BindPFlag("locals", flag.Lookup("locals")) + viper.BindPFlag("domains", flag.Lookup("domains")) + + flag.BoolP("insecure", "k", false, "Allow TLS connections to stunneld without valid certs") + flag.String("stunneld", "", "the domain (or ip address) at which the RVPN server is running") + flag.String("secret", "", "the same secret used by stunneld (used for JWT authentication)") + flag.String("token", "", "a pre-generated token to give the server (instead of generating one with --secret)") + viper.BindPFlag("raw.insecure", flag.Lookup("insecure")) + viper.BindPFlag("raw.stunneld", flag.Lookup("stunneld")) + viper.BindPFlag("raw.secret", flag.Lookup("secret")) + viper.BindPFlag("raw.token", flag.Lookup("token")) +} + +type proxy struct { + protocol string + hostname string + port int +} + +func addLocals(proxies []proxy, location string) []proxy { + parts := strings.Split(location, ":") + if len(parts) > 3 { + panic(fmt.Sprintf("provided invalid location %q", location)) } - token := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData) - tokenStr, err := token.SignedString([]byte("abc123")) - if err != nil { - panic(err) + + // If all that was provided as a "local" is the domain name we assume that domain + // has HTTP and HTTPS handlers on the default ports. + if len(parts) == 1 { + proxies = append(proxies, proxy{"http", parts[0], 80}) + proxies = append(proxies, proxy{"https", parts[0], 443}) + return proxies + } + + // Make everything lower case and trim any slashes in something like https://john.example.com + parts[0] = strings.ToLower(parts[0]) + parts[1] = strings.ToLower(strings.Trim(parts[1], "/")) + + if len(parts) == 2 { + if strings.Contains(parts[1], ".") { + if parts[0] == "http" { + parts = append(parts, "80") + } else if parts[0] == "https" { + parts = append(parts, "443") + } else { + panic(fmt.Sprintf("port must be specified for %q", location)) + } + } else { + // https:3443 -> https:*:3443 + parts = []string{parts[0], "*", parts[1]} + } + } + + if port, err := strconv.Atoi(parts[2]); err != nil { + panic(fmt.Sprintf("port must be a valid number, not %q: %v", parts[2], err)) + } else if port <= 0 || port > 65535 { + panic(fmt.Sprintf("%d is an invalid port for local services", port)) + } else { + proxies = append(proxies, proxy{parts[0], parts[1], port}) + } + return proxies +} + +func addDomains(proxies []proxy, location string) []proxy { + parts := strings.Split(location, ":") + if len(parts) > 3 { + panic(fmt.Sprintf("provided invalid location %q", location)) + } else if len(parts) == 2 { + panic("invalid argument for --domains, use format or ::") + } + + // If the scheme and port weren't provided use the zero values + if len(parts) == 1 { + return append(proxies, proxy{"", parts[0], 0}) + } + + if port, err := strconv.Atoi(parts[2]); err != nil { + panic(fmt.Sprintf("port must be a valid number, not %q: %v", parts[2], err)) + } else if port <= 0 || port > 65535 { + panic(fmt.Sprintf("%d is an invalid port for local services", port)) + } else { + proxies = append(proxies, proxy{parts[0], parts[1], port}) + } + return proxies +} + +func extractServicePorts(proxies []proxy) map[string]map[string]int { + result := make(map[string]map[string]int, 2) + + for _, p := range proxies { + if p.protocol != "" && p.port != 0 { + hostPorts := result[p.protocol] + if hostPorts == nil { + result[p.protocol] = make(map[string]int) + hostPorts = result[p.protocol] + } + + // Only HTTP and HTTPS allow us to determine the hostname from the request, so only + // those protocols support different ports for the same service. + if !httpRegexp.MatchString(p.protocol) || p.hostname == "" { + p.hostname = "*" + } + if port, ok := hostPorts[p.hostname]; ok && port != p.port { + panic(fmt.Sprintf("duplicate ports for %s://%s", p.protocol, p.hostname)) + } + hostPorts[p.hostname] = p.port + } + } + + // Make sure we have defaults for HTTPS and HTTP. + if result["https"] == nil { + result["https"] = make(map[string]int, 1) + } + if result["https"]["*"] == 0 { + result["https"]["*"] = 8443 + } + + if result["http"] == nil { + result["http"] = make(map[string]int, 1) + } + if result["http"]["*"] == 0 { + result["http"]["*"] = result["https"]["*"] + } + + return result +} + +func main() { + flag.Parse() + + proxies := make([]proxy, 0) + for _, option := range viper.GetStringSlice("locals") { + for _, location := range strings.Split(option, ",") { + proxies = addLocals(proxies, location) + } + } + for _, option := range viper.GetStringSlice("domains") { + for _, location := range strings.Split(option, ",") { + proxies = addDomains(proxies, location) + } + } + + servicePorts := extractServicePorts(proxies) + domainMap := make(map[string]bool) + for _, p := range proxies { + if p.hostname != "" && p.hostname != "*" { + domainMap[p.hostname] = true + } + } + + if viper.GetString("raw.stunneld") == "" { + panic("must provide remote RVPN server to connect to") + } + + var token string + if viper.GetString("raw.token") != "" { + token = viper.GetString("raw.token") + } else if viper.GetString("raw.secret") != "" { + domains := make([]string, 0, len(domainMap)) + for name := range domainMap { + domains = append(domains, name) + } + tokenData := jwt.MapClaims{"domains": domains} + + secret := []byte(viper.GetString("raw.secret")) + jwtToken := jwt.NewWithClaims(jwt.SigningMethodHS256, tokenData) + if tokenStr, err := jwtToken.SignedString(secret); err != nil { + panic(err) + } else { + token = tokenStr + } + } else { + panic("must provide either token or secret") } ctx, quit := context.WithCancel(context.Background()) defer quit() config := client.Config{ - Server: "wss://localhost.daplie.me:9999", - Services: map[string]map[string]int{ - "https": map[string]int{ - "*": 8443, - "localhost.foo.daplie.me": 4443, - }, - "http": map[string]int{ - "*": 8443, - "localhost.foo.daplie.me": 4443, - }, - }, - Token: tokenStr, - Insecure: true, + Insecure: viper.GetBool("raw.insecure"), + Server: viper.GetString("raw.stunneld"), + Services: servicePorts, + Token: token, } panic(client.Run(ctx, &config)) } From 3e30137b36662b9ac474a2aec741a9e4a054cba0 Mon Sep 17 00:00:00 2001 From: tigerbot Date: Mon, 17 Apr 2017 14:57:32 -0600 Subject: [PATCH 09/10] fixed bug masked by changes I opted not to commit --- rvpn/server/connection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rvpn/server/connection.go b/rvpn/server/connection.go index a2ab6f2..7048870 100755 --- a/rvpn/server/connection.go +++ b/rvpn/server/connection.go @@ -2,8 +2,8 @@ package server import ( "context" + "fmt" "io" - "strconv" "sync" "time" @@ -252,7 +252,7 @@ func (c *Connection) Reader(ctx context.Context) { // unpack the message. p, err := packer.ReadMessage(message) - key := p.Header.Address().String() + ":" + strconv.Itoa(p.Header.Port) + key := fmt.Sprintf("%s:%d", p.Address(), p.Port()) track, err := connectionTrack.Lookup(key) //loginfo.Println(hex.Dump(p.Data.Data())) From 76b5e1b102e417770a72b450eb9f77603ab8ccab Mon Sep 17 00:00:00 2001 From: tigerbot Date: Mon, 17 Apr 2017 14:59:45 -0600 Subject: [PATCH 10/10] fixed some go formatting problems --- ...lect_status_dead time.go => api_collect_status_dead_time.go} | 0 rvpn/server/manager.go | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename rvpn/server/{api_collect_status_dead time.go => api_collect_status_dead_time.go} (100%) diff --git a/rvpn/server/api_collect_status_dead time.go b/rvpn/server/api_collect_status_dead_time.go similarity index 100% rename from rvpn/server/api_collect_status_dead time.go rename to rvpn/server/api_collect_status_dead_time.go diff --git a/rvpn/server/manager.go b/rvpn/server/manager.go index 17a6b20..f3033f9 100644 --- a/rvpn/server/manager.go +++ b/rvpn/server/manager.go @@ -52,7 +52,7 @@ type servers struct { secretKey string certbundle tls.Certificate register chan *ListenerRegistration - servers *servers + servers *servers wssHostName string adminHostName string cancelCheck int