My problem appears only sometimes(what is really strange) while saving multiple files simultaneously to cloud storage. I am using google-cloud/storage library version 6.9.5. I was able to find out people having the same issue however none of their solutions worked for me(neither changing stream options nor applying timeout). Firstly I thought that bandwidth could be exceeded, however I am almost sure we are not saving over 50Gb of data(I tried to check it using cloud charts; however you can check bandwidth usage only if other services are using cloud storage - in my case i get "No data available").
For further explanation: In my code there is a function called "uploadFileToGCS" which is invoked around 57 times per each process. Process includes saving 35jpg(20KB per one) images, 1 tiff image(500KB) and 7 json(150KB per one) file. Simoultaneously 30 processes are invoked.
Here is exactly what error says(different files throw this error, sometimes jpg, sometimes json - there is no rule):
error - unhandledRejection: FetchError: request to https://storage.googleapis.com/upload/storage/v1/b/satellite-photos/o?uploadType=multipart&name=64d162a80103b55f87a6cdb4_64db59204f7cd1f34a65e5c7_2023_6_31_raw.tiff failed, reason: read ECONNRESET
at ClientRequest.<anonymous> (D:\WORK\nirby-project\node_modules\next\dist\compiled\node-fetch\index.js:1:65756)
at ClientRequest.emit (node:events:525:35)
at TLSSocket.socketErrorListener (node:_http_client:496:9)
at TLSSocket.emit (node:events:513:28)
at emitErrorNT (node:internal/streams/destroy:151:8)
at emitErrorCloseNT (node:internal/streams/destroy:116:3)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
type: 'system',
errno: 'ECONNRESET',
code: 'ECONNRESET'
}
function saving 5 images(which is invoked 7 times in the same time):
saveLayers.ts:
import { saveSinglePhoto } from "~/helpers/sentinel/saveSinglePhoto";
interface INdviData {
min: number;
max: number;
averageNDVI: number;
buffer: Buffer;
}
const saveLayers = async (
fileNameBase: string,
trueColorData: Buffer,
ndviData: INdviData,
contrastNdviData: Buffer,
sclData: Buffer,
clmData: Buffer
) => {
console.log(fileNameBase);
const data = await Promise.all([
saveSinglePhoto(trueColorData, fileNameBase, "TRUECOLOR"),
saveSinglePhoto(ndviData.buffer, fileNameBase, "NDVI"),
saveSinglePhoto(contrastNdviData, fileNameBase, "CONTRAST_NDVI"),
saveSinglePhoto(sclData, fileNameBase, "SCL"),
saveSinglePhoto(clmData, fileNameBase, "CLM"),
]);
return data;
};
export default saveLayers;
saveSinglePhoto.ts
import { SatelliteData } from "~/interfaces/sentinel/SatelliteData";
import uploadFileToGCS from "../gcs/uploadFileToGCS";
const baseUrl = `https://storage.googleapis.com/${process.env.GCLOUD_STORAGE_BUCKET}/`;
export const saveSinglePhoto = async (
buffer: Buffer,
fileNameBase: string,
layerId: string
): Promise<SatelliteData | null> => {
const fileName = fileNameBase + layerId + ".jpg";
await uploadFileToGCS(fileName, buffer, "image/jpeg");
const satelliteData: SatelliteData = {
layerId: layerId,
fileUrl: baseUrl + fileName,
};
return satelliteData;
};
uploadFileToGCS.ts
import { Storage } from "@google-cloud/storage";
const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());
const storage = new Storage({
credentials: {
client_email: gcsKey.client_email,
private_key: gcsKey.private_key,
},
projectId: process.env.GCLOUD_PROJECT_ID,
});
const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
return new Promise((resolve, reject) => {
const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);
const stream = file.createWriteStream({
metadata: {
contentType,
},
resumable: false,
validation: false,
timeout: 86400,
});
stream.on("error", (err) => {
reject(err);
});
stream.on("finish", () => {
resolve("ok");
});
stream.end(data);
});
};
export default uploadFileToGCS;
=======EDIT========
Cloud tasks seems to be a solution for my problem - however my server stucks while invoking createTask - I get no response, everything is freezed
import { v2beta3 } from "@google-cloud/tasks";
import { google } from "@google-cloud/tasks/build/protos/protos";
const client = new v2beta3.CloudTasksClient();
const project = process.env.GCLOUD_PROJECT_ID;
const location = process.env.GCLOUD_QUEUE_LOCATION;
const queue = process.env.GCLOUD_QUEUE_NAME;
const parent = client.queuePath(project, location, queue);
const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());
const email = gcsKey.client_email;
const createUploadTask = async (filename: string, bucket: string, contentType: string, data: any) => {
const payload = {
filename,
bucket,
contentType,
data,
};
const url = `${process.env.GCLOUD_FUNCTIONS_URL}/uploadToCS`;
const task = {
httpRequest: {
httpMethod: google.cloud.tasks.v2beta3.HttpMethod.POST,
url,
oidcToken: {
serviceAccountEmail: email,
audience: url,
},
headers: {
"Content-Type": "application/json",
},
body: Buffer.from(JSON.stringify(payload)).toString("base64"),
},
};
const [response] = await client.createTask({ parent, task });
const name = response.name ?? "name not exists";
return name;
};
export default createUploadTask;
I've decided to change my code to have an array of SatelliteFiles(file name + buffer + content type) and use p-limit library. Now I am uploading all files only in this one place in code.
This new approach allowed me to achieve similar results to this:
Number of files to upload: 186
All files uploaded to GCS: 207.94119999930263 ms
Here is my code for other people dealing with this problem:
const downloadSatelliteFiles = async (files: SatelliteFile[]) => {
const limit = pLimit(5);
const promises: Promise<void>[] = [];
files.forEach((file) => {
promises.push(
limit(() => {
uploadFileToGCS(file.fileName, file.buffer, file.contentType);
})
);
});
await Promise.all(promises);
return;
};
const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
return new Promise(async (resolve, reject) => {
const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);
const stream = file.createWriteStream({
metadata: {
contentType,
cacheControl: "no-cache",
},
resumable: false,
});
stream.on("error", (err) => {
console.log("UPLOAD_ERROR");
console.log(err);
});
stream.on("finish", () => {
resolve("ok");
});
stream.end(data);
});
};