使用 WHATWG Streams API 实现规范

Streams API一种现代化的方式,可以逐步且异步地生成和使用数据。多个规范开始使用它,例如 Fetch文件流WebTransport 等。本文档将简要说明如何在 Gecko 中实现此类规范。

在流对象上调用函数

您大多可以按照给定规范中的步骤进行操作,因为 Gecko 中的实现是故意以一种方式编写的,使给定规范的文本可以与函数调用 1:1 匹配。假设规范中写道

view 入队到 stream 中。

该文本可以用 C++ 写成如下形式

stream->EnqueueNative(cx, view, rv);

请注意,函数名称以 Native 结尾,以将其与 Web IDL 的 enqueue 方法区分开来。请参见下面的列表,了解规范术语与函数之间的完整映射关系。

创建流

流的创建通常可以通过调用 CreateNative() 来完成。如果规范有以下要求,则可能需要调用其他方法:

  • 需要字节流并使用术语“设置字节读取支持”。在这种情况下,您需要调用 ByteNative 变体。

  • 定义一个继承基本流接口的新接口。在这种情况下,您需要定义一个子类并在其 init 方法中调用 SetUpNative()

    • 为了使循环收集正常工作,您需要将 HoldDropJSObjectsCaller::Explicit 传递给超类构造函数,并在构造函数/析构函数中分别调用 mozilla::HoldJSObjects(this)/mozilla::DropJSObjects(this)

CreateNative()/SetUpNative() 两个函数都需要一个参数来实现回调函数的自定义算法,其对应的规范短语可能是:

  1. readable 为一个新的ReadableStream

  2. pullAlgorithm 为以下步骤

    1. (…)

  3. 使用 pullAlgorithm 设置为 pullAlgorithm 来设置 stream

这可以大致转换为以下 C++ 代码

class MySourceAlgorithms : UnderlyingSourceAlgorithmsWrapper {
   already_AddRefed<Promise> PullCallbackImpl(
      JSContext* aCx, ReadableStreamController& aController,
      ErrorResult& aRv) override;
};

already_AddRefed<ReadableStream> CreateMyReadableStream(
   JSContext* aCx, nsIGlobalObject* aGlobal, ErrorResult& aRv) {
   // Step 2: Let pullAlgorithm be the following steps:
   auto algorithms = MakeRefPtr<MySourceAlgorithms>();

   // Step 1: Let readable be a new ReadableStream.
   // Step 3: Set up stream with pullAlgorithm set to pullAlgorithm.
   RefPtr<ReadableStream> readable = ReadableStream::CreateNative(
      aCx,
      aGlobal,
      *algorithms,
      /* aHighWaterMark */ Nothing(),
      /* aSizeAlgorithm */ nullptr,
      aRv
   );
}

请注意,new ReadableStream() 和“设置”步骤为了方便起见在 CreateNative() 中一起完成。对于子类,这需要再次拆分。

class MyReadableStream : public ReadableStream {
 public:
   MyReadableStream(nsIGlobalObject* aGlobal)
      : ReadableStream(aGlobal, ReadableStream::HoldDropJSObjectsCaller::Explicit) {
      mozilla::HoldJSObjects(this);
   }

   ~MyReadableStream() {
      mozilla::DropJSObjects(this);
   }

   void Init(ErrorResult& aRv) {
      // Step 2: Let pullAlgorithm be the following steps:
      auto algorithms = MakeRefPtr<MySourceAlgorithms>();

      // Step 3: Set up stream with pullAlgorithm set to pullAlgorithm.
      //
      // NOTE:
      // For now there's no SetUpNative but only SetUpByteNative.
      // File a bug on DOM: Streams if you need to create a subclass
      // for non-byte ReadableStream.
      SetUpNative(aCx, *algorithms, Nothing(), nullptr, aRv);
   }
}

使用算法创建流后,大致流程如下所示:

sequenceDiagram JavaScript->>ReadableStream: await reader.read() ReadableStream->>UnderlyingSourceAlgorithmsWrapper: PullCallback() UnderlyingSourceAlgorithmsWrapper->>(数据源): (实现细节) NOTE left of (数据源): (可以是文件 I/O、网络 I/O 等) (数据源)->>UnderlyingSourceAlgorithmsWrapper: (发出通知) UnderlyingSourceAlgorithmsWrapper->>ReadableStream: EnqueueNative() ReadableStream->>JavaScript: 解析 reader.read()

实现回调函数

如流程所示,真正的实现将在算法内部完成,在本例中为 PullCallbackImpl()。假设有一个规范术语:

  1. pullAlgorithm 为以下步骤

    1. 将 JavaScript 字符串值“Hello Fox!” 入队。

这可以转换为以下 C++ 代码

class MySourceAlgorithms : UnderlyingSourceAlgorithmsWrapper {
   // Step 1: Let `pullAlgorithm` be the following steps:
   already_AddRefed<Promise> PullCallbackImpl(
      JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
      RefPtr<ReadableStream> stream = aController.Stream();

      // Step 1.1: Enqueue a JavaScript string value "Hello Fox!".
      JS::Rooted<JSString*> hello(aCx, JS_NewStringCopyZ(aCx, "Hello Fox!"));
      stream->EnqueueNative(aCx, JS::StringValue(hello), aRv);

      // Return a promise if the task is asynchronous, or nullptr if not.
      return nullptr;

      // NOTE:
      // Please don't use aController directly, as it's more for JavaScript.
      // The *Native() functions are safer with additional assertions and more
      // automatic state management.
      // Please file a bug if there's no *Native() function that fulfills your need.
      // In the future this function should receive a ReadableStream instead.

      // Also note that you'll need to touch JS APIs frequently as the functions
      // often expect JS::Value.
   };
};

请注意,PullCallbackImpl 返回一个 Promise。在 Promise 解析之前,该函数不会再次被调用。调用序列大致如下所示,其中包含重复的读取请求:

  1. 来自 JS 的 await read()

  2. PullCallbackImpl() 调用,它返回一个 Promise

  3. 来自 JS 的第二个 await read()

  4. (时间流逝)

  5. Promise 解析

  6. 第二个 PullCallbackImpl() 调用

同样适用于 WritableStreamTransformStream 中的写入和转换回调,只是它们分别使用 UnderlyingSinkAlgorithmsWrapperTransformerAlgorithmsWrapper

将现有的 XPCOM 流公开为 WHATWG 流

您可能只想将现有的 XPCOM 流公开给 JavaScript,而无需进行任何其他自定义。幸运的是,有一些辅助函数可以做到这一点。您可以使用

  • InputToReadableStreamAlgorithms 将数据从 nsIAsyncInputStream 发送到 ReadableStream

  • WritableStreamToOutputAlgorithms 将数据从 WritableStream 接收到的 nsIAsyncOutputStream

使用方法如下所示

// For nsIAsyncInputStream:
already_AddRefed<ReadableStream> ConvertInputStreamToReadableStream(
   JSContext* aCx, nsIGlobalObject* aGlobal, nsIAsyncInputStream* aInput,
   ErrorResult& aRv) {
   auto algorithms = MakeRefPtr<InputToReadableStreamAlgorithms>(
         stream->GetParentObject(), aInput);
   return do_AddRef(ReadableStream::CreateNative(aCx, aGlobal, *algorithms,
                                                 Nothing(), nullptr, aRv));
}

// For nsIAsyncOutputStream
already_AddRefed<ReadableStream> ConvertOutputStreamToWritableStream(
   JSContext* aCx, nsIGlobalObject* aGlobal, nsIAsyncOutputStream* aInput,
   ErrorResult& aRv) {
   auto algorithms = MakeRefPtr<WritableStreamToOutputAlgorithms>(
         stream->GetParentObject(), aInput);
   return do_AddRef(WritableStream::CreateNative(aCx, aGlobal, *algorithms,
                                                 Nothing(), nullptr, aRv));
}

规范术语与函数的映射

  1. ReadableStream

  2. WritableStream

    • 设置:

      • CreateNative():当规范与 new WritableStream 一起使用该术语时,您可以调用此方法。

      • SetUpNative():当规范与 WritableStream 的子类一起使用该术语时,您需要使用此方法。在子类的构造函数中调用此方法。

    • 错误ErrorNative()

  3. TransformStream:目前,它仅使用 TransfromStreamDefaultController 中的函数,这些函数将作为转换或刷新算法的参数提供。

映射仅根据需要实现,并且不涵盖规范中的每个函数。如果您需要此处缺少的内容,请在 Bugzilla 中的 DOM: Streams 组件上提交 Bug。