diff --git a/.eslintignore b/.eslintignore
new file mode 100644
index 0000000..22be1f9
--- /dev/null
+++ b/.eslintignore
@@ -0,0 +1,2 @@
+/coverage
+/dist
diff --git a/.eslintrc.js b/.eslintrc.js
index 25ad4d1..263f533 100644
--- a/.eslintrc.js
+++ b/.eslintrc.js
@@ -27,6 +27,7 @@ module.exports = {
"@typescript-eslint/ban-ts-ignore": "off",
"@typescript-eslint/no-this-alias": "off",
"@typescript-eslint/explicit-function-return-type": "off",
- "@typescript-eslint/no-empty-function": "off"
+ "@typescript-eslint/no-empty-function": "off",
+ "@typescript-eslint/no-non-null-assertion": "error"
}
};
diff --git a/src/behavior.ts b/src/behavior.ts
index c566388..7153b96 100644
--- a/src/behavior.ts
+++ b/src/behavior.ts
@@ -1,16 +1,11 @@
-import { cons, DoubleLinkedList, Node, fromArray, nil } from "./datastructures";
-import { combine, isPlaceholder } from "./index";
-import { State, Reactive, Time, BListener, Parent, SListener } from "./common";
-import { Future, BehaviorFuture } from "./future";
+import { Cons, cons, DoubleLinkedList, fromArray, nil, Node } from "./datastructures";
+import { __UNSAFE_GET_LAST_BEHAVIOR_VALUE, combine, isPlaceholder } from "./index";
+import { BListener, Parent, Reactive, SListener, State, Time } from "./common";
import * as F from "./future";
-import {
- Stream,
- FlatFuturesOrdered,
- FlatFuturesLatest,
- FlatFutures
-} from "./stream";
-import { tick, getTime } from "./clock";
-import { sample, Now } from "./now";
+import { BehaviorFuture, Future } from "./future";
+import { FlatFutures, FlatFuturesLatest, FlatFuturesOrdered, Stream } from "./stream";
+import { getTime, tick } from "./clock";
+import { Now, sample } from "./now";
export type MapBehaviorTuple = { [K in keyof A]: Behavior };
@@ -22,7 +17,7 @@ export type MapBehaviorTuple = { [K in keyof A]: Behavior };
export abstract class Behavior extends Reactive
implements Parent {
// Behaviors cache their last value in `last`.
- last: A;
+ last?: A;
children: DoubleLinkedList = new DoubleLinkedList();
pulledAt: number | undefined;
changedAt: number | undefined;
@@ -70,7 +65,7 @@ export abstract class Behavior extends Reactive
const time = t === undefined ? tick() : t;
this.pull(time);
}
- return this.last;
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this);
}
abstract update(t: number): A;
pushB(t: number): void {
@@ -88,7 +83,11 @@ export abstract class Behavior extends Reactive
for (const parent of this.parents) {
if (isBehavior(parent)) {
parent.pull(t);
- shouldRefresh = shouldRefresh || this.changedAt < parent.changedAt;
+ shouldRefresh =
+ shouldRefresh ||
+ (this.changedAt !== undefined &&
+ parent.changedAt !== undefined &&
+ this.changedAt < parent.changedAt);
}
}
if (shouldRefresh) {
@@ -122,7 +121,7 @@ export abstract class Behavior extends Reactive
}
}
-export function pushToChildren(t: number, b: Behavior): void {
+export function pushToChildren<_>(t: number, b: Behavior<_>): void {
for (const child of b.children) {
child.pushB(t);
}
@@ -139,7 +138,7 @@ function refresh(b: Behavior, t: number) {
export function isBehavior(b: unknown): b is Behavior {
return (
- (typeof b === "object" && "at" in b && !isPlaceholder(b)) ||
+ (typeof b === "object" && b !== null && "at" in b && !isPlaceholder(b)) ||
(isPlaceholder(b) && (b.source === undefined || isBehavior(b.source)))
);
}
@@ -184,14 +183,16 @@ class ProducerBehaviorFromFunction extends ProducerBehavior {
) {
super();
}
- deactivateFn: () => void;
+ deactivateFn?: () => void;
activateProducer(): void {
this.state = State.Push;
this.deactivateFn = this.activateFn(this.newValue.bind(this));
}
deactivateProducer(): void {
this.state = State.Inactive;
- this.deactivateFn();
+ if (this.deactivateFn !== undefined) {
+ this.deactivateFn();
+ }
}
}
@@ -243,17 +244,19 @@ export class MapBehavior extends Behavior {
this.parents = cons(parent);
}
update(_t: number): B {
- return this.f(this.parent.last);
+ return this.f(__UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.parent));
}
}
class ApBehavior extends Behavior {
constructor(private fn: Behavior<(a: A) => B>, private val: Behavior) {
super();
- this.parents = cons B) | A>>(fn, cons(val));
+ this.parents = cons B> | Behavior>(fn, cons(val));
}
update(_t: number): B {
- return this.fn.last(this.val.last);
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.fn)(
+ __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.val)
+ );
}
}
@@ -295,19 +298,22 @@ export class LiftBehavior extends Behavior {
class FlatMapBehavior extends Behavior {
// The last behavior returned by the chain function
- private innerB: Behavior;
+ private innerB?: Behavior | undefined;
private innerNode: Node = new Node(this);
constructor(private outer: Behavior, private fn: (a: A) => Behavior) {
super();
this.parents = cons(this.outer);
}
update(t: number): B {
- const outerChanged = this.outer.changedAt > this.changedAt;
+ const outerChanged =
+ this.outer.changedAt !== undefined &&
+ this.changedAt !== undefined &&
+ this.outer.changedAt > this.changedAt;
if (outerChanged || this.changedAt === undefined) {
if (this.innerB !== undefined) {
this.innerB.removeListener(this.innerNode);
}
- this.innerB = this.fn(this.outer.last);
+ this.innerB = this.fn(__UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.outer));
this.innerB.addListener(this.innerNode, t);
if (this.state !== this.innerB.state) {
this.changeStateDown(this.innerB.state);
@@ -317,28 +323,32 @@ class FlatMapBehavior extends Behavior {
this.innerB.pull(t);
}
}
- return this.innerB.last;
+ if (this.innerB === undefined) {
+ // panic!
+ throw new Error("FlatMapBehavior#innerB should be defined");
+ }
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.innerB);
}
}
/** @private */
-class WhenBehavior extends Behavior> {
+class WhenBehavior extends Behavior> {
constructor(private parent: Behavior) {
super();
this.parents = cons(parent);
}
- update(_t: number): Future<{}> {
+ update(_t: number): Future {
return this.parent.last === true
- ? Future.of({})
+ ? Future.of(true)
: new BehaviorFuture(this.parent);
}
}
-export function whenFrom(b: Behavior): Behavior> {
+export function whenFrom(b: Behavior): Behavior> {
return new WhenBehavior(b);
}
-export function when(b: Behavior): Now> {
+export function when(b: Behavior): Now> {
return sample(whenFrom(b));
}
@@ -359,7 +369,7 @@ class SnapshotBehavior extends Behavior> implements SListener {
}
}
pushS(t: number, _val: A): void {
- this.last.resolve(this.parent.at(t), t);
+ __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this).resolve(this.parent.at(t), t);
this.parents = cons(this.parent);
this.changeStateDown(this.state);
this.parent.addListener(this.node, t);
@@ -368,7 +378,7 @@ class SnapshotBehavior extends Behavior> implements SListener {
if (this.future.state === State.Done) {
return Future.of(this.parent.at(t));
} else {
- return this.last;
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this);
}
}
}
@@ -440,7 +450,7 @@ class SwitcherBehavior extends ActiveBehavior
next.addListener(this.nNode, t);
}
update(_t: Time): A {
- return this.b.last;
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.b);
}
pushS(t: number, value: Behavior): void {
this.doSwitch(t, value);
@@ -486,7 +496,9 @@ export function switcherFrom(
init: Behavior,
stream: Stream>
): Behavior> {
- return fromFunction((t) => new SwitcherBehavior(init, stream, t));
+ return fromFunction>(
+ (t) => new SwitcherBehavior(init, stream, t)
+ );
}
export function switcher(
@@ -636,10 +648,10 @@ export function stepper(initial: B, steps: Stream): Now> {
* @param turnOn the streams that turn the behavior on
* @param turnOff the streams that turn the behavior off
*/
-export function toggleFrom(
+export function toggleFrom(
initial: boolean,
- turnOn: Stream,
- turnOff: Stream
+ turnOn: Stream,
+ turnOff: Stream
): Behavior> {
return stepperFrom(initial, turnOn.mapTo(true).combine(turnOff.mapTo(false)));
}
@@ -650,10 +662,10 @@ export function toggleFrom(
* @param turnOn the streams that turn the behavior on
* @param turnOff the streams that turn the behavior off
*/
-export function toggle(
+export function toggle(
initial: boolean,
- turnOn: Stream,
- turnOff: Stream
+ turnOn: Stream,
+ turnOff: Stream
): Now> {
return sample(toggleFrom(initial, turnOn, turnOff));
}
@@ -662,7 +674,7 @@ export type SampleAt = (b: Behavior) => B;
class MomentBehavior extends Behavior {
private sampleBound: SampleAt;
- private currentSampleTime: Time;
+ private currentSampleTime?: Time;
constructor(private f: (at: SampleAt) => A) {
super();
this.sampleBound = (b) => this.sample(b);
@@ -703,17 +715,18 @@ class MomentBehavior extends Behavior {
parent.removeListener(node);
}
}
- this.parents = undefined;
- const value = this.f(this.sampleBound);
- return value;
+ this.parents = nil;
+ return this.f(this.sampleBound);
}
sample(b: Behavior): B {
const node = new Node(this);
this.listenerNodes = cons({ node, parent: b }, this.listenerNodes);
- b.addListener(node, this.currentSampleTime);
- b.at(this.currentSampleTime);
+ if (this.currentSampleTime !== undefined) {
+ b.addListener(node, this.currentSampleTime);
+ b.at(this.currentSampleTime);
+ }
this.parents = cons(b, this.parents);
- return b.last;
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(b);
}
}
@@ -727,9 +740,9 @@ class FormatBehavior extends Behavior {
private behaviors: Array>
) {
super();
- let parents = undefined;
+ let parents: Cons> = nil;
for (const b of behaviors) {
- if (isBehavior(b)) {
+ if (isBehavior(b)) {
parents = cons(b, parents);
}
}
@@ -739,7 +752,9 @@ class FormatBehavior extends Behavior {
let resultString = this.strings[0];
for (let i = 0; i < this.behaviors.length; ++i) {
const b = this.behaviors[i];
- const value = isBehavior(b) ? b.last : b;
+ const value = isBehavior(b)
+ ? __UNSAFE_GET_LAST_BEHAVIOR_VALUE(b)
+ : b;
resultString += value.toString() + this.strings[i + 1];
}
return resultString;
@@ -763,7 +778,8 @@ export function flatFutures(stream: Stream>): Now> {
export const flatFuturesOrderedFrom = (
stream: Stream>
-): Behavior> => fromFunction(() => new FlatFuturesOrdered(stream));
+): Behavior> =>
+ fromFunction>(() => new FlatFuturesOrdered(stream));
export function flatFuturesOrdered(
stream: Stream>
@@ -773,7 +789,8 @@ export function flatFuturesOrdered(
export const flatFuturesLatestFrom = (
stream: Stream>
-): Behavior> => fromFunction(() => new FlatFuturesLatest(stream));
+): Behavior> =>
+ fromFunction>(() => new FlatFuturesLatest(stream));
export function flatFuturesLatest(
stream: Stream>
diff --git a/src/common.ts b/src/common.ts
index 072f087..c28a509 100644
--- a/src/common.ts
+++ b/src/common.ts
@@ -1,15 +1,28 @@
-import { Cons, cons, DoubleLinkedList, Node } from "./datastructures";
+import { Cons, cons, DoubleLinkedList, nil, Node } from "./datastructures";
import { Behavior } from "./behavior";
import { tick } from "./clock";
export type Time = number;
function isBehavior(b: unknown): b is Behavior {
- return typeof b === "object" && "at" in b;
+ return typeof b === "object" && b !== null && "at" in b;
}
export type PullHandler = (pull: (t?: number) => void) => () => void;
+/**
+ * @internal
+ * Do not use!
+ * @throws {Error}
+ */
+export const __UNSAFE_GET_LAST_BEHAVIOR_VALUE = (b: Behavior): A => {
+ if (b.last === undefined) {
+ // panic!
+ throw new Error("Behavior#last value should be defined");
+ }
+ return b.last;
+};
+
/**
* The various states that a reactive can be in. The order matters here: Done <
* Push < Pull < Inactive. The idea is that a reactive can calculate its current
@@ -56,7 +69,7 @@ export class PushOnlyObserver implements BListener, SListener {
}
}
pushB(_t: number): void {
- this.callback((this.source as Behavior).last);
+ this.callback(__UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.source as Behavior));
}
pushS(_t: number, value: A): void {
this.callback(value);
@@ -73,13 +86,10 @@ export type NodeParentPair = {
};
export abstract class Reactive implements Child {
- state: State;
- parents: Cons>;
+ state: State = State.Inactive;
+ parents: Cons> = nil;
listenerNodes: Cons | undefined;
children: DoubleLinkedList = new DoubleLinkedList();
- constructor() {
- this.state = State.Inactive;
- }
addListener(node: Node, t: number): State {
const firstChild = this.children.head === undefined;
this.children.prepend(node);
@@ -135,18 +145,22 @@ export abstract class Reactive implements Child {
}
export class CbObserver implements BListener, SListener {
- private endPulling: () => void;
+ private endPulling?: () => void;
node: Node> = new Node(this);
constructor(
private callback: (a: A) => void,
readonly handlePulling: PullHandler,
- private time: Time,
+ private time: Time | undefined,
readonly source: ParentBehavior
) {
source.addListener(this.node, tick());
if (source.state === State.Pull) {
this.endPulling = handlePulling(this.pull.bind(this));
- } else if (isBehavior(source) && source.state === State.Push) {
+ } else if (
+ isBehavior(source) &&
+ source.state === State.Push &&
+ source.last !== undefined
+ ) {
callback(source.last);
}
this.time = undefined;
@@ -156,11 +170,13 @@ export class CbObserver implements BListener, SListener {
time !== undefined ? time : this.time !== undefined ? this.time : tick();
if (isBehavior(this.source) && this.source.state === State.Pull) {
this.source.pull(t);
- this.callback(this.source.last);
+ if (this.source.last !== undefined) {
+ this.callback(this.source.last);
+ }
}
}
pushB(_t: number): void {
- this.callback((this.source as Behavior).last);
+ this.callback(__UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.source as Behavior));
}
pushS(_t: number, value: A): void {
this.callback(value);
diff --git a/src/datastructures.ts b/src/datastructures.ts
index 638cc47..068178e 100644
--- a/src/datastructures.ts
+++ b/src/datastructures.ts
@@ -1,25 +1,40 @@
-export class Cons {
- constructor(
- public readonly value: A,
- public readonly tail: Cons,
- public readonly isNil: boolean
- ) {}
- *[Symbol.iterator](): IterableIterator {
- let head: Cons = this;
- while (head.isNil === false) {
- const v = head.value;
- head = head.tail;
- yield v;
- }
- }
+export interface ConsNil {
+ readonly isNil: true;
+ [Symbol.iterator](): Generator;
}
-export const nil: Cons = new Cons(undefined, undefined, true);
+export interface ConsValue {
+ readonly isNil: false;
+ readonly value: A;
+ readonly tail: Cons;
+ [Symbol.iterator](): Generator;
+}
+
+export type Cons = ConsNil | ConsValue;
-export function cons(value: A, tail: Cons = nil): Cons {
- return new Cons(value, tail, false);
+function* generator(this: Cons) {
+ let head: Cons = this;
+ while (!head.isNil) {
+ const v = head.value;
+ head = head.tail;
+ yield v;
+ }
}
+export const nil: Cons = {
+ isNil: true,
+ [Symbol.iterator]: generator
+};
+
+export const cons = (value: A, tail: Cons = nil): Cons => {
+ return {
+ isNil: false,
+ value,
+ tail,
+ [Symbol.iterator]: generator
+ };
+};
+
export function fromArray(values: A[]): Cons {
let list = cons(values[0]);
for (let i = 1; i < values.length; ++i) {
diff --git a/src/dom.ts b/src/dom.ts
index eaaee37..9cae00b 100644
--- a/src/dom.ts
+++ b/src/dom.ts
@@ -12,7 +12,7 @@ class DomEventStream extends ProducerStream {
constructor(
private target: EventTarget,
private eventName: string,
- private extractor: Extractor
+ private extractor: Extractor
) {
super();
}
@@ -66,8 +66,8 @@ export function streamFromEvent(
export function streamFromEvent(
target: EventTarget,
eventName: string,
- extractor: Extractor = id
-): Stream {
+ extractor: Extractor = id
+): Stream {
return new DomEventStream(target, eventName, extractor);
}
diff --git a/src/future.ts b/src/future.ts
index ed5cfd1..3bafc1e 100644
--- a/src/future.ts
+++ b/src/future.ts
@@ -1,4 +1,11 @@
-import { State, SListener, Parent, BListener, Time } from "./common";
+import {
+ State,
+ SListener,
+ Parent,
+ BListener,
+ Time,
+ __UNSAFE_GET_LAST_BEHAVIOR_VALUE
+} from "./common";
import { Reactive } from "./common";
import { cons, fromArray, Node } from "./datastructures";
import { Behavior, FunctionBehavior } from "./behavior";
@@ -8,6 +15,13 @@ import { sample, Now } from "./now";
export type MapFutureTuple = { [K in keyof A]: Future };
+const __UNSAFE_GET_LAST_FUTURE_VALUE = (f: Future): A => {
+ if (f.value === undefined) {
+ // panic!
+ throw new Error("Future#value should be defined");
+ }
+ return f.value;
+};
/**
* A future is a thing that occurs at some point in time with a value.
* It can be understood as a pair consisting of the time the future
@@ -17,7 +31,7 @@ export type MapFutureTuple = { [K in keyof A]: Future };
export abstract class Future extends Reactive>
implements Parent> {
// The value of the future. Often `undefined` until occurrence.
- value: A;
+ value?: A;
constructor() {
super();
}
@@ -37,7 +51,7 @@ export abstract class Future extends Reactive>
}
addListener(node: Node>, t: number): State {
if (this.state === State.Done) {
- node.value.pushS(t, this.value);
+ node.value.pushS(t, __UNSAFE_GET_LAST_FUTURE_VALUE(this));
return State.Done;
} else {
return super.addListener(node, t);
@@ -52,7 +66,7 @@ export abstract class Future extends Reactive>
return new MapFuture(f, this);
}
mapTo(b: B): Future {
- return new MapToFuture(b, this);
+ return new MapToFuture(b, this);
}
// A future is an applicative. `of` gives a future that has always
// occurred at all points in time.
@@ -62,7 +76,7 @@ export abstract class Future extends Reactive>
of(b: B): Future {
return new OfFuture(b);
}
- ap: (f: Future<(a: A) => B>) => Future;
+ // ap: (f: Future<(a: A) => B>) => Future;
lift(
f: (...args: A) => R,
...args: MapFutureTuple
@@ -88,7 +102,7 @@ export abstract class Future extends Reactive>
}
export function isFuture(a: unknown): a is Future {
- return typeof a === "object" && "resolve" in a;
+ return typeof a === "object" && a !== null && "resolve" in a;
}
export class CombineFuture extends Future {
@@ -111,8 +125,8 @@ export class MapFuture extends Future {
}
}
-export class MapToFuture extends Future {
- constructor(public value: A, readonly parent: Future) {
+export class MapToFuture extends Future {
+ constructor(public value: A, readonly parent: Future<_>) {
super();
this.parents = cons(parent);
}
@@ -241,7 +255,7 @@ export class BehaviorFuture extends SinkFuture implements BListener {
}
pushB(t: number): void {
this.b.removeListener(this.node);
- this.resolve(this.b.last, t);
+ this.resolve(__UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.b), t);
}
}
@@ -256,7 +270,9 @@ export class NextOccurrenceFuture extends Future implements SListener {
}
export function nextOccurrenceFrom(stream: Stream): Behavior> {
- return new FunctionBehavior((t: Time) => new NextOccurrenceFuture(stream, t));
+ return new FunctionBehavior>(
+ (t: Time) => new NextOccurrenceFuture(stream, t)
+ );
}
export function nextOccurrence(stream: Stream): Now> {
diff --git a/src/now.ts b/src/now.ts
index cb47dba..d0fef81 100644
--- a/src/now.ts
+++ b/src/now.ts
@@ -137,7 +137,7 @@ export class PerformMapNow extends Now | Future> {
super();
}
run(): Stream | Future {
- return isStream(this.s)
+ return isStream(this.s)
? mapCbStream((value, done) => done(this.cb(value)), this.s)
: mapCbFuture((value, done) => done(this.cb(value)), this.s);
}
@@ -153,7 +153,7 @@ export function performMap(
s: Stream | Future
): Now | Future> {
return perform(() =>
- isStream(s)
+ isStream(s)
? mapCbStream((value, done) => done(cb(value)), s)
: mapCbFuture((value, done) => done(cb(value)), s)
);
diff --git a/src/placeholder.ts b/src/placeholder.ts
index ecebcae..fc4fe59 100644
--- a/src/placeholder.ts
+++ b/src/placeholder.ts
@@ -1,20 +1,25 @@
-import { Reactive, State, SListener, BListener, Time } from "./common";
-import { Behavior, isBehavior, MapBehavior, pushToChildren } from "./behavior";
+import { Reactive, State, SListener, BListener, Time, __UNSAFE_GET_LAST_BEHAVIOR_VALUE } from "./common";
+import {
+ Behavior,
+ isBehavior,
+ MapBehavior,
+ pushToChildren
+} from "./behavior";
import { Node, cons } from "./datastructures";
import { Stream, MapToStream } from "./stream";
import { tick } from "./clock";
import { Future } from "./future";
-class SamplePlaceholderError {
+class SamplePlaceholderError {
message = "Attempt to sample non-replaced placeholder";
- constructor(public placeholder: Placeholder) {}
+ constructor(public placeholder: Placeholder) {}
toString(): string {
return this.message;
}
}
export class Placeholder extends Behavior {
- source: Reactive | BListener>;
+ source?: Reactive | BListener>;
private node: Node = new Node(this);
replaceWith(parent: Reactive | BListener>, t?: Time): void {
this.source = parent;
@@ -45,7 +50,7 @@ export class Placeholder extends Behavior {
}
}
update(_t: number): A {
- return (this.source as Behavior).last;
+ return __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.source as Behavior);
}
activate(t: number): void {
if (this.source !== undefined) {
@@ -73,7 +78,7 @@ export class Placeholder extends Behavior {
}
export function isPlaceholder(p: unknown): p is Placeholder {
- return typeof p === "object" && "replaceWith" in p;
+ return typeof p === "object" && p !== null && "replaceWith" in p;
}
class MapPlaceholder extends MapBehavior {
@@ -84,7 +89,7 @@ class MapPlaceholder extends MapBehavior {
}
class MapToPlaceholder extends MapToStream {
- changedAt: Time;
+ changedAt?: Time;
constructor(parent: Stream, public last: B) {
super(parent, last);
}
diff --git a/src/stream.ts b/src/stream.ts
index 5bcaeaf..28d22b0 100644
--- a/src/stream.ts
+++ b/src/stream.ts
@@ -1,4 +1,12 @@
-import { Reactive, State, Time, SListener, Parent, BListener } from "./common";
+import {
+ Reactive,
+ State,
+ Time,
+ SListener,
+ Parent,
+ BListener,
+ __UNSAFE_GET_LAST_BEHAVIOR_VALUE
+} from "./common";
import { cons, Node, DoubleLinkedList } from "./datastructures";
import {
Behavior,
@@ -19,11 +27,7 @@ import { Future } from ".";
*/
export abstract class Stream extends Reactive>
implements Parent> {
- constructor() {
- super();
- }
children: DoubleLinkedList> = new DoubleLinkedList();
- state: State;
combine(stream: Stream): Stream {
return new CombineStream(stream, this);
}
@@ -40,7 +44,9 @@ export abstract class Stream extends Reactive>
return scan(fn, startingValue, this);
}
scanFrom(fn: (a: A, b: B) => B, startingValue: B): Behavior> {
- return fromFunction((t) => new ScanStream(fn, startingValue, this, t));
+ return fromFunction>(
+ (t) => new ScanStream(fn, startingValue, this, t)
+ );
}
accum(fn: (a: A, b: B) => B, init: B): Now> {
return accum(fn, init, this);
@@ -201,23 +207,25 @@ export function scan(
class ShiftBehaviorStream extends Stream implements BListener {
private bNode: Node = new Node(this);
private sNode: Node = new Node(this);
- private currentSource: Stream;
+ private currentSource?: Stream;
constructor(private b: Behavior>) {
super();
}
activate(t: number): void {
this.b.addListener(this.bNode, t);
if (this.b.state !== State.Inactive) {
- this.currentSource = this.b.last;
+ this.currentSource = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.b);
this.currentSource.addListener(this.sNode, t);
}
}
deactivate(): void {
this.b.removeListener(this.bNode);
- this.currentSource.removeListener(this.sNode);
+ if (this.currentSource !== undefined) {
+ this.currentSource.removeListener(this.sNode);
+ }
}
pushB(t: number): void {
- const newStream = this.b.last;
+ const newStream = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.b);
if (this.currentSource !== undefined) {
this.currentSource.removeListener(this.sNode);
}
@@ -254,7 +262,7 @@ export function shiftFrom(s: Stream>): Behavior> {
}
class ChangesStream extends Stream implements BListener {
- last: A;
+ last?: A;
initialized: boolean;
constructor(
readonly parent: Behavior,
@@ -269,17 +277,20 @@ class ChangesStream extends Stream implements BListener {
// The parent may be an unreplaced placeholder and in that case
// we can't read its current value.
if (this.parent.state === State.Push) {
- this.last = this.parent.last;
+ this.last = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.parent);
this.initialized = true;
}
}
pushB(t: number): void {
if (!this.initialized) {
this.initialized = true;
- this.last = this.parent.last;
- } else if (!this.comparator(this.last, this.parent.last)) {
- this.pushSToChildren(t, this.parent.last);
- this.last = this.parent.last;
+ this.last = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.parent);
+ } else {
+ const parentLast = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.parent);
+ if (this.last !== undefined && !this.comparator(this.last, parentLast)) {
+ this.pushSToChildren(t, parentLast);
+ this.last = parentLast;
+ }
}
}
pushS(_t: number, _a: A): void {}
@@ -300,7 +311,7 @@ export function changes(
export class CombineStream extends Stream {
constructor(readonly s1: Stream, readonly s2: Stream) {
super();
- this.parents = cons>(s1, cons(s2));
+ this.parents = cons | Stream>(s1, cons(s2));
}
pushS(t: number, a: A | B): void {
this.pushSToChildren(t, a);
@@ -325,7 +336,7 @@ class ProducerStreamFromFunction extends ProducerStream {
constructor(private activateFn: ProducerStreamFunction) {
super();
}
- deactivateFn: () => void;
+ deactivateFn?: () => void;
publish(a: A, t: number = tick()): void {
this.pushS(t, a);
}
@@ -335,7 +346,9 @@ class ProducerStreamFromFunction extends ProducerStream {
}
deactivate(): void {
this.state = State.Inactive;
- this.deactivateFn();
+ if (this.deactivateFn !== undefined) {
+ this.deactivateFn();
+ }
}
}
@@ -376,9 +389,9 @@ export function subscribe(fn: (a: A) => void, stream: Stream): void {
stream.subscribe(fn);
}
-export class SnapshotStream extends Stream {
+export class SnapshotStream extends Stream {
private node: Node = new Node(this);
- constructor(readonly target: Behavior, readonly trigger: Stream) {
+ constructor(readonly target: Behavior, readonly trigger: Stream<_>) {
super();
this.parents = cons(trigger);
}
@@ -394,9 +407,9 @@ export class SnapshotStream extends Stream {
}
}
-export function snapshot(
+export function snapshot(
target: Behavior,
- trigger: Stream
+ trigger: Stream<_>
): Stream {
return new SnapshotStream(target, trigger);
}
@@ -447,8 +460,8 @@ export function selfie(stream: Stream>): Stream {
return new SelfieStream(stream);
}
-export function isStream(s: unknown): s is Stream {
- return typeof s === "object" && "scanFrom" in s;
+export function isStream(s: unknown): s is Stream {
+ return typeof s === "object" && s !== null && "scanFrom" in s;
}
class PerformCbStream extends ActiveStream implements SListener {
@@ -509,11 +522,12 @@ export class FlatFuturesOrdered extends Stream {
});
}
pushFromBuffer(): void {
- while (this.buffer[0] !== undefined) {
+ let a = this.buffer.shift();
+ while (a !== undefined) {
const t = tick();
- const { value } = this.buffer.shift();
- this.pushSToChildren(t, value);
+ this.pushSToChildren(t, a.value);
this.next++;
+ a = this.buffer.shift();
}
}
}
diff --git a/src/testing.ts b/src/testing.ts
index da9f477..31bdfa2 100644
--- a/src/testing.ts
+++ b/src/testing.ts
@@ -44,31 +44,42 @@ import {
InstantRun
} from "./now";
import { time, DelayStream } from "./time";
+import { ConsValue } from "./datastructures";
// Future
-export type Occurrence = {
- time: Time;
- value: A;
-};
-
declare module "./future" {
interface Future {
model(): SemanticFuture;
}
}
-export const neverOccurringFuture = {
- time: "infinity" as "infinity",
- value: undefined as undefined
+export interface Occurrence {
+ tag: "Occurrence";
+ time: Time;
+ value: A;
+}
+
+export const occurrence = (time: Time, value: A): Occurrence => ({
+ tag: "Occurrence",
+ time,
+ value
+});
+
+export interface NeverOccurringFuture {
+ readonly tag: "NeverOccurringFuture";
+}
+
+export const neverOccurringFuture: NeverOccurringFuture = {
+ tag: "NeverOccurringFuture"
};
-export type SemanticFuture = Occurrence | typeof neverOccurringFuture;
+export type SemanticFuture = Occurrence | NeverOccurringFuture;
export function doesOccur(
future: SemanticFuture
): future is Occurrence {
- return future.time !== "infinity";
+ return future.tag === "Occurrence";
}
CombineFuture.prototype.model = function() {
@@ -80,30 +91,31 @@ CombineFuture.prototype.model = function() {
MapFuture.prototype.model = function() {
const p = this.parent.model();
return doesOccur(p)
- ? { time: p.time, value: this.f(p.value) }
+ ? occurrence(p.time, this.f(p.value))
: neverOccurringFuture;
};
MapToFuture.prototype.model = function() {
const p = this.parent.model();
- return doesOccur(p)
- ? { time: p.time, value: this.value }
- : neverOccurringFuture;
+ return doesOccur(p) ? occurrence(p.time, this.value) : neverOccurringFuture;
};
OfFuture.prototype.model = function() {
- return { time: -Infinity, value: this.value };
+ return occurrence(-Infinity, this.value);
};
NeverFuture.prototype.model = function() {
return neverOccurringFuture;
};
+const allOccurred = (fs: SemanticFuture[]): fs is Occurrence[] =>
+ fs.every((f) => f.tag === "Occurrence");
+
LiftFuture.prototype.model = function() {
const sems = (this.futures as Future[]).map((f) => f.model());
const time = Math.max(...sems.map((s) => (doesOccur(s) ? s.time : Infinity)));
- return time !== Infinity
- ? { time, value: this.f(...sems.map((s) => s.value)) }
+ return time !== Infinity && allOccurred(sems)
+ ? occurrence(time, this.f(...sems.map((s) => s.value)))
: neverOccurringFuture;
};
@@ -112,7 +124,7 @@ FlatMapFuture.prototype.model = function() {
if (doesOccur(a)) {
const b = this.f(a.value).model();
if (doesOccur(b)) {
- return { time: Math.max(a.time, b.time), value: b.value };
+ return occurrence(Math.max(a.time, b.time), b.value);
}
}
return neverOccurringFuture;
@@ -143,7 +155,7 @@ class TestFuture extends Future {
}
export function testFuture(time: number, value: A): Future {
- return new TestFuture({ time, value });
+ return new TestFuture(occurrence(time, value));
}
export function assertFutureEqual(
@@ -167,12 +179,12 @@ declare module "./stream" {
MapStream.prototype.model = function(this: MapStream) {
const s = this.parent.model();
- return s.map(({ time, value }) => ({ time, value: this.f(value) }));
+ return s.map(({ time, value }) => occurrence(time, this.f(value)));
};
MapToStream.prototype.model = function(this: MapToStream) {
- const s = (this.parents.value as Stream).model();
- return s.map(({ time }) => ({ time, value: this.b }));
+ const s = (this.parents as ConsValue>).value.model();
+ return s.map(({ time }) => occurrence(time, this.b));
};
FilterStream.prototype.model = function(this: FilterStream) {
@@ -189,7 +201,7 @@ ScanStream.prototype.model = function(this: ScanStream) {
.filter((o) => this.t < o.time)
.map(({ time, value }) => {
acc = this.f(value, acc);
- return { time, value: acc };
+ return occurrence(time, acc);
});
};
@@ -209,48 +221,51 @@ CombineStream.prototype.model = function(this: CombineStream) {
return result;
};
-SnapshotStream.prototype.model = function(this: SnapshotStream) {
+SnapshotStream.prototype.model = function(this: SnapshotStream) {
return this.trigger
.model()
- .map(({ time }) => ({ time, value: testAt(time, this.target) }));
+ .map(({ time }) => occurrence(time, testAt(time, this.target)));
};
DelayStream.prototype.model = function(this: DelayStream) {
- const s = (this.parents.value as Stream).model();
- return s.map(({ time, value }) => ({ time: time + this.ms, value }));
+ const s = (this.parents as ConsValue>).value.model();
+ return s.map(({ time, value }) => occurrence(time + this.ms, value));
};
-const flatFuture = (o: Occurrence>) => {
- const { time, value } = o.value.model();
- return time === "infinity" ? [] : [{ time: Math.max(o.time, time), value }];
+const flatFuture = (o: Occurrence>): Occurrence[] => {
+ const f = o.value.model();
+ if (f.tag === "NeverOccurringFuture") {
+ return [];
+ } else {
+ return [occurrence(Math.max(o.time, f.time), f.value)];
+ }
};
FlatFutures.prototype.model = function(this: FlatFutures) {
- return (this.parents.value as Stream>)
+ return (this.parents as ConsValue>>).value
.model()
.flatMap(flatFuture)
.sort((o, p) => o.time - p.time); // FIXME: Should use stable sort here
};
FlatFuturesOrdered.prototype.model = function(this: FlatFuturesOrdered) {
- return (this.parents.value as Stream>)
+ return (this.parents as ConsValue>>).value
.model()
.flatMap(flatFuture)
- .reduce((acc, o) => {
- const last = acc.length === 0 ? -Infinity : acc[acc.length - 1].time;
- return acc.concat([{ time: Math.max(last, o.time), value: o.value }]);
+ .reduce[]>((acc, o) => {
+ const last: Time =
+ acc.length === 0 ? -Infinity : acc[acc.length - 1].time;
+ return acc.concat([occurrence(Math.max(last, o.time), o.value)]);
}, []);
};
FlatFuturesLatest.prototype.model = function(this: FlatFuturesLatest) {
- return (this.parents.value as Stream>)
+ return (this.parents as ConsValue>>).value
.model()
.flatMap(flatFuture)
.reduceRight[]>((acc, o) => {
const last = acc.length === 0 ? Infinity : acc[0].time;
- return last < o.time
- ? acc
- : [{ time: o.time, value: o.value }].concat(acc);
+ return last < o.time ? acc : [occurrence(o.time, o.value)].concat(acc);
}, []);
};
@@ -276,31 +291,28 @@ class TestStream extends Stream {
}
export function testStreamFromArray(array: ([Time, A])[]): Stream {
- const semanticStream = array.map(([t, value]) => ({ value, time: t }));
+ const semanticStream = array.map(([t, value]) => occurrence(t, value));
return new TestStream(semanticStream);
}
export function testStreamFromObject(object: Record): Stream {
- const semanticStream = Object.keys(object).map((key) => ({
- time: parseFloat(key),
- value: object[key]
- }));
+ const semanticStream = Object.keys(object).map((key) =>
+ occurrence(parseFloat(key), object[key])
+ );
return new TestStream(semanticStream);
}
export function assertStreamEqual(s1: Stream, s2: Stream): void;
export function assertStreamEqual(
s1: Stream,
- s2: {
- [time: number]: A;
- }
+ s2: Record
): void;
export function assertStreamEqual(s1: Stream, s2: ([Time, A])[]): void;
export function assertStreamEqual(
s1: Stream,
- s2: Stream | ([Time, A])[]
+ s2: Stream | ([Time, A])[] | Record
): void {
- const s2_ = isStream(s2)
+ const s2_ = isStream(s2)
? s2
: Array.isArray(s2)
? testStreamFromArray(s2)
diff --git a/src/time.ts b/src/time.ts
index ba32c18..3ddb84c 100644
--- a/src/time.ts
+++ b/src/time.ts
@@ -1,7 +1,10 @@
-import { Time, State } from "./common";
+import { Time, State, __UNSAFE_GET_LAST_BEHAVIOR_VALUE } from "./common";
import { cons } from "./datastructures";
import { Stream } from "./stream";
-import { Behavior, fromFunction } from "./behavior";
+import {
+ Behavior,
+ fromFunction
+} from "./behavior";
import { sample, Now, perform } from "./now";
/*
@@ -51,7 +54,9 @@ class DebounceStream extends Stream {
}
private timer: NodeJS.Timeout | undefined = undefined;
pushS(t: number, a: A): void {
- clearTimeout(this.timer);
+ if (this.timer !== undefined) {
+ clearTimeout(this.timer);
+ }
this.timer = setTimeout(() => {
this.pushSToChildren(t, a);
}, this.ms);
@@ -81,19 +86,20 @@ export const measureTime = sample(measureTimeFrom);
class IntegrateBehavior extends Behavior {
private lastPullTime: Time;
+ last = 0;
+ state = State.Pull;
constructor(private parent: Behavior, t: number) {
super();
this.lastPullTime = time.at(t);
- this.state = State.Pull;
- this.last = 0;
this.pulledAt = t;
this.changedAt = t;
this.parents = cons(parent, cons(time));
}
update(_t: Time): number {
- const currentPullTime = time.last;
+ const currentPullTime = __UNSAFE_GET_LAST_BEHAVIOR_VALUE(time);
const deltaMs = currentPullTime - this.lastPullTime;
- const value = this.last + deltaMs * this.parent.last;
+ const value =
+ this.last + deltaMs * __UNSAFE_GET_LAST_BEHAVIOR_VALUE(this.parent);
this.lastPullTime = currentPullTime;
return value;
}
@@ -117,5 +123,5 @@ export function integrate(behavior: Behavior): Now> {
export function integrateFrom(
behavior: Behavior
): Behavior> {
- return fromFunction((t) => new IntegrateBehavior(behavior, t));
+ return fromFunction>((t) => new IntegrateBehavior(behavior, t));
}
diff --git a/test/behavior.ts b/test/behavior.ts
index 5bb2974..ee4c956 100644
--- a/test/behavior.ts
+++ b/test/behavior.ts
@@ -18,13 +18,22 @@ import {
Stream,
time,
runNow,
- Time
+ Time,
+ SinkBehavior
} from "../src";
import * as H from "../src";
import { subscribeSpy } from "./helpers";
import { placeholder } from "../src/placeholder";
+import { MonadDictionary } from "@funkia/jabz/dist/monad";
+
+declare module "@funkia/jabz" {
+ export function go(
+ gen: () => Generator,
+ monad?: MonadDictionary
+ ): any; // sorry
+}
function double(n: number): number {
return n * 2;
@@ -95,7 +104,7 @@ describe("behavior", () => {
});
it("can push and pull", () => {
let variable = 0;
- let push: (t: Time) => void;
+ let push: undefined | ((t: Time) => void);
const setVar = (n: number) => {
variable = n;
if (push !== undefined) {
@@ -257,20 +266,22 @@ describe("behavior", () => {
const numE = H.fromFunction(() => n);
const applied = H.ap(fnB, numE);
const cb = spy();
- let pull: () => void;
+ let pull: (() => void) | undefined;
applied.observe(cb, (pull_) => {
pull = pull_;
return () => {};
});
- pull();
- push(add(2), fnB);
- pull();
- n = 4;
- pull();
- push(double, fnB);
- pull();
- n = 8;
- pull();
+ if (pull !== undefined) {
+ pull();
+ push(add(2), fnB);
+ pull();
+ n = 4;
+ pull();
+ push(double, fnB);
+ pull();
+ n = 8;
+ pull();
+ }
assert.deepEqual(cb.args, [[6], [3], [6], [8], [16]]);
});
});
@@ -389,13 +400,12 @@ describe("behavior", () => {
const inner1 = sinkBehavior(1);
const inner2 = sinkBehavior(3);
const b = outer.flatMap((n) => {
- if (n === 0) {
- return Behavior.of(0);
- } else if (n === 1) {
+ if (n === 1) {
return inner1;
} else if (n === 2) {
return inner2;
}
+ return Behavior.of(0);
});
b.observe(() => {}, () => () => {});
assert.strictEqual(at(b), 0);
@@ -444,8 +454,12 @@ describe("behavior", () => {
});
it("works with go-notation", () => {
const a = H.sinkBehavior(1);
- const b = go(function*(): IterableIterator {
- const val: number = yield a;
+ const b: SinkBehavior = go(function*(): Generator<
+ SinkBehavior,
+ number,
+ number
+ > {
+ const val = yield a;
return val * 2;
});
const cb = spy();
@@ -624,7 +638,7 @@ describe("Behavior and Future", () => {
});
describe("snapshotAt", () => {
it("snapshots behavior at future occurring in future", () => {
- let result: number;
+ let result: number | undefined = undefined;
const bSink = sinkBehavior(1);
const futureSink = H.sinkFuture();
const mySnapshot = at(H.snapshotAt(bSink, futureSink));
@@ -636,7 +650,7 @@ describe("Behavior and Future", () => {
assert.strictEqual(result, 3);
});
it("uses current value when future occurred in the past", () => {
- let result: number;
+ let result: number | undefined = undefined;
const bSink = sinkBehavior(1);
const occurredFuture = H.Future.of({});
bSink.push(2);
@@ -935,7 +949,7 @@ describe("Behavior and Stream", () => {
const outer = sinkBehavior>(pushingB);
const flattened = H.flat(outer);
const pushSpy = spy();
- let pull: () => void;
+ let pull: undefined | (() => void);
const handlePulling = (p: () => void): (() => void) => {
pull = p;
return () => undefined;
@@ -943,11 +957,13 @@ describe("Behavior and Stream", () => {
flattened.observe(pushSpy, handlePulling);
outer.push(pullingB);
variable = 1;
- pull();
- variable = 2;
- pull();
- variable = 3;
- pull();
+ if (pull !== undefined) {
+ pull();
+ variable = 2;
+ pull();
+ variable = 3;
+ pull();
+ }
assert.deepEqual(pushSpy.args, [[0], [1], [2], [3]]);
});
});
diff --git a/test/future.ts b/test/future.ts
index 2b1331c..ce9da82 100644
--- a/test/future.ts
+++ b/test/future.ts
@@ -30,7 +30,7 @@ describe("Future", () => {
});
describe("sink", () => {
it("notifies subscriber", () => {
- let result: number;
+ let result: number | undefined;
const s = sinkFuture();
s.subscribe((x: number) => {
result = x;
@@ -40,7 +40,7 @@ describe("Future", () => {
assert.strictEqual(result, 2);
});
it("notifies subscriber several layers down", () => {
- let result: number;
+ let result: number | undefined;
const s = sinkFuture();
const s2 = s.map((n) => n + 2).mapTo(9);
s2.subscribe((x: number) => {
@@ -61,7 +61,7 @@ describe("Future", () => {
});
describe("Semigroup", () => {
it("returns the first future if it occurs first", () => {
- let result: number;
+ let result: number | undefined;
const future1 = sinkFuture();
const future2 = sinkFuture();
const combined = future1.combine(future2);
@@ -71,7 +71,7 @@ describe("Future", () => {
assert.strictEqual(result, 1);
});
it("returns the seconds future if it occurs first", () => {
- let result: number;
+ let result: number | undefined;
const future1 = sinkFuture();
const future2 = sinkFuture();
const combined = future1.combine(future2);
@@ -81,8 +81,8 @@ describe("Future", () => {
assert.strictEqual(result, 2);
});
it("returns when only one occurs", () => {
- let result1: number;
- let result2: number;
+ let result1: number | undefined;
+ let result2: number | undefined;
const future1 = sinkFuture();
const future2 = sinkFuture();
const combined = future1.combine(future2);
@@ -106,7 +106,7 @@ describe("Future", () => {
});
describe("Functor", () => {
it("maps over value", () => {
- let result: number;
+ let result: number | undefined;
const s = sinkFuture();
const mapped = s.map((x) => x * x);
mapped.subscribe((x: number) => {
@@ -117,7 +117,7 @@ describe("Future", () => {
assert.strictEqual(result, 16);
});
it("maps to constant", () => {
- let result: string;
+ let result: string | undefined;
const s = sinkFuture();
const mapped = s.mapTo("horse");
mapped.subscribe((x: string) => {
@@ -130,7 +130,7 @@ describe("Future", () => {
});
describe("Apply", () => {
it("lifts a function of one argument", () => {
- let result: string;
+ let result: string | undefined;
const fut = sinkFuture();
const lifted = H.lift((s: string) => s + "!", fut);
lifted.subscribe((s: string) => (result = s));
@@ -139,7 +139,7 @@ describe("Future", () => {
assert.strictEqual(result, "Hello!");
});
it("lifts a function of three arguments", () => {
- let result: string;
+ let result: string | undefined;
const fut1 = sinkFuture();
const fut2 = sinkFuture();
const fut3 = sinkFuture();
@@ -163,7 +163,7 @@ describe("Future", () => {
});
describe("Applicative", () => {
it("of gives future that has occurred", () => {
- let result: number;
+ let result: number | undefined;
const o = Future.of(12);
o.subscribe((x) => (result = x));
assert.strictEqual(result, 12);
@@ -203,19 +203,21 @@ describe("Future", () => {
});
});
it("can convert Promise to Future", async () => {
- let result: number;
- let resolve: (n: number) => void;
+ let result: number | undefined;
+ let resolve: ((n: number) => void) | undefined;
const promise = new Promise((res) => (resolve = res));
const future = fromPromise(promise);
future.subscribe((res: number) => (result = res));
assert.strictEqual(result, undefined);
- resolve(12);
+ if (resolve !== undefined) {
+ resolve(12);
+ }
await promise;
assert.strictEqual(result, 12);
});
describe("nextOccurence", () => {
it("resolves on next occurence", () => {
- let result: string;
+ let result: string | undefined;
const s = new SinkStream();
const next = nextOccurrenceFrom(s);
s.push("a");
@@ -230,8 +232,8 @@ describe("Future", () => {
it("resolves with result when done callback invoked", () => {
const fut = sinkFuture();
const cb = spy();
- let value: number;
- let done: (result: unknown) => void;
+ let value: number | undefined;
+ let done: ((result: unknown) => void) | undefined;
const fut2 = mapCbFuture((v, d) => {
value = v;
done = d;
@@ -240,7 +242,9 @@ describe("Future", () => {
fut.resolve(3);
assert.equal(value, 3);
assert.equal(cb.callCount, 0);
- done(value + 1);
+ if (done !== undefined && value !== undefined) {
+ done(value + 1);
+ }
assert.equal(cb.callCount, 1);
assert.deepEqual(cb.args, [[4]]);
});
diff --git a/test/now.ts b/test/now.ts
index 0b76c97..5f685bb 100644
--- a/test/now.ts
+++ b/test/now.ts
@@ -157,7 +157,9 @@ describe("Now", () => {
function loop(n: number): Now