model encodings package more closely

This commit is contained in:
AJ ONeal 2020-05-20 16:23:58 -06:00
parent 9a97d153e8
commit 1014ac6fd5
2 changed files with 100 additions and 53 deletions

View File

@ -7,23 +7,32 @@ import (
"sync" "sync"
) )
// TODO: try to be more like encoding/csv, or more like encoding/pem and encoding/json?
// Encoder converts TCP to MPLEXY-TCP
type Encoder struct { type Encoder struct {
ctx context.Context ctx context.Context
subctx context.Context subctx context.Context
mux sync.Mutex mux sync.Mutex
w io.WriteCloser out io.WriteCloser
wErr chan error outErr chan error
bufferSize int
} }
func NewEncoder(ctx context.Context, w io.WriteCloser) *Encoder { // NewEncoder returns an Encoder instance
func NewEncoder(ctx context.Context, wout io.WriteCloser) *Encoder {
enc := &Encoder{ enc := &Encoder{
ctx: ctx, ctx: ctx,
w: w, out: wout,
wErr: make(chan error), outErr: make(chan error),
bufferSize: defaultBufferSize,
} }
return enc return enc
} }
// Run loops over a select of contexts and error channels
// to cancel and close south-side connections, if needed.
// TODO should this be pushed elsewhere to handled?
func (enc *Encoder) Run() error { func (enc *Encoder) Run() error {
ctx, cancel := context.WithCancel(enc.ctx) ctx, cancel := context.WithCancel(enc.ctx)
defer cancel() defer cancel()
@ -35,35 +44,31 @@ func (enc *Encoder) Run() error {
// TODO: do children respond to children cancelling? // TODO: do children respond to children cancelling?
case <-enc.ctx.Done(): case <-enc.ctx.Done():
// TODO // TODO
_ = enc.w.Close() _ = enc.out.Close()
return errors.New("context cancelled") return errors.New("context cancelled")
case err := <-enc.wErr: case err := <-enc.outErr:
// if a write fails for one, it fail for all
return err return err
} }
} }
} }
// Start will Run() the encoder in a goroutine
func (enc *Encoder) Start() error { func (enc *Encoder) Start() error {
go enc.Run() go enc.Run()
return nil return nil
} }
// TODO inverse // Encode adds MPLEXY headers to raw net traffic, and is intended to be used on each client connection
func (enc *Encoder) Encode(rin io.ReadCloser, src Addr) error {
// StreamEncode can (and should) be called multiple times (once per client).
func (enc *Encoder) StreamEncode(src Addr, r io.ReadCloser, bufferSize int) error {
rx := make(chan []byte) rx := make(chan []byte)
rxErr := make(chan error) rxErr := make(chan error)
if 0 == bufferSize {
bufferSize = 8192
}
go func() { go func() {
for { for {
b := make([]byte, bufferSize) b := make([]byte, enc.bufferSize)
//fmt.Println("loopers gonna loop") //fmt.Println("loopers gonna loop")
n, err := r.Read(b) n, err := rin.Read(b)
if n > 0 { if n > 0 {
rx <- b[:n] rx <- b[:n]
} }
@ -81,52 +86,51 @@ func (enc *Encoder) StreamEncode(src Addr, r io.ReadCloser, bufferSize int) erro
// would it be sufficient to expect the reader to be closed by the caller instead? // would it be sufficient to expect the reader to be closed by the caller instead?
case <-enc.ctx.Done(): case <-enc.ctx.Done():
// TODO: verify that closing the reader will cause the goroutine to be released // TODO: verify that closing the reader will cause the goroutine to be released
r.Close() rin.Close()
return errors.New("cancelled by context") return errors.New("cancelled by context")
case <-enc.subctx.Done(): case <-enc.subctx.Done():
r.Close() rin.Close()
return errors.New("cancelled by context") return errors.New("cancelled by context")
case b := <-rx: case b := <-rx:
header, _, err := Encode(src, Addr{}, "", b) header, _, err := Encode(src, Addr{}, "", b)
if nil != err { if nil != err {
r.Close() rin.Close()
return err return err
} }
_, err = enc.write(header) _, err = enc.write(header, b)
if nil != err { if nil != err {
r.Close() rin.Close()
return err
}
_, err = enc.write(b)
if nil != err {
r.Close()
return err return err
} }
case err := <-rxErr: case err := <-rxErr:
// it can be assumed that err will close though, right? // it can be assumed that err will close though, right?
r.Close() rin.Close()
if io.EOF == err { if io.EOF == err {
header, _, _ := Encode(src, Addr{scheme: "end"}, "", nil) header, _, _ := Encode(src, Addr{scheme: "end"}, "", nil)
// ignore err, which may have already closed // ignore err, which may have already closed
_, _ = enc.write(header) _, _ = enc.write(header, nil)
return nil return nil
} }
header, _, _ := Encode(src, Addr{scheme: "error"}, "", []byte(err.Error())) header, _, _ := Encode(src, Addr{scheme: "error"}, "", []byte(err.Error()))
// ignore err, which may have already closed // ignore err, which may have already closed
_, _ = enc.write(header) _, _ = enc.write(header, nil)
return err return err
} }
} }
} }
func (enc *Encoder) write(b []byte) (int, error) { func (enc *Encoder) write(h, b []byte) (int, error) {
// mutex here so that we can get back error info // mutex here so that we can get back error info
enc.mux.Lock() enc.mux.Lock()
n, err := enc.w.Write(b) var m int
n, err := enc.out.Write(h)
if nil == err && len(b) > 0 {
m, err = enc.out.Write(b)
}
enc.mux.Unlock() enc.mux.Unlock()
if nil != err { if nil != err {
enc.wErr <- err enc.outErr <- err
} }
return n, err return n + m, err
} }

View File

@ -10,19 +10,47 @@ import (
func TestEncodeWholeBlock(t *testing.T) { func TestEncodeWholeBlock(t *testing.T) {
ch := make(chan string) ch := make(chan string)
a1 := "A.1: hello"
a2 := "A.2: smello"
b1 := "B.1: hello again"
b2 := "B.2: hello a third time"
m := map[string]bool{
a1: false,
a2: false,
b1: false,
b2: false,
}
go func() { go func() {
for { for {
str := <-ch str := <-ch
fmt.Printf("Read: %q\n", str) // TODO check the headers too
if len(str) > 0 && 0xFE == str[0] {
fmt.Printf("TODO header: %q\n", str)
continue
}
b, ok := m[str]
if !ok {
// possible corruption
t.Fatalf("unexpected string %q", str)
}
if b {
// possible corruption also
t.Fatalf("duplicate string %q", str)
}
m[str] = true
} }
}() }()
ctx := context.Background() ctx := context.Background()
rp, wp := net.Pipe() rin, wout := net.Pipe()
go func() { go func() {
for { for {
b := make([]byte, 1024) b := make([]byte, 1024)
n, err := rp.Read(b) n, err := rin.Read(b)
if nil != err { if nil != err {
fmt.Printf("Error: %s\n", err) fmt.Printf("Error: %s\n", err)
return return
@ -31,25 +59,25 @@ func TestEncodeWholeBlock(t *testing.T) {
ch <- string(r) ch <- string(r)
} }
}() }()
encoder := NewEncoder(ctx, wp) encoder := NewEncoder(ctx, wout)
encoder.Start() encoder.Start()
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
// single client // single client
go func() { go func() {
wp, rp := net.Pipe() wout, rin := net.Pipe()
go func() { go func() {
wp.Write([]byte("hello")) wout.Write([]byte(a1))
wp.Write([]byte("smello")) wout.Write([]byte(a2))
}() }()
err := encoder.StreamEncode(Addr{ err := encoder.Encode(rin, Addr{
family: "IPv4", family: "IPv4",
addr: "192.168.1.102", addr: "192.168.1.102",
port: 4834, port: 4834,
}, rp, 0) })
if nil != err { if nil != err {
fmt.Printf("Enc Err: %q\n", err) fmt.Printf("Enc Err: %q\n", err)
} }
@ -57,22 +85,37 @@ func TestEncodeWholeBlock(t *testing.T) {
// single client // single client
go func() { go func() {
wp, rp := net.Pipe() wout, rin := net.Pipe()
go func() { go func() {
wp.Write([]byte("hello again")) wout.Write([]byte(b1))
wp.Write([]byte("hello a third time")) wout.Write([]byte(b2))
}() }()
err := encoder.StreamEncode(Addr{ err := encoder.Encode(rin, Addr{
family: "IPv4", family: "IPv4",
addr: "192.168.1.103", addr: "192.168.1.103",
port: 4834, port: 4834,
}, rp, 0) })
if nil != err { if nil != err {
fmt.Printf("Enc Err 2: %q\n", err) fmt.Printf("Enc Err 2: %q\n", err)
} }
}() }()
time.Sleep(time.Second) // TODO must be a better way to do this
time.Sleep(10 * time.Millisecond)
for k, v := range m {
if !v {
t.Fatalf("failed to encode and transmit a value: %q", k)
} }
}
}
/*
func TestEncodeEnd(t *testing.T) {
}
func TestEncodeError(t *testing.T) {
}
*/