单个网络请求数据并更新UI
这个比较简单,整个流程大致是:
通过
Obsrvable.create方法,调用OkHttp网络请求通过
map方法结合gson,将response转换为bean类通过
onNext,解析bean中数据,并进行数据库存储调度线程
通过
subscribe,根据请求成功或异常来更新UI
Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, Bean>() { @Override
public Bean apply(@NonNull Response response) throws Exception { //Gson
}
}).doOnNext(new Consumer<Bean>() { @Override
public void accept(@NonNull Bean bean) throws Exception { //saveData
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bean>() { @Override
public void accept(@NonNull Bean bean) throws Exception { //refresh UI
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception { //get ERROR
}
});多个网络请求依次依赖
这里主要是依赖于flatMap关键字,FlatMap可以将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
利用这个特性,我们可以将Observable转成另一个Observable
Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, FirstBean>() { @Override
public FirstBean apply(@NonNull Response response) throws Exception { //Gson
}
}).flatMap(new Function<FirstBean, ObservableSource<Response>>() { @Override
public ObservableSource<Response> apply(@NonNull FirstBean bean) throws Exception { final String s = bean.getData(); return Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url/" + s)
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
});
}
}).map(new Function<Response, SecondBean>() { @Override
public SecondBean apply(@NonNull Response response) throws Exception { //Gson
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<SecondBean>() { @Override
public void accept(@NonNull SecondBean secondBean) throws Exception { //refresh UI
}
});先读取缓存数据并展示UI再获取网络数据刷新UI
这里需要依赖另一个操作符:Concatconcat可以做到不交错的发射两个或多个Observable的发射物,并且只有前一个Observable终止(onComleted)才会订阅下一个Obervable
利用这个特性,我们就可以依次的读取缓存数据展示UI,然后再获取网络数据刷新UI
首先创建一个从cache获取数据的observable
再创建一个从网络获取数据的Observable(可以通过map等方法转换数据类型)
通过concat方法将多个observable结合起来
通过subscribe订阅每一个observable
Observable<List<String>> cache = Observable.create(new ObservableOnSubscribe<List<String>>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
CacheManager manager = CacheManager.getInstance();
List<String> data = manager.query();
e.onNext(data); //一定要有onComplete,不然不会执行第二个Observale
e.onComplete();
}
});
Observable<List<String>> network = Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, List<String>>() { @Override
public List<String> apply(@NonNull Response response) throws Exception { //解析数据
}
});//两个observable的泛型应该保持一致Observable.concat(cache, network)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<List<String>>() { @Override
public void accept(@NonNull List<String> strings) throws Exception { //refresh ui
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception { //get error
}
});获取网络数据前先读取缓存
其实和上面的那种类似,只需要稍微修改一下逻辑即可:
当缓存的Observable获取到数据时,只执行onNext,获取不到则只执行onComplete
Observable<String> cache = Observable.create(new ObservableOnSubscribe<String>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
CacheManager manager = CacheManager.getInstance();
String data = manager.queryForPosition(0); if (data != null) {
e.onNext(data);
} else { //调用onComplete之后会执行下一个Observable
//如果缓存为空,那么直接结束,进行网络请求
e.onComplete();
}
}
});
Observable<String> network = Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("url")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, String>() { @Override
public String apply(@NonNull Response response) throws Exception { //解析数据
}
});//两个observable的泛型应该保持一致Observable.concat(cache, network)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() { @Override
public void accept(@NonNull String strings) throws Exception { //refresh ui
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception { //get error
}
});当然,有的时候我们的缓存可能还会分为memory和disk,无差,只需要多写一个Observable然后一样通过concat合并即可。
结合多个接口的数据再更新UI
这个时候就需要靠zip方法啦,zip方法可以将多个Observable的数据结合为一个数据源再发射出去。
Observable<FirstBean> firstRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("firstUrl")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, FirstBean>() { @Override
public FirstBean apply(@NonNull Response response) throws Exception { //解析数据
}
});
Observable<SecondBean> secondRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("secondUrl")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
e.onComplete();
}
}).map(new Function<Response, SecondBean>() { @Override
public SecondBean apply(@NonNull Response response) throws Exception { //解析数据
}
});
Observable.zip(firstRequest, secondRequest, new BiFunction<FirstBean, SecondBean, WholeBean>() { @Override
public WholeBean apply(@NonNull FirstBean firstBean, @NonNull SecondBean secondBean) throws Exception { //结合数据为一体
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<WholeBean>() { @Override
public void accept(@NonNull WholeBean strings) throws Exception { //refresh ui
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception { //get error
}
});当然,如果你的两个api返回的是相同类型的数据,那么可以直接使用merge将数据合并,而不需要实现回调。
减少频繁的网络请求
设想一种场景:点击一次button就进行一次网络请求,或者当输入框数据变化时进行网络请求,那么这样就会在一下子产生大量的网络请求,但实际上又没有必要,这个时候就可以通过debounce方法来处理,debounce操作符会过滤掉发射速率过快的数据项:
为了方便处理点击事件和Observable的关系,我们引入RxBinding处理:
RxView.clicks(mButton)
.debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Object>() { @Override
public void accept(@NonNull Object o) throws Exception { // refresh ui
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception { // get error
}
});







评论