From 1014ac6fd55b4a686646c2709bd483ade36ffbd5 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Wed, 20 May 2020 16:23:58 -0600 Subject: [PATCH] model encodings package more closely --- mplexer/packer/encoder.go | 80 ++++++++++++++++++---------------- mplexer/packer/encoder_test.go | 73 ++++++++++++++++++++++++------- 2 files changed, 100 insertions(+), 53 deletions(-) diff --git a/mplexer/packer/encoder.go b/mplexer/packer/encoder.go index 823d4a6..5b11f3b 100644 --- a/mplexer/packer/encoder.go +++ b/mplexer/packer/encoder.go @@ -7,23 +7,32 @@ import ( "sync" ) +// TODO: try to be more like encoding/csv, or more like encoding/pem and encoding/json? + +// Encoder converts TCP to MPLEXY-TCP type Encoder struct { - ctx context.Context - subctx context.Context - mux sync.Mutex - w io.WriteCloser - wErr chan error + ctx context.Context + subctx context.Context + mux sync.Mutex + out io.WriteCloser + outErr chan error + bufferSize int } -func NewEncoder(ctx context.Context, w io.WriteCloser) *Encoder { +// NewEncoder returns an Encoder instance +func NewEncoder(ctx context.Context, wout io.WriteCloser) *Encoder { enc := &Encoder{ - ctx: ctx, - w: w, - wErr: make(chan error), + ctx: ctx, + out: wout, + outErr: make(chan error), + bufferSize: defaultBufferSize, } return enc } +// Run loops over a select of contexts and error channels +// to cancel and close south-side connections, if needed. +// TODO should this be pushed elsewhere to handled? func (enc *Encoder) Run() error { ctx, cancel := context.WithCancel(enc.ctx) defer cancel() @@ -35,35 +44,31 @@ func (enc *Encoder) Run() error { // TODO: do children respond to children cancelling? case <-enc.ctx.Done(): // TODO - _ = enc.w.Close() + _ = enc.out.Close() return errors.New("context cancelled") - case err := <-enc.wErr: + case err := <-enc.outErr: + // if a write fails for one, it fail for all return err } } } +// Start will Run() the encoder in a goroutine func (enc *Encoder) Start() error { go enc.Run() return nil } -// TODO inverse - -// StreamEncode can (and should) be called multiple times (once per client). -func (enc *Encoder) StreamEncode(src Addr, r io.ReadCloser, bufferSize int) error { +// Encode adds MPLEXY headers to raw net traffic, and is intended to be used on each client connection +func (enc *Encoder) Encode(rin io.ReadCloser, src Addr) error { rx := make(chan []byte) rxErr := make(chan error) - if 0 == bufferSize { - bufferSize = 8192 - } - go func() { for { - b := make([]byte, bufferSize) + b := make([]byte, enc.bufferSize) //fmt.Println("loopers gonna loop") - n, err := r.Read(b) + n, err := rin.Read(b) if n > 0 { rx <- b[:n] } @@ -81,52 +86,51 @@ func (enc *Encoder) StreamEncode(src Addr, r io.ReadCloser, bufferSize int) erro // would it be sufficient to expect the reader to be closed by the caller instead? case <-enc.ctx.Done(): // TODO: verify that closing the reader will cause the goroutine to be released - r.Close() + rin.Close() return errors.New("cancelled by context") case <-enc.subctx.Done(): - r.Close() + rin.Close() return errors.New("cancelled by context") case b := <-rx: header, _, err := Encode(src, Addr{}, "", b) if nil != err { - r.Close() + rin.Close() return err } - _, err = enc.write(header) + _, err = enc.write(header, b) if nil != err { - r.Close() - return err - } - _, err = enc.write(b) - if nil != err { - r.Close() + rin.Close() return err } case err := <-rxErr: // it can be assumed that err will close though, right? - r.Close() + rin.Close() if io.EOF == err { header, _, _ := Encode(src, Addr{scheme: "end"}, "", nil) // ignore err, which may have already closed - _, _ = enc.write(header) + _, _ = enc.write(header, nil) return nil } header, _, _ := Encode(src, Addr{scheme: "error"}, "", []byte(err.Error())) // ignore err, which may have already closed - _, _ = enc.write(header) + _, _ = enc.write(header, nil) return err } } } -func (enc *Encoder) write(b []byte) (int, error) { +func (enc *Encoder) write(h, b []byte) (int, error) { // mutex here so that we can get back error info enc.mux.Lock() - n, err := enc.w.Write(b) + var m int + n, err := enc.out.Write(h) + if nil == err && len(b) > 0 { + m, err = enc.out.Write(b) + } enc.mux.Unlock() if nil != err { - enc.wErr <- err + enc.outErr <- err } - return n, err + return n + m, err } diff --git a/mplexer/packer/encoder_test.go b/mplexer/packer/encoder_test.go index d91d2b9..1761875 100644 --- a/mplexer/packer/encoder_test.go +++ b/mplexer/packer/encoder_test.go @@ -10,19 +10,47 @@ import ( func TestEncodeWholeBlock(t *testing.T) { ch := make(chan string) + + a1 := "A.1: hello" + a2 := "A.2: smello" + b1 := "B.1: hello again" + b2 := "B.2: hello a third time" + m := map[string]bool{ + a1: false, + a2: false, + b1: false, + b2: false, + } + go func() { for { str := <-ch - fmt.Printf("Read: %q\n", str) + // TODO check the headers too + if len(str) > 0 && 0xFE == str[0] { + fmt.Printf("TODO header: %q\n", str) + continue + } + + b, ok := m[str] + if !ok { + // possible corruption + t.Fatalf("unexpected string %q", str) + } + if b { + // possible corruption also + t.Fatalf("duplicate string %q", str) + } + + m[str] = true } }() ctx := context.Background() - rp, wp := net.Pipe() + rin, wout := net.Pipe() go func() { for { b := make([]byte, 1024) - n, err := rp.Read(b) + n, err := rin.Read(b) if nil != err { fmt.Printf("Error: %s\n", err) return @@ -31,25 +59,25 @@ func TestEncodeWholeBlock(t *testing.T) { ch <- string(r) } }() - encoder := NewEncoder(ctx, wp) + encoder := NewEncoder(ctx, wout) encoder.Start() time.Sleep(time.Millisecond) // single client go func() { - wp, rp := net.Pipe() + wout, rin := net.Pipe() go func() { - wp.Write([]byte("hello")) - wp.Write([]byte("smello")) + wout.Write([]byte(a1)) + wout.Write([]byte(a2)) }() - err := encoder.StreamEncode(Addr{ + err := encoder.Encode(rin, Addr{ family: "IPv4", addr: "192.168.1.102", port: 4834, - }, rp, 0) + }) if nil != err { fmt.Printf("Enc Err: %q\n", err) } @@ -57,22 +85,37 @@ func TestEncodeWholeBlock(t *testing.T) { // single client go func() { - wp, rp := net.Pipe() + wout, rin := net.Pipe() go func() { - wp.Write([]byte("hello again")) - wp.Write([]byte("hello a third time")) + wout.Write([]byte(b1)) + wout.Write([]byte(b2)) }() - err := encoder.StreamEncode(Addr{ + err := encoder.Encode(rin, Addr{ family: "IPv4", addr: "192.168.1.103", port: 4834, - }, rp, 0) + }) if nil != err { fmt.Printf("Enc Err 2: %q\n", err) } }() - time.Sleep(time.Second) + // TODO must be a better way to do this + time.Sleep(10 * time.Millisecond) + + for k, v := range m { + if !v { + t.Fatalf("failed to encode and transmit a value: %q", k) + } + } } + +/* +func TestEncodeEnd(t *testing.T) { +} + +func TestEncodeError(t *testing.T) { +} +*/