telebit/relay/api/table.go

185 lines
4.9 KiB
Go
Raw Normal View History

2020-05-01 05:47:46 +00:00
package api
2017-03-22 21:43:36 +00:00
import (
"context"
"fmt"
2020-05-01 05:47:46 +00:00
"log"
2017-03-22 21:43:36 +00:00
"time"
)
const (
initialDomains = 0
incrementDomains = 0
)
//Table maintains the set of connections
type Table struct {
connections map[*Connection][]string
2020-05-01 05:47:46 +00:00
Domains map[string]*DomainLoadBalance
register chan *Registration
unregister chan *Connection
domainAnnounce chan *DomainMapping
domainRevoke chan *DomainMapping
dwell int
idle int
2020-04-30 10:43:36 +00:00
balanceMethod string
}
//NewTable -- consructor
2020-04-30 10:43:36 +00:00
func NewTable(dwell, idle int, balanceMethod string) (p *Table) {
p = new(Table)
p.connections = make(map[*Connection][]string)
2020-05-01 05:47:46 +00:00
p.Domains = make(map[string]*DomainLoadBalance)
p.register = make(chan *Registration)
p.unregister = make(chan *Connection)
p.domainAnnounce = make(chan *DomainMapping)
p.domainRevoke = make(chan *DomainMapping)
p.dwell = dwell
p.idle = idle
2020-04-30 10:43:36 +00:00
p.balanceMethod = balanceMethod
return
}
//Connections Property
func (c *Table) Connections() map[*Connection][]string {
return c.connections
}
//ConnByDomain -- Obtains a connection from a domain announcement. A domain may be announced more than once
//if that is the case the system stores these connections and then sends traffic back round-robin
//back to the WSS connections
func (c *Table) ConnByDomain(domain string) (*Connection, bool) {
2020-05-01 05:47:46 +00:00
for dn := range c.Domains {
log.Println(dn, domain)
}
2020-05-01 05:47:46 +00:00
if domainsLB, ok := c.Domains[domain]; ok {
log.Println("found")
conn := domainsLB.NextMember()
return conn, ok
}
return nil, false
}
2017-02-19 20:32:03 +00:00
//reaper --
func (c *Table) reaper(delay int, idle int) {
_ = "breakpoint"
2017-02-19 20:32:03 +00:00
for {
2020-05-01 05:47:46 +00:00
log.Println("Reaper waiting for ", delay, " seconds")
2017-02-19 20:32:03 +00:00
time.Sleep(time.Duration(delay) * time.Second)
2020-05-01 05:47:46 +00:00
log.Println("Running scanning ", len(c.connections))
2017-02-19 20:32:03 +00:00
for d := range c.connections {
if !d.State() {
2017-02-19 20:32:03 +00:00
if time.Since(d.lastUpdate).Seconds() > float64(idle) {
2020-05-01 05:47:46 +00:00
log.Println("reaper removing ", d.lastUpdate, time.Since(d.lastUpdate).Seconds())
2017-02-19 20:32:03 +00:00
delete(c.connections, d)
}
}
}
}
}
//GetConnection -- find connection by server-id
func (c *Table) GetConnection(serverID int64) (*Connection, error) {
for conn := range c.connections {
if conn.ConnectionID() == serverID {
return conn, nil
}
}
return nil, fmt.Errorf("Server-id %d not found", serverID)
}
//Run -- Execute
2020-04-30 10:43:36 +00:00
func (c *Table) Run(ctx context.Context) {
2020-05-01 05:47:46 +00:00
log.Println("ConnectionTable starting")
2017-02-19 20:32:03 +00:00
go c.reaper(c.dwell, c.idle)
2017-02-19 20:32:03 +00:00
for {
select {
case <-ctx.Done():
2020-05-01 05:47:46 +00:00
log.Println("Cancel signal hit")
return
case registration := <-c.register:
2020-05-01 05:47:46 +00:00
log.Println("register fired")
connection := NewConnection(c, registration.conn, registration.source, registration.initialDomains,
registration.connectionTrack, registration.serverName)
2017-02-13 21:59:10 +00:00
c.connections[connection] = make([]string, initialDomains)
registration.commCh <- true
// handle initial domain additions
for _, domain := range connection.initialDomains {
// add to the domains regirstation
newDomain := domain
2020-05-01 05:47:46 +00:00
log.Println("adding domain ", newDomain, " to connection ", connection.conn.RemoteAddr().String())
//check to see if domain is already present.
2020-05-01 05:47:46 +00:00
if _, ok := c.Domains[newDomain]; ok {
//append to a list of connections for that domain
2020-05-01 05:47:46 +00:00
c.Domains[newDomain].AddConnection(connection)
} else {
//if not, then add as the 1st to the list of connections
2020-05-01 05:47:46 +00:00
c.Domains[newDomain] = NewDomainLoadBalance(c.balanceMethod)
c.Domains[newDomain].AddConnection(connection)
}
// add to the connection domain list
s := c.connections[connection]
c.connections[connection] = append(s, newDomain)
}
go connection.Writer()
go connection.Reader(ctx)
case connection := <-c.unregister:
2020-05-01 05:47:46 +00:00
log.Println("closing connection ", connection.conn.RemoteAddr().String())
//does connection exist in the connection table -- should never be an issue
if _, ok := c.connections[connection]; ok {
//iterate over the connections for the domain
for _, domain := range c.connections[connection] {
2020-05-01 05:47:46 +00:00
log.Println("remove domain", domain)
//removing domain, make sure it is present (should never be a problem)
2020-05-01 05:47:46 +00:00
if _, ok := c.Domains[domain]; ok {
2020-05-01 05:47:46 +00:00
domainLB := c.Domains[domain]
domainLB.RemoveConnection(connection)
//check to see if domain is free of connections, if yes, delete map entry
if domainLB.count > 0 {
//ignore...perhaps we will do something here dealing wtih the lb method
} else {
2020-05-01 05:47:46 +00:00
delete(c.Domains, domain)
}
}
}
//delete(c.connections, connection)
//close(connection.send)
}
case domainMapping := <-c.domainAnnounce:
2020-05-01 05:47:46 +00:00
log.Println("domainMapping fired ", domainMapping)
//check to make sure connection is already regiered, you can no register a domain without an apporved connection
//if connection, ok := connections[domainMapping.connection]; ok {
//} else {
//}
}
}
}
//Register -- Property
func (c *Table) Register() chan *Registration {
return c.register
}