diff options
author | Colin Marc <hi@colinmarc.com> | 2022-02-08 18:28:23 +0100 |
---|---|---|
committer | Colin Marc <hi@colinmarc.com> | 2022-02-08 18:44:08 +0100 |
commit | 11bad6d16f5dac9909100f1230d7eff23ebf6010 (patch) | |
tree | 715563e4d1c7c27570247ce19e3306088144c8d8 | |
parent | 5298db4b4a9ef3b575e99c3023ab3a8472e389af (diff) |
When seeking forward small amounts, don't reconnect
Currently, when seeking, we always discard the BlockReader and reconnect at the
new offset. This is often wasteful, but especially when seeking forward would
keep us in the same chunk, since on reconnecting we discard the beginning of the
chunk anyway.
This adds a new method to BlockReader, Skip, which attempts to discard bytes in
order to skip forward, and makes use of that for small (<64k) forward seeks.
-rw-r--r-- | file_reader.go | 21 | ||||
-rw-r--r-- | file_reader_test.go | 39 | ||||
-rw-r--r-- | internal/transfer/block_reader.go | 36 |
3 files changed, 81 insertions, 15 deletions
diff --git a/file_reader.go b/file_reader.go index d902095..e1e1879 100644 --- a/file_reader.go +++ b/file_reader.go @@ -141,13 +141,14 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) { } var off int64 - if whence == 0 { + switch whence { + case io.SeekStart: off = offset - } else if whence == 1 { + case io.SeekCurrent: off = f.offset + offset - } else if whence == 2 { + case io.SeekEnd: off = f.info.Size() + offset - } else { + default: return f.offset, fmt.Errorf("invalid whence: %d", whence) } @@ -157,11 +158,23 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) { if f.offset != off { f.offset = off + if f.blockReader != nil { + // If the seek is within the next few chunks, it's much more + // efficient to throw away a few bytes than to reconnect and start + // a read at the new offset. + err := f.blockReader.Skip(f.offset - f.blockReader.Offset) + if err == nil { + return f.offset, nil + } + + // It isn't possible to seek within the current block, so reset such + // that we can connect to the new block. f.blockReader.Close() f.blockReader = nil } } + return f.offset, nil } diff --git a/file_reader_test.go b/file_reader_test.go index 3f435d9..de05ad8 100644 --- a/file_reader_test.go +++ b/file_reader_test.go @@ -23,9 +23,13 @@ const ( testStr = "Abominable are the tumblers into which he pours his poison." testStrOff = 48847 - testStr2 = "http://www.gutenberg.org" - testStr2Off = 1256988 - testStr2NegativeOff = -288 + testStr2 = "tumblers" + testStr2Off = 48866 + testStr2RelativeOff = 19 + + testStr3 = "http://www.gutenberg.org" + testStr3Off = 1256988 + testStr3NegativeOff = -288 testChecksum = "27c076e4987344253650d3335a5d08ce" ) @@ -171,16 +175,16 @@ func TestFileReadAt(t *testing.T) { assert.EqualValues(t, string(buf), testStr) - buf = make([]byte, len(testStr2)) + buf = make([]byte, len(testStr3)) off = 0 for off < len(buf) { - n, err := file.ReadAt(buf[off:], int64(testStr2Off+off)) + n, err := file.ReadAt(buf[off:], int64(testStr3Off+off)) require.NoError(t, err) assert.True(t, n > 0) off += n } - assert.EqualValues(t, testStr2, string(buf)) + assert.EqualValues(t, testStr3, string(buf)) } func TestFileReadAtEOF(t *testing.T) { @@ -240,16 +244,35 @@ func TestFileSeek(t *testing.T) { assert.EqualValues(t, len(testStr), n) assert.EqualValues(t, testStr, string(buf.Bytes())) - // now seek forward to another block and read a string - off, err = file.Seek(testStr2NegativeOff, 2) + // Do a small forward seek within the block. + off, err = file.Seek(testStrOff, 0) + assert.NoError(t, err) + assert.EqualValues(t, testStrOff, off) + br := file.blockReader + + off, err = file.Seek(testStr2RelativeOff, 1) assert.NoError(t, err) assert.EqualValues(t, testStr2Off, off) + // Make sure we didn't reconnect. + assert.Equal(t, br, file.blockReader) + buf.Reset() n, err = io.CopyN(buf, file, int64(len(testStr2))) assert.NoError(t, err) assert.EqualValues(t, len(testStr2), n) assert.EqualValues(t, testStr2, string(buf.Bytes())) + + // now seek forward to another block and read a string + off, err = file.Seek(testStr3NegativeOff, 2) + assert.NoError(t, err) + assert.EqualValues(t, testStr3Off, off) + + buf.Reset() + n, err = io.CopyN(buf, file, int64(len(testStr3))) + assert.NoError(t, err) + assert.EqualValues(t, len(testStr3), n) + assert.EqualValues(t, testStr3, string(buf.Bytes())) } func TestFileReadDir(t *testing.T) { diff --git a/internal/transfer/block_reader.go b/internal/transfer/block_reader.go index 7f07f9b..6b32657 100644 --- a/internal/transfer/block_reader.go +++ b/internal/transfer/block_reader.go @@ -39,6 +39,8 @@ type BlockReader struct { closed bool } +const maxSkip = 65536 + // SetDeadline sets the deadline for future Read calls. A zero value for t // means Read will not time out. func (br *BlockReader) SetDeadline(t time.Time) error { @@ -115,6 +117,33 @@ func (br *BlockReader) Read(b []byte) (int, error) { return 0, err } +// Skip attempts to discard bytes in the stream in order to skip forward. This +// is an optimization for the case that the amount to skip is very small. It +// returns an error if skip was not attempted at all (because the BlockReader +// isn't connected, or the offset is out of bounds or too far ahead) or the seek +// failed for some other reason. +func (br *BlockReader) Skip(off int64) error { + blockSize := int64(br.Block.GetB().GetNumBytes()) + amountToSkip := off - br.Offset + + if br.stream == nil || off < 0 || off >= blockSize || + amountToSkip < 0 || amountToSkip > maxSkip { + return errors.New("unable to skip") + } + + _, err := io.CopyN(io.Discard, br.stream, amountToSkip) + if err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + + br.stream = nil + br.datanodes.recordFailure(err) + } + + return err +} + // Close implements io.Closer. func (br *BlockReader) Close() error { br.closed = true @@ -165,12 +194,13 @@ func (br *BlockReader) connectNext() error { return fmt.Errorf("unsupported checksum type: %d", checksumType) } + chunkOffset := int64(readInfo.GetChunkOffset()) chunkSize := int(checksumInfo.GetBytesPerChecksum()) stream := newBlockReadStream(conn, chunkSize, checksumTab) - // The read will start aligned to a chunk boundary, so we need to seek forward - // to the requested offset. - amountToDiscard := br.Offset - int64(readInfo.GetChunkOffset()) + // The read will start aligned to a chunk boundary, so we need to skip + // forward to the requested offset. + amountToDiscard := br.Offset - chunkOffset if amountToDiscard > 0 { _, err := io.CopyN(ioutil.Discard, stream, amountToDiscard) if err != nil { |