bufferToggle.ts 5.66 KB
Newer Older
jatuporn Tonggasem's avatar
jatuporn Tonggasem committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable, SubscribableOrPromise } from '../Observable';
import { Subscription } from '../Subscription';
import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { OperatorFunction } from '../interfaces';

/**
 * Buffers the source Observable values starting from an emission from
 * `openings` and ending when the output of `closingSelector` emits.
 *
 * <span class="informal">Collects values from the past as an array. Starts
 * collecting only when `opening` emits, and calls the `closingSelector`
 * function to get an Observable that tells when to close the buffer.</span>
 *
 * <img src="./img/bufferToggle.png" width="100%">
 *
 * Buffers values from the source by opening the buffer via signals from an
 * Observable provided to `openings`, and closing and sending the buffers when
 * a Subscribable or Promise returned by the `closingSelector` function emits.
 *
 * @example <caption>Every other second, emit the click events from the next 500ms</caption>
 * var clicks = Rx.Observable.fromEvent(document, 'click');
 * var openings = Rx.Observable.interval(1000);
 * var buffered = clicks.bufferToggle(openings, i =>
 *   i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
 * );
 * buffered.subscribe(x => console.log(x));
 *
 * @see {@link buffer}
 * @see {@link bufferCount}
 * @see {@link bufferTime}
 * @see {@link bufferWhen}
 * @see {@link windowToggle}
 *
 * @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
 * buffers.
 * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
 * the value emitted by the `openings` observable and returns a Subscribable or Promise,
 * which, when it emits, signals that the associated buffer should be emitted
 * and cleared.
 * @return {Observable<T[]>} An observable of arrays of buffered values.
 * @method bufferToggle
 * @owner Observable
 */
export function bufferToggle<T, O>(
  openings: SubscribableOrPromise<O>,
  closingSelector: (value: O) => SubscribableOrPromise<any>
): OperatorFunction<T, T[]> {
  return function bufferToggleOperatorFunction(source: Observable<T>) {
    return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
  };
}

class BufferToggleOperator<T, O> implements Operator<T, T[]> {

  constructor(private openings: SubscribableOrPromise<O>,
              private closingSelector: (value: O) => SubscribableOrPromise<any>) {
  }

  call(subscriber: Subscriber<T[]>, source: any): any {
    return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
  }
}

interface BufferContext<T> {
  buffer: T[];
  subscription: Subscription;
}

/**
 * We need this JSDoc comment for affecting ESDoc.
 * @ignore
 * @extends {Ignored}
 */
class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
  private contexts: Array<BufferContext<T>> = [];

  constructor(destination: Subscriber<T[]>,
              private openings: SubscribableOrPromise<O>,
              private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
    super(destination);
    this.add(subscribeToResult(this, openings));
  }

  protected _next(value: T): void {
    const contexts = this.contexts;
    const len = contexts.length;
    for (let i = 0; i < len; i++) {
      contexts[i].buffer.push(value);
    }
  }

  protected _error(err: any): void {
    const contexts = this.contexts;
    while (contexts.length > 0) {
      const context = contexts.shift();
      context.subscription.unsubscribe();
      context.buffer = null;
      context.subscription = null;
    }
    this.contexts = null;
    super._error(err);
  }

  protected _complete(): void {
    const contexts = this.contexts;
    while (contexts.length > 0) {
      const context = contexts.shift();
      this.destination.next(context.buffer);
      context.subscription.unsubscribe();
      context.buffer = null;
      context.subscription = null;
    }
    this.contexts = null;
    super._complete();
  }

  notifyNext(outerValue: any, innerValue: O,
             outerIndex: number, innerIndex: number,
             innerSub: InnerSubscriber<T, O>): void {
    outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
  }

  notifyComplete(innerSub: InnerSubscriber<T, O>): void {
    this.closeBuffer((<any> innerSub).context);
  }

  private openBuffer(value: O): void {
    try {
      const closingSelector = this.closingSelector;
      const closingNotifier = closingSelector.call(this, value);
      if (closingNotifier) {
        this.trySubscribe(closingNotifier);
      }
    } catch (err) {
      this._error(err);
    }
  }

  private closeBuffer(context: BufferContext<T>): void {
    const contexts = this.contexts;

    if (contexts && context) {
      const { buffer, subscription } = context;
      this.destination.next(buffer);
      contexts.splice(contexts.indexOf(context), 1);
      this.remove(subscription);
      subscription.unsubscribe();
    }
  }

  private trySubscribe(closingNotifier: any): void {
    const contexts = this.contexts;

    const buffer: Array<T> = [];
    const subscription = new Subscription();
    const context = { buffer, subscription };
    contexts.push(context);

    const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);

    if (!innerSubscription || innerSubscription.closed) {
      this.closeBuffer(context);
    } else {
      (<any> innerSubscription).context = context;

      this.add(innerSubscription);
      subscription.add(innerSubscription);
    }
  }
}