rename package packer->telebit
This commit is contained in:
parent
1f39f57837
commit
53478d35fd
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
|
@ -1,62 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"net"
|
|
||||||
|
|
||||||
"git.coolaj86.com/coolaj86/go-telebitd/mplexer"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
r := &Router{
|
|
||||||
secret: os.Getenv("SECRET"),
|
|
||||||
}
|
|
||||||
m := &mplexer.MultiplexLocal{
|
|
||||||
Relay: os.Getenv("RELAY"),
|
|
||||||
SortingHat: r,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// TODO more m.ListenAndServe(mux) style?
|
|
||||||
m.ListenAndServe(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Router struct {
|
|
||||||
secret string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) Authz() (string, error) {
|
|
||||||
return r.secret, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// this function is very client-specific logic
|
|
||||||
func (r *Router) LookupTarget(paddr packer.Addr) (net.Conn, error) {
|
|
||||||
//if target := LookupPort(paddr.Servername()); nil != target { }
|
|
||||||
if target := r.LookupServername(paddr.Port()); nil != target {
|
|
||||||
tconn, err := net.Dial(target.Network(), target.Hostname())
|
|
||||||
if nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
// TODO for http proxy
|
|
||||||
return mplexer.TargetOptions {
|
|
||||||
Hostname // default localhost
|
|
||||||
Termination // default TLS
|
|
||||||
XFWD // default... no?
|
|
||||||
Port // default 0
|
|
||||||
Conn // should be dialed beforehand
|
|
||||||
}, nil
|
|
||||||
*/
|
|
||||||
return tconn, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Router) LookupServername(servername string) mplexer.Addr {
|
|
||||||
return &mplexer.NewAddr(
|
|
||||||
mplexer.HTTPS,
|
|
||||||
mplexer.TCP, // TCP -> termination.None? / Plain?
|
|
||||||
"localhost",
|
|
||||||
3000,
|
|
||||||
)
|
|
||||||
}
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
|
telebit "git.coolaj86.com/coolaj86/go-telebitd/mplexer"
|
||||||
|
|
||||||
"github.com/caddyserver/certmagic"
|
"github.com/caddyserver/certmagic"
|
||||||
jwt "github.com/dgrijalva/jwt-go"
|
jwt "github.com/dgrijalva/jwt-go"
|
||||||
|
@ -109,7 +109,7 @@ func main() {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
acme := &packer.ACME{
|
acme := &telebit.ACME{
|
||||||
Email: *email,
|
Email: *email,
|
||||||
StoragePath: *certpath,
|
StoragePath: *certpath,
|
||||||
Agree: *acmeAgree,
|
Agree: *acmeAgree,
|
||||||
|
@ -119,14 +119,14 @@ func main() {
|
||||||
EnableTLSALPNChallenge: enableTLSALPN01,
|
EnableTLSALPNChallenge: enableTLSALPN01,
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := packer.NewRouteMux()
|
mux := telebit.NewRouteMux()
|
||||||
mux.HandleTLS("*", acme, mux)
|
mux.HandleTLS("*", acme, mux)
|
||||||
for _, fwd := range forwards {
|
for _, fwd := range forwards {
|
||||||
mux.ForwardTCP("*", "localhost:"+fwd.port, 120*time.Second)
|
mux.ForwardTCP("*", "localhost:"+fwd.port, 120*time.Second)
|
||||||
//mux.ForwardTCP(fwd.pattern, "localhost:"+fwd.port, 120*time.Second)
|
//mux.ForwardTCP(fwd.pattern, "localhost:"+fwd.port, 120*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
tun, err := packer.DialWebsocketTunnel(ctx, *relay, *token)
|
tun, err := telebit.DialWebsocketTunnel(ctx, *relay, *token)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
fmt.Println("relay:", relay)
|
fmt.Println("relay:", relay)
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -134,7 +134,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Listening at %s\n", *relay)
|
fmt.Printf("Listening at %s\n", *relay)
|
||||||
log.Fatal("Closed server: ", packer.ListenAndServe(tun, mux))
|
log.Fatal("Closed server: ", telebit.ListenAndServe(tun, mux))
|
||||||
}
|
}
|
||||||
|
|
||||||
type ACMEProvider struct {
|
type ACMEProvider struct {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -1,188 +1,176 @@
|
||||||
package mplexer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Listener defines a listener for use with http servers
|
// A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections.
|
||||||
type Listener struct {
|
type Listener struct {
|
||||||
//ParentAddr net.Addr
|
//wsconn *websocket.Conn
|
||||||
//Conns chan *Conn
|
tun net.Conn
|
||||||
ws *websocket.Conn
|
incoming chan *Conn
|
||||||
ctx context.Context
|
close chan struct{}
|
||||||
parser *packer.Parser
|
encoder *Encoder
|
||||||
|
chunksParsed int
|
||||||
|
bytesRead int
|
||||||
|
conns map[string]net.Conn
|
||||||
|
//conns map[string]*Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen creates a channel for connections and returns the listener
|
// Listen creates a new Listener and sets it up to receive and distribute connections.
|
||||||
func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) {
|
func Listen(tun net.Conn) *Listener {
|
||||||
authz, err := m.SortingHat.Authz()
|
ctx := context.TODO()
|
||||||
if nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
wsd := websocket.Dialer{}
|
// Feed the socket into the Encoder and Decoder
|
||||||
headers := http.Header{}
|
|
||||||
headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz))
|
|
||||||
// *http.Response
|
|
||||||
wsconn, _, err := wsd.DialContext(ctx, m.Relay, headers)
|
|
||||||
if nil != err {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
//conns := make(chan *packer.Conn)
|
|
||||||
//parser := &packer.NewParser(ctx, conns)
|
|
||||||
|
|
||||||
/*
|
|
||||||
go func() {
|
|
||||||
conn, err := packer.Accept()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to accept new relayed connection: %s\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
conns <- conn
|
|
||||||
}()
|
|
||||||
*/
|
|
||||||
|
|
||||||
handler := &Handler{}
|
|
||||||
listener := &Listener{
|
listener := &Listener{
|
||||||
//Conns: conns,
|
tun: tun,
|
||||||
parser: packer.NewParser(ctx, handler),
|
incoming: make(chan *Conn, 1), // buffer ever so slightly
|
||||||
|
close: make(chan struct{}),
|
||||||
|
encoder: NewEncoder(ctx, tun),
|
||||||
|
conns: map[string]net.Conn{},
|
||||||
|
//conns: map[string]*Conn{},
|
||||||
}
|
}
|
||||||
go m.listen(ctx, wsconn, listener)
|
|
||||||
return listener, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Handler struct {
|
// TODO perhaps the wrapper should have a mutex
|
||||||
}
|
// rather than having a goroutine in the encoder
|
||||||
|
|
||||||
func (h *Handler) WriteMessage(packer.Addr, []byte) {
|
|
||||||
panic(errors.New("not implemented"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MultiplexLocal) listen(ctx context.Context, wsconn *websocket.Conn, listener *Listener) {
|
|
||||||
// will cancel if ws errors out or closes
|
|
||||||
// (TODO: this may also be redundant)
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Ping every 15 seconds, or stop listening
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
err := listener.encoder.Run()
|
||||||
time.Sleep(15 * time.Second)
|
fmt.Printf("encoder stopped entirely: %q", err)
|
||||||
deadline := time.Now().Add(45 * time.Second)
|
tun.Close()
|
||||||
if err := wsconn.WriteControl(websocket.PingMessage, []byte(""), deadline); nil != err {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to write ping message to websocket: %s\n", err)
|
|
||||||
cancel()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// The write loop (which fails if ping fails)
|
// Decode the stream as it comes in
|
||||||
|
decoder := NewDecoder(tun)
|
||||||
go func() {
|
go func() {
|
||||||
// TODO optimal buffer size
|
// TODO pass error to Accept()
|
||||||
b := make([]byte, 128*1024)
|
err := decoder.Decode(listener)
|
||||||
|
|
||||||
|
// The listener itself must be closed explicitly because
|
||||||
|
// there's an encoder with a callback between the websocket
|
||||||
|
// and the multiplexer, so it doesn't know to stop listening otherwise
|
||||||
|
listener.Close()
|
||||||
|
fmt.Printf("the main stream is done: %q\n", err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return listener
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler
|
||||||
|
func ListenAndServe(tun net.Conn, mux Handler) error {
|
||||||
|
listener := Listen(tun)
|
||||||
|
return Serve(listener, mux)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serve Accept()s connections which have already been unwrapped and serves them with the given Handler
|
||||||
|
func Serve(listener *Listener, mux Handler) error {
|
||||||
for {
|
for {
|
||||||
n, err := listener.parser.Read(b)
|
client, err := listener.Accept()
|
||||||
if n > 0 {
|
if nil != err {
|
||||||
if err := wsconn.WriteMessage(websocket.BinaryMessage, b); nil != err {
|
return err
|
||||||
fmt.Fprintf(os.Stderr, "failed to write packer message to websocket: %s\n", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err = mux.Serve(client)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
if io.EOF != err {
|
if io.EOF != err {
|
||||||
fmt.Fprintf(os.Stderr, "failed to read message from packer: %s\n", err)
|
fmt.Printf("client could not be served: %q\n", err.Error())
|
||||||
break
|
|
||||||
}
|
|
||||||
fmt.Fprintf(os.Stderr, "[TODO debug] closed packer: %s\n", err)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO handle EOF as websocket.CloseNormal?
|
client.Close()
|
||||||
message := websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing connection")
|
|
||||||
deadline := time.Now().Add(10 * time.Second)
|
|
||||||
if err := wsconn.WriteControl(websocket.CloseMessage, message, deadline); nil != err {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to write close message to websocket: %s\n", err)
|
|
||||||
}
|
|
||||||
_ = wsconn.Close()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// The read loop (also fails if ping fails)
|
|
||||||
for {
|
|
||||||
_, message, err := wsconn.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to read message from websocket: %s\n", err)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
_, err = listener.packer.Write(message)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to process message from websocket: %s\n", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// just to be sure
|
|
||||||
listener.packer.Close()
|
|
||||||
wsconn.Close()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
// Accept returns a tunneled network connection
|
||||||
// Feed will block while pushing a net.Conn onto Conns
|
func (l *Listener) Accept() (net.Conn, error) {
|
||||||
func (l *Listener) Feed(conn *Conn) {
|
|
||||||
l.Conns <- conn
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// net.Listener interface
|
|
||||||
|
|
||||||
/*
|
|
||||||
// Accept will block and wait for a new net.Conn
|
|
||||||
func (l *Listener) Accept() (*Conn, error) {
|
|
||||||
select {
|
select {
|
||||||
case conn, ok := <-l.Conns:
|
case rconn, ok := <-l.incoming:
|
||||||
if ok {
|
if ok {
|
||||||
return conn, nil
|
return rconn, nil
|
||||||
}
|
}
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
|
|
||||||
case <-l.ctx.Done():
|
case <-l.close:
|
||||||
// TODO is another error more suitable?
|
return nil, http.ErrServerClosed
|
||||||
// TODO is this redundant?
|
|
||||||
return nil, io.EOF
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
func (l *Listener) Accept() (*packer.Conn, error) {
|
// Close stops accepting new connections and closes the underlying websocket.
|
||||||
return l.Accept()
|
// TODO return errors.
|
||||||
}
|
|
||||||
|
|
||||||
// Close will close the Conns channel
|
|
||||||
func (l *Listener) Close() error {
|
func (l *Listener) Close() error {
|
||||||
//close(l.Conns)
|
l.tun.Close()
|
||||||
//return nil
|
close(l.incoming)
|
||||||
return l.packer.Close()
|
l.close <- struct{}{}
|
||||||
}
|
|
||||||
|
|
||||||
// Addr returns nil to fulfill the net.Listener interface
|
|
||||||
func (l *Listener) Addr() net.Addr {
|
|
||||||
// Addr may (or may not) return the original TCP or TLS listener's address
|
|
||||||
//return l.ParentAddr
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RouteBytes receives address information and a buffer and creates or re-uses a pipe that can be Accept()ed.
|
||||||
|
func (l *Listener) RouteBytes(srcAddr, dstAddr Addr, b []byte) {
|
||||||
|
// TODO use context to be able to cancel many at once?
|
||||||
|
l.chunksParsed++
|
||||||
|
|
||||||
|
src := &srcAddr
|
||||||
|
dst := &dstAddr
|
||||||
|
pipe := l.getPipe(src, dst, len(b))
|
||||||
|
//fmt.Printf("%s\n", b)
|
||||||
|
|
||||||
|
// handle errors before data writes because I don't
|
||||||
|
// remember where the error message goes
|
||||||
|
if "error" == string(dst.scheme) {
|
||||||
|
pipe.Close()
|
||||||
|
delete(l.conns, src.Network())
|
||||||
|
fmt.Printf("a stream errored remotely: %v\n", src)
|
||||||
|
}
|
||||||
|
|
||||||
|
// write data, if any
|
||||||
|
if len(b) > 0 {
|
||||||
|
l.bytesRead += len(b)
|
||||||
|
pipe.Write(b)
|
||||||
|
}
|
||||||
|
// EOF, if needed
|
||||||
|
if "end" == string(dst.scheme) {
|
||||||
|
fmt.Println("[debug] end")
|
||||||
|
pipe.Close()
|
||||||
|
delete(l.conns, src.Network())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Listener) getPipe(src, dst *Addr, count int) net.Conn {
|
||||||
|
connID := src.Network()
|
||||||
|
pipe, ok := l.conns[connID]
|
||||||
|
|
||||||
|
// Pipe exists
|
||||||
|
if ok {
|
||||||
|
return pipe
|
||||||
|
}
|
||||||
|
fmt.Printf("New client (%d byte hello)\n\tfrom %#v\n\tto %#v:\n", count, src, dst)
|
||||||
|
|
||||||
|
// Create pipe
|
||||||
|
rawPipe, pipe := net.Pipe()
|
||||||
|
newconn := &Conn{
|
||||||
|
//updated: time.Now(),
|
||||||
|
relaySourceAddr: *src,
|
||||||
|
relayTargetAddr: *dst,
|
||||||
|
relay: rawPipe,
|
||||||
|
}
|
||||||
|
l.conns[connID] = pipe
|
||||||
|
l.incoming <- newconn
|
||||||
|
|
||||||
|
// Handle encoding
|
||||||
|
go func() {
|
||||||
|
// TODO handle err
|
||||||
|
err := l.encoder.Encode(pipe, *src, *dst)
|
||||||
|
// the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such
|
||||||
|
// or it might be an actual error
|
||||||
|
// In any case, we'll just close it all
|
||||||
|
newconn.Close()
|
||||||
|
pipe.Close()
|
||||||
|
fmt.Printf("a stream is done: %q\n", err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return pipe
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
|
@ -1,160 +0,0 @@
|
||||||
package mplexer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MultiplexLocal struct {
|
|
||||||
Relay string
|
|
||||||
SortingHat SortingHat
|
|
||||||
Timeout time.Duration
|
|
||||||
listener *Listener
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(relay string, hat SortingHat) *MultiplexLocal {
|
|
||||||
return &MultiplexLocal{
|
|
||||||
Relay: relay,
|
|
||||||
SortingHat: hat,
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error {
|
|
||||||
// Cancels if Accept() returns an error (i.e. because it was closed)
|
|
||||||
// (TODO: this may be redundant)
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
listener, err := m.Listen(ctx)
|
|
||||||
if nil != err {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
pconn, err := listener.Accept() // packer.Conn
|
|
||||||
if nil != err {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go m.serve(ctx, pconn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MultiplexLocal) Close() error {
|
|
||||||
return m.listener.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MultiplexLocal) serve(ctx context.Context, pconn *packer.Conn) {
|
|
||||||
//paddr := pconn.LocalAddr().(*Addr) // packer.Addr
|
|
||||||
paddr := pconn.LocalAddr()
|
|
||||||
//addr.Network()
|
|
||||||
//addr.String()
|
|
||||||
paddr.Scheme()
|
|
||||||
//paddr.Encrypted()
|
|
||||||
//paddr.Servername()
|
|
||||||
|
|
||||||
// todo: some sort of logic to avoid infinite loop to self?
|
|
||||||
// (that's probably not possible since the connection could
|
|
||||||
// route several layers deep)
|
|
||||||
if target, err := m.SortingHat.LookupTarget(paddr); nil != target {
|
|
||||||
if nil != err {
|
|
||||||
// TODO get a log channel or some such
|
|
||||||
fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s\n", err)
|
|
||||||
err := pconn.Error(err)
|
|
||||||
if nil != err {
|
|
||||||
fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s\n", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pipePacker(ctx, pconn, target, m.Timeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func pipePacker(ctx context.Context, pconn *packer.Conn, target net.Conn, timeout time.Duration) {
|
|
||||||
// how can this be done so that target errors are
|
|
||||||
// sent back to the relay server?
|
|
||||||
|
|
||||||
// Also something like ReadAhead(size) should signal
|
|
||||||
// to read and send up to `size` bytes without waiting
|
|
||||||
// for a response - since we can't signal 'non-read' as
|
|
||||||
// is the normal operation of tcp... or can we?
|
|
||||||
// And how do we distinguish idle from dropped?
|
|
||||||
// Maybe this should have been a udp protocol???
|
|
||||||
|
|
||||||
defer pconn.Close()
|
|
||||||
defer target.Close()
|
|
||||||
|
|
||||||
srcCh := make(chan []byte)
|
|
||||||
dstCh := make(chan []byte)
|
|
||||||
errCh := make(chan error)
|
|
||||||
|
|
||||||
// Source (Relay) Read Channel
|
|
||||||
go func() {
|
|
||||||
// TODO what's the optimal size to buffer?
|
|
||||||
// TODO user buffered reader
|
|
||||||
b := make([]byte, 128*1024)
|
|
||||||
for {
|
|
||||||
pconn.SetDeadline(time.Now().Add(timeout))
|
|
||||||
n, err := pconn.Read(b)
|
|
||||||
if n > 0 {
|
|
||||||
srcCh <- b
|
|
||||||
}
|
|
||||||
if nil != err {
|
|
||||||
// TODO let client log this server-side error (unless EOF)
|
|
||||||
// (nil here because we probably can't send the error to the relay)
|
|
||||||
errCh <- nil
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Target (Local) Read Channel
|
|
||||||
go func() {
|
|
||||||
// TODO what's the optimal size to buffer?
|
|
||||||
// TODO user buffered reader
|
|
||||||
b := make([]byte, 128*1024)
|
|
||||||
for {
|
|
||||||
target.SetDeadline(time.Now().Add(timeout))
|
|
||||||
n, err := target.Read(b)
|
|
||||||
if n > 0 {
|
|
||||||
dstCh <- b
|
|
||||||
}
|
|
||||||
if nil != err {
|
|
||||||
if io.EOF == err {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
errCh <- err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
break
|
|
||||||
case b := <-srcCh:
|
|
||||||
target.SetDeadline(time.Now().Add(timeout))
|
|
||||||
_, err := target.Write(b)
|
|
||||||
if nil != err {
|
|
||||||
// TODO log error locally
|
|
||||||
pconn.Error(err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
case b := <-dstCh:
|
|
||||||
pconn.SetDeadline(time.Now().Add(timeout))
|
|
||||||
_, err := pconn.Write(b)
|
|
||||||
if nil != err {
|
|
||||||
// TODO log error locally
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
|
@ -1,176 +0,0 @@
|
||||||
package packer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A Listener transforms a multiplexed websocket connection into individual net.Conn-like connections.
|
|
||||||
type Listener struct {
|
|
||||||
//wsconn *websocket.Conn
|
|
||||||
tun net.Conn
|
|
||||||
incoming chan *Conn
|
|
||||||
close chan struct{}
|
|
||||||
encoder *Encoder
|
|
||||||
chunksParsed int
|
|
||||||
bytesRead int
|
|
||||||
conns map[string]net.Conn
|
|
||||||
//conns map[string]*Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listen creates a new Listener and sets it up to receive and distribute connections.
|
|
||||||
func Listen(tun net.Conn) *Listener {
|
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
// Feed the socket into the Encoder and Decoder
|
|
||||||
listener := &Listener{
|
|
||||||
tun: tun,
|
|
||||||
incoming: make(chan *Conn, 1), // buffer ever so slightly
|
|
||||||
close: make(chan struct{}),
|
|
||||||
encoder: NewEncoder(ctx, tun),
|
|
||||||
conns: map[string]net.Conn{},
|
|
||||||
//conns: map[string]*Conn{},
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO perhaps the wrapper should have a mutex
|
|
||||||
// rather than having a goroutine in the encoder
|
|
||||||
go func() {
|
|
||||||
err := listener.encoder.Run()
|
|
||||||
fmt.Printf("encoder stopped entirely: %q", err)
|
|
||||||
tun.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Decode the stream as it comes in
|
|
||||||
decoder := NewDecoder(tun)
|
|
||||||
go func() {
|
|
||||||
// TODO pass error to Accept()
|
|
||||||
err := decoder.Decode(listener)
|
|
||||||
|
|
||||||
// The listener itself must be closed explicitly because
|
|
||||||
// there's an encoder with a callback between the websocket
|
|
||||||
// and the multiplexer, so it doesn't know to stop listening otherwise
|
|
||||||
listener.Close()
|
|
||||||
fmt.Printf("the main stream is done: %q\n", err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return listener
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListenAndServe listens on a websocket and handles the incomming net.Conn-like connections with a Handler
|
|
||||||
func ListenAndServe(tun net.Conn, mux Handler) error {
|
|
||||||
listener := Listen(tun)
|
|
||||||
return Serve(listener, mux)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Serve Accept()s connections which have already been unwrapped and serves them with the given Handler
|
|
||||||
func Serve(listener *Listener, mux Handler) error {
|
|
||||||
for {
|
|
||||||
client, err := listener.Accept()
|
|
||||||
if nil != err {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
err = mux.Serve(client)
|
|
||||||
if nil != err {
|
|
||||||
if io.EOF != err {
|
|
||||||
fmt.Printf("client could not be served: %q\n", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
client.Close()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept returns a tunneled network connection
|
|
||||||
func (l *Listener) Accept() (net.Conn, error) {
|
|
||||||
select {
|
|
||||||
case rconn, ok := <-l.incoming:
|
|
||||||
if ok {
|
|
||||||
return rconn, nil
|
|
||||||
}
|
|
||||||
return nil, io.EOF
|
|
||||||
|
|
||||||
case <-l.close:
|
|
||||||
return nil, http.ErrServerClosed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close stops accepting new connections and closes the underlying websocket.
|
|
||||||
// TODO return errors.
|
|
||||||
func (l *Listener) Close() error {
|
|
||||||
l.tun.Close()
|
|
||||||
close(l.incoming)
|
|
||||||
l.close <- struct{}{}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RouteBytes receives address information and a buffer and creates or re-uses a pipe that can be Accept()ed.
|
|
||||||
func (l *Listener) RouteBytes(srcAddr, dstAddr Addr, b []byte) {
|
|
||||||
// TODO use context to be able to cancel many at once?
|
|
||||||
l.chunksParsed++
|
|
||||||
|
|
||||||
src := &srcAddr
|
|
||||||
dst := &dstAddr
|
|
||||||
pipe := l.getPipe(src, dst, len(b))
|
|
||||||
//fmt.Printf("%s\n", b)
|
|
||||||
|
|
||||||
// handle errors before data writes because I don't
|
|
||||||
// remember where the error message goes
|
|
||||||
if "error" == string(dst.scheme) {
|
|
||||||
pipe.Close()
|
|
||||||
delete(l.conns, src.Network())
|
|
||||||
fmt.Printf("a stream errored remotely: %v\n", src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// write data, if any
|
|
||||||
if len(b) > 0 {
|
|
||||||
l.bytesRead += len(b)
|
|
||||||
pipe.Write(b)
|
|
||||||
}
|
|
||||||
// EOF, if needed
|
|
||||||
if "end" == string(dst.scheme) {
|
|
||||||
fmt.Println("[debug] end")
|
|
||||||
pipe.Close()
|
|
||||||
delete(l.conns, src.Network())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Listener) getPipe(src, dst *Addr, count int) net.Conn {
|
|
||||||
connID := src.Network()
|
|
||||||
pipe, ok := l.conns[connID]
|
|
||||||
|
|
||||||
// Pipe exists
|
|
||||||
if ok {
|
|
||||||
return pipe
|
|
||||||
}
|
|
||||||
fmt.Printf("New client (%d byte hello)\n\tfrom %#v\n\tto %#v:\n", count, src, dst)
|
|
||||||
|
|
||||||
// Create pipe
|
|
||||||
rawPipe, pipe := net.Pipe()
|
|
||||||
newconn := &Conn{
|
|
||||||
//updated: time.Now(),
|
|
||||||
relaySourceAddr: *src,
|
|
||||||
relayTargetAddr: *dst,
|
|
||||||
relay: rawPipe,
|
|
||||||
}
|
|
||||||
l.conns[connID] = pipe
|
|
||||||
l.incoming <- newconn
|
|
||||||
|
|
||||||
// Handle encoding
|
|
||||||
go func() {
|
|
||||||
// TODO handle err
|
|
||||||
err := l.encoder.Encode(pipe, *src, *dst)
|
|
||||||
// the error may be EOF or ErrServerClosed or ErrGoingAwawy or some such
|
|
||||||
// or it might be an actual error
|
|
||||||
// In any case, we'll just close it all
|
|
||||||
newconn.Close()
|
|
||||||
pipe.Close()
|
|
||||||
fmt.Printf("a stream is done: %q\n", err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return pipe
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
"strconv"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -1,10 +0,0 @@
|
||||||
package mplexer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.coolaj86.com/coolaj86/go-telebitd/mplexer/packer"
|
|
||||||
)
|
|
||||||
|
|
||||||
type SortingHat interface {
|
|
||||||
LookupTarget(*packer.Addr) (*packer.Conn, error)
|
|
||||||
Authz() (string, error)
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -1,4 +1,4 @@
|
||||||
package packer
|
package telebit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
Loading…
Reference in New Issue