telebit/table/table.go

271 lines
5.9 KiB
Go
Raw Normal View History

2020-06-29 06:35:19 +00:00
package table
import (
"fmt"
"net"
2020-07-21 06:35:45 +00:00
"os"
2020-06-29 06:35:19 +00:00
"sync"
"io"
"strconv"
"strings"
2020-07-20 22:20:59 +00:00
telebit "git.rootprojects.org/root/telebit"
2020-07-21 06:35:45 +00:00
"git.rootprojects.org/root/telebit/dbg"
2020-06-29 06:35:19 +00:00
"github.com/gorilla/websocket"
)
// Servers represent actual connections
var Servers *sync.Map
// Table makes sense to be in-memory, but it could be serialized if needed
var Table *sync.Map
func init() {
Servers = &sync.Map{}
Table = &sync.Map{}
}
func Add(server *SubscriberConn) {
var srvMap *sync.Map
srvMapX, ok := Servers.Load(server.Grants.Subject)
if ok {
srvMap = srvMapX.(*sync.Map)
} else {
srvMap = &sync.Map{}
}
srvMap.Store(server.RemoteAddr, server)
Servers.Store(server.Grants.Subject, srvMap)
// Add this server to the domain name matrix
for _, domainname := range server.Grants.Domains {
2020-06-29 06:35:19 +00:00
var srvMap *sync.Map
srvMapX, ok := Table.Load(domainname)
2020-06-29 06:35:19 +00:00
if ok {
srvMap = srvMapX.(*sync.Map)
} else {
srvMap = &sync.Map{}
}
srvMap.Store(server.RemoteAddr, server)
Table.Store(domainname, srvMap)
2020-06-29 06:35:19 +00:00
}
}
func RemoveServer(server *SubscriberConn) bool {
// TODO remove by RemoteAddr
//return false
2020-07-21 06:35:45 +00:00
fmt.Fprintf(
os.Stderr,
"[warn] RemoveServer() still calls Remove(subject) instead of removing by RemoteAddr\n",
)
return Remove(server.Grants.Subject)
}
2020-06-29 06:35:19 +00:00
func Remove(subject string) bool {
srvMapX, ok := Servers.Load(subject)
2020-07-21 06:35:45 +00:00
fmt.Printf("Remove(%s): exists? %t\n", subject, ok)
2020-06-29 06:35:19 +00:00
if !ok {
return false
}
srvMap := srvMapX.(*sync.Map)
srvMap.Range(func(k, v interface{}) bool {
srv := v.(*SubscriberConn)
srv.Clients.Range(func(k, v interface{}) bool {
conn := v.(net.Conn)
_ = conn.Close()
return true
})
srv.WSConn.Close()
for _, domainname := range srv.Grants.Domains {
srvMapX, ok := Table.Load(domainname)
if !ok {
continue
}
srvMap = srvMapX.(*sync.Map)
srvMap.Delete(srv.RemoteAddr)
n := 0
srvMap.Range(func(k, v interface{}) bool {
n++
return true
})
if 0 == n {
// TODO comment out to handle the bad case of 0 servers / empty map
Table.Delete(domainname)
}
}
2020-06-29 06:35:19 +00:00
return true
})
Servers.Delete(subject)
return true
}
// SubscriberConn represents a tunneled server, its grants, and its clients
type SubscriberConn struct {
RemoteAddr string
WSConn *websocket.Conn
WSTun net.Conn // *telebit.WebsocketTunnel
Grants *telebit.Grants
Clients *sync.Map
// TODO is this the right codec type?
MultiEncoder *telebit.Encoder
MultiDecoder *telebit.Decoder
// to fulfill Router interface
}
func (s *SubscriberConn) RouteBytes(src, dst telebit.Addr, payload []byte) {
2020-07-15 10:46:02 +00:00
id := fmt.Sprintf("%s:%d", src.Hostname(), src.Port())
if dbg.Debug {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(
os.Stderr,
"[debug] Routing some more bytes: %s\n",
dbg.Trunc(payload, len(payload)),
)
fmt.Printf("\tid %s\nsrc %+v\n", id, src)
fmt.Printf("\tdst %s %+v\n", dst.Scheme(), dst)
}
2020-06-29 06:35:19 +00:00
clientX, ok := s.Clients.Load(id)
if !ok {
// TODO send back closed client error
2020-07-21 06:35:45 +00:00
fmt.Printf("RouteBytes({ %s }, %v, ...) [debug] no client found for %s\n", id, dst)
2020-06-29 06:35:19 +00:00
return
}
client, _ := clientX.(net.Conn)
2020-07-15 10:46:02 +00:00
if "end" == dst.Scheme() {
2020-07-21 06:35:45 +00:00
fmt.Printf("RouteBytes: { %s }.Close(): %v\n", id, dst)
2020-07-15 10:46:02 +00:00
_ = client.Close()
return
}
2020-06-29 06:35:19 +00:00
for {
n, err := client.Write(payload)
if dbg.Debug {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(os.Stderr, "[debug] table Write %s\n", dbg.Trunc(payload, len(payload)))
}
2020-07-15 10:46:02 +00:00
if nil == err || io.EOF == err {
2020-06-29 06:35:19 +00:00
break
}
2020-07-15 10:46:02 +00:00
if n > 0 && io.ErrShortWrite == err {
payload = payload[n:]
continue
}
break
// TODO send back closed client error
//return err
2020-06-29 06:35:19 +00:00
}
}
func (s *SubscriberConn) Serve(client net.Conn) error {
var wconn *telebit.ConnWrap
switch conn := client.(type) {
case *telebit.ConnWrap:
wconn = conn
default:
// this probably isn't strictly necessary
panic("*SubscriberConn.Serve is special in that it must receive &ConnWrap{ Conn: conn }")
}
id := client.RemoteAddr().String()
2020-07-21 06:35:45 +00:00
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] NEW ID (ip:port) %s\n", id)
}
2020-06-29 06:35:19 +00:00
s.Clients.Store(id, client)
2020-07-21 06:35:45 +00:00
//fmt.Fprintf(os.Stderr, "[debug] immediately cancel client to simplify testing / debugging\n")
2020-07-09 09:03:04 +00:00
//_ = client.Close()
2020-06-29 06:35:19 +00:00
// TODO
// - Encode each client to the tunnel
// - Find the right client for decoded messages
// TODO which order is remote / local?
srcParts := strings.Split(client.RemoteAddr().String(), ":")
srcAddr := srcParts[0]
srcPort, _ := strconv.Atoi(srcParts[1])
dstParts := strings.Split(client.LocalAddr().String(), ":")
dstAddr := dstParts[0]
dstPort, _ := strconv.Atoi(dstParts[1])
2020-07-21 06:35:45 +00:00
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] srcParts %v\n", srcParts)
fmt.Fprintf(os.Stderr, "[debug] dstParts %v\n", dstParts)
}
2020-07-15 10:46:02 +00:00
servername := wconn.Servername()
2020-06-29 06:35:19 +00:00
termination := telebit.Unknown
scheme := telebit.None
2020-07-15 10:46:02 +00:00
if "" != servername {
dstAddr = servername
//scheme = telebit.TLS
scheme = telebit.HTTPS
}
2020-06-29 06:35:19 +00:00
if 80 == dstPort {
2020-07-15 10:46:02 +00:00
scheme = telebit.HTTPS
} else if 443 == dstPort {
2020-06-29 06:35:19 +00:00
// TODO dstAddr = wconn.Servername()
scheme = telebit.HTTP
}
src := telebit.NewAddr(
scheme,
termination,
srcAddr,
srcPort,
)
dst := telebit.NewAddr(
scheme,
termination,
dstAddr,
dstPort,
)
2020-07-21 06:35:45 +00:00
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] NewAddr src %+v\n", src)
fmt.Fprintf(os.Stderr, "[debug] NewAddr dst %+v\n", dst)
}
2020-06-29 06:35:19 +00:00
err := s.MultiEncoder.Encode(wconn, *src, *dst)
_ = wconn.Close()
2020-07-21 06:35:45 +00:00
if dbg.Debug {
fmt.Fprintf(os.Stderr, "[debug] Encoder Complete %+v %+v\n", id, err)
}
2020-06-29 06:35:19 +00:00
s.Clients.Delete(id)
return err
}
func GetServer(servername string) (*SubscriberConn, bool) {
var srv *SubscriberConn
load := -1
// TODO match *.whatever.com
srvMapX, ok := Table.Load(servername)
if !ok {
return nil, false
}
srvMap := srvMapX.(*sync.Map)
srvMap.Range(func(k, v interface{}) bool {
myLoad := 0
mySrv := v.(*SubscriberConn)
mySrv.Clients.Range(func(k, v interface{}) bool {
2020-07-15 11:00:21 +00:00
myLoad += 1
2020-06-29 06:35:19 +00:00
return true
})
// pick the least loaded server
if -1 == load || myLoad < load {
load = myLoad
srv = mySrv
}
return true
})
return srv, true
}