cyb/src/services/backend/workers/factoryMethods.ts

import { expose, proxy, Remote, releaseProxy, transferHandlers, wrap } from 'comlink';
import { Observable, Observer, Subscribable, Subscription } from 'rxjs'; // v7.8.0
import { IPFSContentTransferHandler } from './serializers';

type WorkerType = SharedWorker | Worker;

const isSharedWorkersSupported = typeof SharedWorker !== 'undefined';

const isSharedWorkerUsed =
  isSharedWorkersSupported && !process.env.IS_DEV && !process.env.IS_TAURI;

// apply serializers for custom types
export function installTransferHandlers() {
  transferHandlers.set('IPFSContent', IPFSContentTransferHandler);
  transferHandlers.set('observable', {
    canHandle: (value: unknown): value is Observable<unknown> => {
      return value instanceof Observable;
    },
    deserialize: (value: MessagePort) => {
      return new Observable<unknown>((observer) => {
        const remote = transferHandlers.get('proxy')!.deserialize(value) as Remote<
          Subscribable<unknown>
        >;

        remote
          .subscribe(
            proxy({
              next: (next: unknown) => observer.next(next),
              error: (error: unknown) => observer.error(error),
              complete: () => observer.complete(),
            })
          )
          .then((subscription) =>
            observer.add(() => {
              subscription.unsubscribe();
              remote[releaseProxy]();
            })
          );
      });
    },
    serialize: (value: Observable<unknown>) => {
      return transferHandlers.get('proxy')!.serialize({
        subscribe: (observer: Remote<Observer<unknown>>) =>
          value.subscribe({
            next: (next: unknown) => observer.next(next).then(),
            error: (error: unknown) => observer.error(typeof error === 'string' ? error : String(error)).then(),
            complete: () => observer.complete().then(),
          }),
      });
    },
  });

  transferHandlers.set('subscription', {
    canHandle: (value: unknown): value is Subscription => {
      return value instanceof Subscription;
    },
    deserialize: (value: MessagePort) => {
      return new Subscription(() => {
        const remote = transferHandlers.get('proxy')!.deserialize(value) as Remote<Subscription>;

        remote.unsubscribe().then(() => {
          remote[releaseProxy]();
        });
      });
    },
    serialize: (value: Subscription) => {
      return transferHandlers.get('proxy')!.serialize({
        unsubscribe: () => value.unsubscribe(),
      });
    },
  });
}

function safeStringify(obj: any): string {
  try {
    return JSON.stringify(obj);
  } catch (_error) {
    return String(obj);
  }
}

// Override console.log to send logs to main thread
function overrideLogging(worker: Worker | MessagePort) {
  const consoleLogMap = {
    log: { original: console.log },
    error: { original: console.error },
    warn: { original: console.warn },
  };
  const replaceConsoleLog = (method: keyof typeof consoleLogMap) => {
    const { original } = consoleLogMap[method];

    consoleLogMap[method].original = console[method];

    console[method] = (...args) => {
      original.apply(console, args);
      const serializableArgs = args.map((arg) => safeStringify(arg));

      worker.postMessage({ type: 'console', method, args: serializableArgs });
    };
  };

  Object.keys(consoleLogMap).forEach((method) =>
    replaceConsoleLog(method as keyof typeof consoleLogMap)
  );
}

// Install handlers for logging from worker
function installLoggingHandler(worker: Worker | MessagePort, name: string) {
  // Add event listener
  worker.addEventListener('message', (event) => {
    if (event.data.type === 'console') {
      const { method, args } = event.data;

      console[method](name, ...args);
    }
  });
}

// Create Shared Worker with fallback to usual Worker(in case of DEV too)
export function createWorkerApi<T>(
  workerUrl: URL,
  workerName: string
): { worker: WorkerType; workerApiProxy: Remote<T> } {
  installTransferHandlers();
  // && !process.env.IS_DEV
  if (isSharedWorkerUsed) {
    const worker = new SharedWorker(workerUrl, { name: workerName });
    installLoggingHandler(worker.port, workerName);
    return { worker, workerApiProxy: wrap<T>(worker.port) };
  }

  const worker = new Worker(workerUrl);
  // installLoggingHandler(worker, workerName);
  return { worker, workerApiProxy: wrap<T>(worker) };
}


// Wrap already-created worker with comlink proxy (Worker must be created inline for rspack bundling)
export function createWorkerFromInstance<T>(
  worker: Worker,
  workerName: string
): { worker: WorkerType; workerApiProxy: Remote<T> } {
  installTransferHandlers();
  // installLoggingHandler(worker, workerName);
  return { worker, workerApiProxy: wrap<T>(worker) };
}

export function exposeWorkerApi<T>(worker: WorkerType, api: T) {
  installTransferHandlers();
  if (typeof worker.onconnect !== 'undefined') {
    worker.onconnect = (e) => {
      const port = e.ports[0];
      overrideLogging(port);

      expose(api, port);
    };
  } else {
    // Don't use overrideLogging for regular workers - it calls self.postMessage()
    // which interferes with comlink's message handling. Use BroadcastChannel instead.
    expose(api);
  }
}

Synonyms

pussy-ts/src/services/backend/workers/factoryMethods.ts

Neighbours