/* eslint-disable no-restricted-syntax */
import { Observable, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';

import BroadcastChannelSender from '../../channels/BroadcastChannelSender';

import ParticlesResolverQueue from './services/ParticlesResolverQueue/ParticlesResolverQueue';

// import SyncIpfsLoop from './services/SyncIpfsLoop/SyncIpfsLoop';
import SyncTransactionsLoop from './services/SyncTransactionsLoop/SyncTransactionsLoop';
import SyncParticlesLoop from './services/SyncParticlesLoop/SyncParticlesLoop';

import { ServiceDeps } from './services/types';
import {
  MY_FRIENDS_SYNC_INTERVAL,
  MY_PARTICLES_SYNC_INTERVAL,
} from './services/consts';
import SyncMyFriendsLoop from './services/SyncMyFriendsLoop/SyncMyFriendsLoop';
import { SyncEntryName } from '../../types/services';
import BaseSyncLoop from './services/BaseSyncLoop/BaseSyncLoop';
import createCommunitySync$ from './services/CommunitySync/CommunitySync';
import { createCyblogChannel } from 'src/utils/logging/cyblog';

const cyblogCh = createCyblogChannel({ thread: 'bckd' });

// eslint-disable-next-line import/prefer-default-export
export class SyncService {
  private isInitialized$: Observable<boolean>;

  private channelApi = new BroadcastChannelSender();

  private loops: Partial<Record<SyncEntryName, BaseSyncLoop>> = {};

  constructor(deps: ServiceDeps) {
    const { dbInstance$, ipfsInstance$ } = deps;
    this.isInitialized$ = combineLatest([dbInstance$, ipfsInstance$]).pipe(
      map(([dbInstance, ipfsInstance]) => !!dbInstance && !!ipfsInstance)
    );
    // subscribe when started
    this.isInitialized$.subscribe({
      next: (result) => {
        return result && this.channelApi.postServiceStatus('sync', 'started');
      },
      error: (err) => this.channelApi.postServiceStatus('sync', 'error', err),
    });

    const particlesResolver = new ParticlesResolverQueue(deps).start();

    const communitySync$ = createCommunitySync$(deps);
    communitySync$.subscribe((community) => {
      cyblogCh.info('--> community fetched', {
        unit: 'community',
        data: community,
      });
    });

    const followings$ = communitySync$.pipe(
      map((c) => c.filter((i) => i.following)),
      map((c) => c.map((i) => i.neuron))
    );

    // new SyncIpfsLoop(deps, particlesResolver).start();

    new SyncTransactionsLoop('transactions', deps, particlesResolver).start();

    new SyncParticlesLoop(
      'particles',
      MY_PARTICLES_SYNC_INTERVAL,
      deps,
      particlesResolver
    ).start();

    new SyncMyFriendsLoop(
      'my-friends',
      MY_FRIENDS_SYNC_INTERVAL,
      { ...deps, followings$ },
      particlesResolver
      // { warmupMs: 1000 }
    ).start();
  }

  public restart(name: SyncEntryName) {
    this.loops[name]?.restart();
  }
}

Synonyms

cyb/src/services/backend/services/sync/sync.ts

Neighbours