Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gensupport: Allow user to provide his own buffer used for uploading #632

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions googleapi/googleapi.go
Expand Up @@ -211,6 +211,31 @@ type MediaOption interface {
setOptions(o *MediaOptions)
}

type noopOption struct{}

func (bp noopOption) setOptions(o *MediaOptions) {
}

type bufferOption []byte

func (bp bufferOption) setOptions(o *MediaOptions) {
o.Buffer = bp
}

// WithBuffer returns MediaOption which sets buffer used for media uploads.
// Buffer capacity needs to be at least MinUploadChunkSize, if it's not
// this option is a no op.
// If used together with ChunkSize, buffer needs to have at least ChunkSize capacity.
// If not set, each upload will allocate its own memory.
zimnx marked this conversation as resolved.
Show resolved Hide resolved
// Buffer can be reused only after request complete. Using the same buffer
// in concurrent calls will lead to data race.
func WithBuffer(buffer []byte) MediaOption {
if cap(buffer) < MinUploadChunkSize {
return noopOption{}
}
return bufferOption(buffer)
}

type contentTypeOption string

func (ct contentTypeOption) setOptions(o *MediaOptions) {
Expand Down Expand Up @@ -251,6 +276,8 @@ type MediaOptions struct {
ForceEmptyContentType bool

ChunkSize int

Buffer []byte
}

// ProcessMediaOptions stores options from opts in a MediaOptions.
Expand Down
12 changes: 12 additions & 0 deletions internal/gensupport/buffer.go
Expand Up @@ -28,6 +28,18 @@ func NewMediaBuffer(media io.Reader, chunkSize int) *MediaBuffer {
return &MediaBuffer{media: media, chunk: make([]byte, 0, chunkSize)}
}

// NewMediaBuffer initializes a MediaBuffer.
func NewMediaBufferWithBuffer(media io.Reader, chunkSize int, buffer []byte) *MediaBuffer {
// If buffer isn't long enough, allocate new one.
if cap(buffer) < chunkSize {
return NewMediaBuffer(media, chunkSize)
}

// Implementation expects buffer of zero length.
buffer = buffer[:0]
return &MediaBuffer{media: media, chunk: buffer}
zimnx marked this conversation as resolved.
Show resolved Hide resolved
}

// Chunk returns the current buffered chunk, the offset in the underlying media
// from which the chunk is drawn, and the size of the chunk.
// Successive calls to Chunk return the same chunk between calls to Next.
Expand Down
11 changes: 8 additions & 3 deletions internal/gensupport/media.go
Expand Up @@ -200,11 +200,16 @@ func typeHeader(contentType string) textproto.MIMEHeader {
//
// After PrepareUpload has been called, media should no longer be used: the
// media content should be accessed via one of the return values.
func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
func PrepareUpload(media io.Reader, chunkSize int, buffer []byte) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
if chunkSize == 0 { // do not chunk
return media, nil, true
}
mb = NewMediaBuffer(media, chunkSize)
if buffer != nil {
mb = NewMediaBufferWithBuffer(media, chunkSize, buffer)
} else {
mb = NewMediaBuffer(media, chunkSize)
}

_, _, _, err := mb.Chunk()
// If err is io.EOF, we can upload this in a single request. Otherwise, err is
// either nil or a non-EOF error. If it is the latter, then the next call to
Expand Down Expand Up @@ -234,7 +239,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
if !opts.ForceEmptyContentType {
r, mi.mType = DetermineContentType(r, opts.ContentType)
}
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.Buffer)
return mi
}

Expand Down
41 changes: 29 additions & 12 deletions internal/gensupport/media_test.go
Expand Up @@ -149,27 +149,29 @@ func TestDetermineContentType(t *testing.T) {

func TestNewInfoFromMedia(t *testing.T) {
const textType = "text/plain; charset=utf-8"
chunkBuffer := make([]byte, 0)
zimnx marked this conversation as resolved.
Show resolved Hide resolved
for _, test := range []struct {
desc string
r io.Reader
opts []googleapi.MediaOption
wantType string
wantMedia, wantBuffer, wantSingleChunk bool
desc string
r io.Reader
opts []googleapi.MediaOption
wantType string
wantChunkBuffer []byte
wantMedia, wantAnyBuffer, wantSingleChunk bool
}{
{
desc: "an empty reader results in a MediaBuffer with a single, empty chunk",
r: new(bytes.Buffer),
opts: nil,
wantType: textType,
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
desc: "ContentType is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.ContentType("xyz")},
wantType: "xyz",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
Expand All @@ -185,15 +187,15 @@ func TestNewInfoFromMedia(t *testing.T) {
r: strings.NewReader("12345"),
opts: []googleapi.MediaOption{googleapi.ChunkSize(100)},
wantType: textType,
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
desc: "chunk size == data size: MediaBuffer with single chunk",
r: &nullReader{googleapi.MinUploadChunkSize},
opts: []googleapi.MediaOption{googleapi.ChunkSize(1)},
wantType: "application/octet-stream",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
Expand All @@ -202,7 +204,15 @@ func TestNewInfoFromMedia(t *testing.T) {
r: &nullReader{2 * googleapi.MinUploadChunkSize},
opts: []googleapi.MediaOption{googleapi.ChunkSize(1)},
wantType: "application/octet-stream",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: false,
},
{
desc: "WithBuffer is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.WithBuffer(chunkBuffer)},
wantType: textType,
wantChunkBuffer: chunkBuffer,
wantSingleChunk: false,
},
} {
Expand All @@ -214,8 +224,15 @@ func TestNewInfoFromMedia(t *testing.T) {
if got, want := (mi.media != nil), test.wantMedia; got != want {
t.Errorf("%s: media non-nil: got %t, want %t", test.desc, got, want)
}
if got, want := (mi.buffer != nil), test.wantBuffer; got != want {
t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want)
if test.wantAnyBuffer {
if got, want := (mi.buffer != nil), test.wantAnyBuffer; got != want {
t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want)
}
}
if test.wantChunkBuffer != nil {
if got, want := reflect.ValueOf(mi.buffer.chunk).Pointer(), reflect.ValueOf(test.wantChunkBuffer).Pointer(); got != want {
t.Errorf("%s: chunk buffer: got %v, want %v", test.desc, got, want)
}
}
if got, want := mi.singleChunk, test.wantSingleChunk; got != want {
t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want)
Expand Down
46 changes: 46 additions & 0 deletions internal/gensupport/resumable_test.go
Expand Up @@ -364,3 +364,49 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
}
}
}

func TestResumableUploadWithPredefinedBuffer(t *testing.T) {
const (
mediaSize = 256
)
media := strings.Repeat("a", mediaSize)
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-255/*", 200},
},
bodies: bodyTracker{},
}
buffer := make([]byte, 0, mediaSize)
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBufferWithBuffer(strings.NewReader(media), mediaSize, buffer),
MediaType: "text/plain",
Callback: func(int64) {},
}

res, err := rx.Upload(context.Background())
if err == nil {
res.Body.Close()
}
if err != nil || res == nil || res.StatusCode != http.StatusOK {
if res == nil {
t.Fatalf("Upload not successful, res=nil: %v", err)
} else {
t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err)
}
}
if !reflect.DeepEqual(tr.buf, []byte(media)) {
t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, []byte(media))
}
// Media fits in single chunk, input buffer should have media content inside.
if !reflect.DeepEqual(buffer[:cap(buffer)], []byte(media)) {
t.Fatalf("buffer contents:\ngot %v\nwant %v", buffer, []byte(media))
}
if len(tr.events) > 0 {
t.Fatalf("did not observe all expected events. leftover events: %v", tr.events)
}
if len(tr.bodies) > 0 {
t.Errorf("unclosed request bodies: %v", tr.bodies)
}
}