From a13e45a324317a110b934ab303f302de653f8688 Mon Sep 17 00:00:00 2001 From: zutto Date: Thu, 11 Jan 2018 20:15:20 +0200 Subject: [PATCH] init --- README.md | 9 ++++ TimeoutPipe.go | 110 ++++++++++++++++++++++++++++++++++++++++++ TimeoutPipe_test.go | 115 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 234 insertions(+) create mode 100644 README.md create mode 100644 TimeoutPipe.go create mode 100644 TimeoutPipe_test.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..63559c7 --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +*wip + + +# io.pipe that times out + + +this is a work-in-progress. + +Purpose of this is to intentionally cause r/w to close after certain time. \ No newline at end of file diff --git a/TimeoutPipe.go b/TimeoutPipe.go new file mode 100644 index 0000000..983e706 --- /dev/null +++ b/TimeoutPipe.go @@ -0,0 +1,110 @@ +package TimeoutPipe + +import ( + "errors" + "io" + "time" +) + +var TimeoutErr error = errors.New("Pipe has timed out!") + +//TimeoutPipe is a io.pipe that times out. +//Purpose of this is to stop goroutine r/w deadlocks by intentionally causing error +//less code to implement on the potentially-deadlocking-r/w goroutines, no need to worry about channels or timers. +type TimeoutPipe struct { + BufferSize int + + Reader *io.PipeReader + Writer *io.PipeWriter + + internalReader *io.PipeReader + internalWriter *io.PipeWriter + + timer *time.Timer + timeout time.Duration +} + +//NewTimeoutPipe retusn a new instance of TimeoutPipe interface. +func NewTimeoutPipe() *TimeoutPipe { + + r, Wr := io.Pipe() + Re, w := io.Pipe() + t := TimeoutPipe{ + BufferSize: 1500, + internalReader: r, + internalWriter: w, + Reader: Re, + Writer: Wr, + } + return &t +} + +//Pipe returns instance of io.pipereader & io.pipewriter +func (t *TimeoutPipe) Pipe(timeout time.Duration) (*io.PipeReader, *io.PipeWriter) { + t.timeout = timeout + t.rwProxy() + t.start() + return t.Reader, t.Writer +} + +func (t *TimeoutPipe) start() { + t.timer = time.AfterFunc(t.timeout, func() { + t.closePipes(TimeoutErr) + }) +} + +func (t *TimeoutPipe) rwProxy() { + + go func() { + var buffer []byte = make([]byte, t.BufferSize) + for { + var written int = 0 + + readLength, err := t.internalReader.Read(buffer) + if err != nil || readLength == 0 { + switch err { + case io.EOF: + t.internalWriter.Write(buffer[:readLength]) + t.closePipes(err) + return + case io.ErrClosedPipe: + t.closePipes(err) + return + default: + t.closePipes(err) + return + } + } + t.ResetTimer() + for written < readLength { + w, err := t.internalWriter.Write(buffer[written:readLength]) + written += w + if err != nil { + t.closePipes(err) + } + } + } + }() +} + +func (t *TimeoutPipe) closePipes(e error) { + if e != nil { + t.internalReader.Close() + t.internalWriter.Close() + + t.Reader.CloseWithError(e) + t.Writer.Close() + } else { + t.internalReader.Close() + t.internalWriter.Close() + + t.Reader.Close() + t.Writer.Close() + } +} + +// +func (t *TimeoutPipe) ResetTimer() { + t.timer.Stop() + t.timer.Reset(t.timeout) +} diff --git a/TimeoutPipe_test.go b/TimeoutPipe_test.go new file mode 100644 index 0000000..7310b80 --- /dev/null +++ b/TimeoutPipe_test.go @@ -0,0 +1,115 @@ +package TimeoutPipe + +import ( + "sync" + "testing" + "time" +) + +var dummyData []byte = []byte("testing 12345") +var smallBuffer []byte = make([]byte, 2) +var largeBuffer []byte = make([]byte, 100) + +func TestNoTimeout(t *testing.T) { + + x := NewTimeoutPipe() + read, write := x.Pipe(time.Second * 5) + + _, e := write.Write(dummyData) + if e != nil { + t.Error("Writer died unexpectedly!") + } + r, re := read.Read(largeBuffer) + if re != nil { + t.Error("Reader died unexpectedly!") + } + + if r < len(dummyData) || r > len(dummyData) { + t.Error("unexpected write or read length") + } +} + +//if this function gets stuck -- something is wrong. +func TestTimeout(t *testing.T) { + x := NewTimeoutPipe() + read, _ := x.Pipe(time.Second * 1) + + _, re := read.Read(largeBuffer) + if re != nil { + //this is what we want + } + +} + +func TestBufferedData(t *testing.T) { + wg := sync.WaitGroup{} + x := NewTimeoutPipe() + read, write := x.Pipe(time.Second * 1) + + wg.Add(1) + go func() { + _, e := write.Write(dummyData) + if e != nil { + return + t.Error("Writer died unexpectedly!") + } + + wg.Done() + }() + + wg.Add(1) + go func() { + for { + _, re := read.Read(smallBuffer) + if re != nil { + break + } + } + wg.Done() + }() + + wg.Wait() +} + +func TestWriteLargeData(t *testing.T) { + //we dont actually need to write large data, just set buffer size to very low ^^ + x := NewTimeoutPipe() + x.BufferSize = 2 + read, write := x.Pipe(time.Second * 5) + written := 0 + + go func() { + for { + _, re := read.Read(largeBuffer) + if re != nil { + break + t.Error("Reader died unexpectedly!") + } + + } + }() + for written < len(dummyData) { + w, e := write.Write(dummyData) + if e != nil { + t.Error("Writer died unexpectedly!") + } + written += w + + } + +} + +func TestReset(t *testing.T) { + ti := time.Now() + x := NewTimeoutPipe() + x.Pipe(time.Second * 1) + for i := 0; i < 3; i++ { + x.ResetTimer() + time.Sleep(time.Second * 1) + } + + if time.Since(ti) < 1*time.Second { + t.Error("Timer was not reset properly!") + } + +}