You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
398 lines
13 KiB
398 lines
13 KiB
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. |
|
|
|
;(function (factory) { |
|
var objectTypes = { |
|
'boolean': false, |
|
'function': true, |
|
'object': true, |
|
'number': false, |
|
'string': false, |
|
'undefined': false |
|
}; |
|
|
|
var root = (objectTypes[typeof window] && window) || this, |
|
freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports, |
|
freeModule = objectTypes[typeof module] && module && !module.nodeType && module, |
|
moduleExports = freeModule && freeModule.exports === freeExports && freeExports, |
|
freeGlobal = objectTypes[typeof global] && global; |
|
|
|
if (freeGlobal && (freeGlobal.global === freeGlobal || freeGlobal.window === freeGlobal)) { |
|
root = freeGlobal; |
|
} |
|
|
|
// Because of build optimizers |
|
if (typeof define === 'function' && define.amd) { |
|
define(['rx', 'exports'], function (Rx, exports) { |
|
root.Rx = factory(root, exports, Rx); |
|
return root.Rx; |
|
}); |
|
} else if (typeof module === 'object' && module && module.exports === freeExports) { |
|
module.exports = factory(root, module.exports, require('./rx')); |
|
} else { |
|
root.Rx = factory(root, {}, root.Rx); |
|
} |
|
}.call(this, function (root, exp, Rx, undefined) { |
|
|
|
// References |
|
var Observable = Rx.Observable, |
|
observableProto = Observable.prototype, |
|
AnonymousObservable = Rx.AnonymousObservable, |
|
CompositeDisposable = Rx.CompositeDisposable, |
|
Subject = Rx.Subject, |
|
Observer = Rx.Observer, |
|
disposableEmpty = Rx.Disposable.empty, |
|
disposableCreate = Rx.Disposable.create, |
|
inherits = Rx.internals.inherits, |
|
addProperties = Rx.internals.addProperties, |
|
timeoutScheduler = Rx.Scheduler.timeout, |
|
identity = Rx.helpers.identity; |
|
|
|
var objectDisposed = 'Object has been disposed'; |
|
function checkDisposed() { if (this.isDisposed) { throw new Error(objectDisposed); } } |
|
|
|
var PausableObservable = (function (_super) { |
|
|
|
inherits(PausableObservable, _super); |
|
|
|
function subscribe(observer) { |
|
var conn = this.source.publish(), |
|
subscription = conn.subscribe(observer), |
|
connection = disposableEmpty; |
|
|
|
var pausable = this.subject.distinctUntilChanged().subscribe(function (b) { |
|
if (b) { |
|
connection = conn.connect(); |
|
} else { |
|
connection.dispose(); |
|
connection = disposableEmpty; |
|
} |
|
}); |
|
|
|
return new CompositeDisposable(subscription, connection, pausable); |
|
} |
|
|
|
function PausableObservable(source, subject) { |
|
this.source = source; |
|
this.subject = subject || new Subject(); |
|
this.isPaused = true; |
|
_super.call(this, subscribe); |
|
} |
|
|
|
PausableObservable.prototype.pause = function () { |
|
if (this.isPaused === true){ |
|
return; |
|
} |
|
this.isPaused = true; |
|
this.subject.onNext(false); |
|
}; |
|
|
|
PausableObservable.prototype.resume = function () { |
|
if (this.isPaused === false){ |
|
return; |
|
} |
|
this.isPaused = false; |
|
this.subject.onNext(true); |
|
}; |
|
|
|
return PausableObservable; |
|
|
|
}(Observable)); |
|
|
|
/** |
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false. |
|
* @example |
|
* var pauser = new Rx.Subject(); |
|
* var source = Rx.Observable.interval(100).pausable(pauser); |
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
|
* @returns {Observable} The observable sequence which is paused based upon the pauser. |
|
*/ |
|
observableProto.pausable = function (pauser) { |
|
return new PausableObservable(this, pauser); |
|
}; |
|
function combineLatestSource(source, subject, resultSelector) { |
|
return new AnonymousObservable(function (observer) { |
|
var n = 2, |
|
hasValue = [false, false], |
|
hasValueAll = false, |
|
isDone = false, |
|
values = new Array(n); |
|
|
|
function next(x, i) { |
|
values[i] = x |
|
var res; |
|
hasValue[i] = true; |
|
if (hasValueAll || (hasValueAll = hasValue.every(identity))) { |
|
try { |
|
res = resultSelector.apply(null, values); |
|
} catch (ex) { |
|
observer.onError(ex); |
|
return; |
|
} |
|
observer.onNext(res); |
|
} else if (isDone) { |
|
observer.onCompleted(); |
|
} |
|
} |
|
|
|
return new CompositeDisposable( |
|
source.subscribe( |
|
function (x) { |
|
next(x, 0); |
|
}, |
|
observer.onError.bind(observer), |
|
function () { |
|
isDone = true; |
|
observer.onCompleted(); |
|
}), |
|
subject.subscribe( |
|
function (x) { |
|
next(x, 1); |
|
}, |
|
observer.onError.bind(observer)) |
|
); |
|
}); |
|
} |
|
|
|
var PausableBufferedObservable = (function (_super) { |
|
|
|
inherits(PausableBufferedObservable, _super); |
|
|
|
function subscribe(observer) { |
|
var q = [], previous = true; |
|
|
|
var subscription = |
|
combineLatestSource( |
|
this.source, |
|
this.subject.distinctUntilChanged(), |
|
function (data, shouldFire) { |
|
return { data: data, shouldFire: shouldFire }; |
|
}) |
|
.subscribe( |
|
function (results) { |
|
if (results.shouldFire && previous) { |
|
observer.onNext(results.data); |
|
} |
|
if (results.shouldFire && !previous) { |
|
while (q.length > 0) { |
|
observer.onNext(q.shift()); |
|
} |
|
previous = true; |
|
} else if (!results.shouldFire && !previous) { |
|
q.push(results.data); |
|
} else if (!results.shouldFire && previous) { |
|
previous = false; |
|
} |
|
|
|
}, |
|
observer.onError.bind(observer), |
|
observer.onCompleted.bind(observer) |
|
); |
|
|
|
this.subject.onNext(false); |
|
|
|
return subscription; |
|
} |
|
|
|
function PausableBufferedObservable(source, subject) { |
|
this.source = source; |
|
this.subject = subject || new Subject(); |
|
this.isPaused = true; |
|
_super.call(this, subscribe); |
|
} |
|
|
|
PausableBufferedObservable.prototype.pause = function () { |
|
if (this.isPaused === true){ |
|
return; |
|
} |
|
this.isPaused = true; |
|
this.subject.onNext(false); |
|
}; |
|
|
|
PausableBufferedObservable.prototype.resume = function () { |
|
if (this.isPaused === false){ |
|
return; |
|
} |
|
this.isPaused = false; |
|
this.subject.onNext(true); |
|
}; |
|
|
|
return PausableBufferedObservable; |
|
|
|
}(Observable)); |
|
|
|
/** |
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false, |
|
* and yields the values that were buffered while paused. |
|
* @example |
|
* var pauser = new Rx.Subject(); |
|
* var source = Rx.Observable.interval(100).pausableBuffered(pauser); |
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
|
* @returns {Observable} The observable sequence which is paused based upon the pauser. |
|
*/ |
|
observableProto.pausableBuffered = function (subject) { |
|
return new PausableBufferedObservable(this, subject); |
|
}; |
|
|
|
/** |
|
* Attaches a controller to the observable sequence with the ability to queue. |
|
* @example |
|
* var source = Rx.Observable.interval(100).controlled(); |
|
* source.request(3); // Reads 3 values |
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
|
* @returns {Observable} The observable sequence which is paused based upon the pauser. |
|
*/ |
|
observableProto.controlled = function (enableQueue) { |
|
if (enableQueue == null) { enableQueue = true; } |
|
return new ControlledObservable(this, enableQueue); |
|
}; |
|
var ControlledObservable = (function (_super) { |
|
|
|
inherits(ControlledObservable, _super); |
|
|
|
function subscribe (observer) { |
|
return this.source.subscribe(observer); |
|
} |
|
|
|
function ControlledObservable (source, enableQueue) { |
|
_super.call(this, subscribe); |
|
this.subject = new ControlledSubject(enableQueue); |
|
this.source = source.multicast(this.subject).refCount(); |
|
} |
|
|
|
ControlledObservable.prototype.request = function (numberOfItems) { |
|
if (numberOfItems == null) { numberOfItems = -1; } |
|
return this.subject.request(numberOfItems); |
|
}; |
|
|
|
return ControlledObservable; |
|
|
|
}(Observable)); |
|
|
|
var ControlledSubject = Rx.ControlledSubject = (function (_super) { |
|
|
|
function subscribe (observer) { |
|
return this.subject.subscribe(observer); |
|
} |
|
|
|
inherits(ControlledSubject, _super); |
|
|
|
function ControlledSubject(enableQueue) { |
|
if (enableQueue == null) { |
|
enableQueue = true; |
|
} |
|
|
|
_super.call(this, subscribe); |
|
this.subject = new Subject(); |
|
this.enableQueue = enableQueue; |
|
this.queue = enableQueue ? [] : null; |
|
this.requestedCount = 0; |
|
this.requestedDisposable = disposableEmpty; |
|
this.error = null; |
|
this.hasFailed = false; |
|
this.hasCompleted = false; |
|
this.controlledDisposable = disposableEmpty; |
|
} |
|
|
|
addProperties(ControlledSubject.prototype, Observer, { |
|
onCompleted: function () { |
|
checkDisposed.call(this); |
|
this.hasCompleted = true; |
|
|
|
if (!this.enableQueue || this.queue.length === 0) { |
|
this.subject.onCompleted(); |
|
} |
|
}, |
|
onError: function (error) { |
|
checkDisposed.call(this); |
|
this.hasFailed = true; |
|
this.error = error; |
|
|
|
if (!this.enableQueue || this.queue.length === 0) { |
|
this.subject.onError(error); |
|
} |
|
}, |
|
onNext: function (value) { |
|
checkDisposed.call(this); |
|
var hasRequested = false; |
|
|
|
if (this.requestedCount === 0) { |
|
if (this.enableQueue) { |
|
this.queue.push(value); |
|
} |
|
} else { |
|
if (this.requestedCount !== -1) { |
|
if (this.requestedCount-- === 0) { |
|
this.disposeCurrentRequest(); |
|
} |
|
} |
|
hasRequested = true; |
|
} |
|
|
|
if (hasRequested) { |
|
this.subject.onNext(value); |
|
} |
|
}, |
|
_processRequest: function (numberOfItems) { |
|
if (this.enableQueue) { |
|
//console.log('queue length', this.queue.length); |
|
|
|
while (this.queue.length >= numberOfItems && numberOfItems > 0) { |
|
//console.log('number of items', numberOfItems); |
|
this.subject.onNext(this.queue.shift()); |
|
numberOfItems--; |
|
} |
|
|
|
if (this.queue.length !== 0) { |
|
return { numberOfItems: numberOfItems, returnValue: true }; |
|
} else { |
|
return { numberOfItems: numberOfItems, returnValue: false }; |
|
} |
|
} |
|
|
|
if (this.hasFailed) { |
|
this.subject.onError(this.error); |
|
this.controlledDisposable.dispose(); |
|
this.controlledDisposable = disposableEmpty; |
|
} else if (this.hasCompleted) { |
|
this.subject.onCompleted(); |
|
this.controlledDisposable.dispose(); |
|
this.controlledDisposable = disposableEmpty; |
|
} |
|
|
|
return { numberOfItems: numberOfItems, returnValue: false }; |
|
}, |
|
request: function (number) { |
|
checkDisposed.call(this); |
|
this.disposeCurrentRequest(); |
|
var self = this, |
|
r = this._processRequest(number); |
|
|
|
number = r.numberOfItems; |
|
if (!r.returnValue) { |
|
this.requestedCount = number; |
|
this.requestedDisposable = disposableCreate(function () { |
|
self.requestedCount = 0; |
|
}); |
|
|
|
return this.requestedDisposable |
|
} else { |
|
return disposableEmpty; |
|
} |
|
}, |
|
disposeCurrentRequest: function () { |
|
this.requestedDisposable.dispose(); |
|
this.requestedDisposable = disposableEmpty; |
|
}, |
|
|
|
dispose: function () { |
|
this.isDisposed = true; |
|
this.error = null; |
|
this.subject.dispose(); |
|
this.requestedDisposable.dispose(); |
|
} |
|
}); |
|
|
|
return ControlledSubject; |
|
}(Observable)); |
|
return Rx; |
|
})); |