import { Buffer } from 'buffer';

import { TxEventMap, WsReadyState } from './types';

type Listeners = {
  [K in keyof TxEventMap]?: TxEventMap[K][];
};

/**
 * TxTracer is almost same with the `TendermintTxTracer` in the @keplr-wallet/cosmos library.
 * Changes for some mistake on the original `TendermintTxTracer` and this would be remove if the changes are merged to the original library.
 */
class TxTracer {
  protected ws!: WebSocket;

  protected newBlockSubscribes: {
    handler: (block: any) => void;
  }[] = [];

  // Key is "id" for jsonrpc
  protected txSubscribes: Map<
    number,
    {
      params: Record<string, string | number | boolean>;
      resolver: (data?: unknown) => void;
      rejector: (e: Error) => void;
    }
  > = new Map();

  // Key is "id" for jsonrpc
  protected pendingQueries: Map<
    number,
    {
      method: string;
      params: Record<string, string | number | boolean>;
      resolver: (data?: unknown) => void;
      rejector: (e: Error) => void;
    }
  > = new Map();

  protected listeners: Listeners = {};

  constructor(
    protected readonly url: string,
    protected readonly wsEndpoint: string,
    protected readonly options: {
      wsObject?: new (url: string, protocols?: string | string[]) => WebSocket;
    } = {}
  ) {
    this.open();
  }

  protected getWsEndpoint(): string {
    let { url } = this;
    if (url.startsWith('http')) {
      url = url.replace('http', 'ws');
    }
    if (!url.endsWith(this.wsEndpoint)) {
      const wsEndpoint = this.wsEndpoint.startsWith('/')
        ? this.wsEndpoint
        : `/${this.wsEndpoint}`;

      url = url.endsWith('/') ? url + wsEndpoint.slice(1) : url + wsEndpoint;
    }

    return url;
  }

  open() {
    this.ws = this.options.wsObject
      ? new this.options.wsObject(this.getWsEndpoint())
      : new WebSocket(this.getWsEndpoint());
    this.ws.onopen = this.onOpen;
    this.ws.onmessage = this.onMessage;
    this.ws.onclose = this.onClose;
  }

  close() {
    this.ws.close();
  }

  get numberOfSubscriberOrPendingQuery(): number {
    return (
      this.newBlockSubscribes.length +
      this.txSubscribes.size +
      this.pendingQueries.size
    );
  }

  get readyState(): WsReadyState {
    switch (this.ws.readyState) {
      case 0:
        return WsReadyState.CONNECTING;
      case 1:
        return WsReadyState.OPEN;
      case 2:
        return WsReadyState.CLOSING;
      case 3:
        return WsReadyState.CLOSED;
      default:
        return WsReadyState.NONE;
    }
  }

  addEventListener<T extends keyof TxEventMap>(
    type: T,
    listener: TxEventMap[T]
  ) {
    if (!this.listeners[type]) {
      this.listeners[type] = [];
    }

    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    this.listeners[type]!.push(listener);
  }

  protected readonly onOpen = (e: Event) => {
    if (this.newBlockSubscribes.length > 0) {
      this.sendSubscribeBlockRpc();
    }

    for (const [id, tx] of this.txSubscribes) {
      this.sendSubscribeTxRpc(id, tx.params);
    }

    for (const [id, query] of this.pendingQueries) {
      this.sendQueryRpc(id, query.method, query.params);
    }

    for (const listener of this.listeners.open ?? []) {
      listener(e);
    }
  };

  protected readonly onMessage = (e: MessageEvent) => {
    for (const listener of this.listeners.message ?? []) {
      listener(e);
    }

    if (e.data) {
      try {
        const obj = JSON.parse(e.data);

        if (obj?.id) {
          if (this.pendingQueries.has(obj.id)) {
            if (obj.error) {
              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              this.pendingQueries
                .get(obj.id)!
                .rejector(new Error(obj.error.data || obj.error.message));
            } else {
              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              this.pendingQueries.get(obj.id)!.resolver(obj.result);
            }

            this.pendingQueries.delete(obj.id);
          }
        }

        if (obj?.result?.data?.type === 'tendermint/event/NewBlock') {
          for (const handler of this.newBlockSubscribes) {
            handler.handler(obj.result.data.value);
          }
        }

        if (obj?.result?.data?.type === 'tendermint/event/Tx') {
          if (obj?.id) {
            if (this.txSubscribes.has(obj.id)) {
              if (obj.error) {
                // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
                this.txSubscribes
                  .get(obj.id)!
                  .rejector(new Error(obj.error.data || obj.error.message));
              } else {
                // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
                this.txSubscribes
                  .get(obj.id)!
                  .resolver(obj.result.data.value.TxResult.result);
              }

              this.txSubscribes.delete(obj.id);
            }
          }
        }
      } catch (e: any) {
        console.error(
          `Tendermint websocket jsonrpc response is not JSON: ${
            e.message || e.toString()
          }`
        );
      }
    }
  };

  protected readonly onClose = (e: CloseEvent) => {
    for (const listener of this.listeners.close ?? []) {
      listener(e);
    }
  };

  /**
   * SubscribeBlock receives the handler for the block.
   * The handelrs shares the subscription of block.
   * @param handler
   * @return unsubscriber
   */
  subscribeBlock(handler: (block: any) => void): () => void {
    this.newBlockSubscribes.push({
      handler,
    });

    if (this.newBlockSubscribes.length === 1) {
      this.sendSubscribeBlockRpc();
    }

    return () => {
      this.newBlockSubscribes = this.newBlockSubscribes.filter(
        (s) => s.handler !== handler
      );
    };
  }

  protected sendSubscribeBlockRpc(): void {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method: 'subscribe',
          params: { query: "tm.event='NewBlock'" },
          id: 1,
        })
      );
    }
  }

  // Query the tx and subscribe the tx.
  traceTx(
    query: Uint8Array | Record<string, string | number | boolean>
  ): Promise<any> {
    return new Promise<any>((resolve) => {
      // At first, try to query the tx at the same time of subscribing the tx.
      // But, the querying's error will be ignored.
      this.queryTx(query)
        .then((result) => {
          if (query instanceof Uint8Array) {
            resolve(result);
            return;
          }

          if (result?.total_count !== '0') {
            resolve(result);
          }
        })
        .catch(() => {
          // noop
        });

      this.subscribeTx(query).then(resolve);
    });
  }

  subscribeTx(
    query: Uint8Array | Record<string, string | number | boolean>
  ): Promise<any> {
    if (query instanceof Uint8Array) {
      const id = this.createRandomId();

      const params = {
        query: `tm.event='Tx' AND tx.hash='${Buffer.from(query)
          .toString('hex')
          .toUpperCase()}'`,
      };

      return new Promise<unknown>((resolve, reject) => {
        this.txSubscribes.set(id, {
          params,
          resolver: resolve,
          rejector: reject,
        });

        this.sendSubscribeTxRpc(id, params);
      });
    }
    const id = this.createRandomId();

    const params = {
      query: `tm.event='Tx' and ${Object.keys(query)
        .map((key) => {
          return {
            key,
            value: query[key],
          };
        })
        .map((obj) => {
          return `${obj.key}=${
            typeof obj.value === 'string' ? `'${obj.value}'` : obj.value
          }`;
        })
        .join(' and ')}`,
      page: '1',
      per_page: '1',
      order_by: 'desc',
    };

    return new Promise<unknown>((resolve, reject) => {
      this.txSubscribes.set(id, {
        params,
        resolver: resolve,
        rejector: reject,
      });

      this.sendSubscribeTxRpc(id, params);
    });
  }

  protected sendSubscribeTxRpc(
    id: number,
    params: Record<string, string | number | boolean>
  ): void {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method: 'subscribe',
          params,
          id,
        })
      );
    }
  }

  queryTx(
    query: Uint8Array | Record<string, string | number | boolean>
  ): Promise<any> {
    if (query instanceof Uint8Array) {
      return this.query('tx', {
        hash: Buffer.from(query).toString('base64'),
        prove: false,
      });
    }
    const params = {
      query: Object.keys(query)
        .map((key) => {
          return {
            key,
            value: query[key],
          };
        })
        .map((obj) => {
          return `${obj.key}=${
            typeof obj.value === 'string' ? `'${obj.value}'` : obj.value
          }`;
        })
        .join(' and '),
      page: '1',
      per_page: '1',
      order_by: 'desc',
    };

    return this.query('tx_search', params);
  }

  protected query(
    method: string,
    params: Record<string, string | number | boolean>
  ): Promise<any> {
    const id = this.createRandomId();

    return new Promise<unknown>((resolve, reject) => {
      this.pendingQueries.set(id, {
        method,
        params,
        resolver: resolve,
        rejector: reject,
      });

      this.sendQueryRpc(id, method, params);
    });
  }

  protected sendQueryRpc(
    id: number,
    method: string,
    params: Record<string, string | number | boolean>
  ) {
    if (this.readyState === WsReadyState.OPEN) {
      this.ws.send(
        JSON.stringify({
          jsonrpc: '2.0',
          method,
          params,
          id,
        })
      );
    }
  }

  protected createRandomId(): number {
    return parseInt(
      Array.from({ length: 6 })
        .map(() => Math.floor(Math.random() * 100))
        .join('')
    );
  }
}

export default TxTracer;

Synonyms

cyb/src/features/ibc-history/tx/TracerTx.ts

Neighbours