Search code examples
javascriptnode.jsgoogle-cloud-platformgoogle-cloud-storage

Problem saving multiple files to cloud storage


My problem appears only sometimes(what is really strange) while saving multiple files simultaneously to cloud storage. I am using google-cloud/storage library version 6.9.5. I was able to find out people having the same issue however none of their solutions worked for me(neither changing stream options nor applying timeout). Firstly I thought that bandwidth could be exceeded, however I am almost sure we are not saving over 50Gb of data(I tried to check it using cloud charts; however you can check bandwidth usage only if other services are using cloud storage - in my case i get "No data available").

For further explanation: In my code there is a function called "uploadFileToGCS" which is invoked around 57 times per each process. Process includes saving 35jpg(20KB per one) images, 1 tiff image(500KB) and 7 json(150KB per one) file. Simoultaneously 30 processes are invoked.

Here is exactly what error says(different files throw this error, sometimes jpg, sometimes json - there is no rule):

error - unhandledRejection: FetchError: request to https://storage.googleapis.com/upload/storage/v1/b/satellite-photos/o?uploadType=multipart&name=64d162a80103b55f87a6cdb4_64db59204f7cd1f34a65e5c7_2023_6_31_raw.tiff failed, reason: read ECONNRESET
    at ClientRequest.<anonymous> (D:\WORK\nirby-project\node_modules\next\dist\compiled\node-fetch\index.js:1:65756)
    at ClientRequest.emit (node:events:525:35)
    at TLSSocket.socketErrorListener (node:_http_client:496:9)
    at TLSSocket.emit (node:events:513:28)
    at emitErrorNT (node:internal/streams/destroy:151:8)
    at emitErrorCloseNT (node:internal/streams/destroy:116:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  type: 'system',
  errno: 'ECONNRESET',
  code: 'ECONNRESET'
}

function saving 5 images(which is invoked 7 times in the same time):

saveLayers.ts:

import { saveSinglePhoto } from "~/helpers/sentinel/saveSinglePhoto";

interface INdviData {
   min: number;
   max: number;
   averageNDVI: number;
   buffer: Buffer;
}

const saveLayers = async (
   fileNameBase: string,
   trueColorData: Buffer,
   ndviData: INdviData,
   contrastNdviData: Buffer,
   sclData: Buffer,
   clmData: Buffer
) => {
   console.log(fileNameBase);

   const data = await Promise.all([
       saveSinglePhoto(trueColorData, fileNameBase, "TRUECOLOR"),
       saveSinglePhoto(ndviData.buffer, fileNameBase, "NDVI"),
       saveSinglePhoto(contrastNdviData, fileNameBase, "CONTRAST_NDVI"),
       saveSinglePhoto(sclData, fileNameBase, "SCL"),
       saveSinglePhoto(clmData, fileNameBase, "CLM"),
   ]);

   return data;
};

export default saveLayers;

saveSinglePhoto.ts

import { SatelliteData } from "~/interfaces/sentinel/SatelliteData";
import uploadFileToGCS from "../gcs/uploadFileToGCS";

const baseUrl = `https://storage.googleapis.com/${process.env.GCLOUD_STORAGE_BUCKET}/`;

export const saveSinglePhoto = async (
    buffer: Buffer,
    fileNameBase: string,
    layerId: string
): Promise<SatelliteData | null> => {
    const fileName = fileNameBase + layerId + ".jpg";

    await uploadFileToGCS(fileName, buffer, "image/jpeg");

    const satelliteData: SatelliteData = {
        layerId: layerId,
        fileUrl: baseUrl + fileName,
    };

    return satelliteData;
};

uploadFileToGCS.ts

import { Storage } from "@google-cloud/storage";

const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());

const storage = new Storage({
    credentials: {
        client_email: gcsKey.client_email,
        private_key: gcsKey.private_key,
    },
    projectId: process.env.GCLOUD_PROJECT_ID,
});

const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
    return new Promise((resolve, reject) => {
        const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);

        const stream = file.createWriteStream({
            metadata: {
                contentType,
            },
            resumable: false,
            validation: false,
            timeout: 86400,
        });

        stream.on("error", (err) => {
            reject(err);
        });

        stream.on("finish", () => {
            resolve("ok");
        });

        stream.end(data);
    });
};

export default uploadFileToGCS;

=======EDIT========

Cloud tasks seems to be a solution for my problem - however my server stucks while invoking createTask - I get no response, everything is freezed

import { v2beta3 } from "@google-cloud/tasks";
import { google } from "@google-cloud/tasks/build/protos/protos";

const client = new v2beta3.CloudTasksClient();

const project = process.env.GCLOUD_PROJECT_ID;
const location = process.env.GCLOUD_QUEUE_LOCATION;
const queue = process.env.GCLOUD_QUEUE_NAME;

const parent = client.queuePath(project, location, queue);

const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());
const email = gcsKey.client_email;

const createUploadTask = async (filename: string, bucket: string, contentType: string, data: any) => {
    const payload = {
        filename,
        bucket,
        contentType,
        data,
    };

    const url = `${process.env.GCLOUD_FUNCTIONS_URL}/uploadToCS`;

    const task = {
        httpRequest: {
            httpMethod: google.cloud.tasks.v2beta3.HttpMethod.POST,
            url,
            oidcToken: {
                serviceAccountEmail: email,
                audience: url,
            },
            headers: {
                "Content-Type": "application/json",
            },
            body: Buffer.from(JSON.stringify(payload)).toString("base64"),
        },
    };

    const [response] = await client.createTask({ parent, task });
    const name = response.name ?? "name not exists";

    return name;
};

export default createUploadTask;


Solution

  • I've decided to change my code to have an array of SatelliteFiles(file name + buffer + content type) and use p-limit library. Now I am uploading all files only in this one place in code.

    This new approach allowed me to achieve similar results to this:

    Number of files to upload:  186
    All files uploaded to GCS:  207.94119999930263  ms
    

    Here is my code for other people dealing with this problem:

    const downloadSatelliteFiles = async (files: SatelliteFile[]) => {
        const limit = pLimit(5);
        const promises: Promise<void>[] = [];
    
        files.forEach((file) => {
            promises.push(
                limit(() => {
                    uploadFileToGCS(file.fileName, file.buffer, file.contentType);
                })
            );
        });
    
        await Promise.all(promises);
    
        return;
    };
    
    const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
        return new Promise(async (resolve, reject) => {
            const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);
    
            const stream = file.createWriteStream({
                metadata: {
                    contentType,
                    cacheControl: "no-cache",
                },
                resumable: false,
            });
            stream.on("error", (err) => {
                console.log("UPLOAD_ERROR");
                console.log(err);
            });
            stream.on("finish", () => {
                resolve("ok");
            });
            stream.end(data);
        });
    };