All files readableStream.ts

0% Statements 0/93
0% Branches 0/1
0% Functions 0/1
0% Lines 0/93

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94                                                                                                                                                                                           
/**
 * Copyright (c) OpenLens Authors. All rights reserved.
 * Licensed under MIT License. See LICENSE in root directory for more information.
 */

import { Readable } from "readable-stream";
import type { TypedArray } from "type-fest";

/**
 * ReadableWebToNodeStream
 *
 * Copied from https://github.com/Borewit/readable-web-to-node-stream
 *
 * Adds read error handler
 *
 * */
export class ReadableWebToNodeStream<T extends TypedArray> extends Readable {

  public bytesRead = 0;
  public released = false;

  /**
   * Default web API stream reader
   * https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
   */
  private reader: ReadableStreamDefaultReader<T>;
  private pendingRead?: Promise<ReadableStreamReadResult<T>>;

  /**
   *
   * @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
   */
  constructor(stream: ReadableStream<T>) {
    super();
    this.reader = stream.getReader();
  }

  /**
   * Implementation of readable._read(size).
   * When readable._read() is called, if data is available from the resource,
   * the implementation should begin pushing that data into the read queue
   * https://nodejs.org/api/stream.html#stream_readable_read_size_1
   */
  public async _read() {
    // Should start pushing data into the queue
    // Read data from the underlying Web-API-readable-stream
    if (this.released) {
      this.push(null); // Signal EOF

      return;
    }

    try {
      this.pendingRead = this.reader.read();
      const data = await this.pendingRead;

      // clear the promise before pushing pushing new data to the queue and allow sequential calls to _read()
      delete this.pendingRead;

      if (data.done || this.released) {
        this.push(null); // Signal EOF
      } else {
        this.bytesRead += data.value.length;
        this.push(data.value); // Push new data to the queue
      }
    } catch (error) {
      this.push(null); // Signal EOF
    }
  }

  /**
   * If there is no unresolved read call to Web-API ReadableStream immediately returns;
   * otherwise will wait until the read is resolved.
   */
  public async waitForReadToComplete() {
    if (this.pendingRead) {
      await this.pendingRead;
    }
  }

  /**
   * Close wrapper
   */
  public async close(): Promise<void> {
    await this.syncAndRelease();
  }

  private async syncAndRelease() {
    this.released = true;
    await this.waitForReadToComplete();
    await this.reader.releaseLock();
  }
}