cyb/src/features/ibc-history/tx/TracerTx.ts

import { toBase64, toHex } from 'src/utils/encoding';

import { TxEventMap, WsReadyState } from './types';

type Listeners = {
  [K in keyof TxEventMap]?: TxEventMap[K][];
};

/**
 * TxTracer is based on the `TendermintTxTracer` pattern from @cosmjs/tendermint-rpc.
 * Changes for some mistake on the original `TendermintTxTracer` and this would be remove if the changes are merged to the original library.
 */
class TxTracer {
  protected ws!: WebSocket;

  protected newBlockSubscribes: {
    handler: (block: any) => void;
  }[] = [];

  // Key is "id" for jsonrpc
  protected txSubscribes: Map<
    number,
    {
      params: Record<string, string | number | boolean>;
      resolver: (data?: unknown) => void;
      rejector: (e: Error) => void;
    }
  > = new Map();

  // Key is "id" for jsonrpc
  protected pendingQueries: Map<
    number,
    {
      method: string;
      params: Record<string, string | number | boolean>;
      resolver: (data?: unknown) => void;
      rejector: (e: Error) => void;
    }
  > = new Map();

  protected listeners: Listeners = {};

  constructor(
    protected readonly url: string,
    protected readonly wsEndpoint: string,
    protected readonly options: {
      wsObject?: new (url: string, protocols?: string | string[]) => WebSocket;
    } = {}
  ) {
    this.open();
  }

  protected getWsEndpoint(): string {
    let { url } = this;
    if (url.startsWith('http')) {
      url = url.replace('http', 'ws');
    }
    if (!url.endsWith(this.wsEndpoint)) {
      const wsEndpoint = this.wsEndpoint.startsWith('/') ? this.wsEndpoint : `/${this.wsEndpoint}`;

      url = url.endsWith('/') ? url + wsEndpoint.slice(1) : url + wsEndpoint;
    }

    return url;
  }

  open() {
    this.ws = this.options.wsObject
      ? new this.options.wsObject(this.getWsEndpoint())
      : new WebSocket(this.getWsEndpoint());
    this.ws.onopen = this.onOpen;
    this.ws.onmessage = this.onMessage;
    this.ws.onclose = this.onClose;
    this.ws.onerror = this.onError;
  }

  close() {
    this.ws.close();
  }

  get numberOfSubscriberOrPendingQuery(): number {
    return this.newBlockSubscribes.length + this.txSubscribes.size + this.pendingQueries.size;
  }

  get readyState(): WsReadyState {
    switch (this.ws.readyState) {
      case 0:
        return WsReadyState.CONNECTING;
      case 1:
        return WsReadyState.OPEN;
      case 2:
        return WsReadyState.CLOSING;
      case 3:
        return WsReadyState.CLOSED;
      default:
        return WsReadyState.NONE;
    }
  }

  addEventListener<T extends keyof TxEventMap>(type: T, listener: TxEventMap[T]) {
    if (!this.listeners[type]) {
      this.listeners[type] = [];
    }

    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-expect-error
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    this.listeners[type]!.push(listener);
  }

  protected readonly onOpen = (e: Event) => {
    if (this.newBlockSubscribes.length > 0) {
      this.sendSubscribeBlockRpc();
    }

    for (const [id, tx] of this.txSubscribes) {
      this.sendSubscribeTxRpc(id, tx.params);
    }

    for (const [id, query] of this.pendingQueries) {
      this.sendQueryRpc(id, query.method, query.params);
    }

    for (const listener of this.listeners.open ?? []) {
      listener(e);
    }
  };

  protected readonly onMessage = (e: MessageEvent) => {
    for (const listener of this.listeners.message ?? []) {
      listener(e);
    }

    if (e.data) {
      try {
        const obj = JSON.parse(e.data);

        if (obj?.id) {
          if (this.pendingQueries.has(obj.id)) {
            if (obj.error) {
              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              this.pendingQueries
                .get(obj.id)!
                .rejector(new Error(obj.error.data || obj.error.message));
            } else {
              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              this.pendingQueries.get(obj.id)!.resolver(obj.result);
            }

            this.pendingQueries.delete(obj.id);
          }
        }

        if (obj?.result?.data?.type === 'tendermint/event/NewBlock') {
          for (const handler of this.newBlockSubscribes) {
            handler.handler(obj.result.data.value);
          }
        }

        if (obj?.result?.data?.type === 'tendermint/event/Tx') {
          if (obj?.id) {
            if (this.txSubscribes.has(obj.id)) {
              if (obj.error) {
                // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
                this.txSubscribes
                  .get(obj.id)!
                  .rejector(new Error(obj.error.data || obj.error.message));
              } else {
                // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
                this.txSubscribes.get(obj.id)!.resolver(obj.result.data.value.TxResult.result);
              }

              this.txSubscribes.delete(obj.id);
            }
          }
        }
      } catch (e: any) {
        console.error(
          `Tendermint websocket jsonrpc response is not JSON: ${e.message || e.toString()}`
        );
      }
    }
  };

  protected readonly onClose = (e: CloseEvent) => {
    // Reject all pending queries and subscriptions when WebSocket closes
    const error = new Error(`WebSocket closed: ${e.reason || 'connection lost'}`);

    for (const [_id, query] of this.pendingQueries) {
      query.rejector(error);
    }
    this.pendingQueries.clear();

    for (const [_id, sub] of this.txSubscribes) {
      sub.rejector(error);
    }
    this.txSubscribes.clear();

    for (const listener of this.listeners.close ?? []) {
      listener(e);
    }
  };

  protected readonly onError = (_e: Event) => {
    // Reject all pending queries and subscriptions on WebSocket error
    const error = new Error('WebSocket error');

    for (const [_id, query] of this.pendingQueries) {
      query.rejector(error);
    }
    this.pendingQueries.clear();

    for (const [_id, sub] of this.txSubscribes) {
      sub.rejector(error);
    }
    this.txSubscribes.clear();
  };

  /**
   * SubscribeBlock receives the handler for the block.
   * The handelrs shares the subscription of block.
   * @param handler
   * @return unsubscriber
   */
  subscribeBlock(handler: (block: any) => void): () => void {
    this.newBlockSubscribes.push({
      handler,
    });

    if (this.newBlockSubscribes.length === 1) {
      this.sendSubscribeBlockRpc();
    }

    return () => {
      this.newBlockSubscribes = this.newBlockSubscribes.filter((s) => s.handler !== handler);
    };
  }

  protected sendSubscribeBlockRpc(): void {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method: 'subscribe',
          params: { query: "tm.event='NewBlock'" },
          id: 1,
        })
      );
    }
  }

  // Query the tx and subscribe the tx.
  traceTx(
    query: Uint8Array | Record<string, string | number | boolean>,
    {
      timeoutMs = 120000,
      connectionTimeoutMs = 15000,
    }: { timeoutMs?: number; connectionTimeoutMs?: number } = {}
  ): Promise<any> {
    const timeoutPromise = new Promise<never>((_, reject) => {
      setTimeout(() => reject(new Error(`traceTx timed out after ${timeoutMs}ms`)), timeoutMs);
    });

    const connectionPromise =
      this.readyState !== WsReadyState.OPEN
        ? new Promise<void>((resolve, reject) => {
            const connectionTimer = setTimeout(() => {
              reject(new Error(`WebSocket connection timed out after ${connectionTimeoutMs}ms`));
            }, connectionTimeoutMs);

            this.addEventListener('open', () => {
              clearTimeout(connectionTimer);
              resolve();
            });
          })
        : Promise.resolve();

    const tracePromise = connectionPromise.then(
      () =>
        new Promise<any>((resolve) => {
          this.queryTx(query)
            .then((result) => {
              if (query instanceof Uint8Array) {
                resolve(result);
                return;
              }

              if (result?.total_count !== '0') {
                resolve(result);
              }
            })
            .catch(() => {
              // noop
            });

          this.subscribeTx(query).then(resolve);
        })
    );

    return Promise.race([tracePromise, timeoutPromise]);
  }

  subscribeTx(query: Uint8Array | Record<string, string | number | boolean>): Promise<any> {
    if (query instanceof Uint8Array) {
      const id = this.createRandomId();

      const params = {
        query: `tm.event='Tx' AND tx.hash='${toHex(query).toUpperCase()}'`,
      };

      return new Promise<unknown>((resolve, reject) => {
        this.txSubscribes.set(id, {
          params,
          resolver: resolve,
          rejector: reject,
        });

        this.sendSubscribeTxRpc(id, params);
      });
    }
    const id = this.createRandomId();

    const params = {
      query: `tm.event='Tx' AND ${Object.keys(query)
        .map((key) => {
          return {
            key,
            value: query[key],
          };
        })
        .map((obj) => {
          return `${obj.key}=${typeof obj.value === 'string' ? `'${obj.value}'` : obj.value}`;
        })
        .join(' AND ')}`,
      page: '1',
      per_page: '1',
      order_by: 'desc',
    };

    return new Promise<unknown>((resolve, reject) => {
      this.txSubscribes.set(id, {
        params,
        resolver: resolve,
        rejector: reject,
      });

      this.sendSubscribeTxRpc(id, params);
    });
  }

  protected sendSubscribeTxRpc(
    id: number,
    params: Record<string, string | number | boolean>
  ): void {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method: 'subscribe',
          params,
          id,
        })
      );
    }
  }

  queryTx(query: Uint8Array | Record<string, string | number | boolean>): Promise<any> {
    if (query instanceof Uint8Array) {
      return this.query('tx', {
        hash: toBase64(query),
        prove: false,
      });
    }
    const params = {
      query: Object.keys(query)
        .map((key) => {
          return {
            key,
            value: query[key],
          };
        })
        .map((obj) => {
          return `${obj.key}=${typeof obj.value === 'string' ? `'${obj.value}'` : obj.value}`;
        })
        .join(' AND '),
      page: '1',
      per_page: '1',
      order_by: 'desc',
    };

    return this.query('tx_search', params);
  }

  protected query(method: string, params: Record<string, string | number | boolean>): Promise<any> {
    const id = this.createRandomId();

    return new Promise<unknown>((resolve, reject) => {
      this.pendingQueries.set(id, {
        method,
        params,
        resolver: resolve,
        rejector: reject,
      });

      this.sendQueryRpc(id, method, params);
    });
  }

  protected sendQueryRpc(
    id: number,
    method: string,
    params: Record<string, string | number | boolean>
  ) {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method,
          params,
          id,
        })
      );
    }
  }

  protected createRandomId(): number {
    return parseInt(
      Array.from({ length: 6 })
        .map(() => Math.floor(Math.random() * 100))
        .join(''),
      10
    );
  }
}

export default TxTracer;

Synonyms

pussy-ts/src/services/ibc-history/tx/TracerTx.ts

Neighbours