Search code examples
goamazon-s3streamreverse-proxyaws-sdk-go

How to stream the UploadPart S3 operation from an incoming request to AWS S3 using go?


Context

With my team we are building a reverse proxy to intercept all outgoing requests to S3 in order to audit and control the access from the different apps.

We've successfully implemented almost all operations by streaming the content of the files. For instance, to upload with single operation we used the s3manager.Uploader to stream the body of the incoming request (which is an io.Reader) to S3 and to download (both single and multipart flavor) we used the primitive io.Copy to write the response from the s3.GetObjectOutput.Body (which is an io.ReadCloser).

The problem:

The only operation we still couldn't implement via streaming was the upload-part (in the context of a multipart uploading). The problem is that s3.UploadPartInput needs a aws.ReadSeekCloser and to pass the body of the incoming request you need to buffer it in some place (for example, in memory).

This is what we have so far:

func (ph *VaultProxyHandler) HandleUploadPart(w http.ResponseWriter, r *http.Request, s3api s3iface.S3API, bucket string, key string, uploadID string, part int64) {
    buf := bytes.NewBuffer(nil)
    
    // here loads the entire body to memory
    if _, err := io.Copy(buf, r.Body); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    payload := buf.Bytes()

    input := &s3.UploadPartInput{
        Bucket:     aws.String(bucket),
        Key:        aws.String(key),
        UploadId:   aws.String(uploadID),
        PartNumber: aws.Int64(part),
        Body:       aws.ReadSeekCloser(bytes.NewReader(payload)),
    }

    output, err := s3api.UploadPart(input)

    // and so on...
}

Question:

Is there a way to stream an incoming request of an UploadPart to S3? (by stream I mean not to store the entire body in memory).


Solution

  • Finally I got a way to reverse-proxy an incoming UploadPart with stream processing by building a request using the AWS SDK and signing it with unsigned payload.

    Here a basic example:

    type AwsService struct {
        Region   string
        S3Client s3iface.S3API
        Signer   *v4.Signer
    }
    
    func NewAwsService(region string, accessKey string, secretKey string, sessionToken string) (*AwsService, error) {
        creds := credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
        awsConfig := aws.NewConfig().
            WithRegion(region).
            WithCredentials(creds).
            WithCredentialsChainVerboseErrors(true)
        sess, err := session.NewSession(awsConfig)
        if err != nil {
            return nil, err
        }
        svc := s3.New(sess)
    
        signer := v4.NewSigner(creds)
        v4.WithUnsignedPayload(signer)
    
        return &AwsService{
            Region:   region,
            S3Client: svc,
            Signer:   signer,
        }, nil
    }
    
    func (s *AwsService) UploadPart(bucket string, key string, part int, uploadID string, payloadReader io.Reader, contentLength int64) (string, error) {
    
        input := &s3.UploadPartInput{
            Bucket:        aws.String(bucket),
            Key:           aws.String(key),
            UploadId:      aws.String(uploadID),
            PartNumber:    aws.Int64(int64(part)),
            ContentLength: aws.Int64(contentLength),
            Body:          aws.ReadSeekCloser(payloadReader),
        }
    
        req, output := s.S3Client.UploadPartRequest(input)
    
        _, err := s.Signer.Sign(req.HTTPRequest, req.Body, s3.ServiceName, s.Region, time.Now())
        err = req.Send()
        if err != nil {
            return "", err
        }
    
        return *output.ETag, nil
    }
    

    Then, it can be invoked from a handler:

    func HandleUploadPart(w http.ResponseWriter, r *http.Request) {
    
        query := r.URL.Query()
        region := query.Get("region")
        bucket := query.Get("bucket")
        key := query.Get("key")
        part, err := strconv.Atoi(query.Get("part"))
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        uploadID := query.Get("upload-id")
        payloadReader := r.Body
    
        contentLength, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    
        etag, err := awsService.UploadPart(region, bucket, key, part, uploadID, payloadReader, contentLength)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    
        w.Header().Set("ETag", etag)
    }
    

    Downsides:

    • The client must know the content length in advance and send it.
    • The payload cannot be signed.