pussy-ts/src/services/QueueManager/__tests__/QueueManager.test.ts

import QueueManager from '../QueueManager';
import { BehaviorSubject, of } from 'rxjs';
import { CybIpfsNode } from '../../ipfs/ipfs';
import { QueueStrategy } from '../QueueStrategy';
import { IDeferredDbSaver } from '../types';
import { valuesExpected } from 'src/utils/test-utils/test-utils';
import { fetchIpfsContent } from 'src/services/ipfs/utils/utils-ipfs';

// const mockTimeout = () => (source) => {
//   console.log('TIMEOUT');
//   return source;
// };

// Replace the real timeout with the mock
// rxjsOperators.timeout = mockTimeout;

// jest.mock('rxjs', () => {
//   const originalOperators = jest.requireActual('rxjs');
//   return {
//     ...originalOperators,
//     debounceTime: jest.fn().mockImplementation(() => (source$) => source$),
//   };
// });

jest.mock('../../backend/services/DeferredDbSaver/DeferredDbSaver'); // adjust the path as needed
jest.mock('../../ipfs/utils/ipfsCacheDb');

jest.mock('src/services/ipfs/utils/cluster.ts', () => ({ add: jest.fn() }));
jest.mock('src/services/backend/channels/BroadcastChannelSender');

jest.mock('src/services/ipfs/utils/utils-ipfs.ts', () => ({
  ipfsCacheDb: jest.fn(),
  cyberCluster: jest.fn(),
  fetchIpfsContent: jest.fn(),
  addContenToIpfs: jest.fn(),
}));

const TIMEOUT_MS = 300;

const queueStrategy = new QueueStrategy(
  {
    db: { timeout: TIMEOUT_MS, maxConcurrentExecutions: 2 },
    node: { timeout: TIMEOUT_MS, maxConcurrentExecutions: 2 },
    gateway: { timeout: TIMEOUT_MS, maxConcurrentExecutions: 2 },
  },
  ['db', 'node', 'gateway']
);

const cid1 = 'cid1';
const cid2 = 'cid2';
const cid3 = 'cid3';
const cid4 = 'cid4';

const nextTick = () => {
  return new Promise((resolve) => {
    setTimeout(resolve, 0);
  });
};

const onNextTick = (callback) => {
  setTimeout(() => {
    callback();
  }, 0);
};
function wrapPromiseWithSignal(
  promise: Promise<string>,
  signal?: AbortSignal
): Promise<string> {
  return new Promise((resolve, reject) => {
    promise.then((result) => {
      resolve(result);
    });
    console.log('------sssss', signal?.aborted);
    signal?.addEventListener('abort', (e) => {
      // @ts-ignore
      console.log('------abort', e, e?.target, e?.target?.reason);

      if (e?.target?.reason !== 'timeout') {
        reject(new DOMException('canceled', 'AbortError'));
      }
    });
  });
}

const getPromise = (
  result = 'result',
  timeout = 500,
  signal?: AbortSignal
): Promise<string> =>
  wrapPromiseWithSignal(
    new Promise<string>((resolve) => {
      setTimeout(() => resolve(`result ${result}`), timeout);
    }),
    signal
  );

const mockNode: jest.Mocked<CybIpfsNode> = {
  nodeType: 'helia',
  reconnectToSwarm: jest.fn(),
};
// jest.useFakeTimers();

describe('QueueManager without timers', () => {
  let queueManager: QueueManager;

  beforeEach(() => {
    // setup QueueManager instance before each test
    const deferredDbSaverMock: IDeferredDbSaver = {
      enqueueIpfsContent: jest.fn(),
      enqueueLinks: jest.fn(),
    };
    queueManager = new QueueManager(of(mockNode), {
      strategy: queueStrategy,
      queueDebounceMs: 1,
      defferedDbSaver: deferredDbSaverMock,
    });
  });

  test('should instantiate without errors', () => {
    expect(queueManager).toBeInstanceOf(QueueManager);
  });

  test('should execute enqueue as promise and get result', async () => {
    (fetchIpfsContent as jest.Mock).mockResolvedValue('done!');
    const result = await queueManager.enqueueAndWait('xxx');
    expect(result?.result).toBe('done!');
  });

  test('should execute enqueue as promise and get undefined', async () => {
    (fetchIpfsContent as jest.Mock).mockRejectedValue(undefined);
    const result = await queueManager.enqueueAndWait('xxx');
    expect(result?.status).toBe('not_found');
  });

  test('should keep in pending items thats is out of maxConcurrentExecutions', () => {
    (fetchIpfsContent as jest.Mock).mockResolvedValue('good-result');

    queueManager.enqueue('1', jest.fn);
    queueManager.enqueue('2', jest.fn);
    queueManager.enqueue('3', jest.fn);
    const itemList = queueManager.getQueueList();
    onNextTick(() => {
      expect(itemList[0].status).toEqual('executing');
      expect(itemList[1].status).toEqual('executing');
      expect(itemList[2].status).toEqual('pending');
    });
  });

  test('should cancel queue items', (done) => {
    const statuses = valuesExpected(['pending', 'executing', 'cancelled']);
    (fetchIpfsContent as jest.Mock).mockImplementation(
      (cid: string, source: string, { controller }) =>
        getPromise('result', 1000, controller.signal)
    );

    queueManager.enqueue(cid1, (cid, status) => {
      expect(cid).toBe(cid1);
      expect(status).toBe(statuses.next().value);

      if (status === 'cancelled') {
        expect(queueManager.getQueueList()[0]?.controller?.signal.aborted).toBe(
          true
        );

        expect(queueManager.getQueueList().length).toEqual(0);
      }
    });
    queueManager.cancel(cid1);

    onNextTick(() => {
      done();
    });
  });

  test('should handle timeout and switch to next source', (done) => {
    const statuses = valuesExpected([
      'pending', // db
      'executing', // db
      'timeout', // db
      'pending', // node
      'executing', // node
      'timeout', // node
      'pending', // gateway
      'executing', // gateway
      'timeout', // gateway
      'not_found',
    ]);

    (fetchIpfsContent as jest.Mock).mockImplementation(
      (cid: string, source: string, { controller }) =>
        getPromise('result', 50000, controller?.signal)
    );
    queueManager.enqueue(cid1, (cid, status, source) => {
      const expectedStatus = statuses.next().value;
      expect(cid).toBe(cid1);
      expect(status).toBe(expectedStatus);
      if (expectedStatus === 'timeout' && source === 'gateway') {
        done();
      }
    });
  });

  test('should enqueue item, try to resolve all sources and return not_found', (done) => {
    (fetchIpfsContent as jest.Mock).mockResolvedValue(undefined);
    const responsesExpected = valuesExpected([
      ['pending', 'db'],
      ['executing', 'db'],
      ['error', 'db'],
      ['pending', 'node'],
      ['executing', 'node'],
      ['error', 'node'],
      ['pending', 'gateway'],
      ['executing', 'gateway'],
      ['error', 'gateway'],
      ['not_found', 'gateway'],
    ]);

    queueManager.enqueue(cid1, (cid, status, source) => {
      expect(cid).toBe(cid1);

      const [statusExpected, sourceExpected] = responsesExpected.next().value;
      expect(status).toBe(statusExpected);
      expect(source).toBe(sourceExpected);
      // jest.runOnlyPendingTimers();
      if (statusExpected === 'not_found') {
        done();
      }
    });
  });

  it('should execute queue items in order by priority', (done) => {
    (fetchIpfsContent as jest.Mock).mockImplementation(() =>
      getPromise('good-result', 100)
    );
    queueManager.enqueue(cid1, jest.fn);
    queueManager.enqueue(cid2, jest.fn);

    const executingByPriority: string[] = [];
    queueManager.enqueue(cid3, (cid) => executingByPriority.push(cid), {
      priority: 0.5,
    });

    queueManager.enqueue(cid4, (cid) => executingByPriority.push(cid), {
      priority: 0.9,
    });

    onNextTick(() => {
      const queue = queueManager.getQueueMap();
      expect(queue.size).toBe(4);
      expect(queue.get(cid4)!.status).toBe('executing');
      expect(queue.get(cid3)!.status).toBe('executing');
      expect(queue.get(cid2)!.status).toBe('pending');
      expect(queue.get(cid1)!.status).toBe('pending');
      done();
    });
  });

  // it('should execute queue items in order by viewportPriority in real-time', (done) => {
  //   try {
  //     (fetchIpfsContent as jest.Mock).mockImplementation(() =>
  //       getPromise('x', 100)
  //     );
  //     queueManager.enqueue('1', jest.fn);
  //     queueManager.enqueue('2', jest.fn);

  //     const executingByPriority: string[] = [];
  //     const setExecutingByPriority = (cid: string, status: QueueItemStatus) =>
  //       status === 'executing' && executingByPriority.push(cid);

  //     queueManager.enqueue('3', setExecutingByPriority, {
  //       priority: 0.5,
  //       viewPortPriority: 0.1,
  //     });

  //     queueManager.enqueue('4', setExecutingByPriority, {
  //       priority: 0.6,
  //       viewPortPriority: 0.1,
  //     });

  //     waitUtilQueueDebounce(() => {
  //       const queue = queueManager.getQueueMap();
  //       // const queue = queueManager.getQueueList();
  //       expect(queue.size).toBe(4);
  //       expect(queue.get('4').status).toBe('executing');
  //       expect(queue.get('3').status).toBe('executing');
  //       expect(queue.get('2').status).toBe('pending');
  //       expect(queue.get('1').status).toBe('pending');

  //       // Update priorities by viewport
  //       queueManager.updateViewPortPriority('3', 0.5);
  //       queueManager.updateViewPortPriority('4', 0);
  //     });

  //     setTimeout(() => {
  //       const queue = queueManager.getQueueMap();
  //       expect(queue.size).toBe(4);

  //       expect(executingByPriority[1]).toBe('3');

  //       expect(executingByPriority[0]).toBe('4');
  //       done();
  //     }, QUEUE_DEBOUNCE_MS + 1);
  //   } finally {
  //     (fetchIpfsContent as jest.Mock).mockClear();
  //   }
  // });

  test('should execute queue items and deprioritize based on viewPortPriority', (done) => {
    (fetchIpfsContent as jest.Mock).mockImplementation(
      (cid: string, source: string, { controller }) =>
        getPromise('result', 5000, controller?.signal)
    );
    [cid1, cid2, cid3].map((cid) =>
      queueManager.enqueue(cid, jest.fn, { initialSource: 'node' })
    );

    onNextTick(() => {
      const queue = queueManager.getQueueList();
      console.log('---qm 1', queueManager.getQueueMap());

      expect(queue[0].status).toBe('executing');
      expect(queue[1].status).toBe('executing');
      expect(queue[2].status).toBe('pending');

      // Update priorities by viewport
      queueManager.updateViewPortPriority(cid1, -1);
      console.log('---qm 11', queueManager.getQueueMap());
      queueManager.enqueue(cid4, jest.fn, { initialSource: 'node' });
      console.log('---qm 2', queueManager.getQueueMap());

      onNextTick(() => {
        console.log('---qm 3', queueManager.getQueueMap());
        const queueMap = queueManager.getQueueMap();
        expect(queueMap.get(cid1).status).toBe('pending');
        expect(queueMap.get(cid2).status).toBe('executing');
        expect(queueMap.get(cid3).status).toBe('executing');
        expect(queueMap.get(cid4).status).toBe('pending');
        done();
      });
    });
  });
});

Synonyms

pussy-ts/src/services/QueueManager/QueueManager.test.ts
cyb/src/services/QueueManager/QueueManager.test.ts
cyb/src/services/QueueManager/__tests__/QueueManager.test.ts

Neighbours