diff options
Diffstat (limited to 'api-put-object.go')
-rw-r--r-- | api-put-object.go | 328 |
1 files changed, 182 insertions, 146 deletions
diff --git a/api-put-object.go b/api-put-object.go index 2ea4987..1fda1bc 100644 --- a/api-put-object.go +++ b/api-put-object.go @@ -1,5 +1,6 @@ /* - * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc. + * Minio Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2015-2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,117 +18,84 @@ package minio import ( + "bytes" + "context" + "fmt" "io" - "os" - "reflect" - "runtime" - "strings" + "net/http" + "runtime/debug" + "sort" - "github.com/minio/minio-go/pkg/credentials" + "github.com/minio/minio-go/pkg/encrypt" "github.com/minio/minio-go/pkg/s3utils" ) -// toInt - converts go value to its integer representation based -// on the value kind if it is an integer. -func toInt(value reflect.Value) (size int64) { - size = -1 - if value.IsValid() { - switch value.Kind() { - case reflect.Int: - fallthrough - case reflect.Int8: - fallthrough - case reflect.Int16: - fallthrough - case reflect.Int32: - fallthrough - case reflect.Int64: - size = value.Int() - } +// PutObjectOptions represents options specified by user for PutObject call +type PutObjectOptions struct { + UserMetadata map[string]string + Progress io.Reader + ContentType string + ContentEncoding string + ContentDisposition string + CacheControl string + EncryptMaterials encrypt.Materials + NumThreads uint +} + +// getNumThreads - gets the number of threads to be used in the multipart +// put object operation +func (opts PutObjectOptions) getNumThreads() (numThreads int) { + if opts.NumThreads > 0 { + numThreads = int(opts.NumThreads) + } else { + numThreads = totalWorkers } - return size + return } -// getReaderSize - Determine the size of Reader if available. -func getReaderSize(reader io.Reader) (size int64, err error) { - size = -1 - if reader == nil { - return -1, nil - } - // Verify if there is a method by name 'Size'. - sizeFn := reflect.ValueOf(reader).MethodByName("Size") - // Verify if there is a method by name 'Len'. - lenFn := reflect.ValueOf(reader).MethodByName("Len") - if sizeFn.IsValid() { - if sizeFn.Kind() == reflect.Func { - // Call the 'Size' function and save its return value. - result := sizeFn.Call([]reflect.Value{}) - if len(result) == 1 { - size = toInt(result[0]) - } - } - } else if lenFn.IsValid() { - if lenFn.Kind() == reflect.Func { - // Call the 'Len' function and save its return value. - result := lenFn.Call([]reflect.Value{}) - if len(result) == 1 { - size = toInt(result[0]) - } - } +// Header - constructs the headers from metadata entered by user in +// PutObjectOptions struct +func (opts PutObjectOptions) Header() (header http.Header) { + header = make(http.Header) + + if opts.ContentType != "" { + header["Content-Type"] = []string{opts.ContentType} } else { - // Fallback to Stat() method, two possible Stat() structs exist. - switch v := reader.(type) { - case *os.File: - var st os.FileInfo - st, err = v.Stat() - if err != nil { - // Handle this case specially for "windows", - // certain files for example 'Stdin', 'Stdout' and - // 'Stderr' it is not allowed to fetch file information. - if runtime.GOOS == "windows" { - if strings.Contains(err.Error(), "GetFileInformationByHandle") { - return -1, nil - } - } - return - } - // Ignore if input is a directory, throw an error. - if st.Mode().IsDir() { - return -1, ErrInvalidArgument("Input file cannot be a directory.") - } - // Ignore 'Stdin', 'Stdout' and 'Stderr', since they - // represent *os.File type but internally do not - // implement Seekable calls. Ignore them and treat - // them like a stream with unknown length. - switch st.Name() { - case "stdin", "stdout", "stderr": - return - // Ignore read/write stream of os.Pipe() which have unknown length too. - case "|0", "|1": - return - } - var pos int64 - pos, err = v.Seek(0, 1) // SeekCurrent. - if err != nil { - return -1, err - } - size = st.Size() - pos - case *Object: - var st ObjectInfo - st, err = v.Stat() - if err != nil { - return - } - var pos int64 - pos, err = v.Seek(0, 1) // SeekCurrent. - if err != nil { - return -1, err - } - size = st.Size - pos + header["Content-Type"] = []string{"application/octet-stream"} + } + if opts.ContentEncoding != "" { + header["Content-Encoding"] = []string{opts.ContentEncoding} + } + if opts.ContentDisposition != "" { + header["Content-Disposition"] = []string{opts.ContentDisposition} + } + if opts.CacheControl != "" { + header["Cache-Control"] = []string{opts.CacheControl} + } + if opts.EncryptMaterials != nil { + header[amzHeaderIV] = []string{opts.EncryptMaterials.GetIV()} + header[amzHeaderKey] = []string{opts.EncryptMaterials.GetKey()} + header[amzHeaderMatDesc] = []string{opts.EncryptMaterials.GetDesc()} + } + for k, v := range opts.UserMetadata { + if !isAmzHeader(k) && !isStandardHeader(k) && !isSSEHeader(k) { + header["X-Amz-Meta-"+k] = []string{v} + } else { + header[k] = []string{v} + } + } + return +} + +// validate() checks if the UserMetadata map has standard headers or client side +// encryption headers and raises an error if so. +func (opts PutObjectOptions) validate() (err error) { + for k := range opts.UserMetadata { + if isStandardHeader(k) || isCSEHeader(k) { + return ErrInvalidArgument(k + " unsupported request parameter for user defined metadata") } } - // Returns the size here. - return size, err + return nil } // completedParts is a collection of parts sortable by their part numbers. @@ -149,39 +117,12 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part // - For size input as -1 PutObject does a multipart Put operation // until input stream reaches EOF. Maximum object size that can // be uploaded through this operation will be 5TiB. -func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) { - return c.PutObjectWithMetadata(bucketName, objectName, reader, map[string][]string{ - "Content-Type": []string{contentType}, - }, nil) +func (c Client) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, + opts PutObjectOptions) (n int64, err error) { + return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, objectSize, opts) } -// PutObjectWithSize - is a helper PutObject similar in behavior to PutObject() -// but takes the size argument explicitly, this function avoids doing reflection -// internally to figure out the size of input stream. Also if the input size is -// lesser than 0 this function returns an error. -func (c Client) PutObjectWithSize(bucketName, objectName string, reader io.Reader, readerSize int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { - return c.putObjectCommon(bucketName, objectName, reader, readerSize, metadata, progress) -} - -// PutObjectWithMetadata using AWS streaming signature V4 -func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { - return c.PutObjectWithProgress(bucketName, objectName, reader, metadata, progress) -} - -// PutObjectWithProgress using AWS streaming signature V4 -func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { - // Size of the object. - var size int64 - - // Get reader size. - size, err = getReaderSize(reader) - if err != nil { - return 0, err - } - return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress) -} - -func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) { // Check for largest object size allowed. if size > int64(maxMultipartPutObjectSize) { return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName) @@ -190,29 +131,124 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, // NOTE: Streaming signature is not supported by GCS. if s3utils.IsGoogleEndpoint(c.endpointURL) { // Do not compute MD5 for Google Cloud Storage. - return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) } if c.overrideSignerType.IsV2() { - if size > 0 && size < minPartSize { - return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) + if size >= 0 && size < minPartSize { + return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) } - return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts) } - - // If size cannot be found on a stream, it is not possible - // to upload using streaming signature, fall back to multipart. if size < 0 { - return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts) } - // Set streaming signature. - c.overrideSignerType = credentials.SignatureV4Streaming - if size < minPartSize { - return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) } - // For all sizes greater than 64MiB do multipart. - return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress) + return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts) +} + +func (c Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) { + // Input validation. + if err = s3utils.CheckValidBucketName(bucketName); err != nil { + return 0, err + } + if err = s3utils.CheckValidObjectName(objectName); err != nil { + return 0, err + } + + // Total data read and written to server. should be equal to + // 'size' at the end of the call. + var totalUploadedSize int64 + + // Complete multipart upload. + var complMultipartUpload completeMultipartUpload + + // Calculate the optimal parts info for a given size. + totalPartsCount, partSize, _, err := optimalPartInfo(-1) + if err != nil { + return 0, err + } + // Initiate a new multipart upload. + uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) + if err != nil { + return 0, err + } + + defer func() { + if err != nil { + c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) + } + }() + + // Part number always starts with '1'. + partNumber := 1 + + // Initialize parts uploaded map. + partsInfo := make(map[int]ObjectPart) + + // Create a buffer. + buf := make([]byte, partSize) + defer debug.FreeOSMemory() + + for partNumber <= totalPartsCount { + length, rErr := io.ReadFull(reader, buf) + if rErr == io.EOF && partNumber > 1 { + break + } + if rErr != nil && rErr != io.ErrUnexpectedEOF { + return 0, rErr + } + // Update progress reader appropriately to the latest offset + // as we read from the source. + rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) + + // Proceed to upload the part. + var objPart ObjectPart + objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, + "", "", int64(length), opts.UserMetadata) + if err != nil { + return totalUploadedSize, err + } + + // Save successfully uploaded part metadata. + partsInfo[partNumber] = objPart + + // Save successfully uploaded size. + totalUploadedSize += int64(length) + + // Increment part number. + partNumber++ + + // For unknown size, Read EOF we break away. + // We do not have to upload till totalPartsCount. + if rErr == io.EOF { + break + } + } + + // Loop over total uploaded parts to save them in + // Parts array before completing the multipart request. + for i := 1; i < partNumber; i++ { + part, ok := partsInfo[i] + if !ok { + return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i)) + } + complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + + // Sort all completed parts. + sort.Sort(completedParts(complMultipartUpload.Parts)) + if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil { + return totalUploadedSize, err + } + + // Return final size. + return totalUploadedSize, nil } |