Agera引入的代碼風(fēng)格也許適合從零開始的新建app項目。這篇包括一些提示,來幫助想在遺留代碼中使用Agera的開發(fā)者,如此往下做。
觀察者模式有很多種實現(xiàn)方式,但不是所有的都可以通過簡單的放入遷入到Agera中。下面是一個例子:演示將一個監(jiān)聽(listenable)類添加到Observable
接口的一種方法。
MyListenable
類可以增加(addListener
)和刪除(removeListener
)Listener
,作為額外完整的演示,它繼承了SomeBaseClass
。
該實例使用UpdateDispatcher
來解決Java的單繼承約束,使用一個內(nèi)部類(Bridge
)來做橋接, 保持其完整的原始API,同時也使Agera可見。
public final class MyListenable extends SomeBaseClass implements Observable {
private final UpdateDispatcher updateDispatcher;
public MyListenable() {
// Original constructor code here...
updateDispatcher = Observables.updateDispatcher(new Bridge());
}
// Original class body here... including:
public void addListener(Listener listener) { … }
public void removeListener(Listener listener) { … }
@Override
public void addUpdatable(Updatable updatable) {
updateDispatcher.addUpdatable(updatable);
}
@Override
public void removeUpdatable(Updatable updatable) {
updateDispatcher.removeUpdatable(updatable);
}
private final class Bridge implements ActivationHandler, Listener {
@Override
public void observableActivated(UpdateDispatcher caller) {
addListener(this);
}
@Override
public void observableDeactivated(UpdateDispatcher caller) {
removeListener(this);
}
@Override
public void onEvent() { // Listener implementation
updateDispatcher.update();
}
}
}
Java本質(zhì)是一種同步語言,如:在Java中最低級別的操作都是同步方法。 當(dāng)操作可能會花一些時間才能返回(耗時操作),這種方法通常稱為阻塞方法,而且禁止開發(fā)者在主線程(UI Thread)調(diào)用。
假設(shè)app的UI需要從阻塞的方法獲得數(shù)據(jù)。Agera可以很容易的通過后臺線程完成調(diào)用,然后UI可以在主線程中接收數(shù)據(jù)。首先,這個阻塞操作需要封裝成一個Agera操作,像這樣:
public class NetworkCallingSupplier implements Supplier<Result<ResponseBlob>> {
private final RequestBlob request = …;
@Override
public Result<ResponseBlob> get() {
try {
ResponseBlob blob = networkStack.execute(request); // blocking call
return Result.success(blob);
} catch (Throwable e) {
return Result.failure(e);
}
}
}
Supplier<Result<ResponseBlob>> networkCall = new NetworkCallingSupplier();
Repository<Result<ResponseBlob>> responseRepository =
Repositories.repositoryWithInitialValue(Result.<ResponseBlob>absent())
.observe() // no event source; works on activation
.onUpdatesPerLoop() // but this line is still needed to compile
.goTo(networkingExecutor)
.thenGetFrom(networkCall)
.compile();
上面的代碼段假定了,在Repository.compile()之前這個請求是已知且永遠(yuǎn)不變的。
這個很容易升級成為一個動態(tài)請求,甚至在repository同樣的激活周期期間。
要可以修改請求,簡單的方式是使用MutableRepository
。
此外,為了在第一次請求為完成之前就可以提供數(shù)據(jù),可以在Result
中一個提供初始值:absent()
。
MutableRepository這種用法類似于是一個可變的變量(可為null),故命名為requestVariable
。
// MutableRepository<RequestBlob> requestVariable =
// mutableRepository(firstRequest);
// OR:
MutableRepository<Result<RequestBlob>> requestVariable =
mutableRepository(Result.<RequestBlob>absent());
然后, 不是在supplier中封裝阻塞方法,使用function實現(xiàn)動態(tài)請求:
public class NetworkCallingFunction
implements Function<RequestBlob, Result<ResponseBlob>> {
@Override
public Result<ResponseBlob> apply(RequestBlob request) {
try {
ResponseBlob blob = networkStack.execute(request);
return Result.success(blob);
} catch (Throwable e) {
return Result.failure(e);
}
}
}
Function<RequestBlob, Result<ResponseBlob>> networkCallingFunction =
new NetworkCallingFunction();
升級后的repository可以像這樣compiled:
Result<ResponseBlob> noResponse = Result.absent();
Function<Throwable, Result<ResponseBlob>> withNoResponse =
Functions.staticFunction(noResponse);
Repository<Result<ResponseBlob>> responseRepository =
Repositories.repositoryWithInitialValue(noResponse)
.observe(requestVariable)
.onUpdatesPerLoop()
// .getFrom(requestVariable) if it does not supply Result, OR:
.attemptGetFrom(requestVariable).orEnd(withNoResponse)
.goTo(networkingExecutor)
.thenTransform(networkCallingFunction)
.compile();
這部分代碼段還演示了一點:通過給操作一個特殊的名字,讓repository的編譯表達式更易讀。
現(xiàn)在的很多Library都有異步API和內(nèi)置的線程,但是客戶端不能控制或禁用。
app中有這樣的Library的話,引入Agera將是一個具有挑戰(zhàn)性的工作。 一個直接的辦法就是找到Library中同步選擇的點,采用[[如上段所述|Incrementally-Agerifying-legacy-code#exposing-synchronous-operations-as-repositories]]方法。
另一個方式(反模式):切換后臺線程執(zhí)行異步調(diào)用并等待結(jié)果,然后同步拿結(jié)果。上面方式不可行時,這一節(jié)討論一個合適的解決方法。
異步調(diào)用的一個循環(huán)模式是請求-響應(yīng) 結(jié)構(gòu)。下面的示例假定這樣結(jié)構(gòu):未完成的工作可以被取消,但是不指定回調(diào)的線程。
interface AsyncOperator<P, R> {
Cancellable request(P param, Callback<R> callback);
}
interface Callback<R> {
void onResponse(R response); // Can be called from any thread
}
interface Cancellable {
void cancel();
}
下面repository例子,使用AsyncOperator
提供數(shù)據(jù), 完成響應(yīng)式請求(一個抽象的supplier類)。
這段代碼假定AsyncOperator
已經(jīng)有足夠的緩存,因此重復(fù)的請求不會影響性能。
public class AsyncOperatorRepository<P, R> extends BaseObservable
implements Repository<Result<R>>, Callback<R> {
private final AsyncOperator<P, R> asyncOperator;
private final Supplier<P> paramSupplier;
private Result<R> result;
private Cancellable cancellable;
public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
Supplier<P> paramSupplier) {
this.asyncOperator = asyncOperator;
this.paramSupplier = paramSupplier;
this.result = Result.absent();
}
@Override
protected synchronized void observableActivated() {
cancellable = asyncOperator.request(paramSupplier.get(), this);
}
@Override
protected synchronized void observableDeactivated() {
if (cancellable != null) {
cancellable.cancel();
cancellable = null;
}
}
@Override
public synchronized void onResponse(R response) {
cancellable = null;
result = Result.absentIfNull(response);
dispatchUpdate();
}
@Override
public synchronized Result<R> get() {
return result;
}
}
這個類可以很容易地升級到可以修改請求參數(shù),而這個過程就類似于前面的討論:讓repository提供請求參數(shù),并讓AsyncOperatorRepository
觀察請求參數(shù)變化。
在激活期間,觀察請求參數(shù)的變化,取消任何正在進行的請求,并發(fā)出新的請求,如下所示:
public class AsyncOperatorRepository<P, R> extends BaseObservable
implements Repository<Result<R>>, Callback<R>, Updatable {
private final AsyncOperator<P, R> asyncOperator;
private final Repository<P> paramRepository;
private Result<R> result;
private Cancellable cancellable;
public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
Repository<P> paramRepository) {
this.asyncOperator = asyncOperator;
this.paramRepository = paramRepository;
this.result = Result.absent();
}
@Override
protected void observableActivated() {
paramRepository.addUpdatable(this);
update();
}
@Override
protected synchronized void observableDeactivated() {
paramRepository.removeUpdatable(this);
cancelOngoingRequestLocked();
}
@Override
public synchronized void update() {
cancelOngoingRequestLocked();
// Adapt accordingly if paramRepository supplies a Result.
cancellable = asyncOperator.request(paramRepository.get(), this);
}
private void cancelOngoingRequestLocked() {
if (cancellable != null) {
cancellable.cancel();
cancellable = null;
}
}
@Override
public synchronized void onResponse(R response) {
cancellable = null;
result = Result.absentIfNull(response);
dispatchUpdate();
}
// Similar process for fallible requests (typically with an
// onError(Throwable) callback): wrap the failure in a Result and
// dispatchUpdate().
@Override
public synchronized Result<R> get() {
return result;
}
}