import { ProxyMarked, Remote, proxy } from 'comlink';

import { initIpfsNode } from 'src/services/ipfs/node/factory';

import {
  CybIpfsNode,
  IpfsContentType,
  IpfsOptsType,
} from 'src/services/ipfs/ipfs';

import QueueManager from 'src/services/QueueManager/QueueManager';

// import { CozoDbWorkerApi } from 'src/services/backend/workers/db/worker';

import {
  QueueItemCallback,
  QueueItemOptions,
  QueuePriority,
} from 'src/services/QueueManager/types';
import { ParticleCid } from 'src/types/base';
import { LinkDto } from 'src/services/CozoDb/types/dto';
import { BehaviorSubject, Subject } from 'rxjs';

import { exposeWorkerApi } from '../factoryMethods';

import { SyncService } from '../../services/sync/sync';
import { SyncServiceParams } from '../../services/sync/types';

import DbApi from '../../services/dataSource/indexedDb/dbApiWrapper';

import BroadcastChannelSender from '../../channels/BroadcastChannelSender';
import DeferredDbSaver from '../../services/DeferredDbSaver/DeferredDbSaver';
import { SyncEntryName } from '../../types/services';

const createBackgroundWorkerApi = () => {
  const dbInstance$ = new Subject<DbApi | undefined>();

  const ipfsInstance$ = new BehaviorSubject<CybIpfsNode | undefined>(undefined);

  const params$ = new BehaviorSubject<SyncServiceParams>({
    myAddress: null,
  });

  let ipfsNode: CybIpfsNode | undefined;
  const defferedDbSaver = new DeferredDbSaver(dbInstance$);

  const ipfsQueue = new QueueManager(ipfsInstance$, {
    defferedDbSaver,
  });
  const broadcastApi = new BroadcastChannelSender();

  // service to sync updates about cyberlinks, transactions, swarm etc.
  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  const syncService = new SyncService({
    waitForParticleResolve: async (
      cid: ParticleCid,
      priority: QueuePriority = QueuePriority.MEDIUM
    ) => ipfsQueue.enqueueAndWait(cid, { postProcessing: true, priority }),
    dbInstance$,
    ipfsInstance$,
    params$,
  });

  const init = async (dbApiProxy: DbApi & ProxyMarked) => {
    dbInstance$.next(dbApiProxy);
  };

  const stopIpfs = async () => {
    if (ipfsNode) {
      await ipfsNode.stop();
    }
    ipfsInstance$.next(undefined);
    broadcastApi.postServiceStatus('ipfs', 'inactive');
  };

  const startIpfs = async (ipfsOpts: IpfsOptsType) => {
    try {
      if (ipfsNode) {
        console.log('Ipfs node already started!');
        await ipfsNode.stop();
      }
      broadcastApi.postServiceStatus('ipfs', 'starting');
      ipfsNode = await initIpfsNode(ipfsOpts);

      ipfsInstance$.next(ipfsNode);

      setTimeout(() => broadcastApi.postServiceStatus('ipfs', 'started'), 0);
      return true;
    } catch (err) {
      console.log('----ipfs node init error ', err);
      const msg = err instanceof Error ? err.message : (err as string);
      broadcastApi.postServiceStatus('ipfs', 'error', msg);
      throw Error(msg);
    }
  };

  const defferedDbApi = {
    importCyberlinks: (links: LinkDto[]) => {
      defferedDbSaver.enqueueLinks(links);
    },
  };

  const ipfsApi = {
    start: startIpfs,
    stop: stopIpfs,
    getIpfsNode: async () => ipfsNode && proxy(ipfsNode),
    config: async () => ipfsNode?.config,
    info: async () => ipfsNode?.info(),
    fetchWithDetails: async (cid: string, parseAs?: IpfsContentType) =>
      ipfsNode?.fetchWithDetails(cid, parseAs),
    enqueue: async (
      cid: string,
      callback: QueueItemCallback,
      options: QueueItemOptions
    ) => ipfsQueue!.enqueue(cid, callback, options),
    enqueueAndWait: async (cid: string, options?: QueueItemOptions) =>
      ipfsQueue!.enqueueAndWait(cid, options),
    dequeue: async (cid: string) => ipfsQueue.cancel(cid),
    dequeueByParent: async (parent: string) => ipfsQueue.cancelByParent(parent),
    clearQueue: async () => ipfsQueue.clear(),
    addContent: async (content: string | File) => ipfsNode?.addContent(content),
  };

  return {
    init,
    isInitialized: () => !!ipfsInstance$.value,
    // syncDrive,
    ipfsApi: proxy(ipfsApi),
    defferedDbApi: proxy(defferedDbApi),
    ipfsQueue: proxy(ipfsQueue),
    restartSync: (name: SyncEntryName) => syncService.restart(name),
    setParams: (params: Partial<SyncServiceParams>) =>
      params$.next({ ...params$.value, ...params }),
  };
};

const backgroundWorker = createBackgroundWorkerApi();
export type IpfsApi = Remote<typeof backgroundWorker.ipfsApi>;
export type BackgroundWorker = typeof backgroundWorker;

// Expose the API to the main thread as shared/regular worker
exposeWorkerApi(self, backgroundWorker);

Synonyms

cyb/src/services/backend/workers/db/worker.ts
pussy-ts/src/services/backend/workers/db/worker.ts
cyb/src/services/backend/workers/background/worker.ts

Neighbours