Skip to content

ReqPacket Operators

References of operators that can be applied to rxReq.

batch()

Converts an Observable of ReqPacket[] to an Observable of ReqPacket based on the mergeFilter parameter. This is useful in conjunction with bufferTime().

If the mergeFilter parameter is omitted, the filters are simply combined.

ts
import { bufferTime } from "rxjs";
import { batch, latestEach, now } from "rx-nostr";

// ...

// Forward REQ observing kind1 timeline.
const timelineReq = createRxForwardReq();

// Backward REQ fetching kind0 on demand.
const profileReq = createRxBackwardReq();

rxNostr.use(timelineReq).subscribe((packet) => {
  const event = packet.event;

  // Get kind0 of user appeared in the timeline.
  profileReq.emit({
    kinds: [0],
    authors: [event.pubkey],
    limit: 1,
  });
});

// To prevent a large number of REQs from being issued in a short period of time,
// REQs are issued in batches every second
const batchedReq = profileReq.pipe(bufferTime(1000), batch());

rxNostr
  .use(batchedReq)
  .pipe(latestEach((packet) => packet.event.pubkey))
  .subscribe((packet) => {
    console.log("kind0", packet);
  });

timelineReq.emit({ kinds: [1], since: now });

chunk()

Split ReqPacket into several ReqPackets as needed.

Currently rx-nostr cannot automatically respect the max_filters defined in NIP-11, so chunk() must be used if an REQ with a large number of filters can be issued.

The first argument is a predicate that determines if a split is necessary, and the second argument is toChunks that specifies the method of split.

ts
import { chunk } from "rx-nostr";

const chunkedReq = rxReq.pipe(
  chunk(
    (filters) => filters.length > 100,
    (filters) => {
      const pile = [...filters];
      const chunks = [];

      while (pile.length > 0) {
        chunks.push(pile.splice(0, 100));
      }

      return chunks;
    },
  ),
);

rxNostr.use(chunkedReq).subscribe(() => {
  // ...
});