summaryrefslogtreecommitdiff
path: root/api-put-object-streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'api-put-object-streaming.go')
-rw-r--r--api-put-object-streaming.go117
1 files changed, 49 insertions, 68 deletions
diff --git a/api-put-object-streaming.go b/api-put-object-streaming.go
index 0d4639e..579cb54 100644
--- a/api-put-object-streaming.go
+++ b/api-put-object-streaming.go
@@ -1,5 +1,6 @@
/*
- * Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
+ * Minio Go Library for Amazon S3 Compatible Cloud Storage
+ * Copyright 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +18,7 @@
package minio
import (
+ "context"
"fmt"
"io"
"net/http"
@@ -26,33 +28,23 @@ import (
"github.com/minio/minio-go/pkg/s3utils"
)
-// PutObjectStreaming using AWS streaming signature V4
-func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int64, err error) {
- return c.PutObjectWithProgress(bucketName, objectName, reader, nil, nil)
-}
-
// putObjectMultipartStream - upload a large object using
// multipart upload and streaming signature for signing payload.
// Comprehensive put object operation involving multipart uploads.
//
// Following code handles these types of readers.
//
-// - *os.File
// - *minio.Object
// - Any reader which has a method 'ReadAt()'
//
-func (c Client) putObjectMultipartStream(bucketName, objectName string,
- reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
-
- // Verify if reader is *minio.Object, *os.File or io.ReaderAt.
- // NOTE: Verification of object is kept for a specific purpose
- // while it is going to be duck typed similar to io.ReaderAt.
- // It is to indicate that *minio.Object implements io.ReaderAt.
- // and such a functionality is used in the subsequent code path.
- if isFile(reader) || !isObject(reader) && isReadAt(reader) {
- n, err = c.putObjectMultipartStreamFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, metadata, progress)
+func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
+ reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
+
+ if !isObject(reader) && isReadAt(reader) {
+ // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
+ n, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else {
- n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ n, err = c.putObjectMultipartStreamNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
if err != nil {
errResp := ToErrorResponse(err)
@@ -64,7 +56,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string,
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
- return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
+ return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
}
}
return n, err
@@ -94,8 +86,8 @@ type uploadPartReq struct {
// temporary files for staging all the data, these temporary files are
// cleaned automatically when the caller i.e http client closes the
// stream after uploading all the contents successfully.
-func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string,
- reader io.ReaderAt, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
+ reader io.ReaderAt, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
@@ -111,7 +103,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
}
// Initiate a new multipart upload.
- uploadID, err := c.newUploadID(bucketName, objectName, metadata)
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return 0, err
}
@@ -122,7 +114,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
// to relinquish storage space.
defer func() {
if err != nil {
- c.abortMultipartUpload(bucketName, objectName, uploadID)
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()
@@ -150,10 +142,9 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil}
}
close(uploadPartsCh)
-
// Receive each part number from the channel allowing three parallel uploads.
- for w := 1; w <= totalWorkers; w++ {
- go func() {
+ for w := 1; w <= opts.getNumThreads(); w++ {
+ go func(partSize int64) {
// Each worker will draw from the part channel and upload in parallel.
for uploadReq := range uploadPartsCh {
@@ -170,13 +161,13 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
}
// Get a section reader on a particular offset.
- sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), progress)
+ sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
// Proceed to upload the part.
var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID,
+ objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
sectionReader, uploadReq.PartNum,
- nil, nil, partSize, metadata)
+ "", "", partSize, opts.UserMetadata)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Size: 0,
@@ -197,7 +188,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
Error: nil,
}
}
- }()
+ }(partSize)
}
// Gather the responses as they occur and update any
@@ -229,7 +220,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
+ _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
if err != nil {
return totalUploadedSize, err
}
@@ -238,8 +229,8 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
return totalUploadedSize, nil
}
-func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string,
- reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectMultipartStreamNoChecksum(ctx context.Context, bucketName, objectName string,
+ reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
@@ -253,9 +244,8 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
if err != nil {
return 0, err
}
-
// Initiates a new multipart request
- uploadID, err := c.newUploadID(bucketName, objectName, metadata)
+ uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return 0, err
}
@@ -266,7 +256,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// storage space.
defer func() {
if err != nil {
- c.abortMultipartUpload(bucketName, objectName, uploadID)
+ c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()
@@ -281,17 +271,16 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
// Update progress reader appropriately to the latest offset
// as we read from the source.
- hookReader := newHook(reader, progress)
+ hookReader := newHook(reader, opts.Progress)
// Proceed to upload the part.
if partNumber == totalPartsCount {
partSize = lastPartSize
}
-
var objPart ObjectPart
- objPart, err = c.uploadPart(bucketName, objectName, uploadID,
+ objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
io.LimitReader(hookReader, partSize),
- partNumber, nil, nil, partSize, metadata)
+ partNumber, "", "", partSize, opts.UserMetadata)
if err != nil {
return totalUploadedSize, err
}
@@ -328,7 +317,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))
- _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload)
+ _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
if err != nil {
return totalUploadedSize, err
}
@@ -339,7 +328,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// putObjectNoChecksum special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
-func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
+func (c Client) putObjectNoChecksum(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
@@ -355,17 +344,22 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
}
if size > 0 {
if isReadAt(reader) && !isObject(reader) {
- reader = io.NewSectionReader(reader.(io.ReaderAt), 0, size)
+ seeker, _ := reader.(io.Seeker)
+ offset, err := seeker.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return 0, ErrInvalidArgument(err.Error())
+ }
+ reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
}
}
// Update progress reader appropriately to the latest offset as we
// read from the source.
- readSeeker := newHook(reader, progress)
+ readSeeker := newHook(reader, opts.Progress)
// This function does not calculate sha256 and md5sum for payload.
// Execute put object.
- st, err := c.putObjectDo(bucketName, objectName, readSeeker, nil, nil, size, metaData)
+ st, err := c.putObjectDo(ctx, bucketName, objectName, readSeeker, "", "", size, opts)
if err != nil {
return 0, err
}
@@ -377,7 +371,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
// putObjectDo - executes the put object http operation.
// NOTE: You must have WRITE permissions on a bucket to add an object to it.
-func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, metaData map[string][]string) (ObjectInfo, error) {
+func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (ObjectInfo, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectInfo{}, err
@@ -385,35 +379,22 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return ObjectInfo{}, err
}
-
// Set headers.
- customHeader := make(http.Header)
-
- // Set metadata to headers
- for k, v := range metaData {
- if len(v) > 0 {
- customHeader.Set(k, v[0])
- }
- }
-
- // If Content-Type is not provided, set the default application/octet-stream one
- if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 {
- customHeader.Set("Content-Type", "application/octet-stream")
- }
+ customHeader := opts.Header()
// Populate request metadata.
reqMetadata := requestMetadata{
- bucketName: bucketName,
- objectName: objectName,
- customHeader: customHeader,
- contentBody: reader,
- contentLength: size,
- contentMD5Bytes: md5Sum,
- contentSHA256Bytes: sha256Sum,
+ bucketName: bucketName,
+ objectName: objectName,
+ customHeader: customHeader,
+ contentBody: reader,
+ contentLength: size,
+ contentMD5Base64: md5Base64,
+ contentSHA256Hex: sha256Hex,
}
// Execute PUT an objectName.
- resp, err := c.executeMethod("PUT", reqMetadata)
+ resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
defer closeResponse(resp)
if err != nil {
return ObjectInfo{}, err