telebit/relay/mplexy/mplexy.go

189 lines
5.2 KiB
Go
Raw Normal View History

2020-05-01 06:12:16 +00:00
package mplexy
import (
"context"
"crypto/tls"
"net"
2020-04-30 10:43:36 +00:00
"net/http"
2020-05-01 05:47:46 +00:00
2020-05-01 06:12:16 +00:00
"git.coolaj86.com/coolaj86/go-telebitd/log"
2020-05-01 05:47:46 +00:00
"git.coolaj86.com/coolaj86/go-telebitd/relay/api"
)
2020-05-01 07:06:14 +00:00
// InvalidAdminDomain is for bootstrapping the setup of a relay device
var InvalidAdminDomain = "admin.telebit.invalid"
var loginfo = log.Loginfo
2020-05-01 06:12:16 +00:00
var connectionID int64 = 0
//ListenerRegistrationStatus - post registration status
type ListenerRegistrationStatus int
2020-04-30 10:43:36 +00:00
// Authz represents grants or privileges of a client
// clientID
// domains that may be forwarded
// # of domains that may be forwarded
// ports that may be forwarded (i.e. allow special ports < 1024, exclude 443, 25, etc)
// # of ports that may be forwarded
// # of concurrent conections
// # bandwith rate (i.e. 5 mbps)
// # bandwith cap per time period (i.e. 100 MB / hour)
// # throttled rate (i.e. 0 (kill), or 1 mbps)
type Authz struct {
Domains []string
2020-04-30 10:43:36 +00:00
}
// Authorizer is called when a new client connects and we need to know something about it
type Authorizer func(*http.Request) (*Authz, error)
const (
listenerAdded ListenerRegistrationStatus = iota
listenerExists
listenerFault
)
//ListenerRegistration -- A connection registration structure used to bring up a connection
//connection table will then handle additing and sdtarting up the various readers
//else error.
type ListenerRegistration struct {
// The websocket connection.
listener *net.Listener
// The listener port
port int
// The status
status ListenerRegistrationStatus
// The error
err error
// communications channel between go routines
commCh chan *ListenerRegistration
}
//NewListenerRegistration -- Constructor
func NewListenerRegistration(port int) (p *ListenerRegistration) {
p = new(ListenerRegistration)
p.port = port
p.commCh = make(chan *ListenerRegistration)
2020-04-30 10:43:36 +00:00
return p
}
2020-04-30 10:43:36 +00:00
// MPlexy -
type MPlexy struct {
listeners map[*net.Listener]int
ctx context.Context
2020-05-01 05:47:46 +00:00
connnectionTable *api.Table
connectionTracking *api.Tracking
AuthorizeTarget Authorizer
AuthorizeAdmin Authorizer
2020-04-30 10:43:36 +00:00
tlsConfig *tls.Config
register chan *ListenerRegistration
wssHostName string
adminHostName string
cancelCheck int
lbDefaultMethod string
2020-05-01 05:47:46 +00:00
Status *api.Status
AcceptTargetServer func(net.Conn)
AcceptAdminClient func(net.Conn)
}
2020-04-30 10:43:36 +00:00
// New creates tcp (and https and wss?) listeners
2020-05-01 05:47:46 +00:00
func New(
ctx context.Context,
tlsConfig *tls.Config,
authAdmin Authorizer,
authz Authorizer,
serverStatus *api.Status,
) (mx *MPlexy) {
2020-04-30 10:43:36 +00:00
mx = &MPlexy{
listeners: make(map[*net.Listener]int),
ctx: ctx,
connnectionTable: serverStatus.ConnectionTable,
connectionTracking: serverStatus.ConnectionTracking,
2020-05-01 05:47:46 +00:00
AuthorizeTarget: authz,
AuthorizeAdmin: authz,
2020-04-30 10:43:36 +00:00
tlsConfig: tlsConfig,
register: make(chan *ListenerRegistration),
wssHostName: serverStatus.WssDomain,
adminHostName: serverStatus.AdminDomain,
2020-05-01 05:47:46 +00:00
cancelCheck: serverStatus.DeadTime.Cancelcheck,
2020-04-30 10:43:36 +00:00
lbDefaultMethod: serverStatus.LoadbalanceDefaultMethod,
2020-05-01 05:47:46 +00:00
Status: serverStatus,
2020-04-30 10:43:36 +00:00
}
return mx
}
2020-05-01 07:06:14 +00:00
// AdminDomain returns the Admin Domain as set on startup
func (mx *MPlexy) AdminDomain() string {
return mx.adminHostName
}
//Run -- Execute
// - execute the GenericLister
// - pass initial port, we'll announce that
2020-04-30 10:43:36 +00:00
func (mx *MPlexy) Run() error {
loginfo.Println("[mplexy] ConnectionTable starting")
loginfo.Println("[mplexy] ct ", mx.connectionTracking)
2020-04-30 10:43:36 +00:00
ctx := mx.ctx
2020-04-30 10:43:36 +00:00
// For just this bit
ctx = context.WithValue(ctx, ctxConnectionTrack, mx.connectionTracking)
2020-04-30 10:43:36 +00:00
// For all Listeners
ctx = context.WithValue(ctx, ctxConfig, mx.tlsConfig)
ctx = context.WithValue(ctx, ctxListenerRegistration, mx.register)
ctx = context.WithValue(ctx, ctxWssHostName, mx.wssHostName)
ctx = context.WithValue(ctx, ctxCancelCheck, mx.cancelCheck)
ctx = context.WithValue(ctx, ctxLoadbalanceDefaultMethod, mx.lbDefaultMethod)
2020-05-01 05:47:46 +00:00
ctx = context.WithValue(ctx, ctxServerStatus, mx.Status)
2020-04-30 10:43:36 +00:00
for {
select {
2020-04-30 10:43:36 +00:00
case <-ctx.Done():
loginfo.Println("[mplexy] Cancel signal hit")
2020-04-30 10:43:36 +00:00
return nil
2020-04-30 10:43:36 +00:00
case registration := <-mx.register:
loginfo.Println("[mplexy] register fired", registration.port)
2020-04-30 10:43:36 +00:00
// check to see if port is already running
for listener := range mx.listeners {
if mx.listeners[listener] == registration.port {
loginfo.Println("[mplexy] listener already running", registration.port)
2020-04-30 10:43:36 +00:00
registration.status = listenerExists
registration.commCh <- registration
}
}
2020-05-01 07:06:14 +00:00
loginfo.Println("[mplexy] listener starting up ", registration.port)
loginfo.Println("[mplexy]", ctx.Value(ctxConnectionTrack).(*api.Tracking))
2020-04-30 10:43:36 +00:00
go mx.multiListenAndServe(ctx, registration)
status := <-registration.commCh
if status.status == listenerAdded {
mx.listeners[status.listener] = status.port
} else if status.status == listenerFault {
loginfo.Println("[mplexy] Unable to create a new listerer", registration.port)
2020-04-30 10:43:36 +00:00
}
}
2020-04-30 10:43:36 +00:00
}
return nil
}
2020-05-01 07:06:14 +00:00
// Start calls go Run()
2020-04-30 10:43:36 +00:00
func (mx *MPlexy) Start() {
go mx.Run()
}
2020-04-30 10:43:36 +00:00
// MultiListenAndServe starts another listener (to the same application) on a new port
func (mx *MPlexy) MultiListenAndServe(port int) {
// TODO how to associate a listening device with a given plain port
mx.register <- NewListenerRegistration(port)
}