Search code examples
multithreadingmulticoredeno

Deno on multi-core machines


In Node.js there is the cluster module to utilize all available cores on the machine which is pretty great, especially when used with the node module pm2. But I am pretty stoked about some features of Deno but I have wondered about how to best run it on a multi-core machine.

I understand that there is workers which works great for a specific task but for normal web requests it seems like performance of multi-core machines is wasted somewhat? What is the best strategy to get maximum availability and utilization of my hardware in Deno?

I am a bit worried that if you only have a single process going on and there is some CPU intensive task for whatever reason it will "block" all other requests coming in. In node.js the cluster module would solve this, since another process would handle the request but I am unsure on how to handle this in Deno?

I think you could run several instances in Deno on different ports and then have some kind of load balancer in front of it but that seems like quite a complex setup in comparison. I also get that you could use some kind of service like Deno Deploy or whatever, but I already have hardware that I want to run it on.

What are the alternatives for me? Thanks in advance for you sage advice and better wisdom.


Solution

  • In Deno, like in a web browser, you should be able to use Web Workers to utilize 100% of a multi-core CPU.

    In a cluster you need a "manager" node (which can be a worker itself too as needed/appropriate). In a similar fashion the Web Worker API can be used to create however many dedicated workers as desired. This means the main thread should never block as it can delegate all tasks that will potentially block to its workers. Tasks that won't block (e.g. simple database or other I/O bound calls) can be done directly on the main thread like normal.

    Deno also supports navigator.hardwareConcurrency so you can query about available hardware and determine the number of desired workers accordingly. You might not need to define any limits though. Spawning a new dedicated worker from the same source as a previously spawned dedicated worker may be fast enough to do so on demand. Even so there may be value in reusing dedicated workers rather than spawning a new one for every request.

    With Transferable Objects large data sets can be made available to/from workers without copying the data. This along with messaging makes it pretty straight forward to delegate tasks while avoiding performance bottlenecks from copying large data sets.

    Depending on your use cases you might also use a library like Comlink "that removes the mental barrier of thinking about postMessage and hides the fact that you are working with workers."

    e.g.

    main.ts

    import { serve } from "https://deno.land/[email protected]/http/server.ts";
    
    import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
    
    serve(async function handler(request) {
      const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
        type: "module",
      });
    
      const handler = ComlinkRequestHandler.wrap(worker);
    
      return await handler(request);
    });
    

    worker.ts

    /// <reference no-default-lib="true"/>
    /// <reference lib="deno.worker" />
    
    import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
    
    ComlinkRequestHandler.expose(async (request) => {
      const body = await request.text();
      return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`);
    });
    

    ComlinkRequestHandler.ts

    import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";
    
    interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
      url: string;
      headers: Record<string, string>;
      hasBody: boolean;
    }
    
    interface ResponseMessage extends ResponseInit {
      headers: Record<string, string>;
      hasBody: boolean;
    }
    
    export default class ComlinkRequestHandler {
      #handler: (request: Request) => Promise<Response>;
      #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;
    
      static expose(handler: (request: Request) => Promise<Response>) {
        Comlink.expose(new ComlinkRequestHandler(handler));
      }
    
      static wrap(worker: Worker) {
        const { handleRequest, nextResponseBodyChunk } =
          Comlink.wrap<ComlinkRequestHandler>(worker);
    
        return async (request: Request): Promise<Response> => {
          const requestBodyReader = request.body?.getReader();
    
          const requestMessage: RequestMessage = {
            url: request.url,
            hasBody: requestBodyReader !== undefined,
            cache: request.cache,
            credentials: request.credentials,
            headers: Object.fromEntries(request.headers.entries()),
            integrity: request.integrity,
            keepalive: request.keepalive,
            method: request.method,
            mode: request.mode,
            redirect: request.redirect,
            referrer: request.referrer,
            referrerPolicy: request.referrerPolicy,
          };
    
          const nextRequestBodyChunk = Comlink.proxy(async () => {
            if (requestBodyReader === undefined) return undefined;
            const { value } = await requestBodyReader.read();
            return value;
          });
    
          const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
            requestMessage,
            nextRequestBodyChunk
          );
    
          const responseBodyInit: BodyInit | null = responseHasBody
            ? new ReadableStream({
                start(controller) {
                  async function push() {
                    const value = await nextResponseBodyChunk();
                    if (value === undefined) {
                      controller.close();
                      return;
                    }
                    controller.enqueue(value);
                    push();
                  }
    
                  push();
                },
              })
            : null;
    
          return new Response(responseBodyInit, responseInit);
        };
      }
    
      constructor(handler: (request: Request) => Promise<Response>) {
        this.#handler = handler;
      }
    
      async handleRequest(
        { url, hasBody, ...init }: RequestMessage,
        nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
      ): Promise<ResponseMessage> {
        const request = new Request(
          url,
          hasBody
            ? {
                ...init,
                body: new ReadableStream({
                  start(controller) {
                    async function push() {
                      const value = await nextRequestBodyChunk();
                      if (value === undefined) {
                        controller.close();
                        return;
                      }
                      controller.enqueue(value);
                      push();
                    }
    
                    push();
                  },
                }),
              }
            : init
        );
        const response = await this.#handler(request);
        this.#responseBodyReader = response.body?.getReader();
        return {
          hasBody: this.#responseBodyReader !== undefined,
          headers: Object.fromEntries(response.headers.entries()),
          status: response.status,
          statusText: response.statusText,
        };
      }
    
      async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
        if (this.#responseBodyReader === undefined) return undefined;
        const { value } = await this.#responseBodyReader.read();
        return value;
      }
    }
    

    Example usage:

    % deno run --allow-net --allow-read main.ts
    
    % curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
    Hello to http://localhost:8000/foo/bar
    
    Received:
    
    {"answer":42}
    

    There's probably a better way to do this (e.g. via Comlink.transferHandlers and registering transfer handlers for Request, Response, and/or ReadableStream) but the idea is the same and will handle even large request or response payloads as the bodies are streamed via messaging.