A piano to hack on while going to/from work.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

398 lines
13 KiB

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
;(function (factory) {
var objectTypes = {
'boolean': false,
'function': true,
'object': true,
'number': false,
'string': false,
'undefined': false
};
var root = (objectTypes[typeof window] && window) || this,
freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports,
freeModule = objectTypes[typeof module] && module && !module.nodeType && module,
moduleExports = freeModule && freeModule.exports === freeExports && freeExports,
freeGlobal = objectTypes[typeof global] && global;
if (freeGlobal && (freeGlobal.global === freeGlobal || freeGlobal.window === freeGlobal)) {
root = freeGlobal;
}
// Because of build optimizers
if (typeof define === 'function' && define.amd) {
define(['rx', 'exports'], function (Rx, exports) {
root.Rx = factory(root, exports, Rx);
return root.Rx;
});
} else if (typeof module === 'object' && module && module.exports === freeExports) {
module.exports = factory(root, module.exports, require('./rx'));
} else {
root.Rx = factory(root, {}, root.Rx);
}
}.call(this, function (root, exp, Rx, undefined) {
// References
var Observable = Rx.Observable,
observableProto = Observable.prototype,
AnonymousObservable = Rx.AnonymousObservable,
CompositeDisposable = Rx.CompositeDisposable,
Subject = Rx.Subject,
Observer = Rx.Observer,
disposableEmpty = Rx.Disposable.empty,
disposableCreate = Rx.Disposable.create,
inherits = Rx.internals.inherits,
addProperties = Rx.internals.addProperties,
timeoutScheduler = Rx.Scheduler.timeout,
identity = Rx.helpers.identity;
var objectDisposed = 'Object has been disposed';
function checkDisposed() { if (this.isDisposed) { throw new Error(objectDisposed); } }
var PausableObservable = (function (_super) {
inherits(PausableObservable, _super);
function subscribe(observer) {
var conn = this.source.publish(),
subscription = conn.subscribe(observer),
connection = disposableEmpty;
var pausable = this.subject.distinctUntilChanged().subscribe(function (b) {
if (b) {
connection = conn.connect();
} else {
connection.dispose();
connection = disposableEmpty;
}
});
return new CompositeDisposable(subscription, connection, pausable);
}
function PausableObservable(source, subject) {
this.source = source;
this.subject = subject || new Subject();
this.isPaused = true;
_super.call(this, subscribe);
}
PausableObservable.prototype.pause = function () {
if (this.isPaused === true){
return;
}
this.isPaused = true;
this.subject.onNext(false);
};
PausableObservable.prototype.resume = function () {
if (this.isPaused === false){
return;
}
this.isPaused = false;
this.subject.onNext(true);
};
return PausableObservable;
}(Observable));
/**
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
* @example
* var pauser = new Rx.Subject();
* var source = Rx.Observable.interval(100).pausable(pauser);
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
observableProto.pausable = function (pauser) {
return new PausableObservable(this, pauser);
};
function combineLatestSource(source, subject, resultSelector) {
return new AnonymousObservable(function (observer) {
var n = 2,
hasValue = [false, false],
hasValueAll = false,
isDone = false,
values = new Array(n);
function next(x, i) {
values[i] = x
var res;
hasValue[i] = true;
if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
try {
res = resultSelector.apply(null, values);
} catch (ex) {
observer.onError(ex);
return;
}
observer.onNext(res);
} else if (isDone) {
observer.onCompleted();
}
}
return new CompositeDisposable(
source.subscribe(
function (x) {
next(x, 0);
},
observer.onError.bind(observer),
function () {
isDone = true;
observer.onCompleted();
}),
subject.subscribe(
function (x) {
next(x, 1);
},
observer.onError.bind(observer))
);
});
}
var PausableBufferedObservable = (function (_super) {
inherits(PausableBufferedObservable, _super);
function subscribe(observer) {
var q = [], previous = true;
var subscription =
combineLatestSource(
this.source,
this.subject.distinctUntilChanged(),
function (data, shouldFire) {
return { data: data, shouldFire: shouldFire };
})
.subscribe(
function (results) {
if (results.shouldFire && previous) {
observer.onNext(results.data);
}
if (results.shouldFire && !previous) {
while (q.length > 0) {
observer.onNext(q.shift());
}
previous = true;
} else if (!results.shouldFire && !previous) {
q.push(results.data);
} else if (!results.shouldFire && previous) {
previous = false;
}
},
observer.onError.bind(observer),
observer.onCompleted.bind(observer)
);
this.subject.onNext(false);
return subscription;
}
function PausableBufferedObservable(source, subject) {
this.source = source;
this.subject = subject || new Subject();
this.isPaused = true;
_super.call(this, subscribe);
}
PausableBufferedObservable.prototype.pause = function () {
if (this.isPaused === true){
return;
}
this.isPaused = true;
this.subject.onNext(false);
};
PausableBufferedObservable.prototype.resume = function () {
if (this.isPaused === false){
return;
}
this.isPaused = false;
this.subject.onNext(true);
};
return PausableBufferedObservable;
}(Observable));
/**
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
* and yields the values that were buffered while paused.
* @example
* var pauser = new Rx.Subject();
* var source = Rx.Observable.interval(100).pausableBuffered(pauser);
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
observableProto.pausableBuffered = function (subject) {
return new PausableBufferedObservable(this, subject);
};
/**
* Attaches a controller to the observable sequence with the ability to queue.
* @example
* var source = Rx.Observable.interval(100).controlled();
* source.request(3); // Reads 3 values
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
observableProto.controlled = function (enableQueue) {
if (enableQueue == null) { enableQueue = true; }
return new ControlledObservable(this, enableQueue);
};
var ControlledObservable = (function (_super) {
inherits(ControlledObservable, _super);
function subscribe (observer) {
return this.source.subscribe(observer);
}
function ControlledObservable (source, enableQueue) {
_super.call(this, subscribe);
this.subject = new ControlledSubject(enableQueue);
this.source = source.multicast(this.subject).refCount();
}
ControlledObservable.prototype.request = function (numberOfItems) {
if (numberOfItems == null) { numberOfItems = -1; }
return this.subject.request(numberOfItems);
};
return ControlledObservable;
}(Observable));
var ControlledSubject = Rx.ControlledSubject = (function (_super) {
function subscribe (observer) {
return this.subject.subscribe(observer);
}
inherits(ControlledSubject, _super);
function ControlledSubject(enableQueue) {
if (enableQueue == null) {
enableQueue = true;
}
_super.call(this, subscribe);
this.subject = new Subject();
this.enableQueue = enableQueue;
this.queue = enableQueue ? [] : null;
this.requestedCount = 0;
this.requestedDisposable = disposableEmpty;
this.error = null;
this.hasFailed = false;
this.hasCompleted = false;
this.controlledDisposable = disposableEmpty;
}
addProperties(ControlledSubject.prototype, Observer, {
onCompleted: function () {
checkDisposed.call(this);
this.hasCompleted = true;
if (!this.enableQueue || this.queue.length === 0) {
this.subject.onCompleted();
}
},
onError: function (error) {
checkDisposed.call(this);
this.hasFailed = true;
this.error = error;
if (!this.enableQueue || this.queue.length === 0) {
this.subject.onError(error);
}
},
onNext: function (value) {
checkDisposed.call(this);
var hasRequested = false;
if (this.requestedCount === 0) {
if (this.enableQueue) {
this.queue.push(value);
}
} else {
if (this.requestedCount !== -1) {
if (this.requestedCount-- === 0) {
this.disposeCurrentRequest();
}
}
hasRequested = true;
}
if (hasRequested) {
this.subject.onNext(value);
}
},
_processRequest: function (numberOfItems) {
if (this.enableQueue) {
//console.log('queue length', this.queue.length);
while (this.queue.length >= numberOfItems && numberOfItems > 0) {
//console.log('number of items', numberOfItems);
this.subject.onNext(this.queue.shift());
numberOfItems--;
}
if (this.queue.length !== 0) {
return { numberOfItems: numberOfItems, returnValue: true };
} else {
return { numberOfItems: numberOfItems, returnValue: false };
}
}
if (this.hasFailed) {
this.subject.onError(this.error);
this.controlledDisposable.dispose();
this.controlledDisposable = disposableEmpty;
} else if (this.hasCompleted) {
this.subject.onCompleted();
this.controlledDisposable.dispose();
this.controlledDisposable = disposableEmpty;
}
return { numberOfItems: numberOfItems, returnValue: false };
},
request: function (number) {
checkDisposed.call(this);
this.disposeCurrentRequest();
var self = this,
r = this._processRequest(number);
number = r.numberOfItems;
if (!r.returnValue) {
this.requestedCount = number;
this.requestedDisposable = disposableCreate(function () {
self.requestedCount = 0;
});
return this.requestedDisposable
} else {
return disposableEmpty;
}
},
disposeCurrentRequest: function () {
this.requestedDisposable.dispose();
this.requestedDisposable = disposableEmpty;
},
dispose: function () {
this.isDisposed = true;
this.error = null;
this.subject.dispose();
this.requestedDisposable.dispose();
}
});
return ControlledSubject;
}(Observable));
return Rx;
}));