cyb/src/features/ibc-history/polling-status-subscription.ts

import Axios, { AxiosInstance, AxiosRequestConfig } from 'axios';

/** Polls a `/status` endpoint on a given Axios RPC config and publishes to an arbitrary set of subsribers. */
class PollingStatusSubscription {
  protected readonly rpcInstance: AxiosInstance;

  protected _subscriptionCount = 0;

  protected _handlers: ((data: any) => void)[] = [];

  constructor(
    protected readonly rpc: string,
    protected readonly rpcConfig?: AxiosRequestConfig
  ) {
    this.rpcInstance = Axios.create({
      ...{
        baseURL: rpc,
      },
      ...rpcConfig,
    });
  }

  get subscriptionCount(): number {
    return this._subscriptionCount;
  }

  /**
   * @param handler
   * @return unsubscriber
   */
  subscribe(handler: (data: any) => void): () => void {
    this._handlers.push(handler);

    this.increaseSubscriptionCount();

    return () => {
      this._handlers = this._handlers.filter((h) => h !== handler);
      this.decreaseSubscriptionCount();
    };
  }

  protected async startSubscription() {
    const BASE_INTERVAL = 7500;
    const MAX_INTERVAL = 60000;
    const CIRCUIT_BREAKER_THRESHOLD = 10;

    let currentInterval = BASE_INTERVAL;
    let consecutiveErrors = 0;

    while (this._subscriptionCount > 0) {
      // eslint-disable-next-line no-await-in-loop
      await new Promise((resolve) => {
        setTimeout(resolve, currentInterval);
      });

      // Circuit breaker: pause longer after many consecutive failures
      if (consecutiveErrors >= CIRCUIT_BREAKER_THRESHOLD) {
        console.warn(
          `PollingStatusSubscription: circuit breaker triggered after ${consecutiveErrors} failures, pausing 5 min`
        );
        // eslint-disable-next-line no-await-in-loop
        await new Promise((resolve) => {
          setTimeout(resolve, 300000);
        });
        consecutiveErrors = 0;
        currentInterval = BASE_INTERVAL;
      }

      try {
        // eslint-disable-next-line no-await-in-loop
        const response = await this.rpcInstance.get('/status');
        if (response.status === 200) {
          this._handlers.forEach((handler) => handler(response.data));
          // Reset on success
          consecutiveErrors = 0;
          currentInterval = BASE_INTERVAL;
        }
      } catch (e: any) {
        consecutiveErrors += 1;
        // Exponential backoff: double interval up to max
        currentInterval = Math.min(currentInterval * 2, MAX_INTERVAL);
        console.error(`Failed to fetch /status (attempt ${consecutiveErrors}): ${e?.toString()}`);
      }
    }
  }

  protected increaseSubscriptionCount() {
    this._subscriptionCount++;

    if (this._subscriptionCount === 1) {
      // No need to await
      this.startSubscription();
    }
  }

  protected decreaseSubscriptionCount() {
    this._subscriptionCount--;
  }
}

export default PollingStatusSubscription;

Synonyms

pussy-ts/src/services/ibc-history/polling-status-subscription.ts

Neighbours