diff --git a/.gitignore b/.gitignore index 136a8fb..40185c6 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,4 @@ certs /cmd/telebitd/telebitd /telebit /cmd/telebit/telebit -/m /debug diff --git a/client/client.go b/client/client.go index a903305..1d8453a 100644 --- a/client/client.go +++ b/client/client.go @@ -51,12 +51,13 @@ func Run(ctx context.Context, config *Config) error { authenticated := false for { + fmt.Printf("debug serverURL:\n%+v", serverURL) if conn, _, err := dialer.Dial(serverURL.String(), nil); err == nil { loginfo.Println("connected to remote server") authenticated = true handler.HandleConn(ctx, conn) } else if !authenticated { - return fmt.Errorf("First connection to server failed - check auth: %v", err) + return fmt.Errorf("first connection to server failed - check auth: %s", err.Error()) } loginfo.Println("disconnected from remote server") diff --git a/cmd/telebitd/telebitd.go b/cmd/telebitd/telebitd.go index ff3f1a4..a6c382a 100644 --- a/cmd/telebitd/telebitd.go +++ b/cmd/telebitd/telebitd.go @@ -10,9 +10,11 @@ import ( "log" "net/http" "os" + "strings" telebit "git.coolaj86.com/coolaj86/go-telebitd" "git.coolaj86.com/coolaj86/go-telebitd/relay" + "git.coolaj86.com/coolaj86/go-telebitd/relay/api" "git.coolaj86.com/coolaj86/go-telebitd/server" jwt "github.com/dgrijalva/jwt-go" @@ -36,7 +38,7 @@ var ( argServerAdminBinding string argServerExternalBinding string argDeadTime int - connectionTable *server.Table + connectionTable *api.Table secretKey string wssHostName = "localhost.rootprojects.org" adminHostName = telebit.InvalidAdminDomain @@ -114,14 +116,14 @@ func main() { ctx, cancelContext := context.WithCancel(context.Background()) defer cancelContext() - serverStatus := server.NewStatus(ctx) + serverStatus := api.NewStatus(ctx) serverStatus.AdminDomain = adminHostName serverStatus.WssDomain = wssHostName serverStatus.Name = nickname - serverStatus.DeadTime = server.NewStatusDeadTime(dwell, idle, cancelcheck) + serverStatus.DeadTime = api.NewStatusDeadTime(dwell, idle, cancelcheck) serverStatus.LoadbalanceDefaultMethod = lbDefaultMethod - connectionTable := server.NewTable(dwell, idle, lbDefaultMethod) + connectionTable := api.NewTable(dwell, idle, lbDefaultMethod) tlsConfig := &tls.Config{ GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) { @@ -139,11 +141,29 @@ func main() { authorizer := func(r *http.Request) (*server.Authz, error) { // do we have a valid wss_client? - tokenString := r.URL.Query().Get("access_token") + + var tokenString string + if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) > 1 { + // TODO handle Basic auth tokens as well + tokenString = auth[1] + } + if "" == tokenString { + tokenString = r.URL.Query().Get("access_token") + } + _, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) { return []byte(secretKey), nil }) - return nil, err + if nil != err { + return nil, err + } + + authz := &server.Authz{ + Domains: []string{ + "target.rootprojects.org", + }, + } + return authz, err /* tokenString := r.URL.Query().Get("access_token") diff --git a/server/api_interface.go b/relay/admin/admin.go similarity index 75% rename from server/api_interface.go rename to relay/admin/admin.go index df5f907..dd7378f 100644 --- a/server/api_interface.go +++ b/relay/admin/admin.go @@ -1,40 +1,48 @@ -package server +package admin import ( - "context" + "log" + "net" "net/http" "runtime" "strconv" "strings" - "github.com/gorilla/mux" - telebit "git.coolaj86.com/coolaj86/go-telebitd" - "git.coolaj86.com/coolaj86/go-telebitd/envelope" + "git.coolaj86.com/coolaj86/go-telebitd/relay/api" + "git.coolaj86.com/coolaj86/go-telebitd/server" + + "github.com/gorilla/mux" ) const ( endPointPrefix = "/api/org.rootprojects.tunnel/" ) -var connectionTable *Table -var serverStatus *Status -var serverStatusAPI *Status +var connectionTable *api.Table +var serverStatus *api.Status +var serverStatusAPI *api.Status -//handleAdminClient - +//ListenAndServe - // - expecting an existing oneConnListener with a qualified wss client connected. // - auth will happen again since we were just peeking at the token. -func handleAdminClient(ctx context.Context, oneConn *oneConnListener) { - serverStatus = ctx.Value(ctxServerStatus).(*Status) +func ListenAndServe(mx *server.MPlexy, adminListener net.Listener) error { + //serverStatus = mx.ctx.Value(ctxServerStatus).(*Status) + + connectionTable = mx.Status.ConnectionTable + serverStatusAPI = mx.Status - connectionTable = serverStatus.ConnectionTable - serverStatusAPI = serverStatus router := mux.NewRouter().StrictSlash(true) - router.PathPrefix("/admin/").Handler(http.StripPrefix("/admin/", http.FileServer(http.Dir("html/admin")))) - router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - loginfo.Println("HandleFunc /") + log.Println("HandleFunc /") + + _, err := mx.AuthorizeAdmin(r) + if err == nil { + // TODO + w.Write([]byte("TODO: handle bad auth")) + return + } serverStatus.AdminStats.IncRequests() @@ -64,30 +72,20 @@ func handleAdminClient(ctx context.Context, oneConn *oneConnListener) { Addr: ":80", Handler: router, } - - err := s.Serve(oneConn) - if err != nil { - loginfo.Println("Serve error: ", err) - } - - select { - case <-ctx.Done(): - loginfo.Println("Cancel signal hit") - return - } + return s.Serve(adminListener) } func getStatusEndpoint(w http.ResponseWriter, r *http.Request) { pc, _, _, _ := runtime.Caller(0) - loginfo.Println(runtime.FuncForPC(pc).Name()) + log.Println(runtime.FuncForPC(pc).Name()) serverStatus.AdminStats.IncRequests() - statusContainer := NewStatusAPI(serverStatusAPI) + statusContainer := api.NewStatusAPI(serverStatusAPI) w.Header().Set("Content-Type", "application/json; charset=UTF-8") - env := envelope.NewEnvelope("domains/GET") + env := NewResponse("domains/GET") env.Result = statusContainer env.GenerateWriter(w) serverStatus.AdminStats.IncResponses() @@ -95,15 +93,15 @@ func getStatusEndpoint(w http.ResponseWriter, r *http.Request) { func getDomainsEndpoint(w http.ResponseWriter, r *http.Request) { pc, _, _, _ := runtime.Caller(0) - loginfo.Println(runtime.FuncForPC(pc).Name()) + log.Println(runtime.FuncForPC(pc).Name()) serverStatus.AdminStats.IncRequests() - domainsContainer := NewDomainsAPI(connectionTable.domains) + domainsContainer := api.NewDomainsAPI(connectionTable.Domains) w.Header().Set("Content-Type", "application/json; charset=UTF-8") - env := envelope.NewEnvelope("domains/GET") + env := NewResponse("domains/GET") env.Result = domainsContainer env.GenerateWriter(w) serverStatus.AdminStats.IncResponses() @@ -111,11 +109,11 @@ func getDomainsEndpoint(w http.ResponseWriter, r *http.Request) { func getDomainEndpoint(w http.ResponseWriter, r *http.Request) { pc, _, _, _ := runtime.Caller(0) - loginfo.Println(runtime.FuncForPC(pc).Name()) + log.Println(runtime.FuncForPC(pc).Name()) serverStatus.AdminStats.IncRequests() - env := envelope.NewEnvelope("domain/GET") + env := NewResponse("domain/GET") params := mux.Vars(r) if id, ok := params["domain-name"]; !ok { @@ -124,16 +122,16 @@ func getDomainEndpoint(w http.ResponseWriter, r *http.Request) { env.ErrorDescription = "domain API requires a domain-name" } else { domainName := id - if domainLB, ok := connectionTable.domains[domainName]; !ok { + if domainLB, ok := connectionTable.Domains[domainName]; !ok { env.Error = "domain-name was not found" env.ErrorURI = r.RequestURI env.ErrorDescription = "domain-name not found" } else { - var domainAPIContainer []*ServerDomainAPI + var domainAPIContainer []*api.ServerDomainAPI conns := domainLB.Connections() for pos := range conns { conn := conns[pos] - domainAPI := NewServerDomainAPI(conn, conn.DomainTrack[domainName]) + domainAPI := api.NewServerDomainAPI(conn, conn.DomainTrack[domainName]) domainAPIContainer = append(domainAPIContainer, domainAPI) } env.Result = domainAPIContainer @@ -146,21 +144,21 @@ func getDomainEndpoint(w http.ResponseWriter, r *http.Request) { func getServersEndpoint(w http.ResponseWriter, r *http.Request) { pc, _, _, _ := runtime.Caller(0) - loginfo.Println(runtime.FuncForPC(pc).Name()) + log.Println(runtime.FuncForPC(pc).Name()) serverStatus.AdminStats.IncRequests() - serverContainer := NewServerAPIContainer() + serverContainer := api.NewServerAPIContainer() for c := range connectionTable.Connections() { - serverAPI := NewServersAPI(c) + serverAPI := api.NewServersAPI(c) serverContainer.Servers = append(serverContainer.Servers, serverAPI) } w.Header().Set("Content-Type", "application/json; charset=UTF-8") - env := envelope.NewEnvelope("servers/GET") + env := NewResponse("servers/GET") env.Result = serverContainer env.GenerateWriter(w) serverStatus.AdminStats.IncResponses() @@ -168,11 +166,11 @@ func getServersEndpoint(w http.ResponseWriter, r *http.Request) { func getServerEndpoint(w http.ResponseWriter, r *http.Request) { pc, _, _, _ := runtime.Caller(0) - loginfo.Println(runtime.FuncForPC(pc).Name()) + log.Println(runtime.FuncForPC(pc).Name()) serverStatus.AdminStats.IncRequests() - env := envelope.NewEnvelope("server/GET") + env := NewResponse("server/GET") params := mux.Vars(r) if id, ok := params["server-id"]; !ok { @@ -193,8 +191,8 @@ func getServerEndpoint(w http.ResponseWriter, r *http.Request) { env.ErrorURI = r.RequestURI env.ErrorDescription = "missing server-id, make sure desired service-id is in servers" } else { - loginfo.Println("test") - serverAPI := NewServerAPI(conn) + log.Println("test") + serverAPI := api.NewServerAPI(conn) env.Result = serverAPI } diff --git a/envelope/envelope.go b/relay/admin/response.go similarity index 75% rename from envelope/envelope.go rename to relay/admin/response.go index 38d11e3..219c5d9 100644 --- a/envelope/envelope.go +++ b/relay/admin/response.go @@ -1,4 +1,4 @@ -package envelope +package admin import ( "bytes" @@ -7,8 +7,8 @@ import ( "time" ) -//Envelope -- Standard response structure -type Envelope struct { +//Response -- Standard response structure +type Response struct { TransactionType string `json:"type"` Schema string `json:"schema"` TransactionTimeStamp int64 `json:"txts"` @@ -19,12 +19,12 @@ type Envelope struct { Result interface{} `json:"result"` } -//NewEnvelope -- Constructor -func NewEnvelope(transactionType string) (p *Envelope) { +//NewResponse -- Constructor +func NewResponse(transactionType string) (p *Response) { // TODO BUG use atomic transactionID++ - p = &Envelope{} + p = &Response{} p.TransactionType = transactionType p.TransactionID = transactionID p.TransactionTimeStamp = time.Now().Unix() @@ -34,13 +34,13 @@ func NewEnvelope(transactionType string) (p *Envelope) { } //Generate -- encode into JSON and return string -func (e *Envelope) Generate() string { +func (e *Response) Generate() string { buf := new(bytes.Buffer) json.NewEncoder(buf).Encode(e) return buf.String() } //GenerateWriter -- -func (e *Envelope) GenerateWriter(w io.Writer) { +func (e *Response) GenerateWriter(w io.Writer) { json.NewEncoder(w).Encode(e) } diff --git a/envelope/setup.go b/relay/admin/setup.go similarity index 95% rename from envelope/setup.go rename to relay/admin/setup.go index ccd443b..4145546 100644 --- a/envelope/setup.go +++ b/relay/admin/setup.go @@ -1,4 +1,4 @@ -package envelope +package admin import ( "log" diff --git a/server/api_collect_connections.go b/relay/api/api_collect_connections.go similarity index 96% rename from server/api_collect_connections.go rename to relay/api/api_collect_connections.go index f0201e3..f19c6de 100644 --- a/server/api_collect_connections.go +++ b/relay/api/api_collect_connections.go @@ -1,4 +1,4 @@ -package server +package api //ConnectionStatsAPI -- type ConnectionStatsAPI struct { diff --git a/server/api_collect_domains.go b/relay/api/api_collect_domains.go similarity index 81% rename from server/api_collect_domains.go rename to relay/api/api_collect_domains.go index 1fe974a..79460c5 100644 --- a/server/api_collect_domains.go +++ b/relay/api/api_collect_domains.go @@ -1,4 +1,4 @@ -package server +package api //DomainsAPI -- A collections of all the domains //List of Domains -> DomainAPI @@ -36,14 +36,14 @@ type DomainAPI struct { func NewDomainAPI(domain string, domainLoadBalance *DomainLoadBalance) (p *DomainAPI) { p = new(DomainAPI) p.DomainName = domain - for pos := range domainLoadBalance.connections { - ds := NewDomainServerAPI(domain, domainLoadBalance.connections[pos]) + for pos := range domainLoadBalance.Connections() { + ds := NewDomainServerAPI(domain, domainLoadBalance.Connections()[pos]) p.Servers = append(p.Servers, ds) p.TotalServers++ - p.Traffic.BytesIn += domainLoadBalance.connections[pos].BytesIn() - p.Traffic.BytesOut += domainLoadBalance.connections[pos].BytesOut() - p.Traffic.Requests += domainLoadBalance.connections[pos].requests - p.Traffic.Responses += domainLoadBalance.connections[pos].responses + p.Traffic.BytesIn += domainLoadBalance.Connections()[pos].BytesIn() + p.Traffic.BytesOut += domainLoadBalance.Connections()[pos].BytesOut() + p.Traffic.Requests += domainLoadBalance.Connections()[pos].Requests + p.Traffic.Responses += domainLoadBalance.Connections()[pos].Responses } return } diff --git a/server/api_collect_server.go b/relay/api/api_collect_server.go similarity index 95% rename from server/api_collect_server.go rename to relay/api/api_collect_server.go index 7cfea54..e884da5 100644 --- a/server/api_collect_server.go +++ b/relay/api/api_collect_server.go @@ -1,6 +1,8 @@ -package server +package api -import "time" +import ( + "time" +) //ServerAPI -- Structure to support the server API type ServerAPI struct { @@ -25,7 +27,7 @@ func NewServerAPI(c *Connection) (s *ServerAPI) { s.Idle = time.Since(c.LastUpdate()).Seconds() s.BytesIn = c.BytesIn() s.BytesOut = c.BytesOut() - s.Source = c.source + s.Source = c.Source() for domainName := range c.DomainTrack { diff --git a/server/api_collect_server_domains.go b/relay/api/api_collect_server_domains.go similarity index 92% rename from server/api_collect_server_domains.go rename to relay/api/api_collect_server_domains.go index 426a68f..1d02676 100644 --- a/server/api_collect_server_domains.go +++ b/relay/api/api_collect_server_domains.go @@ -1,4 +1,4 @@ -package server +package api //ServerDomainsAPI -- Structure to support the server API type ServerDomainsAPI struct { @@ -17,8 +17,8 @@ func NewServerDomainsAPI(c *Connection, d *DomainTrack) (s *ServerDomainsAPI) { s.ServerID = c.ConnectionID() s.BytesIn = d.BytesIn() s.BytesOut = d.BytesOut() - s.Requests = d.requests - s.Responses = d.responses + s.Requests = d.Requests() + s.Responses = d.Responses() return } @@ -53,8 +53,8 @@ func NewServerDomainAPI(c *Connection, d *DomainTrack) (s *ServerDomainAPI) { s.ServerID = c.ConnectionID() s.BytesIn = d.BytesIn() s.BytesOut = d.BytesOut() - s.Requests = d.requests - s.Responses = d.responses + s.Requests = d.Requests() + s.Responses = d.Responses() s.Source = c.Source() return } diff --git a/server/api_collect_servers.go b/relay/api/api_collect_servers.go similarity index 94% rename from server/api_collect_servers.go rename to relay/api/api_collect_servers.go index 302df90..381a344 100644 --- a/server/api_collect_servers.go +++ b/relay/api/api_collect_servers.go @@ -1,6 +1,10 @@ -package server +package api -import "time" +import ( + "time" + + +) //ServersAPI -- Structure to support the server API type ServersAPI struct { @@ -27,8 +31,8 @@ func NewServersAPI(c *Connection) (s *ServersAPI) { s.Idle = time.Since(c.LastUpdate()).Seconds() s.BytesIn = c.BytesIn() s.BytesOut = c.BytesOut() - s.Requests = c.requests - s.Responses = c.responses + s.Requests = c.Requests + s.Responses = c.Responses s.Source = c.Source() s.State = c.State() diff --git a/server/api_collect_status.go b/relay/api/api_collect_status.go similarity index 97% rename from server/api_collect_status.go rename to relay/api/api_collect_status.go index f796d7f..c8e4ab1 100644 --- a/server/api_collect_status.go +++ b/relay/api/api_collect_status.go @@ -1,4 +1,4 @@ -package server +package api import ( "time" @@ -26,7 +26,7 @@ func NewStatusAPI(c *Status) (s *StatusAPI) { s.WssDomain = c.WssDomain s.AdminDomain = c.AdminDomain s.LoadbalanceDefaultMethod = c.LoadbalanceDefaultMethod - s.DeadTime = NewStatusDeadTimeAPI(c.DeadTime.dwell, c.DeadTime.idle, c.DeadTime.cancelcheck) + s.DeadTime = NewStatusDeadTimeAPI(c.DeadTime.dwell, c.DeadTime.idle, c.DeadTime.Cancelcheck) s.AdminStats = NewTrafficAPI(c.AdminStats.Requests, c.AdminStats.Responses, c.AdminStats.BytesIn, c.AdminStats.BytesOut) s.TrafficStats = NewTrafficAPI(c.TrafficStats.Requests, c.TrafficStats.Responses, c.TrafficStats.BytesIn, c.TrafficStats.BytesOut) s.ExtConnections = NewConnectionStatsAPI(c.ExtConnections.Connections, c.ExtConnections.TotalConnections) diff --git a/server/api_collect_status_dead_time.go b/relay/api/api_collect_status_dead_time.go similarity index 96% rename from server/api_collect_status_dead_time.go rename to relay/api/api_collect_status_dead_time.go index 75c9221..c7b6639 100644 --- a/server/api_collect_status_dead_time.go +++ b/relay/api/api_collect_status_dead_time.go @@ -1,4 +1,4 @@ -package server +package api //StatusDeadTimeAPI -- structure for deadtime configuration type StatusDeadTimeAPI struct { diff --git a/server/api_collect_status_traffic.go b/relay/api/api_collect_status_traffic.go similarity index 95% rename from server/api_collect_status_traffic.go rename to relay/api/api_collect_status_traffic.go index 60692f8..9ebfc27 100644 --- a/server/api_collect_status_traffic.go +++ b/relay/api/api_collect_status_traffic.go @@ -1,4 +1,4 @@ -package server +package api //TrafficStats -- type TrafficAPI struct { diff --git a/server/conn_tracking.go b/relay/api/conn_tracking.go similarity index 88% rename from server/conn_tracking.go rename to relay/api/conn_tracking.go index 7acfb7c..e20adf5 100644 --- a/server/conn_tracking.go +++ b/relay/api/conn_tracking.go @@ -1,8 +1,9 @@ -package server +package api import ( "context" "fmt" + "log" "net" "sync" ) @@ -40,19 +41,19 @@ func NewTracking() (p *Tracking) { //Run - func (p *Tracking) Run(ctx context.Context) { - loginfo.Println("Tracking Running") + log.Println("Tracking Running") for { select { case <-ctx.Done(): - loginfo.Println("Cancel signal hit") + log.Println("Cancel signal hit") return case connection := <-p.register: p.mutex.Lock() key := connection.conn.RemoteAddr().String() - loginfo.Println("register fired", key) + log.Println("register fired", key) p.connections[key] = connection p.list() p.mutex.Unlock() @@ -60,7 +61,7 @@ func (p *Tracking) Run(ctx context.Context) { case connection := <-p.unregister: p.mutex.Lock() key := connection.RemoteAddr().String() - loginfo.Println("unregister fired", key) + log.Println("unregister fired", key) if _, ok := p.connections[key]; ok { delete(p.connections, key) } @@ -72,7 +73,7 @@ func (p *Tracking) Run(ctx context.Context) { func (p *Tracking) list() { for c := range p.connections { - loginfo.Println(c) + log.Println(c) } } diff --git a/server/connection.go b/relay/api/connection.go similarity index 88% rename from server/connection.go rename to relay/api/connection.go index 3ad0937..b15d0e9 100755 --- a/server/connection.go +++ b/relay/api/connection.go @@ -1,9 +1,10 @@ -package server +package api import ( "context" "fmt" "io" + "log" "sync" "time" @@ -12,6 +13,8 @@ import ( "git.coolaj86.com/coolaj86/go-telebitd/packer" ) +var connectionID int64 = 0 + // Connection track websocket and faciliates in and out data type Connection struct { mutex sync.Mutex @@ -43,11 +46,11 @@ type Connection struct { // bytes out bytesOut int64 - // requests - requests int64 + // Requests + Requests int64 // TODO atomic - // response - responses int64 + // Response + Responses int64 // TODO atomic // Connect Time connectTime time.Time @@ -79,8 +82,8 @@ func NewConnection(connectionTable *Table, conn *websocket.Conn, remoteAddress s p.serverName = serverName p.bytesIn = 0 p.bytesOut = 0 - p.requests = 0 - p.responses = 0 + p.Requests = 0 + p.Responses = 0 p.send = make(chan *SendTrack) p.connectTime = time.Now() p.initialDomains = initialDomains @@ -153,11 +156,13 @@ func (c *Connection) addOut(num int64) { } func (c *Connection) addRequests() { - c.requests = c.requests + 1 + // TODO atomic + c.Requests++ } func (c *Connection) addResponse() { - c.responses = c.responses + 1 + // TODO atomic + c.Responses++ } //ConnectionTable -- property @@ -208,7 +213,7 @@ func (c *Connection) NextWriter(wssMessageType int) (io.WriteCloser, error) { } // Is returning a nil error actually the proper thing to do here? - loginfo.Println("NextWriter aborted, state is not true") + log.Println("NextWriter aborted, state is not true") return nil, nil } @@ -229,23 +234,23 @@ func (c *Connection) Reader(ctx context.Context) { defer func() { c.connectionTable.unregister <- c c.conn.Close() - loginfo.Println("reader defer", c) + log.Println("reader defer", c) }() - loginfo.Println("Reader Start ", c) + log.Println("Reader Start ", c) //c.conn.SetReadLimit(65535) for { _, message, err := c.conn.ReadMessage() - //loginfo.Println("ReadMessage", msgType, err) + //log.Println("ReadMessage", msgType, err) c.Update() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { c.SetState(false) - loginfo.Printf("error: %v", err) + log.Printf("error: %v", err) } break } @@ -255,10 +260,10 @@ func (c *Connection) Reader(ctx context.Context) { key := fmt.Sprintf("%s:%d", p.Address(), p.Port()) track, err := connectionTrack.Lookup(key) - //loginfo.Println(hex.Dump(p.Data.Data())) + //log.Println(hex.Dump(p.Data.Data())) if err != nil { - //loginfo.Println("Unable to locate Tracking for ", key) + //log.Println("Unable to locate Tracking for ", key) continue } @@ -273,7 +278,7 @@ func (c *Connection) Reader(ctx context.Context) { c.addIn(int64(len(message))) c.addResponse() - //loginfo.Println("end of read") + //log.Println("end of read") } } @@ -281,14 +286,14 @@ func (c *Connection) Reader(ctx context.Context) { func (c *Connection) Writer() { defer c.conn.Close() - loginfo.Println("Writer Start ", c) + log.Println("Writer Start ", c) for { select { case message := <-c.send: w, err := c.NextWriter(websocket.BinaryMessage) - loginfo.Println("next writer ", w) + log.Println("next writer ", w) if err != nil { c.SetState(false) return @@ -312,12 +317,12 @@ func (c *Connection) Writer() { //if ok then add to structure, else warn there is something wrong domainTrack.AddIn(messageLen) domainTrack.AddRequests() - loginfo.Println("adding ", messageLen, " to ", message.domain) + log.Println("adding ", messageLen, " to ", message.domain) } else { - logdebug.Println("attempting to add bytes to ", message.domain, "it does not exist") - logdebug.Println(c.DomainTrack) + log.Println("attempting to add bytes to ", message.domain, "it does not exist") + log.Println(c.DomainTrack) } - loginfo.Println(c) + log.Println(c) } } } diff --git a/server/connection_registration.go b/relay/api/connection_registration.go similarity index 98% rename from server/connection_registration.go rename to relay/api/connection_registration.go index bd51dc0..2a167d5 100644 --- a/server/connection_registration.go +++ b/relay/api/connection_registration.go @@ -1,7 +1,9 @@ -package server +package api import ( "github.com/gorilla/websocket" + + ) //Registration -- A connection registration structure used to bring up a connection diff --git a/server/domain_loadbalance.go b/relay/api/domain_loadbalance.go similarity index 87% rename from server/domain_loadbalance.go rename to relay/api/domain_loadbalance.go index 59f58df..f137641 100644 --- a/server/domain_loadbalance.go +++ b/relay/api/domain_loadbalance.go @@ -1,7 +1,8 @@ -package server +package api import ( "fmt" + "log" "sync" ) @@ -54,7 +55,7 @@ func (p *DomainLoadBalance) NextMember() (conn *Connection) { defer p.mutex.Unlock() //check for round robin, if not RR then drop out and call calculate - loginfo.Println("NextMember:", p) + log.Println("NextMember:", p) if p.method == lbmRoundRobin { p.lastmember++ if p.lastmember >= p.count { @@ -75,33 +76,33 @@ func (p *DomainLoadBalance) NextMember() (conn *Connection) { //this should not affect the next member calculation in RR. However it many in other //methods func (p *DomainLoadBalance) AddConnection(conn *Connection) []*Connection { - loginfo.Println("AddConnection", fmt.Sprintf("%p", conn)) + log.Println("AddConnection", fmt.Sprintf("%p", conn)) p.mutex.Lock() defer p.mutex.Unlock() p.connections = append(p.connections, conn) p.count++ - loginfo.Println("AddConnection", p) + log.Println("AddConnection", p) return p.connections } //RemoveConnection -- removes a matching connection from the list. This may //affect the nextmember calculation if found so the recalc flag is set. func (p *DomainLoadBalance) RemoveConnection(conn *Connection) { - loginfo.Println("RemoveConnection", fmt.Sprintf("%p", conn)) + log.Println("RemoveConnection", fmt.Sprintf("%p", conn)) p.mutex.Lock() defer p.mutex.Unlock() //scan all the connections for pos := range p.connections { - loginfo.Println("RemoveConnection", pos, len(p.connections), p.count) + log.Println("RemoveConnection", pos, len(p.connections), p.count) if p.connections[pos] == conn { //found connection remove it - loginfo.Printf("found connection %p", conn) + log.Printf("found connection %p", conn) p.connections[pos], p.connections = p.connections[len(p.connections)-1], p.connections[:len(p.connections)-1] p.count-- break } } - loginfo.Println("RemoveConnection:", p) + log.Println("RemoveConnection:", p) } diff --git a/server/domain_mapping.go b/relay/api/domain_mapping.go similarity index 96% rename from server/domain_mapping.go rename to relay/api/domain_mapping.go index 8f8be8b..a1c2e05 100644 --- a/server/domain_mapping.go +++ b/relay/api/domain_mapping.go @@ -1,4 +1,4 @@ -package server +package api //DomainMapping -- type DomainMapping struct { diff --git a/server/domain_track.go b/relay/api/domain_track.go similarity index 98% rename from server/domain_track.go rename to relay/api/domain_track.go index 89be377..5a145e1 100644 --- a/server/domain_track.go +++ b/relay/api/domain_track.go @@ -1,4 +1,4 @@ -package server +package api //DomainTrack -- Tracking specifics for domains type DomainTrack struct { diff --git a/server/send_track.go b/relay/api/send_track.go similarity index 95% rename from server/send_track.go rename to relay/api/send_track.go index 98352f4..66e8c8b 100644 --- a/server/send_track.go +++ b/relay/api/send_track.go @@ -1,4 +1,4 @@ -package server +package api //SendTrack -- Used as a channel communication to id domain asssociated to domain for outbound WSS type SendTrack struct { diff --git a/server/status.go b/relay/api/status.go similarity index 94% rename from server/status.go rename to relay/api/status.go index 3fd8add..cec684b 100644 --- a/server/status.go +++ b/relay/api/status.go @@ -1,8 +1,10 @@ -package server +package api import ( "context" "time" + + "git.coolaj86.com/coolaj86/go-telebitd/tunnel" ) //Status -- @@ -64,7 +66,7 @@ func (p *Status) ExtConnectionRegister(newTrack *Track) { //ExtConnectionUnregister -- //unregisters an ext facing connection //intercept and update global statistics -func (p *Status) ExtConnectionUnregister(extConn *WedgeConn) { +func (p *Status) ExtConnectionUnregister(extConn *tunnel.WedgeConn) { p.ConnectionTracking.unregister <- extConn p.ExtConnections.DecConnections() diff --git a/server/status_dead_time.go b/relay/api/status_dead_time.go similarity index 83% rename from server/status_dead_time.go rename to relay/api/status_dead_time.go index c53672a..75e6ad0 100644 --- a/server/status_dead_time.go +++ b/relay/api/status_dead_time.go @@ -1,10 +1,10 @@ -package server +package api //StatusDeadTime -- structure for deadtime configuration type StatusDeadTime struct { dwell int idle int - cancelcheck int + Cancelcheck int } //NewStatusDeadTime -- constructor @@ -12,6 +12,6 @@ func NewStatusDeadTime(dwell, idle, cancelcheck int) (p *StatusDeadTime) { p = new(StatusDeadTime) p.dwell = dwell p.idle = idle - p.cancelcheck = cancelcheck + p.Cancelcheck = cancelcheck return } diff --git a/server/status_req_type.go b/relay/api/status_req_type.go similarity index 98% rename from server/status_req_type.go rename to relay/api/status_req_type.go index c1b7d32..ae3b536 100644 --- a/server/status_req_type.go +++ b/relay/api/status_req_type.go @@ -1,4 +1,4 @@ -package server +package api import "sync" diff --git a/server/status_traffic_connections.go b/relay/api/status_traffic_connections.go similarity index 97% rename from server/status_traffic_connections.go rename to relay/api/status_traffic_connections.go index 47131db..8eaa72f 100644 --- a/server/status_traffic_connections.go +++ b/relay/api/status_traffic_connections.go @@ -1,4 +1,4 @@ -package server +package api //ConnectionStats -- type ConnectionStats struct { diff --git a/server/status_traffic_stats.go b/relay/api/status_traffic_stats.go similarity index 97% rename from server/status_traffic_stats.go rename to relay/api/status_traffic_stats.go index 2a19a0b..06698bf 100644 --- a/server/status_traffic_stats.go +++ b/relay/api/status_traffic_stats.go @@ -1,4 +1,4 @@ -package server +package api //TrafficStats -- type TrafficStats struct { diff --git a/server/connection_table.go b/relay/api/table.go similarity index 76% rename from server/connection_table.go rename to relay/api/table.go index 9777711..e23f8b8 100755 --- a/server/connection_table.go +++ b/relay/api/table.go @@ -1,8 +1,9 @@ -package server +package api import ( "context" "fmt" + "log" "time" ) @@ -14,7 +15,7 @@ const ( //Table maintains the set of connections type Table struct { connections map[*Connection][]string - domains map[string]*DomainLoadBalance + Domains map[string]*DomainLoadBalance register chan *Registration unregister chan *Connection domainAnnounce chan *DomainMapping @@ -28,7 +29,7 @@ type Table struct { func NewTable(dwell, idle int, balanceMethod string) (p *Table) { p = new(Table) p.connections = make(map[*Connection][]string) - p.domains = make(map[string]*DomainLoadBalance) + p.Domains = make(map[string]*DomainLoadBalance) p.register = make(chan *Registration) p.unregister = make(chan *Connection) p.domainAnnounce = make(chan *DomainMapping) @@ -48,11 +49,11 @@ func (c *Table) Connections() map[*Connection][]string { //if that is the case the system stores these connections and then sends traffic back round-robin //back to the WSS connections func (c *Table) ConnByDomain(domain string) (*Connection, bool) { - for dn := range c.domains { - loginfo.Println(dn, domain) + for dn := range c.Domains { + log.Println(dn, domain) } - if domainsLB, ok := c.domains[domain]; ok { - loginfo.Println("found") + if domainsLB, ok := c.Domains[domain]; ok { + log.Println("found") conn := domainsLB.NextMember() return conn, ok } @@ -63,14 +64,14 @@ func (c *Table) ConnByDomain(domain string) (*Connection, bool) { func (c *Table) reaper(delay int, idle int) { _ = "breakpoint" for { - loginfo.Println("Reaper waiting for ", delay, " seconds") + log.Println("Reaper waiting for ", delay, " seconds") time.Sleep(time.Duration(delay) * time.Second) - loginfo.Println("Running scanning ", len(c.connections)) + log.Println("Running scanning ", len(c.connections)) for d := range c.connections { if !d.State() { if time.Since(d.lastUpdate).Seconds() > float64(idle) { - loginfo.Println("reaper removing ", d.lastUpdate, time.Since(d.lastUpdate).Seconds()) + log.Println("reaper removing ", d.lastUpdate, time.Since(d.lastUpdate).Seconds()) delete(c.connections, d) } } @@ -91,7 +92,7 @@ func (c *Table) GetConnection(serverID int64) (*Connection, error) { //Run -- Execute func (c *Table) Run(ctx context.Context) { - loginfo.Println("ConnectionTable starting") + log.Println("ConnectionTable starting") go c.reaper(c.dwell, c.idle) @@ -99,11 +100,11 @@ func (c *Table) Run(ctx context.Context) { select { case <-ctx.Done(): - loginfo.Println("Cancel signal hit") + log.Println("Cancel signal hit") return case registration := <-c.register: - loginfo.Println("register fired") + log.Println("register fired") connection := NewConnection(c, registration.conn, registration.source, registration.initialDomains, registration.connectionTrack, registration.serverName) @@ -115,17 +116,17 @@ func (c *Table) Run(ctx context.Context) { // add to the domains regirstation newDomain := domain - loginfo.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String()) + log.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String()) //check to see if domain is already present. - if _, ok := c.domains[newDomain]; ok { + if _, ok := c.Domains[newDomain]; ok { //append to a list of connections for that domain - c.domains[newDomain].AddConnection(connection) + c.Domains[newDomain].AddConnection(connection) } else { //if not, then add as the 1st to the list of connections - c.domains[newDomain] = NewDomainLoadBalance(c.balanceMethod) - c.domains[newDomain].AddConnection(connection) + c.Domains[newDomain] = NewDomainLoadBalance(c.balanceMethod) + c.Domains[newDomain].AddConnection(connection) } // add to the connection domain list @@ -136,26 +137,26 @@ func (c *Table) Run(ctx context.Context) { go connection.Reader(ctx) case connection := <-c.unregister: - loginfo.Println("closing connection ", connection.conn.RemoteAddr().String()) + log.Println("closing connection ", connection.conn.RemoteAddr().String()) //does connection exist in the connection table -- should never be an issue if _, ok := c.connections[connection]; ok { //iterate over the connections for the domain for _, domain := range c.connections[connection] { - loginfo.Println("remove domain", domain) + log.Println("remove domain", domain) //removing domain, make sure it is present (should never be a problem) - if _, ok := c.domains[domain]; ok { + if _, ok := c.Domains[domain]; ok { - domainLB := c.domains[domain] + domainLB := c.Domains[domain] domainLB.RemoveConnection(connection) //check to see if domain is free of connections, if yes, delete map entry if domainLB.count > 0 { //ignore...perhaps we will do something here dealing wtih the lb method } else { - delete(c.domains, domain) + delete(c.Domains, domain) } } } @@ -165,7 +166,7 @@ func (c *Table) Run(ctx context.Context) { } case domainMapping := <-c.domainAnnounce: - loginfo.Println("domainMapping fired ", domainMapping) + log.Println("domainMapping fired ", domainMapping) //check to make sure connection is already regiered, you can no register a domain without an apporved connection //if connection, ok := connections[domainMapping.connection]; ok { diff --git a/relay/relay.go b/relay/relay.go index a86e9f9..612b207 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -3,26 +3,43 @@ package relay import ( "context" "crypto/tls" + "log" + "net" + "net/http" + "git.coolaj86.com/coolaj86/go-telebitd/relay/admin" + "git.coolaj86.com/coolaj86/go-telebitd/relay/api" "git.coolaj86.com/coolaj86/go-telebitd/server" + "git.coolaj86.com/coolaj86/go-telebitd/tunnel" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" ) +// Relay is probably a layer that doesn't need to exist type Relay struct { ctx context.Context - status *server.Status + status *api.Status mx *server.MPlexy - table *server.Table + table *api.Table } -func New(ctx context.Context, tlsConfig *tls.Config, authz server.Authorizer, status *server.Status, table *server.Table) *Relay { - return &Relay{ +// New initializes and returns a relay service +func New(ctx context.Context, tlsConfig *tls.Config, authz server.Authorizer, status *api.Status, table *api.Table) *Relay { + // TODO do we need this already setup here? or is it just for logging? + status.ConnectionTracking = api.NewTracking() + status.ConnectionTable = table + authAdmin := authz + r := &Relay{ ctx: ctx, status: status, table: table, - mx: server.New(ctx, tlsConfig, authz, status), + mx: server.New(ctx, tlsConfig, authAdmin, authz, status), // TODO Accept } + return r } +// ListenAndServe sets up all of the tcp, http, https, and tunnel servers func (r *Relay) ListenAndServe(port int) error { serverStatus := r.status @@ -36,11 +53,7 @@ func (r *Relay) ListenAndServe(port int) error { // - if tls, establish, protocol peek buffer, else decrypted // - match protocol - connectionTracking := server.NewTracking() - serverStatus.ConnectionTracking = connectionTracking - go connectionTracking.Run(r.ctx) - - serverStatus.ConnectionTable = r.table + go r.status.ConnectionTracking.Run(r.ctx) go serverStatus.ConnectionTable.Run(r.ctx) //serverStatus.GenericListeners = genericListeners @@ -48,5 +61,65 @@ func (r *Relay) ListenAndServe(port int) error { // blocks until it can listen, which it can't until started go r.mx.MultiListenAndServe(port) + // funnel target devices into WebSocket pool + tunnelListener := tunnel.NewListener() + r.mx.AcceptTargetServer = func(conn net.Conn) { + tunnelListener.Feed(conn) + } + go listenAndServeTargets(r.mx, tunnelListener) + + // funnel admin clients to API + adminListener := tunnel.NewListener() + r.mx.AcceptAdminClient = func(conn net.Conn) { + adminListener.Feed(conn) + } + go admin.ListenAndServe(r.mx, adminListener) + return r.mx.Run() } + +func listenAndServeTargets(mx *server.MPlexy, handler net.Listener) error { + serverStatus := mx.Status + + router := mux.NewRouter().StrictSlash(true) + router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + log.Println("HandleFunc /") + switch url := r.URL.Path; url { + case "/": + log.Println("websocket opening ", r.RemoteAddr, " ", r.Host) + + authz, err := mx.AuthorizeTarget(r) + var upgrader = websocket.Upgrader{ + ReadBufferSize: 65535, + WriteBufferSize: 65535, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println("WebSocket upgrade failed", err) + return + } + + log.Println("before connection table") + + serverName := authz.Domains[0] + + newRegistration := api.NewRegistration(conn, r.RemoteAddr, authz.Domains, serverStatus.ConnectionTracking, serverName) + serverStatus.WSSConnectionRegister(newRegistration) + + if ok := <-newRegistration.CommCh(); !ok { + log.Println("connection registration failed ", newRegistration) + return + } + + log.Println("connection registration accepted ", newRegistration) + } + }) + + // TODO setup for http/2 + s := &http.Server{ + Addr: ":80", + Handler: router, + } + return s.Serve(handler) +} diff --git a/server/listener_generic.go b/server/listener_generic.go index a478005..c809636 100644 --- a/server/listener_generic.go +++ b/server/listener_generic.go @@ -14,22 +14,18 @@ import ( "strings" "time" - "github.com/gorilla/mux" - "github.com/gorilla/websocket" - telebit "git.coolaj86.com/coolaj86/go-telebitd" "git.coolaj86.com/coolaj86/go-telebitd/packer" + "git.coolaj86.com/coolaj86/go-telebitd/relay/api" "git.coolaj86.com/coolaj86/go-telebitd/sni" + "git.coolaj86.com/coolaj86/go-telebitd/tunnel" ) type contextKey string //CtxConnectionTrack const ( - ctxServerStatus contextKey = "serverstatus" - - //ctxConnectionTable contextKey = "connectionTable" - + ctxServerStatus contextKey = "serverstatus" ctxConfig contextKey = "tlsConfig" ctxListenerRegistration contextKey = "listenerRegistration" ctxConnectionTrack contextKey = "connectionTrack" @@ -37,6 +33,7 @@ const ( ctxAdminHostName contextKey = "adminHostName" ctxCancelCheck contextKey = "cancelcheck" ctxLoadbalanceDefaultMethod contextKey = "lbdefaultmethod" + //ctxConnectionTable contextKey = "connectionTable" ) // TODO isn't this restriction in the TLS lib? @@ -102,7 +99,7 @@ func (mx *MPlexy) multiListenAndServe(ctx context.Context, listenerRegistration fmt.Println("New connection from %v on %v", conn.LocalAddr(), conn.RemoteAddr()) // TODO maybe put these into something like mx.newConnCh and have an mx.Accept()? - wedgeConn := NewWedgeConn(conn) + wedgeConn := tunnel.NewWedgeConn(conn) go mx.accept(ctx, wedgeConn) } } @@ -111,22 +108,18 @@ func (mx *MPlexy) multiListenAndServe(ctx context.Context, listenerRegistration //accept - // - accept a wedgeConnection along with all the other required attritvues // - peek into the buffer, determine TLS or unencrypted -// - if TSL, then terminate with a TLS endpoint, pass to handleStream -// - if clearText, pass to handleStream -func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) { - // TODO shouldn't this responsibility fall elsewhere? - // (otherwise I think we're keeping this function in memory while something else fails to end) - // (i.e. something, somewhere is missing a `go doStuff()` - defer wConn.Close() +// - if TSL, then terminate with a TLS endpoint, pass to acceptEcryptedStream +// - if clearText, pass to acceptPlainStream +func (mx *MPlexy) accept(ctx context.Context, wConn *tunnel.WedgeConn) { peekCnt := 10 encryptMode := encryptNone - loginfo.Println("conn", wConn, wConn.LocalAddr().String(), wConn.RemoteAddr().String()) + loginfo.Println("new conn", wConn, wConn.LocalAddr().String(), wConn.RemoteAddr().String()) peek, err := wConn.Peek(peekCnt) - if err != nil { loginfo.Println("error while peeking") + wConn.Close() return } @@ -152,85 +145,61 @@ func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) { } - oneConn := &oneConnListener{wConn} - tlsConfig := ctx.Value(ctxConfig).(*tls.Config) - if encryptMode == encryptSSLV2 { loginfo.Println("<= SSLv2 is not accepted") + wConn.Close() return } if encryptMode == encryptNone { loginfo.Println("Handle Unencrypted") - mx.handleStream(ctx, wConn) + mx.acceptPlainStream(ctx, wConn, false) return } loginfo.Println("Handle Encryption") + mx.acceptEncryptedStream(ctx, wConn) +} - // check SNI heading - // if matched, then looks like a WSS connection - // else external don't pull off TLS. +func (mx *MPlexy) acceptEncryptedStream(ctx context.Context, wConn *tunnel.WedgeConn) { + // Peek at SNI (ServerName) from TLS Hello header - peek, err = wConn.PeekAll() + peek, err := wConn.PeekAll() if err != nil { - loginfo.Println("error while peeking") + loginfo.Println("Bad socket: read error from", wConn.RemoteAddr(), err) loginfo.Println(hex.Dump(peek[0:])) + wConn.Close() return } - wssHostName := ctx.Value(ctxWssHostName).(string) - adminHostName := ctx.Value(ctxAdminHostName).(string) - sniHostName, err := sni.GetHostname(peek) if err != nil { + loginfo.Println("Bad socket: no SNI from", wConn.RemoteAddr(), err) loginfo.Println(err) + wConn.Close() return } - loginfo.Println("sni:", sniHostName) + loginfo.Println("SNI:", sniHostName) - // This is where a target device connects to receive traffic - if sniHostName == wssHostName { - //handle WSS Path - tlsListener := tls.NewListener(oneConn, tlsConfig) - - conn, err := tlsListener.Accept() - if err != nil { - loginfo.Println(err) - return - } - - tlsWedgeConn := NewWedgeConn(conn) - mx.handleStream(ctx, tlsWedgeConn) + if sniHostName == mx.wssHostName || sniHostName == mx.adminHostName { + // The TLS should be terminated and handled internally + tlsConfig := ctx.Value(ctxConfig).(*tls.Config) + conn := tls.Client(wConn, tlsConfig) + tlsWedgeConn := tunnel.NewWedgeConn(conn) + mx.acceptPlainStream(ctx, tlsWedgeConn, true) return } - // This is where an admin of the relay manages it - if sniHostName == adminHostName { - // TODO mx.Admin.CheckRemoteIP(conn) here + //oneConn := &oneConnListener{wConn} - // handle admin path - tlsListener := tls.NewListener(oneConn, tlsConfig) - - conn, err := tlsListener.Accept() - if err != nil { - loginfo.Println(err) - return - } - - tlsWedgeConn := NewWedgeConn(conn) - mx.handleStream(ctx, tlsWedgeConn) - return - } - - //traffic not terminating on the rvpn do not decrypt - loginfo.Println("processing non terminating traffic", wssHostName, sniHostName) - handleExternalHTTPRequest(ctx, wConn, sniHostName, "https") + // TLS remains intact and shall be routed downstream, wholesale + loginfo.Println("processing non terminating traffic", mx.wssHostName, sniHostName) + go mx.routeToTarget(ctx, wConn, sniHostName, "https") } -//handleStream -- +//acceptPlainStream -- // - we have an unencrypted stream connection with the ability to peek // - attempt to identify HTTP // - handle http @@ -238,15 +207,15 @@ func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) { // - attempt to identify as ADMIN/API session // - else handle as raw http // - handle other? -func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) { - loginfo.Println("handle Stream") - loginfo.Println("conn", wConn.LocalAddr().String(), wConn.RemoteAddr().String()) +func (mx *MPlexy) acceptPlainStream(ctx context.Context, wConn *tunnel.WedgeConn, encrypted bool) { + loginfo.Println("Plain Conn", wConn.LocalAddr().String(), wConn.RemoteAddr().String()) - // TODO couldn't this be dangerous? Or is it limited to a single packet? + // TODO couldn't reading everything be dangerous? Or is it limited to a single packet? peek, err := wConn.PeekAll() if err != nil { loginfo.Println("error while peeking", err) loginfo.Println(hex.Dump(peek[0:])) + wConn.Close() return } @@ -255,11 +224,13 @@ func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) { // HTTP Identifcation // CRLF if !bytes.Contains(peek[:], []byte{0x0d, 0x0a}) { + wConn.Close() return } //string protocol if !bytes.ContainsAny(peek[:], "HTTP/") { + wConn.Close() return } @@ -268,40 +239,51 @@ func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) { r, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(peek))) if err != nil { loginfo.Println("identified as HTTP, failed request parsing", err) - return - } - - // TODO add newtypes - // TODO check if this is a websocket - _, err = mx.authorize(r) - - if err == nil { - loginfo.Println("Valid WSS dected...sending to handler") - oneConn := &oneConnListener{wConn} - mx.handleWssClient(ctx, oneConn) - - //do we have a invalid domain indicating Admin? - //if yes, prep the oneConn and send it to the handler + wConn.Close() return } if strings.Contains(r.Host, telebit.InvalidAdminDomain) { loginfo.Println("admin") - oneConn := &oneConnListener{wConn} - handleAdminClient(ctx, oneConn) + // TODO mx.Admin.CheckRemoteIP(conn) here + // handle admin path + mx.AcceptAdminClient(wConn) return } - loginfo.Println("unsupported") + + // TODO add newtypes + // TODO check if this is a websocket + _, err = mx.AuthorizeTarget(r) + if err == nil { + loginfo.Println("Valid WSS dected...sending to handler") + mx.AcceptTargetServer(wConn) + return + } + + // TODO sniHostName is the key to the route, which could also be a port or hostname + //traffic not terminating on the rvpn do not decrypt + loginfo.Println("processing non terminating traffic", mx.wssHostName, r.Host) loginfo.Println(hex.Dump(peek)) - return + if !encrypted { + // TODO request and cache http resources as a feature?? + go mx.routeToTarget(ctx, wConn, r.Host, "http") + return + } + + // This is not presently possible + loginfo.Println("impossible condition: local decryption of routable client", mx.wssHostName, r.Host) + go mx.routeToTarget(ctx, wConn, r.Host, "https") } -//handleExternalHTTPRequest - +//routeToTarget - // - get a wConn and start processing requests -func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname, service string) { +func (mx *MPlexy) routeToTarget(ctx context.Context, extConn *tunnel.WedgeConn, hostname, service string) { + // TODO is this the right place to do this? + defer extConn.Close() + //connectionTracking := ctx.Value(ctxConnectionTrack).(*Tracking) - serverStatus := ctx.Value(ctxServerStatus).(*Status) + serverStatus := ctx.Value(ctxServerStatus).(*api.Status) defer func() { serverStatus.ExtConnectionUnregister(extConn) @@ -317,7 +299,7 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname return } - track := NewTrack(extConn, hostname) + track := api.NewTrack(extConn, hostname) serverStatus.ExtConnectionRegister(track) remoteStr := extConn.RemoteAddr().String() @@ -352,7 +334,7 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname //loginfo.Println(hex.Dump(buf.Bytes())) //Bundle up the send request and dispatch - sendTrack := NewSendTrack(buf.Bytes(), hostname) + sendTrack := api.NewSendTrack(buf.Bytes(), hostname) serverStatus.SendExtRequest(conn, sendTrack) cnt := len(buffer) @@ -363,63 +345,3 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname } } - -//handleWssClient - -// - expecting an existing oneConnListener with a qualified wss client connected. -// - auth will happen again since we were just peeking at the token. -func (mx *MPlexy) handleWssClient(ctx context.Context, oneConn *oneConnListener) { - serverStatus := ctx.Value(ctxServerStatus).(*Status) - - //connectionTable := ctx.Value(ctxConnectionTable).(*Table) - - router := mux.NewRouter().StrictSlash(true) - router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - loginfo.Println("HandleFunc /") - switch url := r.URL.Path; url { - case "/": - loginfo.Println("websocket opening ", r.RemoteAddr, " ", r.Host) - - authz, err := mx.authorize(r) - var upgrader = websocket.Upgrader{ - ReadBufferSize: 65535, - WriteBufferSize: 65535, - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - loginfo.Println("WebSocket upgrade failed", err) - return - } - - loginfo.Println("before connection table") - - serverName := authz.Domains[0] - - newRegistration := NewRegistration(conn, r.RemoteAddr, authz.Domains, serverStatus.ConnectionTracking, serverName) - serverStatus.WSSConnectionRegister(newRegistration) - - if ok := <-newRegistration.CommCh(); !ok { - loginfo.Println("connection registration failed ", newRegistration) - return - } - - loginfo.Println("connection registration accepted ", newRegistration) - } - }) - - s := &http.Server{ - Addr: ":80", - Handler: router, - } - - err := s.Serve(oneConn) - if err != nil { - loginfo.Println("Serve error: ", err) - } - - select { - case <-ctx.Done(): - loginfo.Println("Cancel signal hit") - return - } -} diff --git a/server/manager.go b/server/manager.go index 61e6d9a..e47d01b 100644 --- a/server/manager.go +++ b/server/manager.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "net" "net/http" + + "git.coolaj86.com/coolaj86/go-telebitd/relay/api" ) //ListenerRegistrationStatus - post registration status @@ -65,34 +67,43 @@ func NewListenerRegistration(port int) (p *ListenerRegistration) { type MPlexy struct { listeners map[*net.Listener]int ctx context.Context - connnectionTable *Table - connectionTracking *Tracking - authorize Authorizer + connnectionTable *api.Table + connectionTracking *api.Tracking + AuthorizeTarget Authorizer + AuthorizeAdmin Authorizer tlsConfig *tls.Config register chan *ListenerRegistration wssHostName string adminHostName string cancelCheck int lbDefaultMethod string - serverStatus *Status - //xservers *MPlexy + Status *api.Status + AcceptTargetServer func(net.Conn) + AcceptAdminClient func(net.Conn) } // New creates tcp (and https and wss?) listeners -func New(ctx context.Context, tlsConfig *tls.Config, authz Authorizer, serverStatus *Status) (mx *MPlexy) { +func New( + ctx context.Context, + tlsConfig *tls.Config, + authAdmin Authorizer, + authz Authorizer, + serverStatus *api.Status, +) (mx *MPlexy) { mx = &MPlexy{ listeners: make(map[*net.Listener]int), ctx: ctx, connnectionTable: serverStatus.ConnectionTable, connectionTracking: serverStatus.ConnectionTracking, - authorize: authz, + AuthorizeTarget: authz, + AuthorizeAdmin: authz, tlsConfig: tlsConfig, register: make(chan *ListenerRegistration), wssHostName: serverStatus.WssDomain, adminHostName: serverStatus.AdminDomain, - cancelCheck: serverStatus.DeadTime.cancelcheck, + cancelCheck: serverStatus.DeadTime.Cancelcheck, lbDefaultMethod: serverStatus.LoadbalanceDefaultMethod, - serverStatus: serverStatus, + Status: serverStatus, } return mx } @@ -117,7 +128,7 @@ func (mx *MPlexy) Run() error { ctx = context.WithValue(ctx, ctxAdminHostName, mx.adminHostName) ctx = context.WithValue(ctx, ctxCancelCheck, mx.cancelCheck) ctx = context.WithValue(ctx, ctxLoadbalanceDefaultMethod, mx.lbDefaultMethod) - ctx = context.WithValue(ctx, ctxServerStatus, mx.serverStatus) + ctx = context.WithValue(ctx, ctxServerStatus, mx.Status) for { select { @@ -138,7 +149,7 @@ func (mx *MPlexy) Run() error { } } loginfo.Println("listener starting up ", registration.port) - loginfo.Println(ctx.Value(ctxConnectionTrack).(*Tracking)) + loginfo.Println(ctx.Value(ctxConnectionTrack).(*api.Tracking)) go mx.multiListenAndServe(ctx, registration) status := <-registration.commCh diff --git a/server/one_conn.go b/server/one_conn.go index 9ced6a5..f8e09de 100644 --- a/server/one_conn.go +++ b/server/one_conn.go @@ -11,7 +11,7 @@ type oneConnListener struct { func (l *oneConnListener) Accept() (net.Conn, error) { if l.conn == nil { - loginfo.Println("Accept EOF") + loginfo.Println("oneConnListener Accept EOF") return nil, io.EOF } @@ -22,11 +22,11 @@ func (l *oneConnListener) Accept() (net.Conn, error) { } func (l *oneConnListener) Close() error { - loginfo.Println("close") + loginfo.Println("oneConnListener close") return nil } func (l *oneConnListener) Addr() net.Addr { - loginfo.Println("addr") + loginfo.Println("oneConnLister addr") return nil } diff --git a/server/conn_wedge.go b/tunnel/conn_wedge.go similarity index 98% rename from server/conn_wedge.go rename to tunnel/conn_wedge.go index b94ff09..034f96c 100644 --- a/server/conn_wedge.go +++ b/tunnel/conn_wedge.go @@ -1,4 +1,4 @@ -package server +package tunnel import ( "bufio" diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go new file mode 100644 index 0000000..b7b3aed --- /dev/null +++ b/tunnel/tunnel.go @@ -0,0 +1,48 @@ +package tunnel + +import ( + "io" + "net" +) + +// Listener defines a listener for use with http servers +type Listener struct { + Conns chan net.Conn + //ParentAddr net.Addr +} + +// NewListener creates a channel for connections and returns the listener +func NewListener() *Listener { + return &Listener{ + Conns: make(chan net.Conn), + } +} + +// Feed will block while pushing a net.Conn onto Conns +func (l *Listener) Feed(conn net.Conn) { + l.Conns <- conn +} + +// net.Listener interface + +// Accept will block and wait for a new net.Conn +func (l *Listener) Accept() (net.Conn, error) { + conn, ok := <-l.Conns + if ok { + return conn, nil + } + return nil, io.EOF +} + +// Close will close the Conns channel +func (l *Listener) Close() error { + close(l.Conns) + return nil +} + +// Addr returns nil to fulfill the net.Listener interface +func (l *Listener) Addr() net.Addr { + // Addr may (or may not) return the original TCP or TLS listener's address + //return l.ParentAddr + return nil +}