diff options
Diffstat (limited to 'api-select.go')
-rw-r--r-- | api-select.go | 114 |
1 files changed, 66 insertions, 48 deletions
diff --git a/api-select.go b/api-select.go index a9b6f17..7ecd953 100644 --- a/api-select.go +++ b/api-select.go @@ -1,6 +1,6 @@ /* - * Minio Go Library for Amazon S3 Compatible Cloud Storage - * (C) 2018 Minio, Inc. + * MinIO Go Library for Amazon S3 Compatible Cloud Storage + * (C) 2018 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,8 @@ import ( "net/url" "strings" - "github.com/minio/minio-go/pkg/encrypt" - "github.com/minio/minio-go/pkg/s3utils" + "github.com/minio/minio-go/v6/pkg/encrypt" + "github.com/minio/minio-go/v6/pkg/s3utils" ) // CSVFileHeaderInfo - is the parameter for whether to utilize headers. @@ -90,19 +90,19 @@ type ParquetInputOptions struct{} type CSVInputOptions struct { FileHeaderInfo CSVFileHeaderInfo RecordDelimiter string - FieldDelimiter string - QuoteCharacter string - QuoteEscapeCharacter string - Comments string + FieldDelimiter string `xml:",omitempty"` + QuoteCharacter string `xml:",omitempty"` + QuoteEscapeCharacter string `xml:",omitempty"` + Comments string `xml:",omitempty"` } // CSVOutputOptions csv output specific options type CSVOutputOptions struct { - QuoteFields CSVQuoteFields + QuoteFields CSVQuoteFields `xml:",omitempty"` RecordDelimiter string - FieldDelimiter string - QuoteCharacter string - QuoteEscapeCharacter string + FieldDelimiter string `xml:",omitempty"` + QuoteCharacter string `xml:",omitempty"` + QuoteEscapeCharacter string `xml:",omitempty"` } // JSONInputOptions json input specific options @@ -191,13 +191,20 @@ type StatsMessage struct { BytesReturned int64 } +// messageType represents the type of message. +type messageType string + +const ( + errorMsg messageType = "error" + commonMsg = "event" +) + // eventType represents the type of event. type eventType string // list of event-types returned by Select API. const ( endEvent eventType = "End" - errorEvent = "Error" recordsEvent = "Records" progressEvent = "Progress" statsEvent = "Stats" @@ -244,6 +251,12 @@ func (c Client) SelectObjectContent(ctx context.Context, bucketName, objectName return nil, err } + return NewSelectResults(resp, bucketName) +} + +// NewSelectResults creates a Select Result parser that parses the response +// and returns a Reader that will return parsed and assembled select output. +func NewSelectResults(resp *http.Response, bucketName string) (*SelectResults, error) { if resp.StatusCode != http.StatusOK { return nil, httpRespToErrorResponse(resp, bucketName, "") } @@ -314,53 +327,58 @@ func (s *SelectResults) start(pipeWriter *io.PipeWriter) { // bytes can be read or parsed. payloadLen := prelude.PayloadLen() - // Get content-type of the payload. - c := contentType(headers.Get("content-type")) + m := messageType(headers.Get("message-type")) - // Get event type of the payload. - e := eventType(headers.Get("event-type")) - - // Handle all supported events. - switch e { - case endEvent: - pipeWriter.Close() - closeResponse(s.resp) - return - case errorEvent: - pipeWriter.CloseWithError(errors.New("Error Type of " + headers.Get("error-type") + " " + headers.Get("error-message"))) + switch m { + case errorMsg: + pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\"")) closeResponse(s.resp) return - case recordsEvent: - if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { - pipeWriter.CloseWithError(err) + case commonMsg: + // Get content-type of the payload. + c := contentType(headers.Get("content-type")) + + // Get event type of the payload. + e := eventType(headers.Get("event-type")) + + // Handle all supported events. + switch e { + case endEvent: + pipeWriter.Close() closeResponse(s.resp) return - } - case progressEvent: - switch c { - case xmlContent: - if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { + case recordsEvent: + if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil { pipeWriter.CloseWithError(err) closeResponse(s.resp) return } - default: - pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) - closeResponse(s.resp) - return - } - case statsEvent: - switch c { - case xmlContent: - if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { - pipeWriter.CloseWithError(err) + case progressEvent: + switch c { + case xmlContent: + if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil { + pipeWriter.CloseWithError(err) + closeResponse(s.resp) + return + } + default: + pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent)) + closeResponse(s.resp) + return + } + case statsEvent: + switch c { + case xmlContent: + if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil { + pipeWriter.CloseWithError(err) + closeResponse(s.resp) + return + } + default: + pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) closeResponse(s.resp) return } - default: - pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent)) - closeResponse(s.resp) - return } } |