summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorColin Marc <hi@colinmarc.com>2022-02-08 18:28:23 +0100
committerColin Marc <hi@colinmarc.com>2022-02-08 18:44:08 +0100
commit11bad6d16f5dac9909100f1230d7eff23ebf6010 (patch)
tree715563e4d1c7c27570247ce19e3306088144c8d8
parent5298db4b4a9ef3b575e99c3023ab3a8472e389af (diff)
When seeking forward small amounts, don't reconnect
Currently, when seeking, we always discard the BlockReader and reconnect at the new offset. This is often wasteful, but especially when seeking forward would keep us in the same chunk, since on reconnecting we discard the beginning of the chunk anyway. This adds a new method to BlockReader, Skip, which attempts to discard bytes in order to skip forward, and makes use of that for small (<64k) forward seeks.
-rw-r--r--file_reader.go21
-rw-r--r--file_reader_test.go39
-rw-r--r--internal/transfer/block_reader.go36
3 files changed, 81 insertions, 15 deletions
diff --git a/file_reader.go b/file_reader.go
index d902095..e1e1879 100644
--- a/file_reader.go
+++ b/file_reader.go
@@ -141,13 +141,14 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) {
}
var off int64
- if whence == 0 {
+ switch whence {
+ case io.SeekStart:
off = offset
- } else if whence == 1 {
+ case io.SeekCurrent:
off = f.offset + offset
- } else if whence == 2 {
+ case io.SeekEnd:
off = f.info.Size() + offset
- } else {
+ default:
return f.offset, fmt.Errorf("invalid whence: %d", whence)
}
@@ -157,11 +158,23 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) {
if f.offset != off {
f.offset = off
+
if f.blockReader != nil {
+ // If the seek is within the next few chunks, it's much more
+ // efficient to throw away a few bytes than to reconnect and start
+ // a read at the new offset.
+ err := f.blockReader.Skip(f.offset - f.blockReader.Offset)
+ if err == nil {
+ return f.offset, nil
+ }
+
+ // It isn't possible to seek within the current block, so reset such
+ // that we can connect to the new block.
f.blockReader.Close()
f.blockReader = nil
}
}
+
return f.offset, nil
}
diff --git a/file_reader_test.go b/file_reader_test.go
index 3f435d9..de05ad8 100644
--- a/file_reader_test.go
+++ b/file_reader_test.go
@@ -23,9 +23,13 @@ const (
testStr = "Abominable are the tumblers into which he pours his poison."
testStrOff = 48847
- testStr2 = "http://www.gutenberg.org"
- testStr2Off = 1256988
- testStr2NegativeOff = -288
+ testStr2 = "tumblers"
+ testStr2Off = 48866
+ testStr2RelativeOff = 19
+
+ testStr3 = "http://www.gutenberg.org"
+ testStr3Off = 1256988
+ testStr3NegativeOff = -288
testChecksum = "27c076e4987344253650d3335a5d08ce"
)
@@ -171,16 +175,16 @@ func TestFileReadAt(t *testing.T) {
assert.EqualValues(t, string(buf), testStr)
- buf = make([]byte, len(testStr2))
+ buf = make([]byte, len(testStr3))
off = 0
for off < len(buf) {
- n, err := file.ReadAt(buf[off:], int64(testStr2Off+off))
+ n, err := file.ReadAt(buf[off:], int64(testStr3Off+off))
require.NoError(t, err)
assert.True(t, n > 0)
off += n
}
- assert.EqualValues(t, testStr2, string(buf))
+ assert.EqualValues(t, testStr3, string(buf))
}
func TestFileReadAtEOF(t *testing.T) {
@@ -240,16 +244,35 @@ func TestFileSeek(t *testing.T) {
assert.EqualValues(t, len(testStr), n)
assert.EqualValues(t, testStr, string(buf.Bytes()))
- // now seek forward to another block and read a string
- off, err = file.Seek(testStr2NegativeOff, 2)
+ // Do a small forward seek within the block.
+ off, err = file.Seek(testStrOff, 0)
+ assert.NoError(t, err)
+ assert.EqualValues(t, testStrOff, off)
+ br := file.blockReader
+
+ off, err = file.Seek(testStr2RelativeOff, 1)
assert.NoError(t, err)
assert.EqualValues(t, testStr2Off, off)
+ // Make sure we didn't reconnect.
+ assert.Equal(t, br, file.blockReader)
+
buf.Reset()
n, err = io.CopyN(buf, file, int64(len(testStr2)))
assert.NoError(t, err)
assert.EqualValues(t, len(testStr2), n)
assert.EqualValues(t, testStr2, string(buf.Bytes()))
+
+ // now seek forward to another block and read a string
+ off, err = file.Seek(testStr3NegativeOff, 2)
+ assert.NoError(t, err)
+ assert.EqualValues(t, testStr3Off, off)
+
+ buf.Reset()
+ n, err = io.CopyN(buf, file, int64(len(testStr3)))
+ assert.NoError(t, err)
+ assert.EqualValues(t, len(testStr3), n)
+ assert.EqualValues(t, testStr3, string(buf.Bytes()))
}
func TestFileReadDir(t *testing.T) {
diff --git a/internal/transfer/block_reader.go b/internal/transfer/block_reader.go
index 7f07f9b..6b32657 100644
--- a/internal/transfer/block_reader.go
+++ b/internal/transfer/block_reader.go
@@ -39,6 +39,8 @@ type BlockReader struct {
closed bool
}
+const maxSkip = 65536
+
// SetDeadline sets the deadline for future Read calls. A zero value for t
// means Read will not time out.
func (br *BlockReader) SetDeadline(t time.Time) error {
@@ -115,6 +117,33 @@ func (br *BlockReader) Read(b []byte) (int, error) {
return 0, err
}
+// Skip attempts to discard bytes in the stream in order to skip forward. This
+// is an optimization for the case that the amount to skip is very small. It
+// returns an error if skip was not attempted at all (because the BlockReader
+// isn't connected, or the offset is out of bounds or too far ahead) or the seek
+// failed for some other reason.
+func (br *BlockReader) Skip(off int64) error {
+ blockSize := int64(br.Block.GetB().GetNumBytes())
+ amountToSkip := off - br.Offset
+
+ if br.stream == nil || off < 0 || off >= blockSize ||
+ amountToSkip < 0 || amountToSkip > maxSkip {
+ return errors.New("unable to skip")
+ }
+
+ _, err := io.CopyN(io.Discard, br.stream, amountToSkip)
+ if err != nil {
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+
+ br.stream = nil
+ br.datanodes.recordFailure(err)
+ }
+
+ return err
+}
+
// Close implements io.Closer.
func (br *BlockReader) Close() error {
br.closed = true
@@ -165,12 +194,13 @@ func (br *BlockReader) connectNext() error {
return fmt.Errorf("unsupported checksum type: %d", checksumType)
}
+ chunkOffset := int64(readInfo.GetChunkOffset())
chunkSize := int(checksumInfo.GetBytesPerChecksum())
stream := newBlockReadStream(conn, chunkSize, checksumTab)
- // The read will start aligned to a chunk boundary, so we need to seek forward
- // to the requested offset.
- amountToDiscard := br.Offset - int64(readInfo.GetChunkOffset())
+ // The read will start aligned to a chunk boundary, so we need to skip
+ // forward to the requested offset.
+ amountToDiscard := br.Offset - chunkOffset
if amountToDiscard > 0 {
_, err := io.CopyN(ioutil.Discard, stream, amountToDiscard)
if err != nil {