summaryrefslogtreecommitdiff
path: root/api-put-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'api-put-object.go')
-rw-r--r--api-put-object.go328
1 files changed, 182 insertions, 146 deletions
diff --git a/api-put-object.go b/api-put-object.go
index 2ea4987..1fda1bc 100644
--- a/api-put-object.go
+++ b/api-put-object.go
@@ -1,5 +1,6 @@
/*
- * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage
+ * Copyright 2015-2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,117 +18,84 @@
package minio
import (
+ "bytes"
+ "context"
+ "fmt"
"io"
- "os"
- "reflect"
- "runtime"
- "strings"
+ "net/http"
+ "runtime/debug"
+ "sort"
- "github.com/minio/minio-go/pkg/credentials"
+ "github.com/minio/minio-go/pkg/encrypt"
"github.com/minio/minio-go/pkg/s3utils"
)
-// toInt - converts go value to its integer representation based
-// on the value kind if it is an integer.
-func toInt(value reflect.Value) (size int64) {
- size = -1
- if value.IsValid() {
- switch value.Kind() {
- case reflect.Int:
- fallthrough
- case reflect.Int8:
- fallthrough
- case reflect.Int16:
- fallthrough
- case reflect.Int32:
- fallthrough
- case reflect.Int64:
- size = value.Int()
- }
+// PutObjectOptions represents options specified by user for PutObject call
+type PutObjectOptions struct {
+ UserMetadata map[string]string
+ Progress io.Reader
+ ContentType string
+ ContentEncoding string
+ ContentDisposition string
+ CacheControl string
+ EncryptMaterials encrypt.Materials
+ NumThreads uint
+}
+
+// getNumThreads - gets the number of threads to be used in the multipart
+// put object operation
+func (opts PutObjectOptions) getNumThreads() (numThreads int) {
+ if opts.NumThreads > 0 {
+ numThreads = int(opts.NumThreads)
+ } else {
+ numThreads = totalWorkers
}
- return size
+ return
}
-// getReaderSize - Determine the size of Reader if available.
-func getReaderSize(reader io.Reader) (size int64, err error) {
- size = -1
- if reader == nil {
- return -1, nil
- }
- // Verify if there is a method by name 'Size'.
- sizeFn := reflect.ValueOf(reader).MethodByName("Size")
- // Verify if there is a method by name 'Len'.
- lenFn := reflect.ValueOf(reader).MethodByName("Len")
- if sizeFn.IsValid() {
- if sizeFn.Kind() == reflect.Func {
- // Call the 'Size' function and save its return value.
- result := sizeFn.Call([]reflect.Value{})
- if len(result) == 1 {
- size = toInt(result[0])
- }
- }
- } else if lenFn.IsValid() {
- if lenFn.Kind() == reflect.Func {
- // Call the 'Len' function and save its return value.
- result := lenFn.Call([]reflect.Value{})
- if len(result) == 1 {
- size = toInt(result[0])
- }
- }
+// Header - constructs the headers from metadata entered by user in
+// PutObjectOptions struct
+func (opts PutObjectOptions) Header() (header http.Header) {
+ header = make(http.Header)
+
+ if opts.ContentType != "" {
+ header["Content-Type"] = []string{opts.ContentType}
} else {
- // Fallback to Stat() method, two possible Stat() structs exist.
- switch v := reader.(type) {
- case *os.File:
- var st os.FileInfo
- st, err = v.Stat()
- if err != nil {
- // Handle this case specially for "windows",
- // certain files for example 'Stdin', 'Stdout' and
- // 'Stderr' it is not allowed to fetch file information.
- if runtime.GOOS == "windows" {
- if strings.Contains(err.Error(), "GetFileInformationByHandle") {
- return -1, nil
- }
- }
- return
- }
- // Ignore if input is a directory, throw an error.
- if st.Mode().IsDir() {
- return -1, ErrInvalidArgument("Input file cannot be a directory.")
- }
- // Ignore 'Stdin', 'Stdout' and 'Stderr', since they
- // represent *os.File type but internally do not
- // implement Seekable calls. Ignore them and treat
- // them like a stream with unknown length.
- switch st.Name() {
- case "stdin", "stdout", "stderr":
- return
- // Ignore read/write stream of os.Pipe() which have unknown length too.
- case "|0", "|1":
- return
- }
- var pos int64
- pos, err = v.Seek(0, 1) // SeekCurrent.
- if err != nil {
- return -1, err
- }
- size = st.Size() - pos
- case *Object:
- var st ObjectInfo
- st, err = v.Stat()
- if err != nil {
- return
- }
- var pos int64
- pos, err = v.Seek(0, 1) // SeekCurrent.
- if err != nil {
- return -1, err
- }
- size = st.Size - pos
+ header["Content-Type"] = []string{"application/octet-stream"}
+ }
+ if opts.ContentEncoding != "" {
+ header["Content-Encoding"] = []string{opts.ContentEncoding}
+ }
+ if opts.ContentDisposition != "" {
+ header["Content-Disposition"] = []string{opts.ContentDisposition}
+ }
+ if opts.CacheControl != "" {
+ header["Cache-Control"] = []string{opts.CacheControl}
+ }
+ if opts.EncryptMaterials != nil {
+ header[amzHeaderIV] = []string{opts.EncryptMaterials.GetIV()}
+ header[amzHeaderKey] = []string{opts.EncryptMaterials.GetKey()}
+ header[amzHeaderMatDesc] = []string{opts.EncryptMaterials.GetDesc()}
+ }
+ for k, v := range opts.UserMetadata {
+ if !isAmzHeader(k) && !isStandardHeader(k) && !isSSEHeader(k) {
+ header["X-Amz-Meta-"+k] = []string{v}
+ } else {
+ header[k] = []string{v}
+ }
+ }
+ return
+}
+
+// validate() checks if the UserMetadata map has standard headers or client side
+// encryption headers and raises an error if so.
+func (opts PutObjectOptions) validate() (err error) {
+ for k := range opts.UserMetadata {
+ if isStandardHeader(k) || isCSEHeader(k) {
+ return ErrInvalidArgument(k + " unsupported request parameter for user defined metadata")
}
}
- // Returns the size here.
- return size, err
+ return nil
}
// completedParts is a collection of parts sortable by their part numbers.
@@ -149,39 +117,12 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// - For size input as -1 PutObject does a multipart Put operation
// until input stream reaches EOF. Maximum object size that can
// be uploaded through this operation will be 5TiB.
-func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) {
- return c.PutObjectWithMetadata(bucketName, objectName, reader, map[string][]string{
- "Content-Type": []string{contentType},
- }, nil)
+func (c Client) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64,
+ opts PutObjectOptions) (n int64, err error) {
+ return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, objectSize, opts)
}
-// PutObjectWithSize - is a helper PutObject similar in behavior to PutObject()
-// but takes the size argument explicitly, this function avoids doing reflection
-// internally to figure out the size of input stream. Also if the input size is
-// lesser than 0 this function returns an error.
-func (c Client) PutObjectWithSize(bucketName, objectName string, reader io.Reader, readerSize int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- return c.putObjectCommon(bucketName, objectName, reader, readerSize, metadata, progress)
-}
-
-// PutObjectWithMetadata using AWS streaming signature V4
-func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- return c.PutObjectWithProgress(bucketName, objectName, reader, metadata, progress)
-}
-
-// PutObjectWithProgress using AWS streaming signature V4
-func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
- // Size of the object.
- var size int64
-
- // Get reader size.
- size, err = getReaderSize(reader)
- if err != nil {
- return 0, err
- }
- return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
-}
-
-func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
@@ -190,29 +131,124 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
// NOTE: Streaming signature is not supported by GCS.
if s3utils.IsGoogleEndpoint(c.endpointURL) {
// Do not compute MD5 for Google Cloud Storage.
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
if c.overrideSignerType.IsV2() {
- if size > 0 && size < minPartSize {
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ if size >= 0 && size < minPartSize {
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
- return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
}
-
- // If size cannot be found on a stream, it is not possible
- // to upload using streaming signature, fall back to multipart.
if size < 0 {
- return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}
- // Set streaming signature.
- c.overrideSignerType = credentials.SignatureV4Streaming
-
if size < minPartSize {
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
-
// For all sizes greater than 64MiB do multipart.
- return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
+}
+
+func (c Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
+ // Input validation.
+ if err = s3utils.CheckValidBucketName(bucketName); err != nil {
+ return 0, err
+ }
+ if err = s3utils.CheckValidObjectName(objectName); err != nil {
+ return 0, err
+ }
+
+ // Total data read and written to server. should be equal to
+ // 'size' at the end of the call.
+ var totalUploadedSize int64
+
+ // Complete multipart upload.
+ var complMultipartUpload completeMultipartUpload
+
+ // Calculate the optimal parts info for a given size.
+ totalPartsCount, partSize, _, err := optimalPartInfo(-1)
+ if err != nil {
+ return 0, err
+ }
+ // Initiate a new multipart upload.
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
+ if err != nil {
+ return 0, err
+ }
+
+ defer func() {
+ if err != nil {
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
+ }
+ }()
+
+ // Part number always starts with '1'.
+ partNumber := 1
+
+ // Initialize parts uploaded map.
+ partsInfo := make(map[int]ObjectPart)
+
+ // Create a buffer.
+ buf := make([]byte, partSize)
+ defer debug.FreeOSMemory()
+
+ for partNumber <= totalPartsCount {
+ length, rErr := io.ReadFull(reader, buf)
+ if rErr == io.EOF && partNumber > 1 {
+ break
+ }
+ if rErr != nil && rErr != io.ErrUnexpectedEOF {
+ return 0, rErr
+ }
+ // Update progress reader appropriately to the latest offset
+ // as we read from the source.
+ rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
+
+ // Proceed to upload the part.
+ var objPart ObjectPart
+ objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
+ "", "", int64(length), opts.UserMetadata)
+ if err != nil {
+ return totalUploadedSize, err
+ }
+
+ // Save successfully uploaded part metadata.
+ partsInfo[partNumber] = objPart
+
+ // Save successfully uploaded size.
+ totalUploadedSize += int64(length)
+
+ // Increment part number.
+ partNumber++
+
+ // For unknown size, Read EOF we break away.
+ // We do not have to upload till totalPartsCount.
+ if rErr == io.EOF {
+ break
+ }
+ }
+
+ // Loop over total uploaded parts to save them in
+ // Parts array before completing the multipart request.
+ for i := 1; i < partNumber; i++ {
+ part, ok := partsInfo[i]
+ if !ok {
+ return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
+ }
+ complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
+ ETag: part.ETag,
+ PartNumber: part.PartNumber,
+ })
+ }
+
+ // Sort all completed parts.
+ sort.Sort(completedParts(complMultipartUpload.Parts))
+ if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
+ return totalUploadedSize, err
+ }
+
+ // Return final size.
+ return totalUploadedSize, nil
}