diff options
author | Félix Sipma <felix+debian@gueux.org> | 2018-01-05 11:09:44 +0100 |
---|---|---|
committer | Félix Sipma <felix+debian@gueux.org> | 2018-01-05 11:09:44 +0100 |
commit | 9aa20edb43ab70f1865d4d1ae680939faa46c8b7 (patch) | |
tree | 66742c50531fcce59814c2e55f8fa1f0a2e5463f /api-put-object-multipart.go | |
parent | fd69b3bb0c26856d0938842c0e4e7d3ff959ca3d (diff) | |
parent | 2e53196f9027ebb270b9e9a251ad39383a500c8f (diff) |
Update upstream source from tag 'upstream/4.0.5'
Update to upstream version '4.0.5'
with Debian dir fc9f5488be66217572c92dce9419d3a3fba2cc24
Diffstat (limited to 'api-put-object-multipart.go')
-rw-r--r-- | api-put-object-multipart.go | 139 |
1 files changed, 70 insertions, 69 deletions
diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index 1938378..f5b8893 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -1,5 +1,6 @@ /* - * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc. + * Minio Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2015-2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +19,16 @@ package minio import ( "bytes" + "context" + "encoding/base64" + "encoding/hex" "encoding/xml" "fmt" "io" "io/ioutil" "net/http" "net/url" + "runtime/debug" "sort" "strconv" "strings" @@ -31,9 +36,9 @@ import ( "github.com/minio/minio-go/pkg/s3utils" ) -func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, - metadata map[string][]string, progress io.Reader) (n int64, err error) { - n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress) +func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, + opts PutObjectOptions) (n int64, err error) { + n, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts) if err != nil { errResp := ToErrorResponse(err) // Verify if multipart functionality is not available, if not @@ -44,14 +49,13 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) } // Fall back to uploading as single PutObject operation. - return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) } } return n, err } -func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64, - metadata map[string][]string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) { // Input validation. if err = s3utils.CheckValidBucketName(bucketName); err != nil { return 0, err @@ -68,85 +72,93 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader var complMultipartUpload completeMultipartUpload // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, _, err := optimalPartInfo(size) + totalPartsCount, partSize, _, err := optimalPartInfo(-1) if err != nil { return 0, err } // Initiate a new multipart upload. - uploadID, err := c.newUploadID(bucketName, objectName, metadata) + uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) if err != nil { return 0, err } defer func() { if err != nil { - c.abortMultipartUpload(bucketName, objectName, uploadID) + c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) } }() // Part number always starts with '1'. partNumber := 1 - // Initialize a temporary buffer. - tmpBuffer := new(bytes.Buffer) - // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) + // Create a buffer. + buf := make([]byte, partSize) + defer debug.FreeOSMemory() + for partNumber <= totalPartsCount { // Choose hash algorithms to be calculated by hashCopyN, // avoid sha256 with non-v4 signature request or // HTTPS connection. hashAlgos, hashSums := c.hashMaterials() - // Calculates hash sums while copying partSize bytes into tmpBuffer. - prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize) - if rErr != nil && rErr != io.EOF { + length, rErr := io.ReadFull(reader, buf) + if rErr == io.EOF { + break + } + if rErr != nil && rErr != io.ErrUnexpectedEOF { return 0, rErr } - var reader io.Reader + // Calculates hash sums while copying partSize bytes into cw. + for k, v := range hashAlgos { + v.Write(buf[:length]) + hashSums[k] = v.Sum(nil) + } + // Update progress reader appropriately to the latest offset // as we read from the source. - reader = newHook(tmpBuffer, progress) + rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) + + // Checksums.. + var ( + md5Base64 string + sha256Hex string + ) + if hashSums["md5"] != nil { + md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"]) + } + if hashSums["sha256"] != nil { + sha256Hex = hex.EncodeToString(hashSums["sha256"]) + } // Proceed to upload the part. var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, - hashSums["md5"], hashSums["sha256"], prtSize, metadata) + objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, + md5Base64, sha256Hex, int64(length), opts.UserMetadata) if err != nil { - // Reset the temporary buffer upon any error. - tmpBuffer.Reset() return totalUploadedSize, err } // Save successfully uploaded part metadata. partsInfo[partNumber] = objPart - // Reset the temporary buffer. - tmpBuffer.Reset() - // Save successfully uploaded size. - totalUploadedSize += prtSize + totalUploadedSize += int64(length) // Increment part number. partNumber++ // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. - if size < 0 && rErr == io.EOF { + if rErr == io.EOF { break } } - // Verify if we uploaded all the data. - if size > 0 { - if totalUploadedSize != size { - return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) - } - } - // Loop over total uploaded parts to save them in // Parts array before completing the multipart request. for i := 1; i < partNumber; i++ { @@ -162,7 +174,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // Sort all completed parts. sort.Sort(completedParts(complMultipartUpload.Parts)) - if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil { + if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil { return totalUploadedSize, err } @@ -171,7 +183,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader } // initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. -func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata map[string][]string) (initiateMultipartUploadResult, error) { +func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { return initiateMultipartUploadResult{}, err @@ -185,17 +197,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata urlValues.Set("uploads", "") // Set ContentType header. - customHeader := make(http.Header) - for k, v := range metadata { - if len(v) > 0 { - customHeader.Set(k, v[0]) - } - } - - // Set a default content-type header if the latter is not provided - if v, ok := metadata["Content-Type"]; !ok || len(v) == 0 { - customHeader.Set("Content-Type", "application/octet-stream") - } + customHeader := opts.Header() reqMetadata := requestMetadata{ bucketName: bucketName, @@ -205,7 +207,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata } // Execute POST on an objectName to initiate multipart upload. - resp, err := c.executeMethod("POST", reqMetadata) + resp, err := c.executeMethod(ctx, "POST", reqMetadata) defer closeResponse(resp) if err != nil { return initiateMultipartUploadResult{}, err @@ -227,8 +229,8 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata const serverEncryptionKeyPrefix = "x-amz-server-side-encryption" // uploadPart - Uploads a part in a multipart upload. -func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader, - partNumber int, md5Sum, sha256Sum []byte, size int64, metadata map[string][]string) (ObjectPart, error) { +func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader, + partNumber int, md5Base64, sha256Hex string, size int64, metadata map[string]string) (ObjectPart, error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { return ObjectPart{}, err @@ -261,24 +263,24 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re for k, v := range metadata { if len(v) > 0 { if strings.HasPrefix(strings.ToLower(k), serverEncryptionKeyPrefix) { - customHeader.Set(k, v[0]) + customHeader.Set(k, v) } } } reqMetadata := requestMetadata{ - bucketName: bucketName, - objectName: objectName, - queryValues: urlValues, - customHeader: customHeader, - contentBody: reader, - contentLength: size, - contentMD5Bytes: md5Sum, - contentSHA256Bytes: sha256Sum, + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + customHeader: customHeader, + contentBody: reader, + contentLength: size, + contentMD5Base64: md5Base64, + contentSHA256Hex: sha256Hex, } // Execute PUT on each part. - resp, err := c.executeMethod("PUT", reqMetadata) + resp, err := c.executeMethod(ctx, "PUT", reqMetadata) defer closeResponse(resp) if err != nil { return ObjectPart{}, err @@ -299,7 +301,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re } // completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. -func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, +func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string, complete completeMultipartUpload) (completeMultipartUploadResult, error) { // Input validation. if err := s3utils.CheckValidBucketName(bucketName); err != nil { @@ -312,7 +314,6 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, // Initialize url queries. urlValues := make(url.Values) urlValues.Set("uploadId", uploadID) - // Marshal complete multipart body. completeMultipartUploadBytes, err := xml.Marshal(complete) if err != nil { @@ -322,16 +323,16 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, // Instantiate all the complete multipart buffer. completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes) reqMetadata := requestMetadata{ - bucketName: bucketName, - objectName: objectName, - queryValues: urlValues, - contentBody: completeMultipartUploadBuffer, - contentLength: int64(len(completeMultipartUploadBytes)), - contentSHA256Bytes: sum256(completeMultipartUploadBytes), + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + contentBody: completeMultipartUploadBuffer, + contentLength: int64(len(completeMultipartUploadBytes)), + contentSHA256Hex: sum256Hex(completeMultipartUploadBytes), } // Execute POST to complete multipart upload for an objectName. - resp, err := c.executeMethod("POST", reqMetadata) + resp, err := c.executeMethod(ctx, "POST", reqMetadata) defer closeResponse(resp) if err != nil { return completeMultipartUploadResult{}, err |