I am using the aws-sdk-js-v3 library to stream multiple data sets to the same object in s3 using the same stream.
import dotenv from 'dotenv'
import {S3} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage';
import {PassThrough} from 'stream';
import {randomBytes} from 'crypto'
import { env } from 'process';
export async function finishS3Stream(upload,stream){
stream.end();
await upload.done();
}
export async function writeToStream(stream, data){
// If the stream returns false it represents that the amount of data in the stream has passed the
// highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
return new Promise((resolve) => {
if (!stream.write(data)) {
console.log("drain needed")
stream.once('drain', resolve);
} else {
resolve();
}
});
}
export function createS3Stream(key,bucket) {
const client = new S3()
const stream = new PassThrough();
stream.on('error', (e) => console.log(e))
const upload = new Upload({
params: {
Bucket: bucket,
Key: key,
Body: stream,
},
client,
});
return {
stream,
upload,
};
}
async function main(){
dotenv.config()
const bucket = env.BUCKET
const key = env.KEY
console.log("creating stream")
const {stream,upload} = createS3Stream(key,bucket)
const data1 = randomBytes(5242880)
console.log('writing data1 to stream')
await writeToStream(stream,data1)
const data2 = randomBytes(5242880)
console.log('writing data2 to stream')
await writeToStream(stream,data2)
console.log('closing stream')
await finishS3Stream(upload,stream)
}
main()
For some reason, though the main function doesn't await for the stream to be drained once it hits the hightwatermark and the application exits with an exit code of 0.
Output
creating stream
writing data1 to stream
drain needed
How can I get the application to wait for the stream to be drained and why is it not waiting for the promise to be resolved and why is it exiting before writing the next dataset?
For those who are struggling to use the aws documentation to write multiple files or multiple datasets to the same stream here is the solution. You need to call upload asynchronously to make the s3Client start reading the stream. Then when you close it you you need to await for the upload function to resolve.
export async function finishS3Stream(upload,stream){
stream.end();
// wait for the upload promise to resolve
await upload;
}
export async function writeToStream(stream, data){
// If the stream returns false it represents that the amount of data in the stream has passed the
// highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
return new Promise((resolve) => {
if (!stream.write(data)) {
console.log("drain needed")
stream.once('drain', resolve);
} else {
resolve();
}
});
}
export function createS3Stream(key,bucket) {
const client = new S3()
const stream = new PassThrough();
stream.on('error', (e) => console.log(e))
// Call done to start reading the string and return a Promise.
const upload = new Upload({
params: {
Bucket: bucket,
Key: key,
Body: stream,
},
client,
}).done();
return {
stream,
upload,
};
}
async function main(){
dotenv.config()
const bucket = env.BUCKET
const key = env.KEY
console.log("creating stream")
const {stream,upload} = createS3Stream(key,bucket)
const data1 = randomBytes(5242880)
console.log('writing data1 to stream')
await writeToStream(stream,data1)
const data2 = randomBytes(5242880)
console.log('writing data2 to stream')
await writeToStream(stream,data2)
console.log('closing stream')
await finishS3Stream(upload,stream)
}
main()