From e2c8a920892bd59e85df82864fc7f70f3f262951 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Wed, 20 May 2020 16:52:06 -0600 Subject: [PATCH] cont: model encodings --- mplexer/packer/encoder.go | 31 +++++++++++++------------------ mplexer/packer/encoder_test.go | 24 ++++++++++++++++++------ 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/mplexer/packer/encoder.go b/mplexer/packer/encoder.go index 5b11f3b..b65818f 100644 --- a/mplexer/packer/encoder.go +++ b/mplexer/packer/encoder.go @@ -11,16 +11,17 @@ import ( // Encoder converts TCP to MPLEXY-TCP type Encoder struct { - ctx context.Context - subctx context.Context - mux sync.Mutex - out io.WriteCloser + ctx context.Context + subctx context.Context + mux sync.Mutex + //out io.WriteCloser + out io.Writer outErr chan error bufferSize int } // NewEncoder returns an Encoder instance -func NewEncoder(ctx context.Context, wout io.WriteCloser) *Encoder { +func NewEncoder(ctx context.Context, wout io.Writer) *Encoder { enc := &Encoder{ ctx: ctx, out: wout, @@ -44,7 +45,7 @@ func (enc *Encoder) Run() error { // TODO: do children respond to children cancelling? case <-enc.ctx.Done(): // TODO - _ = enc.out.Close() + //_ = enc.out.Close() return errors.New("context cancelled") case err := <-enc.outErr: // if a write fails for one, it fail for all @@ -53,14 +54,8 @@ func (enc *Encoder) Run() error { } } -// Start will Run() the encoder in a goroutine -func (enc *Encoder) Start() error { - go enc.Run() - return nil -} - // Encode adds MPLEXY headers to raw net traffic, and is intended to be used on each client connection -func (enc *Encoder) Encode(rin io.ReadCloser, src Addr) error { +func (enc *Encoder) Encode(rin io.Reader, src Addr) error { rx := make(chan []byte) rxErr := make(chan error) @@ -86,25 +81,25 @@ func (enc *Encoder) Encode(rin io.ReadCloser, src Addr) error { // would it be sufficient to expect the reader to be closed by the caller instead? case <-enc.ctx.Done(): // TODO: verify that closing the reader will cause the goroutine to be released - rin.Close() + //rin.Close() return errors.New("cancelled by context") case <-enc.subctx.Done(): - rin.Close() + //rin.Close() return errors.New("cancelled by context") case b := <-rx: header, _, err := Encode(src, Addr{}, "", b) if nil != err { - rin.Close() + //rin.Close() return err } _, err = enc.write(header, b) if nil != err { - rin.Close() + //rin.Close() return err } case err := <-rxErr: // it can be assumed that err will close though, right? - rin.Close() + //rin.Close() if io.EOF == err { header, _, _ := Encode(src, Addr{scheme: "end"}, "", nil) // ignore err, which may have already closed diff --git a/mplexer/packer/encoder_test.go b/mplexer/packer/encoder_test.go index 1761875..4620fa6 100644 --- a/mplexer/packer/encoder_test.go +++ b/mplexer/packer/encoder_test.go @@ -3,6 +3,7 @@ package packer import ( "context" "fmt" + "io" "net" "testing" "time" @@ -45,14 +46,15 @@ func TestEncodeWholeBlock(t *testing.T) { } }() - ctx := context.Background() + // TODO nix context here + ctx := context.TODO() rin, wout := net.Pipe() go func() { for { b := make([]byte, 1024) n, err := rin.Read(b) if nil != err { - fmt.Printf("Error: %s\n", err) + t.Fatalf("Error: %s\n", err) return } r := b[:n] @@ -60,8 +62,18 @@ func TestEncodeWholeBlock(t *testing.T) { } }() encoder := NewEncoder(ctx, wout) - encoder.Start() + go func() { + err := encoder.Run() + if nil != err { + if io.EOF != err { + t.Fatalf("Encoder Run Err: %q\n", err) + } + } + wout.Close() + }() + + // TODO eliminate Run and don't sleep here time.Sleep(time.Millisecond) // single client @@ -79,7 +91,7 @@ func TestEncodeWholeBlock(t *testing.T) { port: 4834, }) if nil != err { - fmt.Printf("Enc Err: %q\n", err) + t.Fatalf("Enc Err 1: %q\n", err) } }() @@ -98,11 +110,11 @@ func TestEncodeWholeBlock(t *testing.T) { port: 4834, }) if nil != err { - fmt.Printf("Enc Err 2: %q\n", err) + t.Fatalf("Enc Err 2: %q\n", err) } }() - // TODO must be a better way to do this + // TODO use wait group time.Sleep(10 * time.Millisecond) for k, v := range m {