telebit/internal/telebit/decoder.go

77 lines
1.7 KiB
Go
Raw Permalink Normal View History

2020-05-22 10:41:24 +00:00
package telebit
2020-05-19 07:06:10 +00:00
import (
2020-07-15 04:09:17 +00:00
"fmt"
2020-05-19 07:06:10 +00:00
"io"
2020-07-21 06:35:45 +00:00
"os"
2020-11-13 12:19:12 +00:00
"git.rootprojects.org/root/telebit/internal/dbg"
2020-05-19 07:06:10 +00:00
)
2020-05-21 05:30:24 +00:00
// Decoder handles a Reader stream containing mplexy-encoded clients
2020-05-19 07:06:10 +00:00
type Decoder struct {
2020-05-21 05:30:24 +00:00
in io.Reader
bufferSize int
2020-05-19 07:06:10 +00:00
}
// NewDecoder returns an initialized Decoder
2020-05-21 05:30:24 +00:00
func NewDecoder(rin io.Reader) *Decoder {
2020-05-19 07:06:10 +00:00
return &Decoder{
2020-05-21 05:30:24 +00:00
in: rin,
bufferSize: defaultBufferSize,
2020-05-19 07:06:10 +00:00
}
}
2020-05-21 05:30:24 +00:00
// Decode will call WriteMessage as often as addressable data exists,
2020-05-19 07:06:10 +00:00
// reading up to bufferSize (default 8192) at a time
// (header + data, though headers are often sent separately from data).
2020-05-21 05:30:24 +00:00
func (d *Decoder) Decode(out Router) error {
p := NewParser(out)
2020-05-19 07:06:10 +00:00
rx := make(chan []byte)
rxErr := make(chan error)
go func() {
for {
2020-05-21 05:30:24 +00:00
b := make([]byte, d.bufferSize)
n, err := d.in.Read(b)
if dbg.Debug {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(os.Stderr, "[debug] [decoder] [srv] Tunnel read %d %s\n", n, dbg.Trunc(b, n))
}
2020-05-19 07:06:10 +00:00
if n > 0 {
rx <- b[:n]
}
if nil != err {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(os.Stderr, "[decoder] [srv] Tunnel read err: %s\n", err)
2020-05-19 07:06:10 +00:00
rxErr <- err
return
}
}
}()
for {
select {
case b := <-rx:
2020-07-15 10:46:02 +00:00
n, err := p.Write(b)
if dbg.Debug {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(os.Stderr, "[debug] [decoder] [srv] Tunnel write %d %d %s\n", n, len(b), dbg.Trunc(b, len(b)))
}
2020-07-15 10:46:02 +00:00
// TODO BUG: handle when 'n' bytes written is less than len(b)
2020-05-19 07:06:10 +00:00
if nil != err {
2020-07-21 06:35:45 +00:00
fmt.Fprintf(os.Stderr, "[decoder] [srv] Tunnel write err: %s\n", err)
2020-05-19 07:06:10 +00:00
// an error to write represents an unrecoverable error,
// not just a downstream client error
2020-05-21 05:30:24 +00:00
//d.in.Close()
2020-05-19 07:06:10 +00:00
return err
}
case err := <-rxErr:
2020-05-21 05:30:24 +00:00
//d.in.Close()
2020-05-19 07:06:10 +00:00
if io.EOF == err {
// it can be assumed that err will close though, right
return nil
}
return err
}
}
}