diff --git a/external/streams/streams-lib.js b/external/streams/streams-lib.js index a031378d1..cc04d5a66 100644 --- a/external/streams/streams-lib.js +++ b/external/streams/streams-lib.js @@ -1960,7 +1960,8 @@ function ReadableStreamClose(stream) { if (IsReadableStreamDefaultReader(reader) === true) { for (var i = 0; i < reader._readRequests.length; i++) { - var _resolve = reader._readRequests[i]; + var _resolve = reader._readRequests[i]._resolve; + _resolve(CreateIterResultObject(undefined, true)); } reader._readRequests = []; diff --git a/src/shared/util.js b/src/shared/util.js index bab25f012..bde41df2f 100644 --- a/src/shared/util.js +++ b/src/shared/util.js @@ -1214,24 +1214,50 @@ var createObjectURL = (function createObjectURLClosure() { }; })(); +function resolveCall(fn, args, thisArg = null) { + if (!fn) { + return Promise.resolve(undefined); + } + return new Promise((resolve, reject) => { + resolve(fn.apply(thisArg, args)); + }); +} + +function resolveOrReject(capability, success, reason) { + if (success) { + capability.resolve(); + } else { + capability.reject(reason); + } +} + +function finalize(promise) { + return Promise.resolve(promise).catch(() => {}); +} + function MessageHandler(sourceName, targetName, comObj) { this.sourceName = sourceName; this.targetName = targetName; this.comObj = comObj; - this.callbackIndex = 1; + this.callbackId = 1; + this.streamId = 1; this.postMessageTransfers = true; - var callbacksCapabilities = this.callbacksCapabilities = Object.create(null); - var ah = this.actionHandler = Object.create(null); + this.streamSinks = Object.create(null); + this.streamControllers = Object.create(null); + let callbacksCapabilities = this.callbacksCapabilities = Object.create(null); + let ah = this.actionHandler = Object.create(null); this._onComObjOnMessage = (event) => { - var data = event.data; + let data = event.data; if (data.targetName !== this.sourceName) { return; } - if (data.isReply) { - var callbackId = data.callbackId; + if (data.stream) { + this._processStreamMessage(data); + } else if (data.isReply) { + let callbackId = data.callbackId; if (data.callbackId in callbacksCapabilities) { - var callback = callbacksCapabilities[callbackId]; + let callback = callbacksCapabilities[callbackId]; delete callbacksCapabilities[callbackId]; if ('error' in data) { callback.reject(data.error); @@ -1242,13 +1268,13 @@ function MessageHandler(sourceName, targetName, comObj) { error('Cannot resolve callback ' + callbackId); } } else if (data.action in ah) { - var action = ah[data.action]; + let action = ah[data.action]; if (data.callbackId) { - var sourceName = this.sourceName; - var targetName = data.sourceName; + let sourceName = this.sourceName; + let targetName = data.sourceName; Promise.resolve().then(function () { return action[0].call(action[1], data.data); - }).then(function (result) { + }).then((result) => { comObj.postMessage({ sourceName, targetName, @@ -1256,7 +1282,7 @@ function MessageHandler(sourceName, targetName, comObj) { callbackId: data.callbackId, data: result, }); - }, function (reason) { + }, (reason) => { if (reason instanceof Error) { // Serialize error to avoid "DataCloneError" reason = reason + ''; @@ -1269,6 +1295,8 @@ function MessageHandler(sourceName, targetName, comObj) { error: reason, }); }); + } else if (data.streamId) { + this._createStreamSink(data); } else { action[0].call(action[1], data.data); } @@ -1289,9 +1317,9 @@ MessageHandler.prototype = { }, /** * Sends a message to the comObj to invoke the action with the supplied data. - * @param {String} actionName Action to call. - * @param {JSON} data JSON data to send. - * @param {Array} [transfers] Optional list of transfers/ArrayBuffers + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers */ send(actionName, data, transfers) { var message = { @@ -1304,14 +1332,14 @@ MessageHandler.prototype = { }, /** * Sends a message to the comObj to invoke the action with the supplied data. - * Expects that other side will callback with the response. - * @param {String} actionName Action to call. - * @param {JSON} data JSON data to send. - * @param {Array} [transfers] Optional list of transfers/ArrayBuffers. + * Expects that the other side will callback with the response. + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. * @returns {Promise} Promise to be resolved with response data. */ sendWithPromise(actionName, data, transfers) { - var callbackId = this.callbackIndex++; + var callbackId = this.callbackId++; var message = { sourceName: this.sourceName, targetName: this.targetName, @@ -1328,10 +1356,215 @@ MessageHandler.prototype = { } return capability.promise; }, + /** + * Sends a message to the comObj to invoke the action with the supplied data. + * Expect that the other side will callback to signal 'start_complete'. + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Object} queueingStrategy - strategy to signal backpressure based on + * internal queue. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. + * @return {ReadableStream} ReadableStream to read data in chunks. + */ + sendWithStream(actionName, data, queueingStrategy, transfers) { + let streamId = this.streamId++; + let sourceName = this.sourceName; + let targetName = this.targetName; + + return new ReadableStream({ + start: (controller) => { + let startCapability = createPromiseCapability(); + this.streamControllers[streamId] = { + controller, + startCall: startCapability, + }; + this.postMessage({ + sourceName, + targetName, + action: actionName, + streamId, + data, + desiredSize: controller.desiredSize, + }); + // Return Promise for Async process, to signal success/failure. + return startCapability.promise; + }, + + pull: (controller) => { + let pullCapability = createPromiseCapability(); + this.streamControllers[streamId].pullCall = pullCapability; + this.postMessage({ + sourceName, + targetName, + stream: 'pull', + streamId, + desiredSize: controller.desiredSize, + }); + // Returning Promise will not call "pull" + // again until current pull is resolved. + return pullCapability.promise; + }, + + cancel: (reason) => { + let cancelCapability = createPromiseCapability(); + this.streamControllers[streamId].cancelCall = cancelCapability; + this.postMessage({ + sourceName, + targetName, + stream: 'cancel', + reason, + streamId, + }); + // Return Promise to signal success or failure. + return cancelCapability.promise; + }, + }, queueingStrategy); + }, + + _createStreamSink(data) { + let self = this; + let action = this.actionHandler[data.action]; + let streamId = data.streamId; + let desiredSize = data.desiredSize; + let sourceName = this.sourceName; + let targetName = data.sourceName; + let capability = createPromiseCapability(); + + let sendStreamRequest = ({ stream, chunk, success, reason, }) => { + this.comObj.postMessage({ sourceName, targetName, stream, streamId, + chunk, success, reason, }); + }; + + let streamSink = { + enqueue(chunk, size = 1) { + let lastDesiredSize = this.desiredSize; + this.desiredSize -= size; + // Enqueue decreases the desiredSize property of sink, + // so when it changes from positive to negative, + // set ready as unresolved promise. + if (lastDesiredSize > 0 && this.desiredSize <= 0) { + this.sinkCapability = createPromiseCapability(); + this.ready = this.sinkCapability.promise; + } + sendStreamRequest({ stream: 'enqueue', chunk, }); + }, + + close() { + sendStreamRequest({ stream: 'close', }); + delete self.streamSinks[streamId]; + }, + + error(reason) { + sendStreamRequest({ stream: 'error', reason, }); + }, + + sinkCapability: capability, + onPull: null, + onCancel: null, + desiredSize, + ready: null, + }; + + streamSink.sinkCapability.resolve(); + streamSink.ready = streamSink.sinkCapability.promise; + this.streamSinks[streamId] = streamSink; + resolveCall(action[0], [data.data, streamSink], action[1]).then(() => { + sendStreamRequest({ stream: 'start_complete', success: true, }); + }, (reason) => { + sendStreamRequest({ stream: 'start_complete', success: false, reason, }); + }); + }, + + _processStreamMessage(data) { + let sourceName = this.sourceName; + let targetName = data.sourceName; + let streamId = data.streamId; + + let sendStreamResponse = ({ stream, success, reason, }) => { + this.comObj.postMessage({ sourceName, targetName, stream, + success, streamId, reason, }); + }; + + let deleteStreamController = () => { + // Delete streamController only when start, pull and + // cancel callbacks are resolved, to avoid "TypeError". + Promise.all([ + this.streamControllers[data.streamId].startCall, + this.streamControllers[data.streamId].pullCall, + this.streamControllers[data.streamId].cancelCall + ].map(function(capability) { + return capability && finalize(capability.promise); + })).then(() => { + delete this.streamControllers[data.streamId]; + }); + }; + + switch (data.stream) { + case 'start_complete': + resolveOrReject(this.streamControllers[data.streamId].startCall, + data.success, data.reason); + break; + case 'pull_complete': + resolveOrReject(this.streamControllers[data.streamId].pullCall, + data.success, data.reason); + break; + case 'pull': + // Ignore any pull after close is called. + if (!this.streamSinks[data.streamId]) { + sendStreamResponse({ stream: 'pull_complete', success: true, }); + break; + } + // Pull increases the desiredSize property of sink, + // so when it changes from negative to positive, + // set ready property as resolved promise. + if (this.streamSinks[data.streamId].desiredSize <= 0 && + data.desiredSize > 0) { + this.streamSinks[data.streamId].sinkCapability.resolve(); + } + // Reset desiredSize property of sink on every pull. + this.streamSinks[data.streamId].desiredSize = data.desiredSize; + resolveCall(this.streamSinks[data.streamId].onPull).then(() => { + sendStreamResponse({ stream: 'pull_complete', success: true, }); + }, (reason) => { + sendStreamResponse({ stream: 'pull_complete', + success: false, reason, }); + }); + break; + case 'enqueue': + this.streamControllers[data.streamId].controller.enqueue(data.chunk); + break; + case 'close': + this.streamControllers[data.streamId].controller.close(); + deleteStreamController(); + break; + case 'error': + this.streamControllers[data.streamId].controller.error(data.reason); + deleteStreamController(); + break; + case 'cancel_complete': + resolveOrReject(this.streamControllers[data.streamId].cancelCall, + data.success, data.reason); + deleteStreamController(); + break; + case 'cancel': + resolveCall(this.streamSinks[data.streamId].onCancel, + [data.reason]).then(() => { + sendStreamResponse({ stream: 'cancel_complete', success: true, }); + }, (reason) => { + sendStreamResponse({ stream: 'cancel_complete', + success: false, reason, }); + }); + delete this.streamSinks[data.streamId]; + break; + default: + throw new Error('Unexpected stream case'); + } + }, + /** * Sends raw message to the comObj. * @private - * @param message {Object} Raw message. + * @param {Object} message - Raw message. * @param transfers List of transfers/ArrayBuffers, or undefined. */ postMessage(message, transfers) { diff --git a/test/unit/clitests.json b/test/unit/clitests.json index 2ac082796..38a018da1 100644 --- a/test/unit/clitests.json +++ b/test/unit/clitests.json @@ -19,6 +19,7 @@ "type1_parser_spec.js", "ui_utils_spec.js", "unicode_spec.js", - "util_spec.js" + "util_spec.js", + "util_stream_spec.js" ] } diff --git a/test/unit/jasmine-boot.js b/test/unit/jasmine-boot.js index 8ff77ce60..eb6604df8 100644 --- a/test/unit/jasmine-boot.js +++ b/test/unit/jasmine-boot.js @@ -45,15 +45,15 @@ function initializePDFJS(callback) { 'pdfjs/display/global', 'pdfjs-test/unit/annotation_spec', 'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec', 'pdfjs-test/unit/cff_parser_spec', 'pdfjs-test/unit/cmap_spec', - 'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/document_spec', - 'pdfjs-test/unit/dom_utils_spec', 'pdfjs-test/unit/evaluator_spec', - 'pdfjs-test/unit/fonts_spec', 'pdfjs-test/unit/function_spec', - 'pdfjs-test/unit/metadata_spec', 'pdfjs-test/unit/murmurhash3_spec', - 'pdfjs-test/unit/network_spec', 'pdfjs-test/unit/parser_spec', - 'pdfjs-test/unit/primitives_spec', 'pdfjs-test/unit/stream_spec', - 'pdfjs-test/unit/type1_parser_spec', 'pdfjs-test/unit/ui_utils_spec', - 'pdfjs-test/unit/unicode_spec', 'pdfjs-test/unit/util_spec', - 'pdfjs-test/unit/custom_spec' + 'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/custom_spec', + 'pdfjs-test/unit/document_spec', 'pdfjs-test/unit/dom_utils_spec', + 'pdfjs-test/unit/evaluator_spec', 'pdfjs-test/unit/fonts_spec', + 'pdfjs-test/unit/function_spec', 'pdfjs-test/unit/metadata_spec', + 'pdfjs-test/unit/murmurhash3_spec', 'pdfjs-test/unit/network_spec', + 'pdfjs-test/unit/parser_spec', 'pdfjs-test/unit/primitives_spec', + 'pdfjs-test/unit/stream_spec', 'pdfjs-test/unit/type1_parser_spec', + 'pdfjs-test/unit/ui_utils_spec', 'pdfjs-test/unit/unicode_spec', + 'pdfjs-test/unit/util_spec', 'pdfjs-test/unit/util_stream_spec' ].map(function (moduleName) { return SystemJS.import(moduleName); })).then(function (modules) { diff --git a/test/unit/util_spec.js b/test/unit/util_spec.js index 4c8c64d40..74e6d773c 100644 --- a/test/unit/util_spec.js +++ b/test/unit/util_spec.js @@ -20,46 +20,46 @@ import { describe('util', function() { describe('stringToPDFString', function() { it('handles ISO Latin 1 strings', function() { - var str = '\x8Dstring\x8E'; + let str = '\x8Dstring\x8E'; expect(stringToPDFString(str)).toEqual('\u201Cstring\u201D'); }); it('handles UTF-16BE strings', function() { - var str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67'; + let str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67'; expect(stringToPDFString(str)).toEqual('string'); }); it('handles empty strings', function() { // ISO Latin 1 - var str1 = ''; + let str1 = ''; expect(stringToPDFString(str1)).toEqual(''); // UTF-16BE - var str2 = '\xFE\xFF'; + let str2 = '\xFE\xFF'; expect(stringToPDFString(str2)).toEqual(''); }); }); describe('removeNullCharacters', function() { it('should not modify string without null characters', function() { - var str = 'string without null chars'; + let str = 'string without null chars'; expect(removeNullCharacters(str)).toEqual('string without null chars'); }); it('should modify string with null characters', function() { - var str = 'string\x00With\x00Null\x00Chars'; + let str = 'string\x00With\x00Null\x00Chars'; expect(removeNullCharacters(str)).toEqual('stringWithNullChars'); }); }); describe('ReadableStream', function() { it('should return an Object', function () { - var readable = new ReadableStream(); + let readable = new ReadableStream(); expect(typeof readable).toEqual('object'); }); it('should have property getReader', function () { - var readable = new ReadableStream(); + let readable = new ReadableStream(); expect(typeof readable.getReader).toEqual('function'); }); }); diff --git a/test/unit/util_stream_spec.js b/test/unit/util_stream_spec.js new file mode 100644 index 000000000..285ec1f9c --- /dev/null +++ b/test/unit/util_stream_spec.js @@ -0,0 +1,378 @@ +/* Copyright 2017 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { createPromiseCapability, MessageHandler } from '../../src/shared/util'; + +describe('util_stream', function () { + // Temporary fake port for sending messages between main and worker. + class FakePort { + constructor() { + this._listeners = []; + this._deferred = Promise.resolve(undefined); + } + + postMessage(obj) { + let event = { data: obj, }; + this._deferred.then(() => { + this._listeners.forEach(function (listener) { + listener.call(this, event); + }, this); + }); + } + + addEventListener(name, listener) { + this._listeners.push(listener); + } + + removeEventListener(name, listener) { + let i = this._listeners.indexOf(listener); + this._listeners.splice(i, 1); + } + + terminate() { + this._listeners = []; + } + } + + // Sleep function to wait for sometime, similar to setTimeout but faster. + function sleep(ticks) { + return Promise.resolve().then(() => { + return (ticks && sleep(ticks - 1)); + }); + } + + describe('sendWithStream', function () { + it('should return a ReadableStream', function () { + let port = new FakePort(); + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler'); + // Check if readable is an instance of ReadableStream. + expect(typeof readable).toEqual('object'); + expect(typeof readable.getReader).toEqual('function'); + }); + + it('should read using a reader', function (done) { + let log = ''; + let port = new FakePort(); + let messageHandler1 = new MessageHandler('main', 'worker', port); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + sink.ready.then(() => { + sink.enqueue('hi'); + return sink.ready; + }).then(() => { + sink.close(); + }); + return sleep(5); + }); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 1, + size() { + return 1; + }, + }); + let reader = readable.getReader(); + sleep(10).then(() => { + expect(log).toEqual(''); + return reader.read(); + }).then((result) => { + expect(log).toEqual('p'); + expect(result.value).toEqual('hi'); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + return reader.read(); + }).then((result) => { + expect(result.value).toEqual(undefined); + expect(result.done).toEqual(true); + done(); + }); + }); + + it('should not read any data when cancelled', function (done) { + let log = ''; + let port = new FakePort(); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + log += '0'; + sink.ready.then(() => { + log += '1'; + sink.enqueue([1, 2, 3, 4], 4); + return sink.ready; + }).then(() => { + log += '2'; + sink.enqueue([5, 6, 7, 8], 4); + return sink.ready; + }).then(() => { + log += '3'; + sink.close(); + }, () => { + log += '4'; + }); + }); + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 4, + size(arr) { + return arr.length; + }, + }); + + let reader = readable.getReader(); + sleep(10).then(() => { + expect(log).toEqual('01'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([1, 2, 3, 4]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('01p2'); + return reader.cancel(); + }).then(() => { + expect(log).toEqual('01p2c'); + done(); + }); + }); + + it('should not read when errored', function(done) { + let log = ''; + let port = new FakePort(); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + sink.ready.then(() => { + sink.enqueue([1, 2, 3, 4], 4); + return sink.ready; + }).then(() => { + log += 'error'; + sink.error('error'); + }); + }); + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 4, + size(arr) { + return arr.length; + }, + }); + + let reader = readable.getReader(); + + sleep(10).then(() => { + expect(log).toEqual(''); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([1, 2, 3, 4]); + expect(result.done).toEqual(false); + return reader.read(); + }).then(() => { + }, (reason) => { + expect(reason).toEqual('error'); + done(); + }); + }); + + it('should read data with blocking promise', function (done) { + let log = ''; + let port = new FakePort(); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + log += '0'; + sink.ready.then(() => { + log += '1'; + sink.enqueue([1, 2, 3, 4], 4); + return sink.ready; + }).then(() => { + log += '2'; + sink.enqueue([5, 6, 7, 8], 4); + return sink.ready; + }).then(() => { + sink.close(); + }); + }); + + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 4, + size(arr) { + return arr.length; + }, + }); + + let reader = readable.getReader(); + // Sleep for 10ms, so that read() is not unblocking the ready promise. + // Chain all read() to stream in sequence. + sleep(10).then(() => { + expect(log).toEqual('01'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([1, 2, 3, 4]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('01p2'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([5, 6, 7, 8]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('01p2p'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual(undefined); + expect(result.done).toEqual(true); + done(); + }); + }); + + it('should read data with blocking promise and buffer whole data' + + ' into stream', function (done) { + let log = ''; + let port = new FakePort(); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + log += '0'; + sink.ready.then(() => { + log += '1'; + sink.enqueue([1, 2, 3, 4], 4); + return sink.ready; + }).then(() => { + log += '2'; + sink.enqueue([5, 6, 7, 8], 4); + return sink.ready; + }).then(() => { + sink.close(); + }); + return sleep(10); + }); + + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 8, + size(arr) { + return arr.length; + }, + }); + + let reader = readable.getReader(); + + sleep(10).then(() => { + expect(log).toEqual('012'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([1, 2, 3, 4]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('012p'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual([5, 6, 7, 8]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('012p'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual(undefined); + expect(result.done).toEqual(true); + done(); + }); + }); + + it('should ignore any pull after close is called', function (done) { + let log = ''; + let port = new FakePort(); + let capability = createPromiseCapability(); + let messageHandler2 = new MessageHandler('worker', 'main', port); + messageHandler2.on('fakeHandler', (data, sink) => { + sink.onPull = function () { + log += 'p'; + }; + sink.onCancel = function (reason) { + log += 'c'; + }; + log += '0'; + sink.ready.then(() => { + log += '1'; + sink.enqueue([1, 2, 3, 4], 4); + }); + return capability.promise.then(() => { + sink.close(); + }); + }); + + let messageHandler1 = new MessageHandler('main', 'worker', port); + let readable = messageHandler1.sendWithStream('fakeHandler', {}, { + highWaterMark: 10, + size(arr) { + return arr.length; + }, + }); + + let reader = readable.getReader(); + + sleep(10).then(() => { + expect(log).toEqual('01'); + capability.resolve(); + return capability.promise.then(() => { + return reader.read(); + }); + }).then((result) => { + expect(result.value).toEqual([1, 2, 3, 4]); + expect(result.done).toEqual(false); + return sleep(10); + }).then(() => { + expect(log).toEqual('01'); + return reader.read(); + }).then((result) => { + expect(result.value).toEqual(undefined); + expect(result.done).toEqual(true); + done(); + }); + }); + }); +});