Context
With my team we are building a reverse proxy to intercept all outgoing requests to S3 in order to audit and control the access from the different apps.
We've successfully implemented almost all operations by streaming the content of the files. For instance, to upload with single operation we used the s3manager.Uploader
to stream the body of the incoming request (which is an io.Reader
) to S3 and to download (both single and multipart flavor) we used the primitive io.Copy
to write the response from the s3.GetObjectOutput.Body
(which is an io.ReadCloser
).
The problem:
The only operation we still couldn't implement via streaming was the upload-part (in the context of a multipart uploading). The problem is that s3.UploadPartInput
needs a aws.ReadSeekCloser
and to pass the body of the incoming request you need to buffer it in some place (for example, in memory).
This is what we have so far:
func (ph *VaultProxyHandler) HandleUploadPart(w http.ResponseWriter, r *http.Request, s3api s3iface.S3API, bucket string, key string, uploadID string, part int64) {
buf := bytes.NewBuffer(nil)
// here loads the entire body to memory
if _, err := io.Copy(buf, r.Body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
payload := buf.Bytes()
input := &s3.UploadPartInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
UploadId: aws.String(uploadID),
PartNumber: aws.Int64(part),
Body: aws.ReadSeekCloser(bytes.NewReader(payload)),
}
output, err := s3api.UploadPart(input)
// and so on...
}
Question:
Is there a way to stream an incoming request of an UploadPart
to S3? (by stream I mean not to store the entire body in memory).
Finally I got a way to reverse-proxy an incoming UploadPart with stream processing by building a request using the AWS SDK and signing it with unsigned payload.
Here a basic example:
type AwsService struct {
Region string
S3Client s3iface.S3API
Signer *v4.Signer
}
func NewAwsService(region string, accessKey string, secretKey string, sessionToken string) (*AwsService, error) {
creds := credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
awsConfig := aws.NewConfig().
WithRegion(region).
WithCredentials(creds).
WithCredentialsChainVerboseErrors(true)
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, err
}
svc := s3.New(sess)
signer := v4.NewSigner(creds)
v4.WithUnsignedPayload(signer)
return &AwsService{
Region: region,
S3Client: svc,
Signer: signer,
}, nil
}
func (s *AwsService) UploadPart(bucket string, key string, part int, uploadID string, payloadReader io.Reader, contentLength int64) (string, error) {
input := &s3.UploadPartInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
UploadId: aws.String(uploadID),
PartNumber: aws.Int64(int64(part)),
ContentLength: aws.Int64(contentLength),
Body: aws.ReadSeekCloser(payloadReader),
}
req, output := s.S3Client.UploadPartRequest(input)
_, err := s.Signer.Sign(req.HTTPRequest, req.Body, s3.ServiceName, s.Region, time.Now())
err = req.Send()
if err != nil {
return "", err
}
return *output.ETag, nil
}
Then, it can be invoked from a handler:
func HandleUploadPart(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
region := query.Get("region")
bucket := query.Get("bucket")
key := query.Get("key")
part, err := strconv.Atoi(query.Get("part"))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
uploadID := query.Get("upload-id")
payloadReader := r.Body
contentLength, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
etag, err := awsService.UploadPart(region, bucket, key, part, uploadID, payloadReader, contentLength)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("ETag", etag)
}
Downsides: