cancel and close connections on enoder read and write errors

This commit is contained in:
AJ ONeal 2020-07-13 09:14:11 +00:00
parent 3ac48f0db1
commit e5f26a25da
3 changed files with 19 additions and 7 deletions

View File

@ -180,13 +180,14 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) {
// The remote address of the server is useful for identification. // The remote address of the server is useful for identification.
// The local address of the server (port to which it connected) is not very meaningful. // The local address of the server (port to which it connected) is not very meaningful.
// Rather the client's local address (the specific relay server) would be more useful. // Rather the client's local address (the specific relay server) would be more useful.
ctxEncoder, cancelEncoder := context.WithCancel(context.Background())
server := &table.SubscriberConn{ server := &table.SubscriberConn{
RemoteAddr: r.RemoteAddr, RemoteAddr: r.RemoteAddr,
WSConn: conn, WSConn: conn,
WSTun: wsTun, WSTun: wsTun,
Grants: grants, Grants: grants,
Clients: &sync.Map{}, Clients: &sync.Map{},
MultiEncoder: telebit.NewEncoder(context.TODO(), wsTun), MultiEncoder: telebit.NewEncoder(ctxEncoder, wsTun),
MultiDecoder: telebit.NewDecoder(wsTun), MultiDecoder: telebit.NewDecoder(wsTun),
} }
// TODO should this happen at NewEncoder()? // TODO should this happen at NewEncoder()?
@ -196,6 +197,8 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
// (this listener is also a telebit.Router) // (this listener is also a telebit.Router)
err := server.MultiDecoder.Decode(server) err := server.MultiDecoder.Decode(server)
cancelEncoder() // TODO why don't failed writes solve this?
//_ = server.MultiEncoder.Close()
// The tunnel itself must be closed explicitly because // The tunnel itself must be closed explicitly because
// there's an encoder with a callback between the websocket // there's an encoder with a callback between the websocket
@ -204,7 +207,8 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) {
// TODO close all clients // TODO close all clients
fmt.Printf("a subscriber stream is done: %q\n", err) fmt.Printf("a subscriber stream is done: %q\n", err)
// TODO check what happens when we leave a junk connection // TODO check what happens when we leave a junk connection
table.Remove(server.Grants.Subject) //fmt.Println("[debug] [warn] removing server turned off")
table.RemoveServer(server)
}() }()
table.Add(server) table.Add(server)

View File

@ -81,6 +81,7 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error {
} }
}() }()
// TODO set a timeout as a failsafe
for { for {
//fmt.Println("poopers gonna poop") //fmt.Println("poopers gonna poop")
select { select {
@ -89,10 +90,10 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error {
case <-enc.ctx.Done(): case <-enc.ctx.Done():
// TODO: verify that closing the reader will cause the goroutine to be released // TODO: verify that closing the reader will cause the goroutine to be released
//rin.Close() //rin.Close()
return errors.New("cancelled by context") return errors.New("cancelled by encoder read or parent context")
case <-enc.subctx.Done(): case <-enc.subctx.Done():
//rin.Close() //rin.Close()
return errors.New("cancelled by context") return errors.New("cancelled by encoder write context")
case b := <-rx: case b := <-rx:
header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: "", port: -1}) header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: "", port: -1})
if nil != err { if nil != err {

View File

@ -49,9 +49,11 @@ func Add(server *SubscriberConn) {
} }
} }
func RemoveByAddr(subject string) bool { func RemoveServer(server *SubscriberConn) bool {
// TODO // TODO remove by RemoteAddr
return false //return false
fmt.Printf("[warn] RemoveServer() still calls Remove(subject) instead of removing by RemoteAddr\n")
return Remove(server.Grants.Subject)
} }
func Remove(subject string) bool { func Remove(subject string) bool {
@ -158,10 +160,12 @@ func (s *SubscriberConn) Serve(client net.Conn) error {
srcParts := strings.Split(client.RemoteAddr().String(), ":") srcParts := strings.Split(client.RemoteAddr().String(), ":")
srcAddr := srcParts[0] srcAddr := srcParts[0]
srcPort, _ := strconv.Atoi(srcParts[1]) srcPort, _ := strconv.Atoi(srcParts[1])
fmt.Println("[debug] srcParts", srcParts)
dstParts := strings.Split(client.LocalAddr().String(), ":") dstParts := strings.Split(client.LocalAddr().String(), ":")
dstAddr := dstParts[0] dstAddr := dstParts[0]
dstPort, _ := strconv.Atoi(dstParts[1]) dstPort, _ := strconv.Atoi(dstParts[1])
fmt.Println("[debug] dstParts", dstParts)
termination := telebit.Unknown termination := telebit.Unknown
scheme := telebit.None scheme := telebit.None
@ -185,8 +189,11 @@ func (s *SubscriberConn) Serve(client net.Conn) error {
dstAddr, dstAddr,
dstPort, dstPort,
) )
fmt.Println("[debug] NewAddr src", src)
fmt.Println("[debug] NewAddr dst", dst)
err := s.MultiEncoder.Encode(wconn, *src, *dst) err := s.MultiEncoder.Encode(wconn, *src, *dst)
_ = wconn.Close()
s.Clients.Delete(id) s.Clients.Delete(id)
return err return err
} }