// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. ;(function (factory) { var objectTypes = { 'boolean': false, 'function': true, 'object': true, 'number': false, 'string': false, 'undefined': false }; var root = (objectTypes[typeof window] && window) || this, freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports, freeModule = objectTypes[typeof module] && module && !module.nodeType && module, moduleExports = freeModule && freeModule.exports === freeExports && freeExports, freeGlobal = objectTypes[typeof global] && global; if (freeGlobal && (freeGlobal.global === freeGlobal || freeGlobal.window === freeGlobal)) { root = freeGlobal; } // Because of build optimizers if (typeof define === 'function' && define.amd) { define(['rx', 'exports'], function (Rx, exports) { root.Rx = factory(root, exports, Rx); return root.Rx; }); } else if (typeof module === 'object' && module && module.exports === freeExports) { module.exports = factory(root, module.exports, require('./rx')); } else { root.Rx = factory(root, {}, root.Rx); } }.call(this, function (root, exp, Rx, undefined) { // References var Observable = Rx.Observable, observableProto = Observable.prototype, AnonymousObservable = Rx.AnonymousObservable, CompositeDisposable = Rx.CompositeDisposable, Subject = Rx.Subject, Observer = Rx.Observer, disposableEmpty = Rx.Disposable.empty, disposableCreate = Rx.Disposable.create, inherits = Rx.internals.inherits, addProperties = Rx.internals.addProperties, timeoutScheduler = Rx.Scheduler.timeout, identity = Rx.helpers.identity; var objectDisposed = 'Object has been disposed'; function checkDisposed() { if (this.isDisposed) { throw new Error(objectDisposed); } } var PausableObservable = (function (_super) { inherits(PausableObservable, _super); function subscribe(observer) { var conn = this.source.publish(), subscription = conn.subscribe(observer), connection = disposableEmpty; var pausable = this.subject.distinctUntilChanged().subscribe(function (b) { if (b) { connection = conn.connect(); } else { connection.dispose(); connection = disposableEmpty; } }); return new CompositeDisposable(subscription, connection, pausable); } function PausableObservable(source, subject) { this.source = source; this.subject = subject || new Subject(); this.isPaused = true; _super.call(this, subscribe); } PausableObservable.prototype.pause = function () { if (this.isPaused === true){ return; } this.isPaused = true; this.subject.onNext(false); }; PausableObservable.prototype.resume = function () { if (this.isPaused === false){ return; } this.isPaused = false; this.subject.onNext(true); }; return PausableObservable; }(Observable)); /** * Pauses the underlying observable sequence based upon the observable sequence which yields true/false. * @example * var pauser = new Rx.Subject(); * var source = Rx.Observable.interval(100).pausable(pauser); * @param {Observable} pauser The observable sequence used to pause the underlying sequence. * @returns {Observable} The observable sequence which is paused based upon the pauser. */ observableProto.pausable = function (pauser) { return new PausableObservable(this, pauser); }; function combineLatestSource(source, subject, resultSelector) { return new AnonymousObservable(function (observer) { var n = 2, hasValue = [false, false], hasValueAll = false, isDone = false, values = new Array(n); function next(x, i) { values[i] = x var res; hasValue[i] = true; if (hasValueAll || (hasValueAll = hasValue.every(identity))) { try { res = resultSelector.apply(null, values); } catch (ex) { observer.onError(ex); return; } observer.onNext(res); } else if (isDone) { observer.onCompleted(); } } return new CompositeDisposable( source.subscribe( function (x) { next(x, 0); }, observer.onError.bind(observer), function () { isDone = true; observer.onCompleted(); }), subject.subscribe( function (x) { next(x, 1); }, observer.onError.bind(observer)) ); }); } var PausableBufferedObservable = (function (_super) { inherits(PausableBufferedObservable, _super); function subscribe(observer) { var q = [], previous = true; var subscription = combineLatestSource( this.source, this.subject.distinctUntilChanged(), function (data, shouldFire) { return { data: data, shouldFire: shouldFire }; }) .subscribe( function (results) { if (results.shouldFire && previous) { observer.onNext(results.data); } if (results.shouldFire && !previous) { while (q.length > 0) { observer.onNext(q.shift()); } previous = true; } else if (!results.shouldFire && !previous) { q.push(results.data); } else if (!results.shouldFire && previous) { previous = false; } }, observer.onError.bind(observer), observer.onCompleted.bind(observer) ); this.subject.onNext(false); return subscription; } function PausableBufferedObservable(source, subject) { this.source = source; this.subject = subject || new Subject(); this.isPaused = true; _super.call(this, subscribe); } PausableBufferedObservable.prototype.pause = function () { if (this.isPaused === true){ return; } this.isPaused = true; this.subject.onNext(false); }; PausableBufferedObservable.prototype.resume = function () { if (this.isPaused === false){ return; } this.isPaused = false; this.subject.onNext(true); }; return PausableBufferedObservable; }(Observable)); /** * Pauses the underlying observable sequence based upon the observable sequence which yields true/false, * and yields the values that were buffered while paused. * @example * var pauser = new Rx.Subject(); * var source = Rx.Observable.interval(100).pausableBuffered(pauser); * @param {Observable} pauser The observable sequence used to pause the underlying sequence. * @returns {Observable} The observable sequence which is paused based upon the pauser. */ observableProto.pausableBuffered = function (subject) { return new PausableBufferedObservable(this, subject); }; /** * Attaches a controller to the observable sequence with the ability to queue. * @example * var source = Rx.Observable.interval(100).controlled(); * source.request(3); // Reads 3 values * @param {Observable} pauser The observable sequence used to pause the underlying sequence. * @returns {Observable} The observable sequence which is paused based upon the pauser. */ observableProto.controlled = function (enableQueue) { if (enableQueue == null) { enableQueue = true; } return new ControlledObservable(this, enableQueue); }; var ControlledObservable = (function (_super) { inherits(ControlledObservable, _super); function subscribe (observer) { return this.source.subscribe(observer); } function ControlledObservable (source, enableQueue) { _super.call(this, subscribe); this.subject = new ControlledSubject(enableQueue); this.source = source.multicast(this.subject).refCount(); } ControlledObservable.prototype.request = function (numberOfItems) { if (numberOfItems == null) { numberOfItems = -1; } return this.subject.request(numberOfItems); }; return ControlledObservable; }(Observable)); var ControlledSubject = Rx.ControlledSubject = (function (_super) { function subscribe (observer) { return this.subject.subscribe(observer); } inherits(ControlledSubject, _super); function ControlledSubject(enableQueue) { if (enableQueue == null) { enableQueue = true; } _super.call(this, subscribe); this.subject = new Subject(); this.enableQueue = enableQueue; this.queue = enableQueue ? [] : null; this.requestedCount = 0; this.requestedDisposable = disposableEmpty; this.error = null; this.hasFailed = false; this.hasCompleted = false; this.controlledDisposable = disposableEmpty; } addProperties(ControlledSubject.prototype, Observer, { onCompleted: function () { checkDisposed.call(this); this.hasCompleted = true; if (!this.enableQueue || this.queue.length === 0) { this.subject.onCompleted(); } }, onError: function (error) { checkDisposed.call(this); this.hasFailed = true; this.error = error; if (!this.enableQueue || this.queue.length === 0) { this.subject.onError(error); } }, onNext: function (value) { checkDisposed.call(this); var hasRequested = false; if (this.requestedCount === 0) { if (this.enableQueue) { this.queue.push(value); } } else { if (this.requestedCount !== -1) { if (this.requestedCount-- === 0) { this.disposeCurrentRequest(); } } hasRequested = true; } if (hasRequested) { this.subject.onNext(value); } }, _processRequest: function (numberOfItems) { if (this.enableQueue) { //console.log('queue length', this.queue.length); while (this.queue.length >= numberOfItems && numberOfItems > 0) { //console.log('number of items', numberOfItems); this.subject.onNext(this.queue.shift()); numberOfItems--; } if (this.queue.length !== 0) { return { numberOfItems: numberOfItems, returnValue: true }; } else { return { numberOfItems: numberOfItems, returnValue: false }; } } if (this.hasFailed) { this.subject.onError(this.error); this.controlledDisposable.dispose(); this.controlledDisposable = disposableEmpty; } else if (this.hasCompleted) { this.subject.onCompleted(); this.controlledDisposable.dispose(); this.controlledDisposable = disposableEmpty; } return { numberOfItems: numberOfItems, returnValue: false }; }, request: function (number) { checkDisposed.call(this); this.disposeCurrentRequest(); var self = this, r = this._processRequest(number); number = r.numberOfItems; if (!r.returnValue) { this.requestedCount = number; this.requestedDisposable = disposableCreate(function () { self.requestedCount = 0; }); return this.requestedDisposable } else { return disposableEmpty; } }, disposeCurrentRequest: function () { this.requestedDisposable.dispose(); this.requestedDisposable = disposableEmpty; }, dispose: function () { this.isDisposed = true; this.error = null; this.subject.dispose(); this.requestedDisposable.dispose(); } }); return ControlledSubject; }(Observable)); return Rx; }));