update all vendored dependencies

This commit is contained in:
Will Norris 2018-02-02 10:23:34 +00:00
parent 0c20cbe5b5
commit 1933f5bf1c
284 changed files with 37534 additions and 11024 deletions

View file

@ -15,7 +15,6 @@
package storage
import (
"fmt"
"net/http"
"reflect"
@ -106,21 +105,17 @@ func (a *ACLHandle) bucketDefaultList(ctx context.Context) ([]ACLRule, error) {
return err
})
if err != nil {
return nil, fmt.Errorf("storage: error listing default object ACL for bucket %q: %v", a.bucket, err)
return nil, err
}
return toACLRules(acls.Items), nil
}
func (a *ACLHandle) bucketDefaultDelete(ctx context.Context, entity ACLEntity) error {
err := runWithRetry(ctx, func() error {
return runWithRetry(ctx, func() error {
req := a.c.raw.DefaultObjectAccessControls.Delete(a.bucket, string(entity))
a.configureCall(req, ctx)
return req.Do()
})
if err != nil {
return fmt.Errorf("storage: error deleting default ACL entry for bucket %q, entity %q: %v", a.bucket, entity, err)
}
return nil
}
func (a *ACLHandle) bucketList(ctx context.Context) ([]ACLRule, error) {
@ -133,7 +128,7 @@ func (a *ACLHandle) bucketList(ctx context.Context) ([]ACLRule, error) {
return err
})
if err != nil {
return nil, fmt.Errorf("storage: error listing bucket ACL for bucket %q: %v", a.bucket, err)
return nil, err
}
r := make([]ACLRule, len(acls.Items))
for i, v := range acls.Items {
@ -156,7 +151,7 @@ func (a *ACLHandle) bucketSet(ctx context.Context, entity ACLEntity, role ACLRol
return err
})
if err != nil {
return fmt.Errorf("storage: error updating bucket ACL entry for bucket %q, entity %q: %v", a.bucket, entity, err)
return err
}
return nil
}
@ -168,7 +163,7 @@ func (a *ACLHandle) bucketDelete(ctx context.Context, entity ACLEntity) error {
return req.Do()
})
if err != nil {
return fmt.Errorf("storage: error deleting bucket ACL entry for bucket %q, entity %q: %v", a.bucket, entity, err)
return err
}
return nil
}
@ -183,7 +178,7 @@ func (a *ACLHandle) objectList(ctx context.Context) ([]ACLRule, error) {
return err
})
if err != nil {
return nil, fmt.Errorf("storage: error listing object ACL for bucket %q, file %q: %v", a.bucket, a.object, err)
return nil, err
}
return toACLRules(acls.Items), nil
}
@ -206,30 +201,18 @@ func (a *ACLHandle) objectSet(ctx context.Context, entity ACLEntity, role ACLRol
req = a.c.raw.ObjectAccessControls.Update(a.bucket, a.object, string(entity), acl)
}
a.configureCall(req, ctx)
err := runWithRetry(ctx, func() error {
return runWithRetry(ctx, func() error {
_, err := req.Do()
return err
})
if err != nil {
if isBucketDefault {
return fmt.Errorf("storage: error updating default ACL entry for bucket %q, entity %q: %v", a.bucket, entity, err)
} else {
return fmt.Errorf("storage: error updating object ACL entry for bucket %q, object %q, entity %q: %v", a.bucket, a.object, entity, err)
}
}
return nil
}
func (a *ACLHandle) objectDelete(ctx context.Context, entity ACLEntity) error {
err := runWithRetry(ctx, func() error {
return runWithRetry(ctx, func() error {
req := a.c.raw.ObjectAccessControls.Delete(a.bucket, a.object, string(entity))
a.configureCall(req, ctx)
return req.Do()
})
if err != nil {
return fmt.Errorf("storage: error deleting object ACL entry for bucket %q, file %q, entity %q: %v", a.bucket, a.object, entity, err)
}
return nil
}
func (a *ACLHandle) configureCall(call interface {

View file

@ -35,7 +35,7 @@ type BucketHandle struct {
acl ACLHandle
defaultObjectACL ACLHandle
conds *BucketConditions
userProject string // project for requester-pays buckets
userProject string // project for Requester Pays buckets
}
// Bucket returns a BucketHandle, which provides operations on the named bucket.
@ -197,8 +197,10 @@ func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPa
}
// BucketAttrs represents the metadata for a Google Cloud Storage bucket.
// Read-only fields are ignored by BucketHandle.Create.
type BucketAttrs struct {
// Name is the name of the bucket.
// This field is read-only.
Name string
// ACL is the list of access control rules on the bucket.
@ -212,6 +214,7 @@ type BucketAttrs struct {
Location string
// MetaGeneration is the metadata generation of the bucket.
// This field is read-only.
MetaGeneration int64
// StorageClass is the default storage class of the bucket. This defines
@ -224,16 +227,19 @@ type BucketAttrs struct {
StorageClass string
// Created is the creation time of the bucket.
// This field is read-only.
Created time.Time
// VersioningEnabled reports whether this bucket has versioning enabled.
// This field is read-only.
VersioningEnabled bool
// Labels are the bucket's labels.
Labels map[string]string
// RequesterPays reports whether the bucket is a Requester Pays bucket.
// Clients performing operations on Requester Pays buckets must provide
// a user project (see BucketHandle.UserProject), which will be billed
// for the operations.
RequesterPays bool
// Lifecycle is the lifecycle configuration for objects in the bucket.
Lifecycle Lifecycle
@ -503,8 +509,10 @@ func (c *BucketConditions) validate(method string) error {
}
// UserProject returns a new BucketHandle that passes the project ID as the user
// project for all subsequent calls. A user project is required for all operations
// on requester-pays buckets.
// project for all subsequent calls. Calls with a user project will be billed to that
// project rather than to the bucket's owning project.
//
// A user project is required for all operations on Requester Pays buckets.
func (b *BucketHandle) UserProject(projectID string) *BucketHandle {
b2 := *b
b2.userProject = projectID
@ -601,6 +609,7 @@ func toLifecycle(rl *raw.BucketLifecycle) Lifecycle {
if rr.Condition.CreatedBefore != "" {
r.Condition.CreatedBefore, _ = time.Parse(rfc3339Date, rr.Condition.CreatedBefore)
}
l.Rules = append(l.Rules, r)
}
return l
}

View file

@ -23,8 +23,6 @@ All of the methods of this package use exponential backoff to retry calls
that fail with certain errors, as described in
https://cloud.google.com/storage/docs/exponential-backoff.
Note: This package is in beta. Some backwards-incompatible changes may occur.
Creating a Client
@ -36,6 +34,13 @@ To start working with this package, create a client:
// TODO: Handle error.
}
The client will use your default application credentials.
If you only wish to access public data, you can create
an unauthenticated client with
client, err := storage.NewClient(ctx, option.WithoutAuthentication())
Buckets
A Google Cloud Storage bucket is a collection of objects. To work with a

View file

@ -23,21 +23,28 @@ import (
// IAM provides access to IAM access control for the bucket.
func (b *BucketHandle) IAM() *iam.Handle {
return iam.InternalNewHandleClient(&iamClient{raw: b.c.raw}, b.name)
return iam.InternalNewHandleClient(&iamClient{
raw: b.c.raw,
userProject: b.userProject,
}, b.name)
}
// iamClient implements the iam.client interface.
type iamClient struct {
raw *raw.Service
raw *raw.Service
userProject string
}
func (c *iamClient) Get(ctx context.Context, resource string) (*iampb.Policy, error) {
req := c.raw.Buckets.GetIamPolicy(resource)
setClientHeader(req.Header())
call := c.raw.Buckets.GetIamPolicy(resource)
setClientHeader(call.Header())
if c.userProject != "" {
call.UserProject(c.userProject)
}
var rp *raw.Policy
var err error
err = runWithRetry(ctx, func() error {
rp, err = req.Context(ctx).Do()
rp, err = call.Context(ctx).Do()
return err
})
if err != nil {
@ -48,21 +55,27 @@ func (c *iamClient) Get(ctx context.Context, resource string) (*iampb.Policy, er
func (c *iamClient) Set(ctx context.Context, resource string, p *iampb.Policy) error {
rp := iamToStoragePolicy(p)
req := c.raw.Buckets.SetIamPolicy(resource, rp)
setClientHeader(req.Header())
call := c.raw.Buckets.SetIamPolicy(resource, rp)
setClientHeader(call.Header())
if c.userProject != "" {
call.UserProject(c.userProject)
}
return runWithRetry(ctx, func() error {
_, err := req.Context(ctx).Do()
_, err := call.Context(ctx).Do()
return err
})
}
func (c *iamClient) Test(ctx context.Context, resource string, perms []string) ([]string, error) {
req := c.raw.Buckets.TestIamPermissions(resource, perms)
setClientHeader(req.Header())
call := c.raw.Buckets.TestIamPermissions(resource, perms)
setClientHeader(call.Header())
if c.userProject != "" {
call.UserProject(c.userProject)
}
var res *raw.TestIamPermissionsResponse
var err error
err = runWithRetry(ctx, func() error {
res, err = req.Context(ctx).Do()
res, err = call.Context(ctx).Do()
return err
})
if err != nil {

179
vendor/cloud.google.com/go/storage/notifications.go generated vendored Normal file
View file

@ -0,0 +1,179 @@
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"errors"
"fmt"
"regexp"
"golang.org/x/net/context"
raw "google.golang.org/api/storage/v1"
)
// A Notification describes how to send Cloud PubSub messages when certain
// events occur in a bucket.
type Notification struct {
//The ID of the notification.
ID string
// The ID of the topic to which this subscription publishes.
TopicID string
// The ID of the project to which the topic belongs.
TopicProjectID string
// Only send notifications about listed event types. If empty, send notifications
// for all event types.
// See https://cloud.google.com/storage/docs/pubsub-notifications#events.
EventTypes []string
// If present, only apply this notification configuration to object names that
// begin with this prefix.
ObjectNamePrefix string
// An optional list of additional attributes to attach to each Cloud PubSub
// message published for this notification subscription.
CustomAttributes map[string]string
// The contents of the message payload.
// See https://cloud.google.com/storage/docs/pubsub-notifications#payload.
PayloadFormat string
}
// Values for Notification.PayloadFormat.
const (
// Send no payload with notification messages.
NoPayload = "NONE"
// Send object metadata as JSON with notification messages.
JSONPayload = "JSON_API_V1"
)
// Values for Notification.EventTypes.
const (
// Event that occurs when an object is successfully created.
ObjectFinalizeEvent = "OBJECT_FINALIZE"
// Event that occurs when the metadata of an existing object changes.
ObjectMetadataUpdateEvent = "OBJECT_METADATA_UPDATE"
// Event that occurs when an object is permanently deleted.
ObjectDeleteEvent = "OBJECT_DELETE"
// Event that occurs when the live version of an object becomes an
// archived version.
ObjectArchiveEvent = "OBJECT_ARCHIVE"
)
func toNotification(rn *raw.Notification) *Notification {
n := &Notification{
ID: rn.Id,
EventTypes: rn.EventTypes,
ObjectNamePrefix: rn.ObjectNamePrefix,
CustomAttributes: rn.CustomAttributes,
PayloadFormat: rn.PayloadFormat,
}
n.TopicProjectID, n.TopicID = parseNotificationTopic(rn.Topic)
return n
}
var topicRE = regexp.MustCompile("^//pubsub.googleapis.com/projects/([^/]+)/topics/([^/]+)")
// parseNotificationTopic extracts the project and topic IDs from from the full
// resource name returned by the service. If the name is malformed, it returns
// "?" for both IDs.
func parseNotificationTopic(nt string) (projectID, topicID string) {
matches := topicRE.FindStringSubmatch(nt)
if matches == nil {
return "?", "?"
}
return matches[1], matches[2]
}
func toRawNotification(n *Notification) *raw.Notification {
return &raw.Notification{
Id: n.ID,
Topic: fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s",
n.TopicProjectID, n.TopicID),
EventTypes: n.EventTypes,
ObjectNamePrefix: n.ObjectNamePrefix,
CustomAttributes: n.CustomAttributes,
PayloadFormat: string(n.PayloadFormat),
}
}
// AddNotification adds a notification to b. You must set n's TopicProjectID, TopicID
// and PayloadFormat, and must not set its ID. The other fields are all optional. The
// returned Notification's ID can be used to refer to it.
func (b *BucketHandle) AddNotification(ctx context.Context, n *Notification) (*Notification, error) {
if n.ID != "" {
return nil, errors.New("storage: AddNotification: ID must not be set")
}
if n.TopicProjectID == "" {
return nil, errors.New("storage: AddNotification: missing TopicProjectID")
}
if n.TopicID == "" {
return nil, errors.New("storage: AddNotification: missing TopicID")
}
call := b.c.raw.Notifications.Insert(b.name, toRawNotification(n))
setClientHeader(call.Header())
if b.userProject != "" {
call.UserProject(b.userProject)
}
rn, err := call.Context(ctx).Do()
if err != nil {
return nil, err
}
return toNotification(rn), nil
}
// Notifications returns all the Notifications configured for this bucket, as a map
// indexed by notification ID.
func (b *BucketHandle) Notifications(ctx context.Context) (map[string]*Notification, error) {
call := b.c.raw.Notifications.List(b.name)
setClientHeader(call.Header())
if b.userProject != "" {
call.UserProject(b.userProject)
}
var res *raw.Notifications
var err error
err = runWithRetry(ctx, func() error {
res, err = call.Context(ctx).Do()
return err
})
if err != nil {
return nil, err
}
return notificationsToMap(res.Items), nil
}
func notificationsToMap(rns []*raw.Notification) map[string]*Notification {
m := map[string]*Notification{}
for _, rn := range rns {
m[rn.Id] = toNotification(rn)
}
return m
}
// DeleteNotification deletes the notification with the given ID.
func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) error {
call := b.c.raw.Notifications.Delete(b.name, id)
setClientHeader(call.Header())
if b.userProject != "" {
call.UserProject(b.userProject)
}
return call.Context(ctx).Do()
}

View file

@ -24,14 +24,20 @@ var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
// Reader reads a Cloud Storage object.
// It implements io.Reader.
//
// Typically, a Reader computes the CRC of the downloaded content and compares it to
// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
type Reader struct {
body io.ReadCloser
remain, size int64
contentType string
cacheControl string
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
body io.ReadCloser
remain, size int64
contentType string
contentEncoding string
cacheControl string
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
checkedCRC bool // did we check the CRC? (For tests.)
}
// Close closes the Reader. It must be called when done reading.
@ -49,9 +55,12 @@ func (r *Reader) Read(p []byte) (int, error) {
// Check CRC here. It would be natural to check it in Close, but
// everybody defers Close on the assumption that it doesn't return
// anything worth looking at.
if r.remain == 0 && r.gotCRC != r.wantCRC {
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
r.gotCRC, r.wantCRC)
if r.remain == 0 { // Only check if we have Content-Length.
r.checkedCRC = true
if r.gotCRC != r.wantCRC {
return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
r.gotCRC, r.wantCRC)
}
}
}
return n, err
@ -74,6 +83,11 @@ func (r *Reader) ContentType() string {
return r.contentType
}
// ContentEncoding returns the content encoding of the object.
func (r *Reader) ContentEncoding() string {
return r.contentEncoding
}
// CacheControl returns the cache control of the object.
func (r *Reader) CacheControl() string {
return r.cacheControl

View file

@ -30,6 +30,8 @@ import (
"net/http"
"net/url"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -110,7 +112,10 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
//
// Close need not be called at program exit.
func (c *Client) Close() error {
// Set fields to nil so that subsequent uses
// will panic.
c.hc = nil
c.raw = nil
return nil
}
@ -167,7 +172,7 @@ type SignedURLOptions struct {
// Optional.
ContentType string
// Headers is a list of extention headers the client must provide
// Headers is a list of extension headers the client must provide
// in order to use the generated signed URL.
// Optional.
Headers []string
@ -179,6 +184,60 @@ type SignedURLOptions struct {
MD5 string
}
var (
canonicalHeaderRegexp = regexp.MustCompile(`(?i)^(x-goog-[^:]+):(.*)?$`)
excludedCanonicalHeaders = map[string]bool{
"x-goog-encryption-key": true,
"x-goog-encryption-key-sha256": true,
}
)
// sanitizeHeaders applies the specifications for canonical extension headers at
// https://cloud.google.com/storage/docs/access-control/signed-urls#about-canonical-extension-headers.
func sanitizeHeaders(hdrs []string) []string {
headerMap := map[string][]string{}
for _, hdr := range hdrs {
// No leading or trailing whitespaces.
sanitizedHeader := strings.TrimSpace(hdr)
// Only keep canonical headers, discard any others.
headerMatches := canonicalHeaderRegexp.FindStringSubmatch(sanitizedHeader)
if len(headerMatches) == 0 {
continue
}
header := strings.ToLower(strings.TrimSpace(headerMatches[1]))
if excludedCanonicalHeaders[headerMatches[1]] {
// Do not keep any deliberately excluded canonical headers when signing.
continue
}
value := strings.TrimSpace(headerMatches[2])
if len(value) > 0 {
// Remove duplicate headers by appending the values of duplicates
// in their order of appearance.
headerMap[header] = append(headerMap[header], value)
}
}
var sanitizedHeaders []string
for header, values := range headerMap {
// There should be no spaces around the colon separating the
// header name from the header value or around the values
// themselves. The values should be separated by commas.
// NOTE: The semantics for headers without a value are not clear.
// However from specifications these should be edge-cases
// anyway and we should assume that there will be no
// canonical headers using empty values. Any such headers
// are discarded at the regexp stage above.
sanitizedHeaders = append(
sanitizedHeaders,
fmt.Sprintf("%s:%s", header, strings.Join(values, ",")),
)
}
sort.Strings(sanitizedHeaders)
return sanitizedHeaders
}
// SignedURL returns a URL for the specified object. Signed URLs allow
// the users access to a restricted resource for a limited time without having a
// Google account or signing in. For more information about the signed
@ -205,6 +264,7 @@ func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
return "", errors.New("storage: invalid MD5 checksum")
}
}
opts.Headers = sanitizeHeaders(opts.Headers)
signBytes := opts.SignBytes
if opts.PrivateKey != nil {
@ -255,14 +315,15 @@ func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
// ObjectHandle provides operations on an object in a Google Cloud Storage bucket.
// Use BucketHandle.Object to get a handle.
type ObjectHandle struct {
c *Client
bucket string
object string
acl ACLHandle
gen int64 // a negative value indicates latest
conds *Conditions
encryptionKey []byte // AES-256 key
userProject string // for requester-pays buckets
c *Client
bucket string
object string
acl ACLHandle
gen int64 // a negative value indicates latest
conds *Conditions
encryptionKey []byte // AES-256 key
userProject string // for requester-pays buckets
readCompressed bool // Accept-Encoding: gzip
}
// ACL provides access to the object's access control list.
@ -346,11 +407,17 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
var forceSendFields, nullFields []string
if uattrs.ContentType != nil {
attrs.ContentType = optional.ToString(uattrs.ContentType)
forceSendFields = append(forceSendFields, "ContentType")
// For ContentType, sending the empty string is a no-op.
// Instead we send a null.
if attrs.ContentType == "" {
nullFields = append(nullFields, "ContentType")
} else {
forceSendFields = append(forceSendFields, "ContentType")
}
}
if uattrs.ContentLanguage != nil {
attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage)
// For ContentLanguage It's an error to send the empty string.
// For ContentLanguage it's an error to send the empty string.
// Instead we send a null.
if attrs.ContentLanguage == "" {
nullFields = append(nullFields, "ContentLanguage")
@ -458,6 +525,13 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
return err
}
// ReadCompressed when true causes the read to happen without decompressing.
func (o *ObjectHandle) ReadCompressed(compressed bool) *ObjectHandle {
o2 := *o
o2.readCompressed = compressed
return &o2
}
// NewReader creates a new Reader to read the contents of the
// object.
// ErrObjectNotExist will be returned if the object is not found.
@ -505,6 +579,9 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
if o.userProject != "" {
req.Header.Set("X-Goog-User-Project", o.userProject)
}
if o.readCompressed {
req.Header.Set("Accept-Encoding", "gzip")
}
if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
return nil, err
}
@ -567,13 +644,14 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
crc, checkCRC = parseCRC32c(res)
}
return &Reader{
body: body,
size: size,
remain: remain,
contentType: res.Header.Get("Content-Type"),
cacheControl: res.Header.Get("Cache-Control"),
wantCRC: crc,
checkCRC: checkCRC,
body: body,
size: size,
remain: remain,
contentType: res.Header.Get("Content-Type"),
contentEncoding: res.Header.Get("Content-Encoding"),
cacheControl: res.Header.Get("Cache-Control"),
wantCRC: crc,
checkCRC: checkCRC,
}, nil
}
@ -629,11 +707,10 @@ func (o *ObjectHandle) validate() error {
return nil
}
// parseKey converts the binary contents of a private key file
// to an *rsa.PrivateKey. It detects whether the private key is in a
// PEM container or not. If so, it extracts the the private key
// from PEM container before conversion. It only supports PEM
// containers with no passphrase.
// parseKey converts the binary contents of a private key file to an
// *rsa.PrivateKey. It detects whether the private key is in a PEM container or
// not. If so, it extracts the private key from PEM container before
// conversion. It only supports PEM containers with no passphrase.
func parseKey(key []byte) (*rsa.PrivateKey, error) {
if block, _ := pem.Decode(key); block != nil {
key = block.Bytes

View file

@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"unicode/utf8"
"golang.org/x/net/context"
@ -68,8 +69,10 @@ type Writer struct {
pw *io.PipeWriter
donec chan struct{} // closed after err and obj are set.
err error
obj *ObjectAttrs
mu sync.Mutex
err error
}
func (w *Writer) open() error {
@ -87,7 +90,7 @@ func (w *Writer) open() error {
w.opened = true
if w.ChunkSize < 0 {
return errors.New("storage: Writer.ChunkSize must non-negative")
return errors.New("storage: Writer.ChunkSize must be non-negative")
}
mediaOpts := []googleapi.MediaOption{
googleapi.ChunkSize(w.ChunkSize),
@ -114,8 +117,10 @@ func (w *Writer) open() error {
call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) })
}
if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil {
w.mu.Lock()
w.err = err
pr.CloseWithError(w.err)
w.mu.Unlock()
pr.CloseWithError(err)
return
}
var resp *raw.Object
@ -125,18 +130,27 @@ func (w *Writer) open() error {
call.UserProject(w.o.userProject)
}
setClientHeader(call.Header())
// We will only retry here if the initial POST, which obtains a URI for
// the resumable upload, fails with a retryable error. The upload itself
// has its own retry logic.
err = runWithRetry(w.ctx, func() error {
var err2 error
resp, err2 = call.Do()
return err2
})
// If the chunk size is zero, then no chunking is done on the Reader,
// which means we cannot retry: the first call will read the data, and if
// it fails, there is no way to re-read.
if w.ChunkSize == 0 {
resp, err = call.Do()
} else {
// We will only retry here if the initial POST, which obtains a URI for
// the resumable upload, fails with a retryable error. The upload itself
// has its own retry logic.
err = runWithRetry(w.ctx, func() error {
var err2 error
resp, err2 = call.Do()
return err2
})
}
}
if err != nil {
w.mu.Lock()
w.err = err
pr.CloseWithError(w.err)
w.mu.Unlock()
pr.CloseWithError(err)
return
}
w.obj = newObject(resp)
@ -151,8 +165,11 @@ func (w *Writer) open() error {
// use the error returned from Writer.Close to determine if
// the upload was successful.
func (w *Writer) Write(p []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
w.mu.Lock()
werr := w.err
w.mu.Unlock()
if werr != nil {
return 0, werr
}
if !w.opened {
if err := w.open(); err != nil {
@ -175,11 +192,15 @@ func (w *Writer) Close() error {
return err
}
<-w.donec
w.mu.Lock()
defer w.mu.Unlock()
return w.err
}
// CloseWithError aborts the write operation with the provided error.
// CloseWithError always returns nil.
//
// Deprecated: cancel the context passed to NewWriter instead.
func (w *Writer) CloseWithError(err error) error {
if !w.opened {
return nil