diff --git a/cmd/telebit/admin.go b/cmd/telebit/admin.go index ce012d6..6cf6ae2 100644 --- a/cmd/telebit/admin.go +++ b/cmd/telebit/admin.go @@ -180,13 +180,14 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) { // The remote address of the server is useful for identification. // The local address of the server (port to which it connected) is not very meaningful. // Rather the client's local address (the specific relay server) would be more useful. + ctxEncoder, cancelEncoder := context.WithCancel(context.Background()) server := &table.SubscriberConn{ RemoteAddr: r.RemoteAddr, WSConn: conn, WSTun: wsTun, Grants: grants, Clients: &sync.Map{}, - MultiEncoder: telebit.NewEncoder(context.TODO(), wsTun), + MultiEncoder: telebit.NewEncoder(ctxEncoder, wsTun), MultiDecoder: telebit.NewDecoder(wsTun), } // TODO should this happen at NewEncoder()? @@ -196,6 +197,8 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) { go func() { // (this listener is also a telebit.Router) err := server.MultiDecoder.Decode(server) + cancelEncoder() // TODO why don't failed writes solve this? + //_ = server.MultiEncoder.Close() // The tunnel itself must be closed explicitly because // there's an encoder with a callback between the websocket @@ -204,7 +207,8 @@ func upgradeWebsocket(w http.ResponseWriter, r *http.Request) { // TODO close all clients fmt.Printf("a subscriber stream is done: %q\n", err) // TODO check what happens when we leave a junk connection - table.Remove(server.Grants.Subject) + //fmt.Println("[debug] [warn] removing server turned off") + table.RemoveServer(server) }() table.Add(server) diff --git a/mplexer/encoder.go b/mplexer/encoder.go index 9aa2fe2..f328369 100644 --- a/mplexer/encoder.go +++ b/mplexer/encoder.go @@ -81,6 +81,7 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error { } }() + // TODO set a timeout as a failsafe for { //fmt.Println("poopers gonna poop") select { @@ -89,10 +90,10 @@ func (enc *Encoder) Encode(rin io.Reader, src, dst Addr) error { case <-enc.ctx.Done(): // TODO: verify that closing the reader will cause the goroutine to be released //rin.Close() - return errors.New("cancelled by context") + return errors.New("cancelled by encoder read or parent context") case <-enc.subctx.Done(): //rin.Close() - return errors.New("cancelled by context") + return errors.New("cancelled by encoder write context") case b := <-rx: header, _, err := Encode(b, src, Addr{scheme: src.scheme, addr: "", port: -1}) if nil != err { diff --git a/table/table.go b/table/table.go index acc6673..29fc490 100644 --- a/table/table.go +++ b/table/table.go @@ -49,9 +49,11 @@ func Add(server *SubscriberConn) { } } -func RemoveByAddr(subject string) bool { - // TODO - return false +func RemoveServer(server *SubscriberConn) bool { + // TODO remove by RemoteAddr + //return false + fmt.Printf("[warn] RemoveServer() still calls Remove(subject) instead of removing by RemoteAddr\n") + return Remove(server.Grants.Subject) } func Remove(subject string) bool { @@ -158,10 +160,12 @@ func (s *SubscriberConn) Serve(client net.Conn) error { srcParts := strings.Split(client.RemoteAddr().String(), ":") srcAddr := srcParts[0] srcPort, _ := strconv.Atoi(srcParts[1]) + fmt.Println("[debug] srcParts", srcParts) dstParts := strings.Split(client.LocalAddr().String(), ":") dstAddr := dstParts[0] dstPort, _ := strconv.Atoi(dstParts[1]) + fmt.Println("[debug] dstParts", dstParts) termination := telebit.Unknown scheme := telebit.None @@ -185,8 +189,11 @@ func (s *SubscriberConn) Serve(client net.Conn) error { dstAddr, dstPort, ) + fmt.Println("[debug] NewAddr src", src) + fmt.Println("[debug] NewAddr dst", dst) err := s.MultiEncoder.Encode(wconn, *src, *dst) + _ = wconn.Close() s.Clients.Delete(id) return err }