Skip to content

[stable30] improve handling of large single-part s3 uploads #52759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: stable30
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions apps/dav/lib/Upload/AssemblyStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public function stream_seek($offset, $whence = SEEK_SET) {
$offset = $this->size + $offset;
}

if ($offset === $this->pos) {
return true;
}

if ($offset > $this->size) {
return false;
}
Expand All @@ -95,7 +99,7 @@ public function stream_seek($offset, $whence = SEEK_SET) {

$stream = $this->getStream($this->nodes[$nodeIndex]);
$nodeOffset = $offset - $nodeStart;
if (fseek($stream, $nodeOffset) === -1) {
if ($nodeOffset > 0 && fseek($stream, $nodeOffset) === -1) {
return false;
}
$this->currentNode = $nodeIndex;
Expand Down Expand Up @@ -126,9 +130,14 @@ public function stream_read($count) {
}
}

do {
$collectedData = '';
// read data until we either got all the data requested or there is no more stream left
while ($count > 0 && !is_null($this->currentStream)) {
$data = fread($this->currentStream, $count);
$read = strlen($data);

$count -= $read;
$collectedData .= $data;
$this->currentNodeRead += $read;

if (feof($this->currentStream)) {
Expand All @@ -145,14 +154,11 @@ public function stream_read($count) {
$this->currentStream = null;
}
}
// if no data read, try again with the next node because
// returning empty data can make the caller think there is no more
// data left to read
} while ($read === 0 && !is_null($this->currentStream));
}

// update position
$this->pos += $read;
return $data;
$this->pos += strlen($collectedData);
return $collectedData;
}

/**
Expand Down
22 changes: 19 additions & 3 deletions apps/dav/tests/unit/Upload/AssemblyStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ public function testGetContents($expected, $nodes): void {
/**
* @dataProvider providesNodes()
*/
public function testGetContentsFread($expected, $nodes): void {
public function testGetContentsFread($expected, $nodes, $chunkLength = 3): void {
$stream = \OCA\DAV\Upload\AssemblyStream::wrap($nodes);

$content = '';
while (!feof($stream)) {
$content .= fread($stream, 3);
$chunk = fread($stream, $chunkLength);
$content .= $chunk;
if ($chunkLength !== 3) {
$this->assertEquals($chunkLength, strlen($chunk));
}
}

$this->assertEquals($expected, $content);
Expand Down Expand Up @@ -102,7 +106,19 @@ public function providesNodes() {
]],
'a ton of nodes' => [
$tonofdata, $tonofnodes
]
],
'one read over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 10],
'two reads over multiple nodes' => [
'1234567890', [
$this->buildNode('0', '1234'),
$this->buildNode('1', '5678'),
$this->buildNode('2', '90'),
], 5],
];
}

Expand Down
7 changes: 7 additions & 0 deletions lib/private/Files/ObjectStore/ObjectStoreStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,13 @@ public function file_put_contents($path, $data) {
}

public function writeStream(string $path, $stream, ?int $size = null): int {
if ($size === null) {
$stats = fstat($stream);
if (is_array($stats) && isset($stats['size'])) {
$size = $stats['size'];
}
}

$stat = $this->stat($path);
if (empty($stat)) {
// create new file
Expand Down
33 changes: 23 additions & 10 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,33 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, ?string
* @since 7.0.0
*/
public function writeObject($urn, $stream, ?string $mimetype = null) {
$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
$psrStream = Utils::streamFor($stream);

// ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream
// so the optimisation does not apply
$buffer = new Psr7\Stream(fopen('php://memory', 'rwb+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);

$size = $psrStream->getSize();
if ($size === null || !$canSeek) {
// The s3 single-part upload requires the size to be known for the stream.
// So for input streams that don't have a known size, we need to copy (part of)
// the input into a temporary stream so the size can be determined
$buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
}
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
if ($size < $this->putSizeLimit) {
$this->writeSingle($urn, $psrStream, $mimetype);
} else {
$this->writeMultiPart($urn, $psrStream, $mimetype);
}
}
$psrStream->close();
}

/**
Expand Down
Loading