cyb/src/services/community/community.ts

import { Observable } from 'rxjs';
import { PATTERN_CYBER } from 'src/constants/patterns';
import { NeuronAddress, ParticleCid } from 'src/types/base';
import { getIpfsHash } from 'src/utils/ipfs/helpers';
import { createCyblogChannel } from 'src/utils/logging/cyblog';
import DbApiWrapper from '../backend/services/DbApi/DbApi';
import { FetchIpfsFunc } from '../backend/services/sync/types';
import { CommunityDto } from '../CozoDb/types/dto';
import { FetchParticleAsync, QueuePriority } from '../QueueManager/types';
import { getFollowers, getFollowsAsCid } from './lcd';

export type SyncCommunityResult = {
  action: 'reset' | 'add' | 'complete';
  items: CommunityDto[];
};

const cyblogCh = createCyblogChannel({
  thread: 'bckd',
  unit: 'fetchStoredSyncCommunity',
});

// eslint-disable-next-line import/prefer-default-export, import/no-unused-modules
export const fetchStoredSyncCommunity$ = (
  dbApi: DbApiWrapper,
  address: NeuronAddress,
  fetchParticleAsync?: FetchIpfsFunc,
  signal?: AbortSignal
): Observable<SyncCommunityResult> => {
  return new Observable<SyncCommunityResult>((subscriber) => {
    subscriber.next({ action: 'reset', items: [] });

    (async () => {
      const storedCommunity = await dbApi.getCommunity(address);

      subscriber.next({ action: 'add', items: storedCommunity });

      const communityUpdatesMap = new Map<ParticleCid, CommunityDto>(
        storedCommunity.map((c) => [c.particle, c])
      );

      const getExistingOrDefault = (cid: ParticleCid): Partial<CommunityDto> =>
        communityUpdatesMap.get(cid) || {
          ownerId: address,
          name: '',
          following: false,
          follower: false,
        };

      const followsCids = await getFollowsAsCid(address, signal);
      const followers = await getFollowers(address, signal);

      const newFollowerCids = followsCids.filter(
        (cid) => !storedCommunity.some((i) => i.particle === cid && i.following)
      );

      const newFollowingNeurons = followers.filter(
        (addr) => !storedCommunity.some((i) => i.neuron === addr && i.follower)
      );

      cyblogCh.info(
        `>>>$ sync community ${address} processing, stored ${storedCommunity.length} new followers: ${newFollowerCids.length} new following: ${newFollowingNeurons.length}`
      );

      const followersCommunity = await Promise.all(
        newFollowingNeurons.map(async (neuron) => {
          const cid = await getIpfsHash(neuron);

          const communityItem = {
            ...getExistingOrDefault(cid),
            particle: cid,
            neuron,
            follower: true,
          } as CommunityDto;

          await dbApi.putCommunity(communityItem);
          communityUpdatesMap.set(cid, communityItem);
          return communityItem;
        })
      );

      subscriber.next({ action: 'add', items: followersCommunity });

      await Promise.all(
        newFollowerCids.map(async (cid: ParticleCid) => {
          const neuron = (await fetchParticleAsync!(cid, QueuePriority.URGENT))?.result
            ?.textPreview;
          if (neuron?.match(PATTERN_CYBER)) {
            const communityItem = {
              ...getExistingOrDefault(cid),
              neuron,
              particle: cid,
              following: true,
            } as CommunityDto;

            await dbApi.putCommunity(communityItem);
            communityUpdatesMap.set(cid, communityItem);
            subscriber.next({ action: 'add', items: [communityItem] });
          }
        })
      );

      cyblogCh.info(`>>>$ sync community ${address}, done`);
      // const communityUpdates = [...communityUpdatesMap.values()];

      // if (communityUpdates.length > 0) {
      //   subscriber.next(communityUpdates);
      // }
      subscriber.next({ action: 'complete', items: [] });

      subscriber.complete();
    })().catch((err) => {
      cyblogCh.error(`>>>$ sync community ${address}, error`, { error: err });
      subscriber.error(err);
    });
  });
};

// eslint-disable-next-line import/no-unused-modules
export const fetchCommunity = async (
  address: NeuronAddress,
  fetchParticleAsync?: FetchParticleAsync,
  onResolve?: (community: CommunityDto[]) => void,
  signal?: AbortSignal
) => {
  const communityUpdatesMap = new Map<ParticleCid, CommunityDto>();

  const getExistingOrDefault = (cid: ParticleCid): Partial<CommunityDto> =>
    communityUpdatesMap.get(cid) || {
      ownerId: address,
      name: '',
      following: false,
      follower: false,
    };

  const followsCids = await getFollowsAsCid(address, signal);
  const followers = await getFollowers(address, signal);

  console.log(`>>> sync community ${address} processing without store`);

  const followsPromise = Promise.all(
    followsCids.map(async (cid) => {
      const neuron = (await fetchParticleAsync!(cid))?.result?.textPreview;
      if (neuron?.match(PATTERN_CYBER)) {
        const communityItem = {
          ...getExistingOrDefault(cid),
          neuron,
          particle: cid,
          following: true,
        } as CommunityDto;
        communityUpdatesMap.set(cid, communityItem);
        onResolve && !signal?.aborted && onResolve([communityItem]);
      }
    })
  );

  const followersPromise = Promise.all(
    followers.map(async (neuron) => {
      const cid = await getIpfsHash(neuron);

      const communityItem = {
        ...getExistingOrDefault(cid),
        particle: cid,
        neuron,
        follower: true,
      } as CommunityDto;

      communityUpdatesMap.set(cid, communityItem);
      onResolve && !signal?.aborted && onResolve([communityItem]);
    })
  );

  await Promise.all([followersPromise, followsPromise]);
};

Synonyms

pussy-ts/src/services/community/community.ts

Neighbours