將其它種類的對(duì)象和數(shù)據(jù)類型轉(zhuǎn)換為Observable
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/from.c.png" alt="from" />
當(dāng)你使用Observable時(shí),如果你要處理的數(shù)據(jù)都可以轉(zhuǎn)換成展現(xiàn)為Observables,而不是需要混合使用Observables和其它類型的數(shù)據(jù),會(huì)非常方便。這讓你在數(shù)據(jù)流的整個(gè)生命周期中,可以使用一組統(tǒng)一的操作符來(lái)管理它們。
例如,Iterable可以看成是同步的Observable;Future,可以看成是總是只發(fā)射單個(gè)數(shù)據(jù)的Observable。通過(guò)顯式地將那些數(shù)據(jù)轉(zhuǎn)換為Observables,你可以像使用Observable一樣與它們交互。
因此,大部分ReactiveX實(shí)現(xiàn)都提供了將語(yǔ)言特定的對(duì)象和數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observables的方法。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/from.png" alt="from" />
在RxJava中,from
操作符可以轉(zhuǎn)換Future、Iterable和數(shù)組。對(duì)于Iterable和數(shù)組,產(chǎn)生的Observable會(huì)發(fā)射Iterable或數(shù)組的每一項(xiàng)數(shù)據(jù)。
示例代碼
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
輸出
0
1
2
3
4
5
Sequence complete
對(duì)于Future,它會(huì)發(fā)射Future.get()方法返回的單個(gè)數(shù)據(jù)。from
方法有一個(gè)可接受兩個(gè)可選參數(shù)的版本,分別指定超時(shí)時(shí)長(zhǎng)和時(shí)間單位。如果過(guò)了指定的時(shí)長(zhǎng)Future還沒(méi)有返回一個(gè)值,這個(gè)Observable會(huì)發(fā)射錯(cuò)誤通知并終止。
from
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。然而你可以將Scheduler作為可選的第二個(gè)參數(shù)傳遞給Observable,它會(huì)在那個(gè)調(diào)度器上管理這個(gè)Future。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/fromFunc0.png" alt="from func)" />
此外,在可選包 RxJavaAsyncUtil
中,你還可以用下面這些操作符將actions,callables,functions和runnables轉(zhuǎn)換為發(fā)射這些動(dòng)作的執(zhí)行結(jié)果的Observable:
在這個(gè)頁(yè)面 Start 查看關(guān)于這些操作符的更多信息。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/St.from.png" alt="from" />
注意:還有一個(gè)可選的StringObservable
類中也有一個(gè)from
方法,它將一個(gè)字符流或者一個(gè)REader轉(zhuǎn)換為一個(gè)發(fā)射字節(jié)數(shù)組或字符串的Observable。
注意:這里與后面start
操作符里的runAsync
說(shuō)明重復(fù)了
在單獨(dú)的RxJavaAsyncUtil
包中(默認(rèn)不包含在RxJava中),還有一個(gè)runAsync
函數(shù)。傳遞一個(gè)Action
和一個(gè)Scheduler
給runAsync
,它會(huì)返回一個(gè)StoppableObservable
,這個(gè)Observable使用Action
產(chǎn)生發(fā)射的數(shù)據(jù)項(xiàng)。
傳遞一個(gè)Action
和一個(gè)Scheduler
給runAsync
,它返回一個(gè)使用這個(gè)Action
產(chǎn)生數(shù)據(jù)的StoppableObservable
。這個(gè)Action
接受一個(gè)Observable
和一個(gè)Subscription
作為參數(shù),它使用Subscription
檢查unsubscribed
條件,一旦發(fā)現(xiàn)條件為真就立即停止發(fā)射數(shù)據(jù)。在任何時(shí)候你都可以使用unsubscribe
方法手動(dòng)停止一個(gè)StoppableObservable
(這會(huì)同時(shí)取消訂閱與這個(gè)StoppableObservable
關(guān)聯(lián)的Subscription
)。
由于runAsync
會(huì)立即調(diào)用Action
并開(kāi)始發(fā)射數(shù)據(jù),在你創(chuàng)建StoppableObservable之后到你的觀察者準(zhǔn)備好接受數(shù)據(jù)之前這段時(shí)間里,可能會(huì)有一部分?jǐn)?shù)據(jù)會(huì)丟失。如果這不符合你的要求,可以使用runAsync
的一個(gè)變體,它也接受一個(gè)Subject
參數(shù),傳遞一個(gè)ReplaySubject
給它,你可以獲取其它丟失的數(shù)據(jù)了。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/St.decode.png" alt="decode" />
StringObservable
類不是默認(rèn)RxJava的一部分,包含一個(gè)decode
操作符,這個(gè)操作符將一個(gè)多字節(jié)字符流轉(zhuǎn)換為一個(gè)發(fā)射字節(jié)數(shù)組的Observable,這些字節(jié)數(shù)組按照字符的邊界劃分。