summaryrefslogtreecommitdiff
path: root/api-put-object-multipart.go
diff options
context:
space:
mode:
authorFélix Sipma <felix+debian@gueux.org>2018-01-05 11:09:44 +0100
committerFélix Sipma <felix+debian@gueux.org>2018-01-05 11:09:44 +0100
commit9aa20edb43ab70f1865d4d1ae680939faa46c8b7 (patch)
tree66742c50531fcce59814c2e55f8fa1f0a2e5463f /api-put-object-multipart.go
parentfd69b3bb0c26856d0938842c0e4e7d3ff959ca3d (diff)
parent2e53196f9027ebb270b9e9a251ad39383a500c8f (diff)
Update upstream source from tag 'upstream/4.0.5'
Update to upstream version '4.0.5' with Debian dir fc9f5488be66217572c92dce9419d3a3fba2cc24
Diffstat (limited to 'api-put-object-multipart.go')
-rw-r--r--api-put-object-multipart.go139
1 files changed, 70 insertions, 69 deletions
diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go
index 1938378..f5b8893 100644
--- a/api-put-object-multipart.go
+++ b/api-put-object-multipart.go
@@ -1,5 +1,6 @@
/*
- * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage
+ * Copyright 2015-2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,12 +19,16 @@ package minio
import (
"bytes"
+ "context"
+ "encoding/base64"
+ "encoding/hex"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
+ "runtime/debug"
"sort"
"strconv"
"strings"
@@ -31,9 +36,9 @@ import (
"github.com/minio/minio-go/pkg/s3utils"
)
-func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
- metadata map[string][]string, progress io.Reader) (n int64, err error) {
- n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress)
+func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
+ opts PutObjectOptions) (n int64, err error) {
+ n, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
@@ -44,14 +49,13 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
}
return n, err
}
-func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64,
- metadata map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
@@ -68,85 +72,93 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload
// Calculate the optimal parts info for a given size.
- totalPartsCount, partSize, _, err := optimalPartInfo(size)
+ totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
// Initiate a new multipart upload.
- uploadID, err := c.newUploadID(bucketName, objectName, metadata)
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return 0, err
}
defer func() {
if err != nil {
- c.abortMultipartUpload(bucketName, objectName, uploadID)
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()
// Part number always starts with '1'.
partNumber := 1
- // Initialize a temporary buffer.
- tmpBuffer := new(bytes.Buffer)
-
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
+ // Create a buffer.
+ buf := make([]byte, partSize)
+ defer debug.FreeOSMemory()
+
for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()
- // Calculates hash sums while copying partSize bytes into tmpBuffer.
- prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
- if rErr != nil && rErr != io.EOF {
+ length, rErr := io.ReadFull(reader, buf)
+ if rErr == io.EOF {
+ break
+ }
+ if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, rErr
}
- var reader io.Reader
+ // Calculates hash sums while copying partSize bytes into cw.
+ for k, v := range hashAlgos {
+ v.Write(buf[:length])
+ hashSums[k] = v.Sum(nil)
+ }
+
// Update progress reader appropriately to the latest offset
// as we read from the source.
- reader = newHook(tmpBuffer, progress)
+ rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
+
+ // Checksums..
+ var (
+ md5Base64 string
+ sha256Hex string
+ )
+ if hashSums["md5"] != nil {
+ md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
+ }
+ if hashSums["sha256"] != nil {
+ sha256Hex = hex.EncodeToString(hashSums["sha256"])
+ }
// Proceed to upload the part.
var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
- hashSums["md5"], hashSums["sha256"], prtSize, metadata)
+ objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
+ md5Base64, sha256Hex, int64(length), opts.UserMetadata)
if err != nil {
- // Reset the temporary buffer upon any error.
- tmpBuffer.Reset()
return totalUploadedSize, err
}
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
- // Reset the temporary buffer.
- tmpBuffer.Reset()
-
// Save successfully uploaded size.
- totalUploadedSize += prtSize
+ totalUploadedSize += int64(length)
// Increment part number.
partNumber++
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
- if size < 0 && rErr == io.EOF {
+ if rErr == io.EOF {
break
}
}
- // Verify if we uploaded all the data.
- if size > 0 {
- if totalUploadedSize != size {
- return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
- }
- }
-
// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
@@ -162,7 +174,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil {
+ if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
return totalUploadedSize, err
}
@@ -171,7 +183,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
}
// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
-func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata map[string][]string) (initiateMultipartUploadResult, error) {
+func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return initiateMultipartUploadResult{}, err
@@ -185,17 +197,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
urlValues.Set("uploads", "")
// Set ContentType header.
- customHeader := make(http.Header)
- for k, v := range metadata {
- if len(v) > 0 {
- customHeader.Set(k, v[0])
- }
- }
-
- // Set a default content-type header if the latter is not provided
- if v, ok := metadata["Content-Type"]; !ok || len(v) == 0 {
- customHeader.Set("Content-Type", "application/octet-stream")
- }
+ customHeader := opts.Header()
reqMetadata := requestMetadata{
bucketName: bucketName,
@@ -205,7 +207,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
}
// Execute POST on an objectName to initiate multipart upload.
- resp, err := c.executeMethod("POST", reqMetadata)
+ resp, err := c.executeMethod(ctx, "POST", reqMetadata)
defer closeResponse(resp)
if err != nil {
return initiateMultipartUploadResult{}, err
@@ -227,8 +229,8 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
const serverEncryptionKeyPrefix = "x-amz-server-side-encryption"
// uploadPart - Uploads a part in a multipart upload.
-func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader,
- partNumber int, md5Sum, sha256Sum []byte, size int64, metadata map[string][]string) (ObjectPart, error) {
+func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
+ partNumber int, md5Base64, sha256Hex string, size int64, metadata map[string]string) (ObjectPart, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectPart{}, err
@@ -261,24 +263,24 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
for k, v := range metadata {
if len(v) > 0 {
if strings.HasPrefix(strings.ToLower(k), serverEncryptionKeyPrefix) {
- customHeader.Set(k, v[0])
+ customHeader.Set(k, v)
}
}
}
reqMetadata := requestMetadata{
- bucketName: bucketName,
- objectName: objectName,
- queryValues: urlValues,
- customHeader: customHeader,
- contentBody: reader,
- contentLength: size,
- contentMD5Bytes: md5Sum,
- contentSHA256Bytes: sha256Sum,
+ bucketName: bucketName,
+ objectName: objectName,
+ queryValues: urlValues,
+ customHeader: customHeader,
+ contentBody: reader,
+ contentLength: size,
+ contentMD5Base64: md5Base64,
+ contentSHA256Hex: sha256Hex,
}
// Execute PUT on each part.
- resp, err := c.executeMethod("PUT", reqMetadata)
+ resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
defer closeResponse(resp)
if err != nil {
return ObjectPart{}, err
@@ -299,7 +301,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
}
// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
-func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
+func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
complete completeMultipartUpload) (completeMultipartUploadResult, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
@@ -312,7 +314,6 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
// Initialize url queries.
urlValues := make(url.Values)
urlValues.Set("uploadId", uploadID)
-
// Marshal complete multipart body.
completeMultipartUploadBytes, err := xml.Marshal(complete)
if err != nil {
@@ -322,16 +323,16 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
// Instantiate all the complete multipart buffer.
completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
reqMetadata := requestMetadata{
- bucketName: bucketName,
- objectName: objectName,
- queryValues: urlValues,
- contentBody: completeMultipartUploadBuffer,
- contentLength: int64(len(completeMultipartUploadBytes)),
- contentSHA256Bytes: sum256(completeMultipartUploadBytes),
+ bucketName: bucketName,
+ objectName: objectName,
+ queryValues: urlValues,
+ contentBody: completeMultipartUploadBuffer,
+ contentLength: int64(len(completeMultipartUploadBytes)),
+ contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
}
// Execute POST to complete multipart upload for an objectName.
- resp, err := c.executeMethod("POST", reqMetadata)
+ resp, err := c.executeMethod(ctx, "POST", reqMetadata)
defer closeResponse(resp)
if err != nil {
return completeMultipartUploadResult{}, err