diff --git a/rvpn/connection/connection.go b/rvpn/connection/connection.go index f41fbdb..bb8662a 100755 --- a/rvpn/connection/connection.go +++ b/rvpn/connection/connection.go @@ -4,6 +4,8 @@ import ( "encoding/hex" "time" + "git.daplie.com/Daplie/go-rvpn-server/rvpn/packer" + "sync" "io" @@ -12,8 +14,8 @@ import ( ) var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, + ReadBufferSize: 4096, + WriteBufferSize: 4096, } // Connection track websocket and faciliates in and out data @@ -188,24 +190,36 @@ func (c *Connection) Reader() { loginfo.Println("Reader Start ", c) - c.conn.SetReadLimit(1024) + c.conn.SetReadLimit(65535) for { - _, message, err := c.conn.ReadMessage() + msgType, message, err := c.conn.ReadMessage() - loginfo.Println("ReadMessage") + loginfo.Println("ReadMessage", msgType, err) + loginfo.Println(hex.Dump(message)) + loginfo.Println(message) c.Update() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { c.State(false) loginfo.Printf("error: %v", err) - loginfo.Println(c.conn) + //loginfo.Println(c.conn) } break } - loginfo.Println(hex.Dump(message)) + + // unpack the message. + _, _ = packer.ReadMessage(message) + + // p.Header.SetAddress(rAddr) + // p.Header.Port, err = strconv.Atoi(rPort) + // p.Header.Port = 8080 + // p.Header.Service = "http" + // p.Data.AppendBytes(buffer[0:cnt]) + // buf := p.PackV1() + c.addIn(int64(len(message))) - loginfo.Println(c) + loginfo.Println("end of read") } } diff --git a/rvpn/genericlistener/handle_wss_client.go b/rvpn/genericlistener/handle_wss_client.go.dead similarity index 100% rename from rvpn/genericlistener/handle_wss_client.go rename to rvpn/genericlistener/handle_wss_client.go.dead diff --git a/rvpn/genericlistener/listener_generic.go b/rvpn/genericlistener/listener_generic.go index 97f2d30..1c68b28 100644 --- a/rvpn/genericlistener/listener_generic.go +++ b/rvpn/genericlistener/listener_generic.go @@ -174,7 +174,7 @@ func handleConnection(ctx context.Context, wConn *WedgeConn) { // - handle http // - attempt to identify as WSS session // - attempt to identify as ADMIN/API session -// - else handle as raw https +// - else handle as raw http // - handle other? func handleStream(ctx context.Context, wConn *WedgeConn) { loginfo.Println("handle Stream") @@ -197,7 +197,7 @@ func handleStream(ctx context.Context, wConn *WedgeConn) { r, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(peek))) if err != nil { - loginfo.Println("identifed as HTTP, failed request", err) + loginfo.Println("identifed as HTTP, failed request parsing", err) return } @@ -275,28 +275,32 @@ func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) { rAddr := remoteSplit[0] rPort := remoteSplit[1] - if conn, ok := connectionTable.ConnByDomain(hostname); !ok { + //find the connection by domain name + conn, ok := connectionTable.ConnByDomain(hostname) + if !ok { //matching connection can not be found based on ConnByDomain loginfo.Println("unable to match ", hostname, " to an existing connection") //http.Error(, "Domain not supported", http.StatusBadRequest) - - } else { - - loginfo.Println("Domain Accepted") - loginfo.Println(conn, rAddr, rPort) - p := packer.NewPacker() - p.Header.SetAddress(rAddr) - p.Header.Port, err = strconv.Atoi(rPort) - p.Header.Port = 8080 - p.Header.Service = "http" - p.Data.AppendBytes(buffer[0:cnt]) - buf := p.PackV1() - - sendTrack := connection.NewSendTrack(buf.Bytes(), hostname) - conn.SendCh() <- sendTrack + return } - } + loginfo.Println("Domain Accepted") + loginfo.Println(conn, rAddr, rPort) + p := packer.NewPacker() + p.Header.SetAddress(rAddr) + p.Header.Port, err = strconv.Atoi(rPort) + if err != nil { + loginfo.Println("Unable to set Remote port", err) + return + } + + p.Header.Service = "http" + p.Data.AppendBytes(buffer[0:cnt]) + buf := p.PackV1() + + sendTrack := connection.NewSendTrack(buf.Bytes(), hostname) + conn.SendCh() <- sendTrack + } } //handleAdminClient - diff --git a/rvpn/genericlistener/manager.go b/rvpn/genericlistener/manager.go index f71d6d7..57f8df5 100644 --- a/rvpn/genericlistener/manager.go +++ b/rvpn/genericlistener/manager.go @@ -98,7 +98,7 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) { // check to see if port is already running for listener := range gl.listeners { if gl.listeners[listener] == registration.port { - loginfo.Println("listener already running") + loginfo.Println("listener already running", registration.port) registration.status = listenerExists registration.commCh <- registration } @@ -112,7 +112,6 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) { } else if status.status == listenerFault { loginfo.Println("Unable to create a new listerer", registration.port) } - } } diff --git a/rvpn/genericlistener/setup.go b/rvpn/genericlistener/setup.go index 5353fd7..79d6113 100644 --- a/rvpn/genericlistener/setup.go +++ b/rvpn/genericlistener/setup.go @@ -12,6 +12,6 @@ var ( ) func init() { - loginfo = log.New(os.Stdout, "INFO: external: ", logFlags) - logdebug = log.New(os.Stdout, "DEBUG: external:", logFlags) + loginfo = log.New(os.Stdout, "INFO: genericlistener: ", logFlags) + logdebug = log.New(os.Stdout, "DEBUG: genericlistener:", logFlags) } diff --git a/rvpn/packer/packer.go b/rvpn/packer/packer.go index 36cf083..96de887 100644 --- a/rvpn/packer/packer.go +++ b/rvpn/packer/packer.go @@ -3,6 +3,13 @@ package packer import ( "bytes" "fmt" + "net" + "strconv" +) + +const ( + packerV1 byte = 255 - 1 + packerV2 byte = 255 - 2 ) //Packer -- contains both header and data @@ -19,9 +26,93 @@ func NewPacker() (p *Packer) { return } +//ReadMessage - +func ReadMessage(b []byte) (p *Packer, err error) { + fmt.Println("ReadMessage") + var pos int + + err = nil + // detect protocol in use + if b[0] == packerV1 { + p = NewPacker() + + // Handle Header Length + pos = pos + 1 + p.Header.HeaderLen = b[pos] + + //handle address family + pos = pos + 1 + end := bytes.IndexAny(b[pos:], ",") + if end == -1 { + err = fmt.Errorf("missing , while parsing address family") + return nil, err + } + + bAddrFamily := b[pos : pos+end] + if bytes.ContainsAny(bAddrFamily, addressFamilyText[FamilyIPv4]) { + p.Header.family = FamilyIPv4 + } else if bytes.ContainsAny(bAddrFamily, addressFamilyText[FamilyIPv6]) { + p.Header.family = FamilyIPv6 + } else { + err = fmt.Errorf("Address family not supported %d", bAddrFamily) + } + + //handle address + pos = pos + end + 1 + end = bytes.IndexAny(b[pos:], ",") + if end == -1 { + err = fmt.Errorf("missing , while parsing address") + return nil, err + } + p.Header.address = net.ParseIP(string(b[pos : pos+end])) + + //handle import + pos = pos + end + 1 + end = bytes.IndexAny(b[pos:], ",") + if end == -1 { + err = fmt.Errorf("missing , while parsing address") + return nil, err + } + + p.Header.Port, err = strconv.Atoi(string(b[pos : pos+end])) + if err != nil { + err = fmt.Errorf("error converting port %s", err) + } + + //handle data length + pos = pos + end + 1 + end = bytes.IndexAny(b[pos:], ",") + if end == -1 { + err = fmt.Errorf("missing , while parsing address") + return nil, err + } + + p.Data.DataLen, err = strconv.Atoi(string(b[pos : pos+end])) + if err != nil { + err = fmt.Errorf("error converting data length %s", err) + } + + //handle Service + pos = pos + end + 1 + end = pos + int(p.Header.HeaderLen) + p.Header.Service = string(b[pos:end]) + + //handle payload + pos = pos + end + 1 + + loginfo.Println(p.Header.Port) + + } else { + err = fmt.Errorf("Version %d not supported", b[0:0]) + } + + return + +} + //PackV1 -- Outputs version 1 of packer func (p *Packer) PackV1() (b bytes.Buffer) { - version := byte(1) + version := packerV1 var headerBuf bytes.Buffer headerBuf.WriteString(p.Header.FamilyText()) @@ -35,7 +126,7 @@ func (p *Packer) PackV1() (b bytes.Buffer) { headerBuf.WriteString(p.Header.Service) var metaBuf bytes.Buffer - metaBuf.WriteByte(byte(255) - version) + metaBuf.WriteByte(version) metaBuf.WriteByte(byte(headerBuf.Len())) var buf bytes.Buffer diff --git a/rvpn/packer/packer_data.go b/rvpn/packer/packer_data.go index 092dd8f..5164b86 100644 --- a/rvpn/packer/packer_data.go +++ b/rvpn/packer/packer_data.go @@ -4,7 +4,8 @@ import "bytes" //packerData -- Contains packer data type packerData struct { - buffer *bytes.Buffer + buffer *bytes.Buffer + DataLen int } func newPackerData() (p *packerData) { diff --git a/rvpn/packer/packer_header.go b/rvpn/packer/packer_header.go index c771f50..7319622 100644 --- a/rvpn/packer/packer_header.go +++ b/rvpn/packer/packer_header.go @@ -7,10 +7,11 @@ type addressFamily int // packerHeader structure to hold our header information. type packerHeader struct { - family addressFamily - address net.IP - Port int - Service string + family addressFamily + address net.IP + Port int + Service string + HeaderLen byte } //Family -- ENUM for Address Family @@ -29,6 +30,7 @@ func newPackerHeader() (p *packerHeader) { p.SetAddress("127.0.0.1") p.Port = 65535 p.Service = "na" + p.HeaderLen = 0 return } diff --git a/rvpn/packer/setup.go b/rvpn/packer/setup.go index 33dd55e..bee01b8 100644 --- a/rvpn/packer/setup.go +++ b/rvpn/packer/setup.go @@ -3,11 +3,13 @@ package packer import "log" import "os" -func init() { - logFlags := log.Ldate | log.Lmicroseconds | log.Lshortfile - loginfo := log.New(os.Stdout, "INFO: packer: ", logFlags) - logdebug := log.New(os.Stdout, "DEBUG: packer:", logFlags) +var ( + loginfo *log.Logger + logdebug *log.Logger + logFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile +) - loginfo.Println("") - logdebug.Println("") +func init() { + loginfo = log.New(os.Stdout, "INFO: packer: ", logFlags) + logdebug = log.New(os.Stdout, "DEBUG: packer:", logFlags) }