Subject

subject 是一個很特別的東西,他同時間是 observable 也是 observer。可以管理多個 observable ,也可以同時發送資料到多個 observe 。主要的用途是 multicast

subject種類

Subject 有 SubjectBehaviorSubjectReplaySubjectAsyncSubject ,每一個 subject 都有各自的特性,以下一一介紹

Multicasted Observables

這小節會用 subject 來解釋什麼是 multicast

我們都知道RxJS的Observeable會在subscribe的時候才會執行,所以每一次的subscribe都會執行一次,但是,某些情況下我們並不想要那樣子做,而 MultiCasting的觀念是避免處理一個Observeable多個Observer的情況時而不重複執行Observable。

假設情境

var source = Observable.interval(1000).take(5)
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
source.subscribe(ObserverA);
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
setTimeout(function(){
      source.subscribe(ObserverB);
},2000);

執行結果

為了要讓兩個 Observer 共用同樣的資料流,這時候就需要透過 subject 的幫助,讓 subject 當中間人,幫忙我們去監聽 observable 回傳的結果,而 observer 就訂閱 subject 時,就可以取得 observable 的資料了

var source = Observable.interval(1000).take(5)
var subject = Subject.create();
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
subject.subscribe(ObserverA);
source.subscribe(subject);
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
setTimeout(function(){
      subject.subscribe(ObserverB);
},2000);

接下來延伸出來的問題是,這樣子的寫法很複雜,所以 RxJS 提供了 operator 來簡化寫法,我們可以使用 multicast,將 Subject.create() 給包起來。

var source = Observable.interval(1000).take(5)
               .multicast(new Subject())
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
source.connect();
source.subscribe(ObserverA);
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
setTimeout(function(){
      source.subscribe(ObserverB);
},2000);
  • 使用 multicast 就必須搭配 connect() 來執行 observeable。

RxJS 又很好心的提供 operator 來簡化 .multicast(Rx.Subject.create()) 的寫法

mutlicast publish operator
multicast(new Subject()) publsih()
multicast(new BehaviorSubject()) publishBehavior(n)
multicast(new ReplaySubject()) publishReplay(n)
multicast(new AsyncSubject()) publishLast()

回到 multicast 需搭配 connect() 的問題,這樣的方式,可以說是全手動,那是否有全自動的寫法,意思是當有人訂閱 subject 時,subject 自動啟動呢? 答案是有的,透過 refCount() 的幫忙,就可以達到上述的效果

refCount 的起動/關閉條件是,訂閱數是否大於 0 ,如果訂閱者大於 0 時,則啟動 observable,反之則停止 observable。

var source = Observable.interval(1000)
               .do(x => console.log('souce '+ x))
               .publish()
               .refCount()
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var subA = source.subscribe(ObserverA);
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
var subB
setTimeout(function(){
      subB = source.subscribe(ObserverB);
},2000);
setTimeout(function(){
  subA.unsubscribe();
  console.log('unsubscribe A');
},5000)
setTimeout(function(){
  subB.unsubscribe();
  console.log('unsubscribe B');
},7000)

執行結果

在更近一步的簡化,就是 share()share是 publish() + refCount() 的縮寫。

var source = Observable.interval(1000)
               .do(x => console.log('souce '+ x))
               .share()
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var subA = source.subscribe(ObserverA);
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
var subB
setTimeout(function(){
      subB = source.subscribe(ObserverB);
},2000);
setTimeout(function(){
  subA.unsubscribe();
  console.log('unsubscribe A');
},5000)
setTimeout(function(){
  subB.unsubscribe();
  console.log('unsubscribe B');
},7000)

執行結果是一樣的

BehaviorSubject

BehaviorSubject會記錄最後一次的值,當後來註冊進來的subscriber可以知道當下的值是什麼

所以BehaviorSubject在建立時,需要指定一個初始值,程式碼如下

var source = Observable.interval(1000).take(5)
               .do(function(value){ console.log('source ' + value);})
var subject = new BehaviorSubject(0); // 0 is the initial value
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
source.subscribe(subject);
subject.subscribe(ObserverA);
 console.log('ObserverA subscribed');
setTimeout(function(){
  subject.subscribe(ObserverB);
  console.log('ObserverB subscribed');
},2000)

執行結果

ReplaySubject

ReplaySubject有點類似於BehaviorSubject,可以取得subscribe之前的值,只是可以取不只一個。類似回播的功能

ReplaySubject在建立時有幾個參數可以設定

var subject = new ReplaySubject(bufferSize, windowTime);
  • bufferSize: ReplaySubject可以儲存 x 數量的值
  • windowTime: ReplaySubject取最後 x milliseconds 期間的值

程式碼 (with no windowTime參數)

var source = Observable.interval(500).take(7)
               .do(function(value){ console.log('source ' + value);})
var subject = new ReplaySubject(3); // 0 is the initial value
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
source.subscribe(subject);
subject.subscribe(ObserverA);
 console.log('ObserverA subscribed');
setTimeout(function(){
  subject.subscribe(ObserverB);
  console.log('ObserverB subscribed');
},2000)

執行結果

程式碼 (with windowTime參數)

var source = Observable.interval(500).take(7)
               .do(function(value){ console.log('source ' + value);})
// source: 0--1--2--3--4--5--6-----
//      A: 0--1--2--3--4--5--6-----
//              |---|: windowTime 
//      B:         23--4--5--6-----
var subject = new ReplaySubject(3, 700); // 0 is the initial value
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
source.subscribe(subject);
subject.subscribe(ObserverA);
 console.log('ObserverA subscribed');
setTimeout(function(){
  subject.subscribe(ObserverB);
  console.log('ObserverB subscribed');
},2000)

執行結果

AsyncSubject

AsyncSubject只會記錄 Observable 完成後的值。

var source = Observable.interval(500).take(7)
               .do(function(value){ console.log('source ' + value);})
var subject = new AsyncSubject(); // 0 is the initial value
var ObserverA = {
  next: function(value){ console.log('A next '+ value);    },
  error: function(error){ console.error('A error '+ error);   },
  complete: function(){ console.log('A Complete');}
}
var ObserverB = {
  next: function(value){ console.log('B next '+ value);    },
  error: function(error){ console.error('B error '+ error);   },
  complete: function(){ console.log('B Complete');}
}
source.subscribe(subject);
subject.subscribe(ObserverA);
 console.log('ObserverA subscribed');
setTimeout(function(){
  subject.subscribe(ObserverB);
  console.log('ObserverB subscribed');
},2000)

執行結果

Multicasting Operators

multicast

要達到 multicast 的效果,所以會回傳一個 subject (任何類型的 subject)

使用介面

multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable

使用範例

let multicasted = source.multicast(()=> new Subject());
multicasted.connect(); // 開始運作

publish

publish 是 multicast 輸出 subject 的縮寫

使用介面

 publish(selector: Function): *

使用範例

let multicasted = source.publish();
multicasted.connect(); // 開始運作

publishBehavior

publish 是 multicast 輸出 BehaviorSubject 的縮寫

publishLast

publish 是 multicast 輸出 AsyncSubject 的縮寫

publishReplay

publish 是 multicast 輸出 ReplaySubject 的縮寫

share

share 是 publish 加上 refCount 的縮寫

使用介面

share(): Observable<T>

使用範例

let source = Observable.ajax(url).share();

shareReplay

share 是 publishReplay 加上 refCount 的縮寫

使用介面

shareReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: IScheduler): Observable<T>;

使用範例

let source = Observable.ajax(url).shareReplay(3,2);

results matching ""

    No results matching ""