From 8f2e4f58c080534d9e86a6b135108fb088a54ff5 Mon Sep 17 00:00:00 2001 From: Henry Camacho Date: Thu, 2 Mar 2017 18:47:59 -0600 Subject: [PATCH] Working version of RVPN - got too cute with the package names, needed to bring everything into one package, except for packer. - system is passing traffic now, ran a load test generating 1000 connections, seems ok. - removed a lot of message logging since traffic is passing. --- README.md | 1 + rvpn/admin/listener_admin.go | 60 ---------------- rvpn/connectiontrack/setup.go | 17 +++++ rvpn/genericlistener/conn_tracking.go | 68 +++++++++++++++++++ rvpn/genericlistener/conn_wedge.go | 3 - .../connection.go | 35 ++++++---- .../connection_registration.go | 7 +- .../connection_table.go | 13 ++-- rvpn/{admin => genericlistener}/domain_api.go | 2 +- .../domain_mapping.go | 2 +- .../domain_track.go | 2 +- .../genericlistener/handle_wss_client.go.dead | 3 - rvpn/genericlistener/listener_admin.go.dead | 1 + rvpn/genericlistener/listener_generic.go | 47 +++++-------- rvpn/genericlistener/manager.go | 27 +++++--- .../send_track.go | 2 +- rvpn/{admin => genericlistener}/server_api.go | 6 +- rvpn/packer/packer.go | 7 +- rvpn/packer/packer_data.go | 10 ++- rvpn/rvpnmain/run.go | 14 ++-- 20 files changed, 177 insertions(+), 150 deletions(-) delete mode 100644 rvpn/admin/listener_admin.go create mode 100644 rvpn/connectiontrack/setup.go create mode 100644 rvpn/genericlistener/conn_tracking.go rename rvpn/{connection => genericlistener}/connection.go (90%) rename rvpn/{connection => genericlistener}/connection_registration.go (83%) rename rvpn/{connection => genericlistener}/connection_table.go (92%) rename rvpn/{admin => genericlistener}/domain_api.go (96%) rename rvpn/{connection => genericlistener}/domain_mapping.go (95%) rename rvpn/{connection => genericlistener}/domain_track.go (96%) create mode 100644 rvpn/genericlistener/listener_admin.go.dead rename rvpn/{connection => genericlistener}/send_track.go (92%) rename rvpn/{admin => genericlistener}/server_api.go (88%) diff --git a/README.md b/README.md index 03f376b..551eea8 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ Once the lister is fired up, I sends back a regisration status to the manager al ```bash hcamacho@Hanks-MBP:go-rvpn-server $ go get +hcamacho@Hanks-MBP:go-rvpn-server $ go build ``` diff --git a/rvpn/admin/listener_admin.go b/rvpn/admin/listener_admin.go deleted file mode 100644 index eaef4e3..0000000 --- a/rvpn/admin/listener_admin.go +++ /dev/null @@ -1,60 +0,0 @@ -package admin - -import ( - "encoding/json" - "fmt" - "net/http" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" - - "github.com/gorilla/mux" -) - -var ( - connTable *connection.Table -) - -//LaunchAdminListener - starts up http listeners and handles various URI paths -func LaunchAdminListener(serverBinding *string, connectionTable *connection.Table) (err error) { - loginfo.Println("starting launchAdminListener", *serverBinding) - - connTable = connectionTable - - router := mux.NewRouter().StrictSlash(true) - router.HandleFunc("/", index) - router.HandleFunc("/api/servers", apiServers) - - s := &http.Server{ - Addr: *serverBinding, - Handler: router, - } - - err = s.ListenAndServeTLS("certs/fullchain.pem", "certs/privkey.pem") - if err != nil { - loginfo.Println("ListenAndServe: ", err) - } - return -} - -func index(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, "Welcome!") -} - -func apiServers(w http.ResponseWriter, r *http.Request) { - fmt.Println("here") - serverContainer := NewServerAPIContainer() - - for c := range connTable.Connections() { - serverAPI := NewServerAPI(c) - serverContainer.Servers = append(serverContainer.Servers, serverAPI) - - } - - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - json.NewEncoder(w).Encode(serverContainer) - -} - -func handleRequest(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Domain not supported", http.StatusBadRequest) -} diff --git a/rvpn/connectiontrack/setup.go b/rvpn/connectiontrack/setup.go new file mode 100644 index 0000000..54f10fe --- /dev/null +++ b/rvpn/connectiontrack/setup.go @@ -0,0 +1,17 @@ +package connectiontrack + +import ( + "log" + "os" +) + +var ( + loginfo *log.Logger + logdebug *log.Logger + logFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile +) + +func init() { + loginfo = log.New(os.Stdout, "INFO: connectiontrack: ", logFlags) + logdebug = log.New(os.Stdout, "DEBUG: connectiontrack:", logFlags) +} diff --git a/rvpn/genericlistener/conn_tracking.go b/rvpn/genericlistener/conn_tracking.go new file mode 100644 index 0000000..6674214 --- /dev/null +++ b/rvpn/genericlistener/conn_tracking.go @@ -0,0 +1,68 @@ +package genericlistener + +import "net" +import "context" +import "fmt" + +//Tracking -- +type Tracking struct { + connections map[string]net.Conn + register chan net.Conn + unregister chan net.Conn +} + +//NewTracking -- Constructor +func NewTracking() (p *Tracking) { + p = new(Tracking) + p.connections = make(map[string]net.Conn) + p.register = make(chan net.Conn) + p.unregister = make(chan net.Conn) + return +} + +//Run - +func (p *Tracking) Run(ctx context.Context) { + loginfo.Println("Tracking Running") + + for { + select { + + case <-ctx.Done(): + loginfo.Println("Cancel signal hit") + return + + case connection := <-p.register: + key := connection.RemoteAddr().String() + loginfo.Println("register fired", key) + p.connections[key] = connection + p.list() + + case connection := <-p.unregister: + key := connection.RemoteAddr().String() + loginfo.Println("unregister fired", key) + p.connections[key] = connection + if _, ok := p.connections[key]; ok { + delete(p.connections, key) + } + p.list() + } + } +} + +func (p *Tracking) list() { + for c := range p.connections { + loginfo.Println(c) + } +} + +//Lookup -- +// - get connection from key +func (p *Tracking) Lookup(key string) (c net.Conn, err error) { + if _, ok := p.connections[key]; ok { + c = p.connections[key] + } else { + err = fmt.Errorf("Lookup failed for %s", key) + c = nil + } + return +} diff --git a/rvpn/genericlistener/conn_wedge.go b/rvpn/genericlistener/conn_wedge.go index c0e7452..4eafe5a 100644 --- a/rvpn/genericlistener/conn_wedge.go +++ b/rvpn/genericlistener/conn_wedge.go @@ -29,13 +29,11 @@ func NewWedgeConnSize(c net.Conn, size int) (p *WedgeConn) { //Peek - Get a number of bytes outof the buffer, but allow the buffer to be replayed once read func (w *WedgeConn) Peek(n int) ([]byte, error) { - loginfo.Println("Peek") return w.reader.Peek(n) } //Read -- A normal reader. func (w *WedgeConn) Read(p []byte) (int, error) { - loginfo.Println("Read") cnt, err := w.reader.Read(p) return cnt, err } @@ -49,7 +47,6 @@ func (w *WedgeConn) Buffered() int { // - get all the chars available // - pass then back func (w *WedgeConn) PeekAll() (buf []byte, err error) { - loginfo.Println("PeekAll") _, err = w.Peek(1) if err != nil { diff --git a/rvpn/connection/connection.go b/rvpn/genericlistener/connection.go similarity index 90% rename from rvpn/connection/connection.go rename to rvpn/genericlistener/connection.go index bb8662a..e8ff2b7 100755 --- a/rvpn/connection/connection.go +++ b/rvpn/genericlistener/connection.go @@ -1,7 +1,7 @@ -package connection +package genericlistener import ( - "encoding/hex" + "strconv" "time" "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" @@ -10,6 +10,8 @@ import ( "io" + "context" + "github.com/gorilla/websocket" ) @@ -55,12 +57,14 @@ type Connection struct { //initialDomains - a list of domains from the JWT initialDomains []interface{} + connectionTrack *Tracking + ///wssState tracks a highlevel status of the connection, false means do nothing. wssState bool } //NewConnection -- Constructor -func NewConnection(connectionTable *Table, conn *websocket.Conn, remoteAddress string, initialDomains []interface{}) (p *Connection) { +func NewConnection(connectionTable *Table, conn *websocket.Conn, remoteAddress string, initialDomains []interface{}, connectionTrack *Tracking) (p *Connection) { p = new(Connection) p.mutex = &sync.Mutex{} p.connectionTable = connectionTable @@ -71,6 +75,7 @@ func NewConnection(connectionTable *Table, conn *websocket.Conn, remoteAddress s p.send = make(chan *SendTrack) p.connectTime = time.Now() p.initialDomains = initialDomains + p.connectionTrack = connectionTrack p.DomainTrack = make(map[string]*DomainTrack) for _, domain := range initialDomains { @@ -181,7 +186,9 @@ func (c *Connection) Write(w io.WriteCloser, message []byte) (cnt int, err error } //Reader -- export the reader function -func (c *Connection) Reader() { +func (c *Connection) Reader(ctx context.Context) { + connectionTrack := c.connectionTrack + defer func() { c.connectionTable.unregister <- c c.conn.Close() @@ -195,28 +202,28 @@ func (c *Connection) Reader() { msgType, message, err := c.conn.ReadMessage() loginfo.Println("ReadMessage", msgType, err) - loginfo.Println(hex.Dump(message)) - loginfo.Println(message) + c.Update() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { c.State(false) loginfo.Printf("error: %v", err) - //loginfo.Println(c.conn) } break } // unpack the message. - _, _ = packer.ReadMessage(message) + p, err := packer.ReadMessage(message) + key := p.Header.Address().String() + ":" + strconv.Itoa(p.Header.Port) + test, err := connectionTrack.Lookup(key) - // p.Header.SetAddress(rAddr) - // p.Header.Port, err = strconv.Atoi(rPort) - // p.Header.Port = 8080 - // p.Header.Service = "http" - // p.Data.AppendBytes(buffer[0:cnt]) - // buf := p.PackV1() + if err != nil { + loginfo.Println("Unable to locate Tracking for ", key) + continue + } + + test.Write(p.Data.Data()) c.addIn(int64(len(message))) loginfo.Println("end of read") diff --git a/rvpn/connection/connection_registration.go b/rvpn/genericlistener/connection_registration.go similarity index 83% rename from rvpn/connection/connection_registration.go rename to rvpn/genericlistener/connection_registration.go index 0ec8660..974874c 100644 --- a/rvpn/connection/connection_registration.go +++ b/rvpn/genericlistener/connection_registration.go @@ -1,4 +1,4 @@ -package connection +package genericlistener import "github.com/gorilla/websocket" @@ -17,15 +17,18 @@ type Registration struct { //initialDomains - a list of domains from the JWT initialDomains []interface{} + + connectionTrack *Tracking } //NewRegistration -- Constructor -func NewRegistration(conn *websocket.Conn, remoteAddress string, initialDomains []interface{}) (p *Registration) { +func NewRegistration(conn *websocket.Conn, remoteAddress string, initialDomains []interface{}, connectionTrack *Tracking) (p *Registration) { p = new(Registration) p.conn = conn p.source = remoteAddress p.commCh = make(chan bool) p.initialDomains = initialDomains + p.connectionTrack = connectionTrack return } diff --git a/rvpn/connection/connection_table.go b/rvpn/genericlistener/connection_table.go similarity index 92% rename from rvpn/connection/connection_table.go rename to rvpn/genericlistener/connection_table.go index 4b752a5..59f5c54 100755 --- a/rvpn/connection/connection_table.go +++ b/rvpn/genericlistener/connection_table.go @@ -1,4 +1,4 @@ -package connection +package genericlistener import "fmt" import "time" @@ -78,7 +78,7 @@ func (c *Table) Run(ctx context.Context) { case registration := <-c.register: loginfo.Println("register fired") - connection := NewConnection(c, registration.conn, registration.source, registration.initialDomains) + connection := NewConnection(c, registration.conn, registration.source, registration.initialDomains, registration.connectionTrack) c.connections[connection] = make([]string, initialDomains) registration.commCh <- true @@ -87,7 +87,7 @@ func (c *Table) Run(ctx context.Context) { // add to the domains regirstation newDomain := string(domain.(string)) - loginfo.Println("adding domain ", newDomain, " to connection ", connection) + loginfo.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String()) c.domains[newDomain] = connection // add to the connection domain list @@ -95,11 +95,10 @@ func (c *Table) Run(ctx context.Context) { c.connections[connection] = append(s, newDomain) } go connection.Writer() - go connection.Reader() - loginfo.Println("register exiting") + go connection.Reader(ctx) case connection := <-c.unregister: - loginfo.Println("closing connection ", connection) + loginfo.Println("closing connection ", connection.conn.RemoteAddr().String()) if _, ok := c.connections[connection]; ok { for _, domain := range c.connections[connection] { fmt.Println("removing domain ", domain) @@ -122,8 +121,6 @@ func (c *Table) Run(ctx context.Context) { //} } - fmt.Println("domain ", c.domains) - fmt.Println("connections ", c.connections) } } diff --git a/rvpn/admin/domain_api.go b/rvpn/genericlistener/domain_api.go similarity index 96% rename from rvpn/admin/domain_api.go rename to rvpn/genericlistener/domain_api.go index 3e3ee0d..bcbf522 100644 --- a/rvpn/admin/domain_api.go +++ b/rvpn/genericlistener/domain_api.go @@ -1,4 +1,4 @@ -package admin +package genericlistener //DomainAPI -- Structure to hold the domain tracking for JSON type DomainAPI struct { diff --git a/rvpn/connection/domain_mapping.go b/rvpn/genericlistener/domain_mapping.go similarity index 95% rename from rvpn/connection/domain_mapping.go rename to rvpn/genericlistener/domain_mapping.go index b2b349a..7c3b49b 100644 --- a/rvpn/connection/domain_mapping.go +++ b/rvpn/genericlistener/domain_mapping.go @@ -1,4 +1,4 @@ -package connection +package genericlistener //DomainMapping -- type DomainMapping struct { diff --git a/rvpn/connection/domain_track.go b/rvpn/genericlistener/domain_track.go similarity index 96% rename from rvpn/connection/domain_track.go rename to rvpn/genericlistener/domain_track.go index e2282a7..2378434 100644 --- a/rvpn/connection/domain_track.go +++ b/rvpn/genericlistener/domain_track.go @@ -1,4 +1,4 @@ -package connection +package genericlistener //DomainTrack -- Tracking specifics for domains type DomainTrack struct { diff --git a/rvpn/genericlistener/handle_wss_client.go.dead b/rvpn/genericlistener/handle_wss_client.go.dead index 298eb54..3ff14a2 100644 --- a/rvpn/genericlistener/handle_wss_client.go.dead +++ b/rvpn/genericlistener/handle_wss_client.go.dead @@ -9,9 +9,6 @@ import ( jwt "github.com/dgrijalva/jwt-go" "github.com/gorilla/mux" "github.com/gorilla/websocket" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/admin" - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" ) //LaunchWssListener - obtains a onetime connection from wedge listener diff --git a/rvpn/genericlistener/listener_admin.go.dead b/rvpn/genericlistener/listener_admin.go.dead new file mode 100644 index 0000000..87dd787 --- /dev/null +++ b/rvpn/genericlistener/listener_admin.go.dead @@ -0,0 +1 @@ +package genericlistener diff --git a/rvpn/genericlistener/listener_generic.go b/rvpn/genericlistener/listener_generic.go index 1c68b28..6fbf343 100644 --- a/rvpn/genericlistener/listener_generic.go +++ b/rvpn/genericlistener/listener_generic.go @@ -21,19 +21,19 @@ import ( "bufio" - "git.daplie.com/Daplie/go-rvpn-server/rvpn/admin" - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" ) type contextKey string +//CtxConnectionTrack const ( ctxSecretKey contextKey = "secretKey" ctxConnectionTable contextKey = "connectionTable" ctxConfig contextKey = "config" ctxDeadTime contextKey = "deadtime" ctxListenerRegistration contextKey = "listenerRegistration" + ctxConnectionTrack contextKey = "connectionTrack" ) const ( @@ -118,9 +118,6 @@ func handleConnection(ctx context.Context, wConn *WedgeConn) { loginfo.Println("error while peeking") return } - loginfo.Println(hex.Dump(peek[0:peekCnt])) - loginfo.Println(hex.Dump(peek[2:4])) - loginfo.Println("after peek") //take a look for a TLS header. if bytes.Contains(peek[0:0], []byte{0x80}) && bytes.Contains(peek[2:4], []byte{0x01, 0x03}) { @@ -181,8 +178,6 @@ func handleStream(ctx context.Context, wConn *WedgeConn) { loginfo.Println("conn", wConn, wConn.LocalAddr().String(), wConn.RemoteAddr().String()) peek, err := wConn.PeekAll() - loginfo.Println(hex.Dump(peek[0:])) - if err != nil { loginfo.Println("error while peeking") loginfo.Println(hex.Dump(peek[0:])) @@ -228,34 +223,32 @@ func handleStream(ctx context.Context, wConn *WedgeConn) { } } } - - loginfo.Println(hex.Dump(peek[0:])) } //handleExternalHTTPRequest - // - get a wConn and start processing requests func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) { - defer conn.Close() + connectionTracking := ctx.Value(ctxConnectionTrack).(*Tracking) + connectionTracking.register <- conn - connectionTable := ctx.Value(ctxConnectionTable).(*connection.Table) + defer func() { + connectionTracking.unregister <- conn + conn.Close() + }() + + connectionTable := ctx.Value(ctxConnectionTable).(*Table) var buffer [512]byte - for { cnt, err := conn.Read(buffer[0:]) if err != nil { return } - loginfo.Println("conn ", conn) - loginfo.Println("byte read", cnt) - readBuffer := bytes.NewBuffer(buffer[0:cnt]) reader := bufio.NewReader(readBuffer) r, err := http.ReadRequest(reader) - loginfo.Println(r) - if err != nil { loginfo.Println("error parsing request") return @@ -284,8 +277,7 @@ func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) { return } - loginfo.Println("Domain Accepted") - loginfo.Println(conn, rAddr, rPort) + loginfo.Println("Domain Accepted", conn, rAddr, rPort) p := packer.NewPacker() p.Header.SetAddress(rAddr) p.Header.Port, err = strconv.Atoi(rPort) @@ -298,7 +290,7 @@ func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) { p.Data.AppendBytes(buffer[0:cnt]) buf := p.PackV1() - sendTrack := connection.NewSendTrack(buf.Bytes(), hostname) + sendTrack := NewSendTrack(buf.Bytes(), hostname) conn.SendCh() <- sendTrack } } @@ -307,7 +299,7 @@ func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) { // - expecting an existing oneConnListener with a qualified wss client connected. // - auth will happen again since we were just peeking at the token. func handleAdminClient(ctx context.Context, oneConn *oneConnListener) { - connectionTable := ctx.Value(ctxConnectionTable).(*connection.Table) + connectionTable := ctx.Value(ctxConnectionTable).(*Table) router := mux.NewRouter().StrictSlash(true) @@ -332,10 +324,10 @@ func handleAdminClient(ctx context.Context, oneConn *oneConnListener) { router.HandleFunc("/api/servers", func(w http.ResponseWriter, r *http.Request) { fmt.Println("here") - serverContainer := admin.NewServerAPIContainer() + serverContainer := NewServerAPIContainer() for c := range connectionTable.Connections() { - serverAPI := admin.NewServerAPI(c) + serverAPI := NewServerAPI(c) serverContainer.Servers = append(serverContainer.Servers, serverAPI) } @@ -367,7 +359,7 @@ func handleAdminClient(ctx context.Context, oneConn *oneConnListener) { // - auth will happen again since we were just peeking at the token. func handleWssClient(ctx context.Context, oneConn *oneConnListener) { secretKey := ctx.Value(ctxSecretKey).(string) - connectionTable := ctx.Value(ctxConnectionTable).(*connection.Table) + connectionTable := ctx.Value(ctxConnectionTable).(*Table) router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -388,13 +380,9 @@ func handleWssClient(ctx context.Context, oneConn *oneConnListener) { return } - loginfo.Println("help access_token valid") - claims := result.Claims.(jwt.MapClaims) domains, ok := claims["domains"].([]interface{}) - loginfo.Println("domains", domains) - var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -410,7 +398,8 @@ func handleWssClient(ctx context.Context, oneConn *oneConnListener) { //newConnection := connection.NewConnection(connectionTable, conn, r.RemoteAddr, domains) - newRegistration := connection.NewRegistration(conn, r.RemoteAddr, domains) + connectionTrack := ctx.Value(ctxConnectionTrack).(*Tracking) + newRegistration := NewRegistration(conn, r.RemoteAddr, domains, connectionTrack) connectionTable.Register() <- newRegistration ok = <-newRegistration.CommCh() if !ok { diff --git a/rvpn/genericlistener/manager.go b/rvpn/genericlistener/manager.go index 57f8df5..9abda26 100644 --- a/rvpn/genericlistener/manager.go +++ b/rvpn/genericlistener/manager.go @@ -4,8 +4,6 @@ import ( "context" "crypto/tls" "net" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" ) //ListenerRegistrationStatus - post registration status @@ -47,22 +45,24 @@ func NewListenerRegistration(port int) (p *ListenerRegistration) { //GenericListeners - type GenericListeners struct { - listeners map[*net.Listener]int - ctx context.Context - connnectionTable *connection.Table - secretKey string - certbundle tls.Certificate - deadTime int - register chan *ListenerRegistration - genericListeners *GenericListeners + listeners map[*net.Listener]int + ctx context.Context + connnectionTable *Table + connectionTracking *Tracking + secretKey string + certbundle tls.Certificate + deadTime int + register chan *ListenerRegistration + genericListeners *GenericListeners } //NewGenerListeners -- -func NewGenerListeners(ctx context.Context, connectionTable *connection.Table, secretKey string, certbundle tls.Certificate, deadTime int) (p *GenericListeners) { +func NewGenerListeners(ctx context.Context, connectionTable *Table, connectionTrack *Tracking, secretKey string, certbundle tls.Certificate, deadTime int) (p *GenericListeners) { p = new(GenericListeners) p.listeners = make(map[*net.Listener]int) p.ctx = ctx p.connnectionTable = connectionTable + p.connectionTracking = connectionTrack p.secretKey = secretKey p.certbundle = certbundle p.deadTime = deadTime @@ -80,6 +80,10 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) { ctx = context.WithValue(ctx, ctxSecretKey, gl.secretKey) ctx = context.WithValue(ctx, ctxConnectionTable, gl.connnectionTable) + + loginfo.Println(gl.connectionTracking) + + ctx = context.WithValue(ctx, ctxConnectionTrack, gl.connectionTracking) ctx = context.WithValue(ctx, ctxConfig, config) ctx = context.WithValue(ctx, ctxDeadTime, gl.deadTime) ctx = context.WithValue(ctx, ctxListenerRegistration, gl.register) @@ -104,6 +108,7 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) { } } loginfo.Println("listener starting up ", registration.port) + loginfo.Println(ctx.Value(ctxConnectionTrack).(*Tracking)) go GenericListenAndServe(ctx, registration) status := <-registration.commCh diff --git a/rvpn/connection/send_track.go b/rvpn/genericlistener/send_track.go similarity index 92% rename from rvpn/connection/send_track.go rename to rvpn/genericlistener/send_track.go index f7430b2..817a179 100644 --- a/rvpn/connection/send_track.go +++ b/rvpn/genericlistener/send_track.go @@ -1,4 +1,4 @@ -package connection +package genericlistener //SendTrack -- Used as a channel communication to id domain asssociated to domain for outbound WSS type SendTrack struct { diff --git a/rvpn/admin/server_api.go b/rvpn/genericlistener/server_api.go similarity index 88% rename from rvpn/admin/server_api.go rename to rvpn/genericlistener/server_api.go index c1b9562..2f84e27 100644 --- a/rvpn/admin/server_api.go +++ b/rvpn/genericlistener/server_api.go @@ -1,10 +1,8 @@ -package admin +package genericlistener import ( "fmt" "time" - - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" ) //ServerAPI -- Structure to support the server API @@ -17,7 +15,7 @@ type ServerAPI struct { } //NewServerAPI - Constructor -func NewServerAPI(c *connection.Connection) (s *ServerAPI) { +func NewServerAPI(c *Connection) (s *ServerAPI) { s = new(ServerAPI) s.ServerName = fmt.Sprintf("%p", c) s.Domains = make([]*DomainAPI, 0) diff --git a/rvpn/packer/packer.go b/rvpn/packer/packer.go index 96de887..2c6f6db 100644 --- a/rvpn/packer/packer.go +++ b/rvpn/packer/packer.go @@ -95,12 +95,11 @@ func ReadMessage(b []byte) (p *Packer, err error) { //handle Service pos = pos + end + 1 end = pos + int(p.Header.HeaderLen) - p.Header.Service = string(b[pos:end]) + p.Header.Service = string(b[pos : p.Header.HeaderLen+2]) //handle payload - pos = pos + end + 1 - - loginfo.Println(p.Header.Port) + pos = int(p.Header.HeaderLen + 2) + p.Data.AppendBytes(b[pos:]) } else { err = fmt.Errorf("Version %d not supported", b[0:0]) diff --git a/rvpn/packer/packer_data.go b/rvpn/packer/packer_data.go index 5164b86..fce36c8 100644 --- a/rvpn/packer/packer_data.go +++ b/rvpn/packer/packer_data.go @@ -14,12 +14,18 @@ func newPackerData() (p *packerData) { return } -func (p packerData) AppendString(dataString string) (n int, err error) { +func (p *packerData) AppendString(dataString string) (n int, err error) { n, err = p.buffer.WriteString(dataString) return } -func (p packerData) AppendBytes(dataBytes []byte) (n int, err error) { +func (p *packerData) AppendBytes(dataBytes []byte) (n int, err error) { n, err = p.buffer.Write(dataBytes) return } + +//Data -- +func (p *packerData) Data() (b []byte) { + b = p.buffer.Bytes() + return +} diff --git a/rvpn/rvpnmain/run.go b/rvpn/rvpnmain/run.go index ccaf437..a949dc9 100644 --- a/rvpn/rvpnmain/run.go +++ b/rvpn/rvpnmain/run.go @@ -10,7 +10,6 @@ import ( "context" - "git.daplie.com/Daplie/go-rvpn-server/rvpn/connection" "git.daplie.com/Daplie/go-rvpn-server/rvpn/genericlistener" "git.daplie.com/Daplie/go-rvpn-server/rvpn/xlate" ) @@ -25,7 +24,7 @@ var ( argServerAdminBinding string argServerExternalBinding string argDeadTime int - connectionTable *connection.Table + connectionTable *genericlistener.Table wssMapping *xlate.WssMapping secretKey = "abc123" ) @@ -58,9 +57,6 @@ func Run() { ctx, cancelContext := context.WithCancel(context.Background()) defer cancelContext() - connectionTable = connection.NewTable() - go connectionTable.Run(ctx) - // Setup for GenericListenServe. // - establish context for the generic listener // - startup listener @@ -70,7 +66,13 @@ func Run() { // - if tls, establish, protocol peek buffer, else decrypted // - match protocol - genericListeners := genericlistener.NewGenerListeners(ctx, connectionTable, secretKey, certbundle, argDeadTime) + connectionTracking := genericlistener.NewTracking() + go connectionTracking.Run(ctx) + + connectionTable = genericlistener.NewTable() + go connectionTable.Run(ctx) + + genericListeners := genericlistener.NewGenerListeners(ctx, connectionTable, connectionTracking, secretKey, certbundle, argDeadTime) go genericListeners.Run(ctx, 8443) //go genericlistener.GenericListenAndServe(ctx, connectionTable, secretKey, argGenericBinding, certbundle, argDeadTime)