Debugging, return traffic is coming back, unpacker build

- it does not look like the client is limiting the amount of traffic coming in, and it does not look like it is chunking.
- need to know the max chunk.
- increased to 64K
- unpacker code v1
- fixed packer logging.
This commit is contained in:
Henry Camacho 2017-03-01 21:02:20 -06:00
parent f16d666b5d
commit 98da3d491c
9 changed files with 157 additions and 44 deletions

View File

@ -4,6 +4,8 @@ import (
"encoding/hex" "encoding/hex"
"time" "time"
"git.daplie.com/Daplie/go-rvpn-server/rvpn/packer"
"sync" "sync"
"io" "io"
@ -12,8 +14,8 @@ import (
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 4096,
WriteBufferSize: 1024, WriteBufferSize: 4096,
} }
// Connection track websocket and faciliates in and out data // Connection track websocket and faciliates in and out data
@ -188,24 +190,36 @@ func (c *Connection) Reader() {
loginfo.Println("Reader Start ", c) loginfo.Println("Reader Start ", c)
c.conn.SetReadLimit(1024) c.conn.SetReadLimit(65535)
for { for {
_, message, err := c.conn.ReadMessage() msgType, message, err := c.conn.ReadMessage()
loginfo.Println("ReadMessage") loginfo.Println("ReadMessage", msgType, err)
loginfo.Println(hex.Dump(message))
loginfo.Println(message)
c.Update() c.Update()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
c.State(false) c.State(false)
loginfo.Printf("error: %v", err) loginfo.Printf("error: %v", err)
loginfo.Println(c.conn) //loginfo.Println(c.conn)
} }
break break
} }
loginfo.Println(hex.Dump(message))
// unpack the message.
_, _ = packer.ReadMessage(message)
// p.Header.SetAddress(rAddr)
// p.Header.Port, err = strconv.Atoi(rPort)
// p.Header.Port = 8080
// p.Header.Service = "http"
// p.Data.AppendBytes(buffer[0:cnt])
// buf := p.PackV1()
c.addIn(int64(len(message))) c.addIn(int64(len(message)))
loginfo.Println(c) loginfo.Println("end of read")
} }
} }

View File

@ -174,7 +174,7 @@ func handleConnection(ctx context.Context, wConn *WedgeConn) {
// - handle http // - handle http
// - attempt to identify as WSS session // - attempt to identify as WSS session
// - attempt to identify as ADMIN/API session // - attempt to identify as ADMIN/API session
// - else handle as raw https // - else handle as raw http
// - handle other? // - handle other?
func handleStream(ctx context.Context, wConn *WedgeConn) { func handleStream(ctx context.Context, wConn *WedgeConn) {
loginfo.Println("handle Stream") loginfo.Println("handle Stream")
@ -197,7 +197,7 @@ func handleStream(ctx context.Context, wConn *WedgeConn) {
r, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(peek))) r, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(peek)))
if err != nil { if err != nil {
loginfo.Println("identifed as HTTP, failed request", err) loginfo.Println("identifed as HTTP, failed request parsing", err)
return return
} }
@ -275,28 +275,32 @@ func handleExternalHTTPRequest(ctx context.Context, conn net.Conn) {
rAddr := remoteSplit[0] rAddr := remoteSplit[0]
rPort := remoteSplit[1] rPort := remoteSplit[1]
if conn, ok := connectionTable.ConnByDomain(hostname); !ok { //find the connection by domain name
conn, ok := connectionTable.ConnByDomain(hostname)
if !ok {
//matching connection can not be found based on ConnByDomain //matching connection can not be found based on ConnByDomain
loginfo.Println("unable to match ", hostname, " to an existing connection") loginfo.Println("unable to match ", hostname, " to an existing connection")
//http.Error(, "Domain not supported", http.StatusBadRequest) //http.Error(, "Domain not supported", http.StatusBadRequest)
return
} else {
loginfo.Println("Domain Accepted")
loginfo.Println(conn, rAddr, rPort)
p := packer.NewPacker()
p.Header.SetAddress(rAddr)
p.Header.Port, err = strconv.Atoi(rPort)
p.Header.Port = 8080
p.Header.Service = "http"
p.Data.AppendBytes(buffer[0:cnt])
buf := p.PackV1()
sendTrack := connection.NewSendTrack(buf.Bytes(), hostname)
conn.SendCh() <- sendTrack
} }
}
loginfo.Println("Domain Accepted")
loginfo.Println(conn, rAddr, rPort)
p := packer.NewPacker()
p.Header.SetAddress(rAddr)
p.Header.Port, err = strconv.Atoi(rPort)
if err != nil {
loginfo.Println("Unable to set Remote port", err)
return
}
p.Header.Service = "http"
p.Data.AppendBytes(buffer[0:cnt])
buf := p.PackV1()
sendTrack := connection.NewSendTrack(buf.Bytes(), hostname)
conn.SendCh() <- sendTrack
}
} }
//handleAdminClient - //handleAdminClient -

View File

@ -98,7 +98,7 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) {
// check to see if port is already running // check to see if port is already running
for listener := range gl.listeners { for listener := range gl.listeners {
if gl.listeners[listener] == registration.port { if gl.listeners[listener] == registration.port {
loginfo.Println("listener already running") loginfo.Println("listener already running", registration.port)
registration.status = listenerExists registration.status = listenerExists
registration.commCh <- registration registration.commCh <- registration
} }
@ -112,7 +112,6 @@ func (gl *GenericListeners) Run(ctx context.Context, initialPort int) {
} else if status.status == listenerFault { } else if status.status == listenerFault {
loginfo.Println("Unable to create a new listerer", registration.port) loginfo.Println("Unable to create a new listerer", registration.port)
} }
} }
} }

View File

@ -12,6 +12,6 @@ var (
) )
func init() { func init() {
loginfo = log.New(os.Stdout, "INFO: external: ", logFlags) loginfo = log.New(os.Stdout, "INFO: genericlistener: ", logFlags)
logdebug = log.New(os.Stdout, "DEBUG: external:", logFlags) logdebug = log.New(os.Stdout, "DEBUG: genericlistener:", logFlags)
} }

View File

@ -3,6 +3,13 @@ package packer
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"net"
"strconv"
)
const (
packerV1 byte = 255 - 1
packerV2 byte = 255 - 2
) )
//Packer -- contains both header and data //Packer -- contains both header and data
@ -19,9 +26,93 @@ func NewPacker() (p *Packer) {
return return
} }
//ReadMessage -
func ReadMessage(b []byte) (p *Packer, err error) {
fmt.Println("ReadMessage")
var pos int
err = nil
// detect protocol in use
if b[0] == packerV1 {
p = NewPacker()
// Handle Header Length
pos = pos + 1
p.Header.HeaderLen = b[pos]
//handle address family
pos = pos + 1
end := bytes.IndexAny(b[pos:], ",")
if end == -1 {
err = fmt.Errorf("missing , while parsing address family")
return nil, err
}
bAddrFamily := b[pos : pos+end]
if bytes.ContainsAny(bAddrFamily, addressFamilyText[FamilyIPv4]) {
p.Header.family = FamilyIPv4
} else if bytes.ContainsAny(bAddrFamily, addressFamilyText[FamilyIPv6]) {
p.Header.family = FamilyIPv6
} else {
err = fmt.Errorf("Address family not supported %d", bAddrFamily)
}
//handle address
pos = pos + end + 1
end = bytes.IndexAny(b[pos:], ",")
if end == -1 {
err = fmt.Errorf("missing , while parsing address")
return nil, err
}
p.Header.address = net.ParseIP(string(b[pos : pos+end]))
//handle import
pos = pos + end + 1
end = bytes.IndexAny(b[pos:], ",")
if end == -1 {
err = fmt.Errorf("missing , while parsing address")
return nil, err
}
p.Header.Port, err = strconv.Atoi(string(b[pos : pos+end]))
if err != nil {
err = fmt.Errorf("error converting port %s", err)
}
//handle data length
pos = pos + end + 1
end = bytes.IndexAny(b[pos:], ",")
if end == -1 {
err = fmt.Errorf("missing , while parsing address")
return nil, err
}
p.Data.DataLen, err = strconv.Atoi(string(b[pos : pos+end]))
if err != nil {
err = fmt.Errorf("error converting data length %s", err)
}
//handle Service
pos = pos + end + 1
end = pos + int(p.Header.HeaderLen)
p.Header.Service = string(b[pos:end])
//handle payload
pos = pos + end + 1
loginfo.Println(p.Header.Port)
} else {
err = fmt.Errorf("Version %d not supported", b[0:0])
}
return
}
//PackV1 -- Outputs version 1 of packer //PackV1 -- Outputs version 1 of packer
func (p *Packer) PackV1() (b bytes.Buffer) { func (p *Packer) PackV1() (b bytes.Buffer) {
version := byte(1) version := packerV1
var headerBuf bytes.Buffer var headerBuf bytes.Buffer
headerBuf.WriteString(p.Header.FamilyText()) headerBuf.WriteString(p.Header.FamilyText())
@ -35,7 +126,7 @@ func (p *Packer) PackV1() (b bytes.Buffer) {
headerBuf.WriteString(p.Header.Service) headerBuf.WriteString(p.Header.Service)
var metaBuf bytes.Buffer var metaBuf bytes.Buffer
metaBuf.WriteByte(byte(255) - version) metaBuf.WriteByte(version)
metaBuf.WriteByte(byte(headerBuf.Len())) metaBuf.WriteByte(byte(headerBuf.Len()))
var buf bytes.Buffer var buf bytes.Buffer

View File

@ -4,7 +4,8 @@ import "bytes"
//packerData -- Contains packer data //packerData -- Contains packer data
type packerData struct { type packerData struct {
buffer *bytes.Buffer buffer *bytes.Buffer
DataLen int
} }
func newPackerData() (p *packerData) { func newPackerData() (p *packerData) {

View File

@ -7,10 +7,11 @@ type addressFamily int
// packerHeader structure to hold our header information. // packerHeader structure to hold our header information.
type packerHeader struct { type packerHeader struct {
family addressFamily family addressFamily
address net.IP address net.IP
Port int Port int
Service string Service string
HeaderLen byte
} }
//Family -- ENUM for Address Family //Family -- ENUM for Address Family
@ -29,6 +30,7 @@ func newPackerHeader() (p *packerHeader) {
p.SetAddress("127.0.0.1") p.SetAddress("127.0.0.1")
p.Port = 65535 p.Port = 65535
p.Service = "na" p.Service = "na"
p.HeaderLen = 0
return return
} }

View File

@ -3,11 +3,13 @@ package packer
import "log" import "log"
import "os" import "os"
func init() { var (
logFlags := log.Ldate | log.Lmicroseconds | log.Lshortfile loginfo *log.Logger
loginfo := log.New(os.Stdout, "INFO: packer: ", logFlags) logdebug *log.Logger
logdebug := log.New(os.Stdout, "DEBUG: packer:", logFlags) logFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile
)
loginfo.Println("") func init() {
logdebug.Println("") loginfo = log.New(os.Stdout, "INFO: packer: ", logFlags)
logdebug = log.New(os.Stdout, "DEBUG: packer:", logFlags)
} }