move to packages

This commit is contained in:
AJ ONeal 2020-04-30 23:47:46 -06:00
parent 789b9af66e
commit 052a3f276b
33 changed files with 426 additions and 336 deletions

1
.gitignore vendored
View File

@ -6,5 +6,4 @@ certs
/cmd/telebitd/telebitd
/telebit
/cmd/telebit/telebit
/m
/debug

View File

@ -51,12 +51,13 @@ func Run(ctx context.Context, config *Config) error {
authenticated := false
for {
fmt.Printf("debug serverURL:\n%+v", serverURL)
if conn, _, err := dialer.Dial(serverURL.String(), nil); err == nil {
loginfo.Println("connected to remote server")
authenticated = true
handler.HandleConn(ctx, conn)
} else if !authenticated {
return fmt.Errorf("First connection to server failed - check auth: %v", err)
return fmt.Errorf("first connection to server failed - check auth: %s", err.Error())
}
loginfo.Println("disconnected from remote server")

View File

@ -10,9 +10,11 @@ import (
"log"
"net/http"
"os"
"strings"
telebit "git.coolaj86.com/coolaj86/go-telebitd"
"git.coolaj86.com/coolaj86/go-telebitd/relay"
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
"git.coolaj86.com/coolaj86/go-telebitd/server"
jwt "github.com/dgrijalva/jwt-go"
@ -36,7 +38,7 @@ var (
argServerAdminBinding string
argServerExternalBinding string
argDeadTime int
connectionTable *server.Table
connectionTable *api.Table
secretKey string
wssHostName = "localhost.rootprojects.org"
adminHostName = telebit.InvalidAdminDomain
@ -114,14 +116,14 @@ func main() {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()
serverStatus := server.NewStatus(ctx)
serverStatus := api.NewStatus(ctx)
serverStatus.AdminDomain = adminHostName
serverStatus.WssDomain = wssHostName
serverStatus.Name = nickname
serverStatus.DeadTime = server.NewStatusDeadTime(dwell, idle, cancelcheck)
serverStatus.DeadTime = api.NewStatusDeadTime(dwell, idle, cancelcheck)
serverStatus.LoadbalanceDefaultMethod = lbDefaultMethod
connectionTable := server.NewTable(dwell, idle, lbDefaultMethod)
connectionTable := api.NewTable(dwell, idle, lbDefaultMethod)
tlsConfig := &tls.Config{
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
@ -139,11 +141,29 @@ func main() {
authorizer := func(r *http.Request) (*server.Authz, error) {
// do we have a valid wss_client?
tokenString := r.URL.Query().Get("access_token")
var tokenString string
if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) > 1 {
// TODO handle Basic auth tokens as well
tokenString = auth[1]
}
if "" == tokenString {
tokenString = r.URL.Query().Get("access_token")
}
_, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
return []byte(secretKey), nil
})
if nil != err {
return nil, err
}
authz := &server.Authz{
Domains: []string{
"target.rootprojects.org",
},
}
return authz, err
/*
tokenString := r.URL.Query().Get("access_token")

View File

@ -1,40 +1,48 @@
package server
package admin
import (
"context"
"log"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"github.com/gorilla/mux"
telebit "git.coolaj86.com/coolaj86/go-telebitd"
"git.coolaj86.com/coolaj86/go-telebitd/envelope"
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
"git.coolaj86.com/coolaj86/go-telebitd/server"
"github.com/gorilla/mux"
)
const (
endPointPrefix = "/api/org.rootprojects.tunnel/"
)
var connectionTable *Table
var serverStatus *Status
var serverStatusAPI *Status
var connectionTable *api.Table
var serverStatus *api.Status
var serverStatusAPI *api.Status
//handleAdminClient -
//ListenAndServe -
// - expecting an existing oneConnListener with a qualified wss client connected.
// - auth will happen again since we were just peeking at the token.
func handleAdminClient(ctx context.Context, oneConn *oneConnListener) {
serverStatus = ctx.Value(ctxServerStatus).(*Status)
func ListenAndServe(mx *server.MPlexy, adminListener net.Listener) error {
//serverStatus = mx.ctx.Value(ctxServerStatus).(*Status)
connectionTable = mx.Status.ConnectionTable
serverStatusAPI = mx.Status
connectionTable = serverStatus.ConnectionTable
serverStatusAPI = serverStatus
router := mux.NewRouter().StrictSlash(true)
router.PathPrefix("/admin/").Handler(http.StripPrefix("/admin/", http.FileServer(http.Dir("html/admin"))))
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
loginfo.Println("HandleFunc /")
log.Println("HandleFunc /")
_, err := mx.AuthorizeAdmin(r)
if err == nil {
// TODO
w.Write([]byte("TODO: handle bad auth"))
return
}
serverStatus.AdminStats.IncRequests()
@ -64,30 +72,20 @@ func handleAdminClient(ctx context.Context, oneConn *oneConnListener) {
Addr: ":80",
Handler: router,
}
err := s.Serve(oneConn)
if err != nil {
loginfo.Println("Serve error: ", err)
}
select {
case <-ctx.Done():
loginfo.Println("Cancel signal hit")
return
}
return s.Serve(adminListener)
}
func getStatusEndpoint(w http.ResponseWriter, r *http.Request) {
pc, _, _, _ := runtime.Caller(0)
loginfo.Println(runtime.FuncForPC(pc).Name())
log.Println(runtime.FuncForPC(pc).Name())
serverStatus.AdminStats.IncRequests()
statusContainer := NewStatusAPI(serverStatusAPI)
statusContainer := api.NewStatusAPI(serverStatusAPI)
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
env := envelope.NewEnvelope("domains/GET")
env := NewResponse("domains/GET")
env.Result = statusContainer
env.GenerateWriter(w)
serverStatus.AdminStats.IncResponses()
@ -95,15 +93,15 @@ func getStatusEndpoint(w http.ResponseWriter, r *http.Request) {
func getDomainsEndpoint(w http.ResponseWriter, r *http.Request) {
pc, _, _, _ := runtime.Caller(0)
loginfo.Println(runtime.FuncForPC(pc).Name())
log.Println(runtime.FuncForPC(pc).Name())
serverStatus.AdminStats.IncRequests()
domainsContainer := NewDomainsAPI(connectionTable.domains)
domainsContainer := api.NewDomainsAPI(connectionTable.Domains)
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
env := envelope.NewEnvelope("domains/GET")
env := NewResponse("domains/GET")
env.Result = domainsContainer
env.GenerateWriter(w)
serverStatus.AdminStats.IncResponses()
@ -111,11 +109,11 @@ func getDomainsEndpoint(w http.ResponseWriter, r *http.Request) {
func getDomainEndpoint(w http.ResponseWriter, r *http.Request) {
pc, _, _, _ := runtime.Caller(0)
loginfo.Println(runtime.FuncForPC(pc).Name())
log.Println(runtime.FuncForPC(pc).Name())
serverStatus.AdminStats.IncRequests()
env := envelope.NewEnvelope("domain/GET")
env := NewResponse("domain/GET")
params := mux.Vars(r)
if id, ok := params["domain-name"]; !ok {
@ -124,16 +122,16 @@ func getDomainEndpoint(w http.ResponseWriter, r *http.Request) {
env.ErrorDescription = "domain API requires a domain-name"
} else {
domainName := id
if domainLB, ok := connectionTable.domains[domainName]; !ok {
if domainLB, ok := connectionTable.Domains[domainName]; !ok {
env.Error = "domain-name was not found"
env.ErrorURI = r.RequestURI
env.ErrorDescription = "domain-name not found"
} else {
var domainAPIContainer []*ServerDomainAPI
var domainAPIContainer []*api.ServerDomainAPI
conns := domainLB.Connections()
for pos := range conns {
conn := conns[pos]
domainAPI := NewServerDomainAPI(conn, conn.DomainTrack[domainName])
domainAPI := api.NewServerDomainAPI(conn, conn.DomainTrack[domainName])
domainAPIContainer = append(domainAPIContainer, domainAPI)
}
env.Result = domainAPIContainer
@ -146,21 +144,21 @@ func getDomainEndpoint(w http.ResponseWriter, r *http.Request) {
func getServersEndpoint(w http.ResponseWriter, r *http.Request) {
pc, _, _, _ := runtime.Caller(0)
loginfo.Println(runtime.FuncForPC(pc).Name())
log.Println(runtime.FuncForPC(pc).Name())
serverStatus.AdminStats.IncRequests()
serverContainer := NewServerAPIContainer()
serverContainer := api.NewServerAPIContainer()
for c := range connectionTable.Connections() {
serverAPI := NewServersAPI(c)
serverAPI := api.NewServersAPI(c)
serverContainer.Servers = append(serverContainer.Servers, serverAPI)
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
env := envelope.NewEnvelope("servers/GET")
env := NewResponse("servers/GET")
env.Result = serverContainer
env.GenerateWriter(w)
serverStatus.AdminStats.IncResponses()
@ -168,11 +166,11 @@ func getServersEndpoint(w http.ResponseWriter, r *http.Request) {
func getServerEndpoint(w http.ResponseWriter, r *http.Request) {
pc, _, _, _ := runtime.Caller(0)
loginfo.Println(runtime.FuncForPC(pc).Name())
log.Println(runtime.FuncForPC(pc).Name())
serverStatus.AdminStats.IncRequests()
env := envelope.NewEnvelope("server/GET")
env := NewResponse("server/GET")
params := mux.Vars(r)
if id, ok := params["server-id"]; !ok {
@ -193,8 +191,8 @@ func getServerEndpoint(w http.ResponseWriter, r *http.Request) {
env.ErrorURI = r.RequestURI
env.ErrorDescription = "missing server-id, make sure desired service-id is in servers"
} else {
loginfo.Println("test")
serverAPI := NewServerAPI(conn)
log.Println("test")
serverAPI := api.NewServerAPI(conn)
env.Result = serverAPI
}

View File

@ -1,4 +1,4 @@
package envelope
package admin
import (
"bytes"
@ -7,8 +7,8 @@ import (
"time"
)
//Envelope -- Standard response structure
type Envelope struct {
//Response -- Standard response structure
type Response struct {
TransactionType string `json:"type"`
Schema string `json:"schema"`
TransactionTimeStamp int64 `json:"txts"`
@ -19,12 +19,12 @@ type Envelope struct {
Result interface{} `json:"result"`
}
//NewEnvelope -- Constructor
func NewEnvelope(transactionType string) (p *Envelope) {
//NewResponse -- Constructor
func NewResponse(transactionType string) (p *Response) {
// TODO BUG use atomic
transactionID++
p = &Envelope{}
p = &Response{}
p.TransactionType = transactionType
p.TransactionID = transactionID
p.TransactionTimeStamp = time.Now().Unix()
@ -34,13 +34,13 @@ func NewEnvelope(transactionType string) (p *Envelope) {
}
//Generate -- encode into JSON and return string
func (e *Envelope) Generate() string {
func (e *Response) Generate() string {
buf := new(bytes.Buffer)
json.NewEncoder(buf).Encode(e)
return buf.String()
}
//GenerateWriter --
func (e *Envelope) GenerateWriter(w io.Writer) {
func (e *Response) GenerateWriter(w io.Writer) {
json.NewEncoder(w).Encode(e)
}

View File

@ -1,4 +1,4 @@
package envelope
package admin
import (
"log"

View File

@ -1,4 +1,4 @@
package server
package api
//ConnectionStatsAPI --
type ConnectionStatsAPI struct {

View File

@ -1,4 +1,4 @@
package server
package api
//DomainsAPI -- A collections of all the domains
//List of Domains -> DomainAPI
@ -36,14 +36,14 @@ type DomainAPI struct {
func NewDomainAPI(domain string, domainLoadBalance *DomainLoadBalance) (p *DomainAPI) {
p = new(DomainAPI)
p.DomainName = domain
for pos := range domainLoadBalance.connections {
ds := NewDomainServerAPI(domain, domainLoadBalance.connections[pos])
for pos := range domainLoadBalance.Connections() {
ds := NewDomainServerAPI(domain, domainLoadBalance.Connections()[pos])
p.Servers = append(p.Servers, ds)
p.TotalServers++
p.Traffic.BytesIn += domainLoadBalance.connections[pos].BytesIn()
p.Traffic.BytesOut += domainLoadBalance.connections[pos].BytesOut()
p.Traffic.Requests += domainLoadBalance.connections[pos].requests
p.Traffic.Responses += domainLoadBalance.connections[pos].responses
p.Traffic.BytesIn += domainLoadBalance.Connections()[pos].BytesIn()
p.Traffic.BytesOut += domainLoadBalance.Connections()[pos].BytesOut()
p.Traffic.Requests += domainLoadBalance.Connections()[pos].Requests
p.Traffic.Responses += domainLoadBalance.Connections()[pos].Responses
}
return
}

View File

@ -1,6 +1,8 @@
package server
package api
import "time"
import (
"time"
)
//ServerAPI -- Structure to support the server API
type ServerAPI struct {
@ -25,7 +27,7 @@ func NewServerAPI(c *Connection) (s *ServerAPI) {
s.Idle = time.Since(c.LastUpdate()).Seconds()
s.BytesIn = c.BytesIn()
s.BytesOut = c.BytesOut()
s.Source = c.source
s.Source = c.Source()
for domainName := range c.DomainTrack {

View File

@ -1,4 +1,4 @@
package server
package api
//ServerDomainsAPI -- Structure to support the server API
type ServerDomainsAPI struct {
@ -17,8 +17,8 @@ func NewServerDomainsAPI(c *Connection, d *DomainTrack) (s *ServerDomainsAPI) {
s.ServerID = c.ConnectionID()
s.BytesIn = d.BytesIn()
s.BytesOut = d.BytesOut()
s.Requests = d.requests
s.Responses = d.responses
s.Requests = d.Requests()
s.Responses = d.Responses()
return
}
@ -53,8 +53,8 @@ func NewServerDomainAPI(c *Connection, d *DomainTrack) (s *ServerDomainAPI) {
s.ServerID = c.ConnectionID()
s.BytesIn = d.BytesIn()
s.BytesOut = d.BytesOut()
s.Requests = d.requests
s.Responses = d.responses
s.Requests = d.Requests()
s.Responses = d.Responses()
s.Source = c.Source()
return
}

View File

@ -1,6 +1,10 @@
package server
package api
import "time"
import (
"time"
)
//ServersAPI -- Structure to support the server API
type ServersAPI struct {
@ -27,8 +31,8 @@ func NewServersAPI(c *Connection) (s *ServersAPI) {
s.Idle = time.Since(c.LastUpdate()).Seconds()
s.BytesIn = c.BytesIn()
s.BytesOut = c.BytesOut()
s.Requests = c.requests
s.Responses = c.responses
s.Requests = c.Requests
s.Responses = c.Responses
s.Source = c.Source()
s.State = c.State()

View File

@ -1,4 +1,4 @@
package server
package api
import (
"time"
@ -26,7 +26,7 @@ func NewStatusAPI(c *Status) (s *StatusAPI) {
s.WssDomain = c.WssDomain
s.AdminDomain = c.AdminDomain
s.LoadbalanceDefaultMethod = c.LoadbalanceDefaultMethod
s.DeadTime = NewStatusDeadTimeAPI(c.DeadTime.dwell, c.DeadTime.idle, c.DeadTime.cancelcheck)
s.DeadTime = NewStatusDeadTimeAPI(c.DeadTime.dwell, c.DeadTime.idle, c.DeadTime.Cancelcheck)
s.AdminStats = NewTrafficAPI(c.AdminStats.Requests, c.AdminStats.Responses, c.AdminStats.BytesIn, c.AdminStats.BytesOut)
s.TrafficStats = NewTrafficAPI(c.TrafficStats.Requests, c.TrafficStats.Responses, c.TrafficStats.BytesIn, c.TrafficStats.BytesOut)
s.ExtConnections = NewConnectionStatsAPI(c.ExtConnections.Connections, c.ExtConnections.TotalConnections)

View File

@ -1,4 +1,4 @@
package server
package api
//StatusDeadTimeAPI -- structure for deadtime configuration
type StatusDeadTimeAPI struct {

View File

@ -1,4 +1,4 @@
package server
package api
//TrafficStats --
type TrafficAPI struct {

View File

@ -1,8 +1,9 @@
package server
package api
import (
"context"
"fmt"
"log"
"net"
"sync"
)
@ -40,19 +41,19 @@ func NewTracking() (p *Tracking) {
//Run -
func (p *Tracking) Run(ctx context.Context) {
loginfo.Println("Tracking Running")
log.Println("Tracking Running")
for {
select {
case <-ctx.Done():
loginfo.Println("Cancel signal hit")
log.Println("Cancel signal hit")
return
case connection := <-p.register:
p.mutex.Lock()
key := connection.conn.RemoteAddr().String()
loginfo.Println("register fired", key)
log.Println("register fired", key)
p.connections[key] = connection
p.list()
p.mutex.Unlock()
@ -60,7 +61,7 @@ func (p *Tracking) Run(ctx context.Context) {
case connection := <-p.unregister:
p.mutex.Lock()
key := connection.RemoteAddr().String()
loginfo.Println("unregister fired", key)
log.Println("unregister fired", key)
if _, ok := p.connections[key]; ok {
delete(p.connections, key)
}
@ -72,7 +73,7 @@ func (p *Tracking) Run(ctx context.Context) {
func (p *Tracking) list() {
for c := range p.connections {
loginfo.Println(c)
log.Println(c)
}
}

View File

@ -1,9 +1,10 @@
package server
package api
import (
"context"
"fmt"
"io"
"log"
"sync"
"time"
@ -12,6 +13,8 @@ import (
"git.coolaj86.com/coolaj86/go-telebitd/packer"
)
var connectionID int64 = 0
// Connection track websocket and faciliates in and out data
type Connection struct {
mutex sync.Mutex
@ -43,11 +46,11 @@ type Connection struct {
// bytes out
bytesOut int64
// requests
requests int64
// Requests
Requests int64 // TODO atomic
// response
responses int64
// Response
Responses int64 // TODO atomic
// Connect Time
connectTime time.Time
@ -79,8 +82,8 @@ func NewConnection(connectionTable *Table, conn *websocket.Conn, remoteAddress s
p.serverName = serverName
p.bytesIn = 0
p.bytesOut = 0
p.requests = 0
p.responses = 0
p.Requests = 0
p.Responses = 0
p.send = make(chan *SendTrack)
p.connectTime = time.Now()
p.initialDomains = initialDomains
@ -153,11 +156,13 @@ func (c *Connection) addOut(num int64) {
}
func (c *Connection) addRequests() {
c.requests = c.requests + 1
// TODO atomic
c.Requests++
}
func (c *Connection) addResponse() {
c.responses = c.responses + 1
// TODO atomic
c.Responses++
}
//ConnectionTable -- property
@ -208,7 +213,7 @@ func (c *Connection) NextWriter(wssMessageType int) (io.WriteCloser, error) {
}
// Is returning a nil error actually the proper thing to do here?
loginfo.Println("NextWriter aborted, state is not true")
log.Println("NextWriter aborted, state is not true")
return nil, nil
}
@ -229,23 +234,23 @@ func (c *Connection) Reader(ctx context.Context) {
defer func() {
c.connectionTable.unregister <- c
c.conn.Close()
loginfo.Println("reader defer", c)
log.Println("reader defer", c)
}()
loginfo.Println("Reader Start ", c)
log.Println("Reader Start ", c)
//c.conn.SetReadLimit(65535)
for {
_, message, err := c.conn.ReadMessage()
//loginfo.Println("ReadMessage", msgType, err)
//log.Println("ReadMessage", msgType, err)
c.Update()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
c.SetState(false)
loginfo.Printf("error: %v", err)
log.Printf("error: %v", err)
}
break
}
@ -255,10 +260,10 @@ func (c *Connection) Reader(ctx context.Context) {
key := fmt.Sprintf("%s:%d", p.Address(), p.Port())
track, err := connectionTrack.Lookup(key)
//loginfo.Println(hex.Dump(p.Data.Data()))
//log.Println(hex.Dump(p.Data.Data()))
if err != nil {
//loginfo.Println("Unable to locate Tracking for ", key)
//log.Println("Unable to locate Tracking for ", key)
continue
}
@ -273,7 +278,7 @@ func (c *Connection) Reader(ctx context.Context) {
c.addIn(int64(len(message)))
c.addResponse()
//loginfo.Println("end of read")
//log.Println("end of read")
}
}
@ -281,14 +286,14 @@ func (c *Connection) Reader(ctx context.Context) {
func (c *Connection) Writer() {
defer c.conn.Close()
loginfo.Println("Writer Start ", c)
log.Println("Writer Start ", c)
for {
select {
case message := <-c.send:
w, err := c.NextWriter(websocket.BinaryMessage)
loginfo.Println("next writer ", w)
log.Println("next writer ", w)
if err != nil {
c.SetState(false)
return
@ -312,12 +317,12 @@ func (c *Connection) Writer() {
//if ok then add to structure, else warn there is something wrong
domainTrack.AddIn(messageLen)
domainTrack.AddRequests()
loginfo.Println("adding ", messageLen, " to ", message.domain)
log.Println("adding ", messageLen, " to ", message.domain)
} else {
logdebug.Println("attempting to add bytes to ", message.domain, "it does not exist")
logdebug.Println(c.DomainTrack)
log.Println("attempting to add bytes to ", message.domain, "it does not exist")
log.Println(c.DomainTrack)
}
loginfo.Println(c)
log.Println(c)
}
}
}

View File

@ -1,7 +1,9 @@
package server
package api
import (
"github.com/gorilla/websocket"
)
//Registration -- A connection registration structure used to bring up a connection

View File

@ -1,7 +1,8 @@
package server
package api
import (
"fmt"
"log"
"sync"
)
@ -54,7 +55,7 @@ func (p *DomainLoadBalance) NextMember() (conn *Connection) {
defer p.mutex.Unlock()
//check for round robin, if not RR then drop out and call calculate
loginfo.Println("NextMember:", p)
log.Println("NextMember:", p)
if p.method == lbmRoundRobin {
p.lastmember++
if p.lastmember >= p.count {
@ -75,33 +76,33 @@ func (p *DomainLoadBalance) NextMember() (conn *Connection) {
//this should not affect the next member calculation in RR. However it many in other
//methods
func (p *DomainLoadBalance) AddConnection(conn *Connection) []*Connection {
loginfo.Println("AddConnection", fmt.Sprintf("%p", conn))
log.Println("AddConnection", fmt.Sprintf("%p", conn))
p.mutex.Lock()
defer p.mutex.Unlock()
p.connections = append(p.connections, conn)
p.count++
loginfo.Println("AddConnection", p)
log.Println("AddConnection", p)
return p.connections
}
//RemoveConnection -- removes a matching connection from the list. This may
//affect the nextmember calculation if found so the recalc flag is set.
func (p *DomainLoadBalance) RemoveConnection(conn *Connection) {
loginfo.Println("RemoveConnection", fmt.Sprintf("%p", conn))
log.Println("RemoveConnection", fmt.Sprintf("%p", conn))
p.mutex.Lock()
defer p.mutex.Unlock()
//scan all the connections
for pos := range p.connections {
loginfo.Println("RemoveConnection", pos, len(p.connections), p.count)
log.Println("RemoveConnection", pos, len(p.connections), p.count)
if p.connections[pos] == conn {
//found connection remove it
loginfo.Printf("found connection %p", conn)
log.Printf("found connection %p", conn)
p.connections[pos], p.connections = p.connections[len(p.connections)-1], p.connections[:len(p.connections)-1]
p.count--
break
}
}
loginfo.Println("RemoveConnection:", p)
log.Println("RemoveConnection:", p)
}

View File

@ -1,4 +1,4 @@
package server
package api
//DomainMapping --
type DomainMapping struct {

View File

@ -1,4 +1,4 @@
package server
package api
//DomainTrack -- Tracking specifics for domains
type DomainTrack struct {

View File

@ -1,4 +1,4 @@
package server
package api
//SendTrack -- Used as a channel communication to id domain asssociated to domain for outbound WSS
type SendTrack struct {

View File

@ -1,8 +1,10 @@
package server
package api
import (
"context"
"time"
"git.coolaj86.com/coolaj86/go-telebitd/tunnel"
)
//Status --
@ -64,7 +66,7 @@ func (p *Status) ExtConnectionRegister(newTrack *Track) {
//ExtConnectionUnregister --
//unregisters an ext facing connection
//intercept and update global statistics
func (p *Status) ExtConnectionUnregister(extConn *WedgeConn) {
func (p *Status) ExtConnectionUnregister(extConn *tunnel.WedgeConn) {
p.ConnectionTracking.unregister <- extConn
p.ExtConnections.DecConnections()

View File

@ -1,10 +1,10 @@
package server
package api
//StatusDeadTime -- structure for deadtime configuration
type StatusDeadTime struct {
dwell int
idle int
cancelcheck int
Cancelcheck int
}
//NewStatusDeadTime -- constructor
@ -12,6 +12,6 @@ func NewStatusDeadTime(dwell, idle, cancelcheck int) (p *StatusDeadTime) {
p = new(StatusDeadTime)
p.dwell = dwell
p.idle = idle
p.cancelcheck = cancelcheck
p.Cancelcheck = cancelcheck
return
}

View File

@ -1,4 +1,4 @@
package server
package api
import "sync"

View File

@ -1,4 +1,4 @@
package server
package api
//ConnectionStats --
type ConnectionStats struct {

View File

@ -1,4 +1,4 @@
package server
package api
//TrafficStats --
type TrafficStats struct {

View File

@ -1,8 +1,9 @@
package server
package api
import (
"context"
"fmt"
"log"
"time"
)
@ -14,7 +15,7 @@ const (
//Table maintains the set of connections
type Table struct {
connections map[*Connection][]string
domains map[string]*DomainLoadBalance
Domains map[string]*DomainLoadBalance
register chan *Registration
unregister chan *Connection
domainAnnounce chan *DomainMapping
@ -28,7 +29,7 @@ type Table struct {
func NewTable(dwell, idle int, balanceMethod string) (p *Table) {
p = new(Table)
p.connections = make(map[*Connection][]string)
p.domains = make(map[string]*DomainLoadBalance)
p.Domains = make(map[string]*DomainLoadBalance)
p.register = make(chan *Registration)
p.unregister = make(chan *Connection)
p.domainAnnounce = make(chan *DomainMapping)
@ -48,11 +49,11 @@ func (c *Table) Connections() map[*Connection][]string {
//if that is the case the system stores these connections and then sends traffic back round-robin
//back to the WSS connections
func (c *Table) ConnByDomain(domain string) (*Connection, bool) {
for dn := range c.domains {
loginfo.Println(dn, domain)
for dn := range c.Domains {
log.Println(dn, domain)
}
if domainsLB, ok := c.domains[domain]; ok {
loginfo.Println("found")
if domainsLB, ok := c.Domains[domain]; ok {
log.Println("found")
conn := domainsLB.NextMember()
return conn, ok
}
@ -63,14 +64,14 @@ func (c *Table) ConnByDomain(domain string) (*Connection, bool) {
func (c *Table) reaper(delay int, idle int) {
_ = "breakpoint"
for {
loginfo.Println("Reaper waiting for ", delay, " seconds")
log.Println("Reaper waiting for ", delay, " seconds")
time.Sleep(time.Duration(delay) * time.Second)
loginfo.Println("Running scanning ", len(c.connections))
log.Println("Running scanning ", len(c.connections))
for d := range c.connections {
if !d.State() {
if time.Since(d.lastUpdate).Seconds() > float64(idle) {
loginfo.Println("reaper removing ", d.lastUpdate, time.Since(d.lastUpdate).Seconds())
log.Println("reaper removing ", d.lastUpdate, time.Since(d.lastUpdate).Seconds())
delete(c.connections, d)
}
}
@ -91,7 +92,7 @@ func (c *Table) GetConnection(serverID int64) (*Connection, error) {
//Run -- Execute
func (c *Table) Run(ctx context.Context) {
loginfo.Println("ConnectionTable starting")
log.Println("ConnectionTable starting")
go c.reaper(c.dwell, c.idle)
@ -99,11 +100,11 @@ func (c *Table) Run(ctx context.Context) {
select {
case <-ctx.Done():
loginfo.Println("Cancel signal hit")
log.Println("Cancel signal hit")
return
case registration := <-c.register:
loginfo.Println("register fired")
log.Println("register fired")
connection := NewConnection(c, registration.conn, registration.source, registration.initialDomains,
registration.connectionTrack, registration.serverName)
@ -115,17 +116,17 @@ func (c *Table) Run(ctx context.Context) {
// add to the domains regirstation
newDomain := domain
loginfo.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String())
log.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String())
//check to see if domain is already present.
if _, ok := c.domains[newDomain]; ok {
if _, ok := c.Domains[newDomain]; ok {
//append to a list of connections for that domain
c.domains[newDomain].AddConnection(connection)
c.Domains[newDomain].AddConnection(connection)
} else {
//if not, then add as the 1st to the list of connections
c.domains[newDomain] = NewDomainLoadBalance(c.balanceMethod)
c.domains[newDomain].AddConnection(connection)
c.Domains[newDomain] = NewDomainLoadBalance(c.balanceMethod)
c.Domains[newDomain].AddConnection(connection)
}
// add to the connection domain list
@ -136,26 +137,26 @@ func (c *Table) Run(ctx context.Context) {
go connection.Reader(ctx)
case connection := <-c.unregister:
loginfo.Println("closing connection ", connection.conn.RemoteAddr().String())
log.Println("closing connection ", connection.conn.RemoteAddr().String())
//does connection exist in the connection table -- should never be an issue
if _, ok := c.connections[connection]; ok {
//iterate over the connections for the domain
for _, domain := range c.connections[connection] {
loginfo.Println("remove domain", domain)
log.Println("remove domain", domain)
//removing domain, make sure it is present (should never be a problem)
if _, ok := c.domains[domain]; ok {
if _, ok := c.Domains[domain]; ok {
domainLB := c.domains[domain]
domainLB := c.Domains[domain]
domainLB.RemoveConnection(connection)
//check to see if domain is free of connections, if yes, delete map entry
if domainLB.count > 0 {
//ignore...perhaps we will do something here dealing wtih the lb method
} else {
delete(c.domains, domain)
delete(c.Domains, domain)
}
}
}
@ -165,7 +166,7 @@ func (c *Table) Run(ctx context.Context) {
}
case domainMapping := <-c.domainAnnounce:
loginfo.Println("domainMapping fired ", domainMapping)
log.Println("domainMapping fired ", domainMapping)
//check to make sure connection is already regiered, you can no register a domain without an apporved connection
//if connection, ok := connections[domainMapping.connection]; ok {

View File

@ -3,26 +3,43 @@ package relay
import (
"context"
"crypto/tls"
"log"
"net"
"net/http"
"git.coolaj86.com/coolaj86/go-telebitd/relay/admin"
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
"git.coolaj86.com/coolaj86/go-telebitd/server"
"git.coolaj86.com/coolaj86/go-telebitd/tunnel"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
// Relay is probably a layer that doesn't need to exist
type Relay struct {
ctx context.Context
status *server.Status
status *api.Status
mx *server.MPlexy
table *server.Table
table *api.Table
}
func New(ctx context.Context, tlsConfig *tls.Config, authz server.Authorizer, status *server.Status, table *server.Table) *Relay {
return &Relay{
// New initializes and returns a relay service
func New(ctx context.Context, tlsConfig *tls.Config, authz server.Authorizer, status *api.Status, table *api.Table) *Relay {
// TODO do we need this already setup here? or is it just for logging?
status.ConnectionTracking = api.NewTracking()
status.ConnectionTable = table
authAdmin := authz
r := &Relay{
ctx: ctx,
status: status,
table: table,
mx: server.New(ctx, tlsConfig, authz, status),
mx: server.New(ctx, tlsConfig, authAdmin, authz, status), // TODO Accept
}
return r
}
// ListenAndServe sets up all of the tcp, http, https, and tunnel servers
func (r *Relay) ListenAndServe(port int) error {
serverStatus := r.status
@ -36,11 +53,7 @@ func (r *Relay) ListenAndServe(port int) error {
// - if tls, establish, protocol peek buffer, else decrypted
// - match protocol
connectionTracking := server.NewTracking()
serverStatus.ConnectionTracking = connectionTracking
go connectionTracking.Run(r.ctx)
serverStatus.ConnectionTable = r.table
go r.status.ConnectionTracking.Run(r.ctx)
go serverStatus.ConnectionTable.Run(r.ctx)
//serverStatus.GenericListeners = genericListeners
@ -48,5 +61,65 @@ func (r *Relay) ListenAndServe(port int) error {
// blocks until it can listen, which it can't until started
go r.mx.MultiListenAndServe(port)
// funnel target devices into WebSocket pool
tunnelListener := tunnel.NewListener()
r.mx.AcceptTargetServer = func(conn net.Conn) {
tunnelListener.Feed(conn)
}
go listenAndServeTargets(r.mx, tunnelListener)
// funnel admin clients to API
adminListener := tunnel.NewListener()
r.mx.AcceptAdminClient = func(conn net.Conn) {
adminListener.Feed(conn)
}
go admin.ListenAndServe(r.mx, adminListener)
return r.mx.Run()
}
func listenAndServeTargets(mx *server.MPlexy, handler net.Listener) error {
serverStatus := mx.Status
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Println("HandleFunc /")
switch url := r.URL.Path; url {
case "/":
log.Println("websocket opening ", r.RemoteAddr, " ", r.Host)
authz, err := mx.AuthorizeTarget(r)
var upgrader = websocket.Upgrader{
ReadBufferSize: 65535,
WriteBufferSize: 65535,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("WebSocket upgrade failed", err)
return
}
log.Println("before connection table")
serverName := authz.Domains[0]
newRegistration := api.NewRegistration(conn, r.RemoteAddr, authz.Domains, serverStatus.ConnectionTracking, serverName)
serverStatus.WSSConnectionRegister(newRegistration)
if ok := <-newRegistration.CommCh(); !ok {
log.Println("connection registration failed ", newRegistration)
return
}
log.Println("connection registration accepted ", newRegistration)
}
})
// TODO setup for http/2
s := &http.Server{
Addr: ":80",
Handler: router,
}
return s.Serve(handler)
}

View File

@ -14,12 +14,11 @@ import (
"strings"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
telebit "git.coolaj86.com/coolaj86/go-telebitd"
"git.coolaj86.com/coolaj86/go-telebitd/packer"
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
"git.coolaj86.com/coolaj86/go-telebitd/sni"
"git.coolaj86.com/coolaj86/go-telebitd/tunnel"
)
type contextKey string
@ -27,9 +26,6 @@ type contextKey string
//CtxConnectionTrack
const (
ctxServerStatus contextKey = "serverstatus"
//ctxConnectionTable contextKey = "connectionTable"
ctxConfig contextKey = "tlsConfig"
ctxListenerRegistration contextKey = "listenerRegistration"
ctxConnectionTrack contextKey = "connectionTrack"
@ -37,6 +33,7 @@ const (
ctxAdminHostName contextKey = "adminHostName"
ctxCancelCheck contextKey = "cancelcheck"
ctxLoadbalanceDefaultMethod contextKey = "lbdefaultmethod"
//ctxConnectionTable contextKey = "connectionTable"
)
// TODO isn't this restriction in the TLS lib?
@ -102,7 +99,7 @@ func (mx *MPlexy) multiListenAndServe(ctx context.Context, listenerRegistration
fmt.Println("New connection from %v on %v", conn.LocalAddr(), conn.RemoteAddr())
// TODO maybe put these into something like mx.newConnCh and have an mx.Accept()?
wedgeConn := NewWedgeConn(conn)
wedgeConn := tunnel.NewWedgeConn(conn)
go mx.accept(ctx, wedgeConn)
}
}
@ -111,22 +108,18 @@ func (mx *MPlexy) multiListenAndServe(ctx context.Context, listenerRegistration
//accept -
// - accept a wedgeConnection along with all the other required attritvues
// - peek into the buffer, determine TLS or unencrypted
// - if TSL, then terminate with a TLS endpoint, pass to handleStream
// - if clearText, pass to handleStream
func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) {
// TODO shouldn't this responsibility fall elsewhere?
// (otherwise I think we're keeping this function in memory while something else fails to end)
// (i.e. something, somewhere is missing a `go doStuff()`
defer wConn.Close()
// - if TSL, then terminate with a TLS endpoint, pass to acceptEcryptedStream
// - if clearText, pass to acceptPlainStream
func (mx *MPlexy) accept(ctx context.Context, wConn *tunnel.WedgeConn) {
peekCnt := 10
encryptMode := encryptNone
loginfo.Println("conn", wConn, wConn.LocalAddr().String(), wConn.RemoteAddr().String())
loginfo.Println("new conn", wConn, wConn.LocalAddr().String(), wConn.RemoteAddr().String())
peek, err := wConn.Peek(peekCnt)
if err != nil {
loginfo.Println("error while peeking")
wConn.Close()
return
}
@ -152,85 +145,61 @@ func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) {
}
oneConn := &oneConnListener{wConn}
tlsConfig := ctx.Value(ctxConfig).(*tls.Config)
if encryptMode == encryptSSLV2 {
loginfo.Println("<= SSLv2 is not accepted")
wConn.Close()
return
}
if encryptMode == encryptNone {
loginfo.Println("Handle Unencrypted")
mx.handleStream(ctx, wConn)
mx.acceptPlainStream(ctx, wConn, false)
return
}
loginfo.Println("Handle Encryption")
// check SNI heading
// if matched, then looks like a WSS connection
// else external don't pull off TLS.
peek, err = wConn.PeekAll()
if err != nil {
loginfo.Println("error while peeking")
loginfo.Println(hex.Dump(peek[0:]))
return
mx.acceptEncryptedStream(ctx, wConn)
}
wssHostName := ctx.Value(ctxWssHostName).(string)
adminHostName := ctx.Value(ctxAdminHostName).(string)
func (mx *MPlexy) acceptEncryptedStream(ctx context.Context, wConn *tunnel.WedgeConn) {
// Peek at SNI (ServerName) from TLS Hello header
peek, err := wConn.PeekAll()
if err != nil {
loginfo.Println("Bad socket: read error from", wConn.RemoteAddr(), err)
loginfo.Println(hex.Dump(peek[0:]))
wConn.Close()
return
}
sniHostName, err := sni.GetHostname(peek)
if err != nil {
loginfo.Println("Bad socket: no SNI from", wConn.RemoteAddr(), err)
loginfo.Println(err)
wConn.Close()
return
}
loginfo.Println("sni:", sniHostName)
loginfo.Println("SNI:", sniHostName)
// This is where a target device connects to receive traffic
if sniHostName == wssHostName {
//handle WSS Path
tlsListener := tls.NewListener(oneConn, tlsConfig)
conn, err := tlsListener.Accept()
if err != nil {
loginfo.Println(err)
if sniHostName == mx.wssHostName || sniHostName == mx.adminHostName {
// The TLS should be terminated and handled internally
tlsConfig := ctx.Value(ctxConfig).(*tls.Config)
conn := tls.Client(wConn, tlsConfig)
tlsWedgeConn := tunnel.NewWedgeConn(conn)
mx.acceptPlainStream(ctx, tlsWedgeConn, true)
return
}
tlsWedgeConn := NewWedgeConn(conn)
mx.handleStream(ctx, tlsWedgeConn)
return
//oneConn := &oneConnListener{wConn}
// TLS remains intact and shall be routed downstream, wholesale
loginfo.Println("processing non terminating traffic", mx.wssHostName, sniHostName)
go mx.routeToTarget(ctx, wConn, sniHostName, "https")
}
// This is where an admin of the relay manages it
if sniHostName == adminHostName {
// TODO mx.Admin.CheckRemoteIP(conn) here
// handle admin path
tlsListener := tls.NewListener(oneConn, tlsConfig)
conn, err := tlsListener.Accept()
if err != nil {
loginfo.Println(err)
return
}
tlsWedgeConn := NewWedgeConn(conn)
mx.handleStream(ctx, tlsWedgeConn)
return
}
//traffic not terminating on the rvpn do not decrypt
loginfo.Println("processing non terminating traffic", wssHostName, sniHostName)
handleExternalHTTPRequest(ctx, wConn, sniHostName, "https")
}
//handleStream --
//acceptPlainStream --
// - we have an unencrypted stream connection with the ability to peek
// - attempt to identify HTTP
// - handle http
@ -238,15 +207,15 @@ func (mx *MPlexy) accept(ctx context.Context, wConn *WedgeConn) {
// - attempt to identify as ADMIN/API session
// - else handle as raw http
// - handle other?
func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) {
loginfo.Println("handle Stream")
loginfo.Println("conn", wConn.LocalAddr().String(), wConn.RemoteAddr().String())
func (mx *MPlexy) acceptPlainStream(ctx context.Context, wConn *tunnel.WedgeConn, encrypted bool) {
loginfo.Println("Plain Conn", wConn.LocalAddr().String(), wConn.RemoteAddr().String())
// TODO couldn't this be dangerous? Or is it limited to a single packet?
// TODO couldn't reading everything be dangerous? Or is it limited to a single packet?
peek, err := wConn.PeekAll()
if err != nil {
loginfo.Println("error while peeking", err)
loginfo.Println(hex.Dump(peek[0:]))
wConn.Close()
return
}
@ -255,11 +224,13 @@ func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) {
// HTTP Identifcation // CRLF
if !bytes.Contains(peek[:], []byte{0x0d, 0x0a}) {
wConn.Close()
return
}
//string protocol
if !bytes.ContainsAny(peek[:], "HTTP/") {
wConn.Close()
return
}
@ -268,40 +239,51 @@ func (mx *MPlexy) handleStream(ctx context.Context, wConn *WedgeConn) {
r, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(peek)))
if err != nil {
loginfo.Println("identified as HTTP, failed request parsing", err)
return
}
// TODO add newtypes
// TODO check if this is a websocket
_, err = mx.authorize(r)
if err == nil {
loginfo.Println("Valid WSS dected...sending to handler")
oneConn := &oneConnListener{wConn}
mx.handleWssClient(ctx, oneConn)
//do we have a invalid domain indicating Admin?
//if yes, prep the oneConn and send it to the handler
wConn.Close()
return
}
if strings.Contains(r.Host, telebit.InvalidAdminDomain) {
loginfo.Println("admin")
oneConn := &oneConnListener{wConn}
handleAdminClient(ctx, oneConn)
// TODO mx.Admin.CheckRemoteIP(conn) here
// handle admin path
mx.AcceptAdminClient(wConn)
return
}
loginfo.Println("unsupported")
// TODO add newtypes
// TODO check if this is a websocket
_, err = mx.AuthorizeTarget(r)
if err == nil {
loginfo.Println("Valid WSS dected...sending to handler")
mx.AcceptTargetServer(wConn)
return
}
// TODO sniHostName is the key to the route, which could also be a port or hostname
//traffic not terminating on the rvpn do not decrypt
loginfo.Println("processing non terminating traffic", mx.wssHostName, r.Host)
loginfo.Println(hex.Dump(peek))
if !encrypted {
// TODO request and cache http resources as a feature??
go mx.routeToTarget(ctx, wConn, r.Host, "http")
return
}
//handleExternalHTTPRequest -
// This is not presently possible
loginfo.Println("impossible condition: local decryption of routable client", mx.wssHostName, r.Host)
go mx.routeToTarget(ctx, wConn, r.Host, "https")
}
//routeToTarget -
// - get a wConn and start processing requests
func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname, service string) {
func (mx *MPlexy) routeToTarget(ctx context.Context, extConn *tunnel.WedgeConn, hostname, service string) {
// TODO is this the right place to do this?
defer extConn.Close()
//connectionTracking := ctx.Value(ctxConnectionTrack).(*Tracking)
serverStatus := ctx.Value(ctxServerStatus).(*Status)
serverStatus := ctx.Value(ctxServerStatus).(*api.Status)
defer func() {
serverStatus.ExtConnectionUnregister(extConn)
@ -317,7 +299,7 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname
return
}
track := NewTrack(extConn, hostname)
track := api.NewTrack(extConn, hostname)
serverStatus.ExtConnectionRegister(track)
remoteStr := extConn.RemoteAddr().String()
@ -352,7 +334,7 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname
//loginfo.Println(hex.Dump(buf.Bytes()))
//Bundle up the send request and dispatch
sendTrack := NewSendTrack(buf.Bytes(), hostname)
sendTrack := api.NewSendTrack(buf.Bytes(), hostname)
serverStatus.SendExtRequest(conn, sendTrack)
cnt := len(buffer)
@ -363,63 +345,3 @@ func handleExternalHTTPRequest(ctx context.Context, extConn *WedgeConn, hostname
}
}
//handleWssClient -
// - expecting an existing oneConnListener with a qualified wss client connected.
// - auth will happen again since we were just peeking at the token.
func (mx *MPlexy) handleWssClient(ctx context.Context, oneConn *oneConnListener) {
serverStatus := ctx.Value(ctxServerStatus).(*Status)
//connectionTable := ctx.Value(ctxConnectionTable).(*Table)
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
loginfo.Println("HandleFunc /")
switch url := r.URL.Path; url {
case "/":
loginfo.Println("websocket opening ", r.RemoteAddr, " ", r.Host)
authz, err := mx.authorize(r)
var upgrader = websocket.Upgrader{
ReadBufferSize: 65535,
WriteBufferSize: 65535,
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
loginfo.Println("WebSocket upgrade failed", err)
return
}
loginfo.Println("before connection table")
serverName := authz.Domains[0]
newRegistration := NewRegistration(conn, r.RemoteAddr, authz.Domains, serverStatus.ConnectionTracking, serverName)
serverStatus.WSSConnectionRegister(newRegistration)
if ok := <-newRegistration.CommCh(); !ok {
loginfo.Println("connection registration failed ", newRegistration)
return
}
loginfo.Println("connection registration accepted ", newRegistration)
}
})
s := &http.Server{
Addr: ":80",
Handler: router,
}
err := s.Serve(oneConn)
if err != nil {
loginfo.Println("Serve error: ", err)
}
select {
case <-ctx.Done():
loginfo.Println("Cancel signal hit")
return
}
}

View File

@ -5,6 +5,8 @@ import (
"crypto/tls"
"net"
"net/http"
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
)
//ListenerRegistrationStatus - post registration status
@ -65,34 +67,43 @@ func NewListenerRegistration(port int) (p *ListenerRegistration) {
type MPlexy struct {
listeners map[*net.Listener]int
ctx context.Context
connnectionTable *Table
connectionTracking *Tracking
authorize Authorizer
connnectionTable *api.Table
connectionTracking *api.Tracking
AuthorizeTarget Authorizer
AuthorizeAdmin Authorizer
tlsConfig *tls.Config
register chan *ListenerRegistration
wssHostName string
adminHostName string
cancelCheck int
lbDefaultMethod string
serverStatus *Status
//xservers *MPlexy
Status *api.Status
AcceptTargetServer func(net.Conn)
AcceptAdminClient func(net.Conn)
}
// New creates tcp (and https and wss?) listeners
func New(ctx context.Context, tlsConfig *tls.Config, authz Authorizer, serverStatus *Status) (mx *MPlexy) {
func New(
ctx context.Context,
tlsConfig *tls.Config,
authAdmin Authorizer,
authz Authorizer,
serverStatus *api.Status,
) (mx *MPlexy) {
mx = &MPlexy{
listeners: make(map[*net.Listener]int),
ctx: ctx,
connnectionTable: serverStatus.ConnectionTable,
connectionTracking: serverStatus.ConnectionTracking,
authorize: authz,
AuthorizeTarget: authz,
AuthorizeAdmin: authz,
tlsConfig: tlsConfig,
register: make(chan *ListenerRegistration),
wssHostName: serverStatus.WssDomain,
adminHostName: serverStatus.AdminDomain,
cancelCheck: serverStatus.DeadTime.cancelcheck,
cancelCheck: serverStatus.DeadTime.Cancelcheck,
lbDefaultMethod: serverStatus.LoadbalanceDefaultMethod,
serverStatus: serverStatus,
Status: serverStatus,
}
return mx
}
@ -117,7 +128,7 @@ func (mx *MPlexy) Run() error {
ctx = context.WithValue(ctx, ctxAdminHostName, mx.adminHostName)
ctx = context.WithValue(ctx, ctxCancelCheck, mx.cancelCheck)
ctx = context.WithValue(ctx, ctxLoadbalanceDefaultMethod, mx.lbDefaultMethod)
ctx = context.WithValue(ctx, ctxServerStatus, mx.serverStatus)
ctx = context.WithValue(ctx, ctxServerStatus, mx.Status)
for {
select {
@ -138,7 +149,7 @@ func (mx *MPlexy) Run() error {
}
}
loginfo.Println("listener starting up ", registration.port)
loginfo.Println(ctx.Value(ctxConnectionTrack).(*Tracking))
loginfo.Println(ctx.Value(ctxConnectionTrack).(*api.Tracking))
go mx.multiListenAndServe(ctx, registration)
status := <-registration.commCh

View File

@ -11,7 +11,7 @@ type oneConnListener struct {
func (l *oneConnListener) Accept() (net.Conn, error) {
if l.conn == nil {
loginfo.Println("Accept EOF")
loginfo.Println("oneConnListener Accept EOF")
return nil, io.EOF
}
@ -22,11 +22,11 @@ func (l *oneConnListener) Accept() (net.Conn, error) {
}
func (l *oneConnListener) Close() error {
loginfo.Println("close")
loginfo.Println("oneConnListener close")
return nil
}
func (l *oneConnListener) Addr() net.Addr {
loginfo.Println("addr")
loginfo.Println("oneConnLister addr")
return nil
}

View File

@ -1,4 +1,4 @@
package server
package tunnel
import (
"bufio"

48
tunnel/tunnel.go Normal file
View File

@ -0,0 +1,48 @@
package tunnel
import (
"io"
"net"
)
// Listener defines a listener for use with http servers
type Listener struct {
Conns chan net.Conn
//ParentAddr net.Addr
}
// NewListener creates a channel for connections and returns the listener
func NewListener() *Listener {
return &Listener{
Conns: make(chan net.Conn),
}
}
// Feed will block while pushing a net.Conn onto Conns
func (l *Listener) Feed(conn net.Conn) {
l.Conns <- conn
}
// net.Listener interface
// Accept will block and wait for a new net.Conn
func (l *Listener) Accept() (net.Conn, error) {
conn, ok := <-l.Conns
if ok {
return conn, nil
}
return nil, io.EOF
}
// Close will close the Conns channel
func (l *Listener) Close() error {
close(l.Conns)
return nil
}
// Addr returns nil to fulfill the net.Listener interface
func (l *Listener) Addr() net.Addr {
// Addr may (or may not) return the original TCP or TLS listener's address
//return l.ParentAddr
return nil
}