更新时间:2016-05-16
原文链接:Crunching RxAndroid
该系列现已更新到 Part 8,本文包含 Part 0~8
举一个 ‘Hello World’ 的例子,要是有一点了解的话就跳过吧。
首先,添加依赖(例子使用的版本号不是最新)
compile 'io.reactivex:rxandroid:0.24.0'
主要角色为 Observable 和 Subscriber。前者负责抛出数据,后者负责接收数据并处理。两者数量为 1:n
//创建一个抛出 “Hello, World!” String 的 Observable
Observable.OnSubscribe observableAction = new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(“Hello, World!”);
subscriber.onCompleted();
}
};
Observable<String> observable = Observable.create(observableAction);
下一步是创建多个 Subscriber,这个例子中将包含两个 Subscriber 以不同的方式展示收到的 问候
Subscriber<String> textViewSubscriber = new Subscriber<String>() {
public void onCompleted() {}
public void onError(Throwable e) {}
public void onNext(String s) {
txtPart1.setText(s);
}
};
Subscriber<String> toastSubscriber = new Subscriber<String>() {
public void onCompleted() {}
public void onError(Throwable e) {}
public void onNext(String s) {
Toast.makeText(context, s, Toast.LENGTH_SHORT).show();
}
};
两个主角都有了之后,然后使用 subscribe
就是将他们联系起来,Observable 只有在被订阅的情况下才会产生数据。不知道大家是不是和我一样觉得如果是 subscribeBy 这样更容易理解
// 指定 observable 在主线程运行
observable.observeOn(AndroidSchedulers.mainThread());
// 实现订阅关系
observable.subscribe(textViewSubscriber);
observable.subscribe(toastSubscriber);
前面的代码实际上可以写的更简略点。RxJava 提供了 Action 和 Func 接口可以使代码更简略一点。Action 可以用于包装无返回值的,而 Func 是用来包装有返回值的
Action1<String> textViewOnNextAction = new Action1<String>() {
@Override
public void call(String s) {
textPart1.setText(s);
}
}
Func1<String, String> toUpperCaseMap = new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
}
前面定义了定义了一个 Func 将 Observable 的数据在发送给 Subscriber 前进行转换,但还需要 map
这一关键的一步。另外,Observable 只发送一个字符串,所以可以使用 just
进一步简化。
Observable<String> singleObservable = Observable.just("Hello,World");
将 Observable 和 Subscriber 连起来后应该像这样
singleObservable.observeOn(AndroidSchedulers.mainThread())
.map(toUpperCaseMap)
.subscribe(textViewOnNextAction); //subscribe(onNext, onError, onCompleted);
这里我们又引入了一个新的部分:Operator
, map 是很常用的一个。前面讲的例子只是一个字符串,在实际使用中显然不够用,比如我们要处理包含多个字符串的数组,其实只用 from
就可以搞定了
Observable<String> oneByOneObservable = Observable.from(manyWords);
处理像数组这类,如果不直接用 from
在源头上解决,还可以像 map 那样进行转换,此时需要的是 flatMap
这个 Operator,它同样能将原先的 Observable 发出的数据转换成另一种,而且更灵活。
Func1<List<String>, Observable<String>> getUrls = new
Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
return Observable.from(strings);
}
}
现在又有新的需求,需要将所有的字符串合并成一个字符串,并以一个空格作为间隔:此时我们需要用到另一种 Operator : reduce
,它将在 Observable 结束发送后,将发送的数据合并
Func2<String, String, String> mergeRoutine = new Func2<String, String, String>(){
@Override
public String call(String s, String s1) {
return String.format("%s %s",s, s1);
}
}
最后将所有的连起来
Observable.just(manyWordList)
.observeOn(AndroidSchedulers.mainThread())
.flatMap(getUrls)
.reduce(mergeRoutine)
.subscribe(toastOnNextAction);
什么是 Lambdas ?维基上是这么解释的
在计算机编程中,匿名函数(英语:anonymous function)是指一类无需定义标识符(函数名)的函数或子程序,普遍存在于多种编程语言中。
匿名函数确实能使代码看上去简洁不少,但在学习中也会让你忽略 RxJava 的一些技术细节。因为 Java 直到 Java 8 才支持,所以需要借助 Retrolambda,同时 Retrolambda 作为非官方兼容方案,其向后兼容性和稳定性是无法保障的
parameter -> functionThatWillReturnSomethingUsingThe(parameter)
首先请将 Java 版本升级到 1.8.0+,然后添加依赖,引入 Retrolambda
buildscript {
repositories {
//...
mavenCentral()
}
dependencies {
//...
classpath 'me.tatarka:gradle-retrolambda:3.1.0'
}
}
// Required because retrolambda is on maven central
repositories {
mavenCentral()
}
apply plugin: 'com.android.application' //or apply plugin: 'java'
apply plugin: 'me.tatarka.retrolambda'
我们来看下实际效果吧
Func1<List<String>, Observable<String>> getUrls = new
Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
return Observable.from(strings);
}
}
strings -> Observable.from(strings);
// 什么? 你还嫌长? 好好好,满足你
Observable::from
再补充一个长点的
Observable.just("Hello, World!")
.observeOn(AndroidSchedulers.mainThread())
.map(String::toUpperCase)
.subscribe(txtPart1::setText);
Observable.from(manyWords)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(message -> Toast.makeText(context, message, Toast.LENGTH_SHORT).show());
Observable.just(manyWordList)
.observeOn(AndroidSchedulers.mainThread())
.flatMap(Observable::from)
.reduce((s, s1) -> String.format("%s %s", s, s1))
.subscribe(message -> Snackbar.make(rootView, message, Snackbar.LENGTH_LONG).show());
这个部分包含作者的软广部分,我才不会贴链接。 所涉及的的实现功能有:
用 Rx 的方式实现的话,首先获得所有音乐所能识别的 packages
Observable.from(softwarePackages);
然后创建另一个 Observable 用于包含 software names,用于创建 PackageData 类
softwareNamesObservable = Observable.from(softwareNames);
现在我们有两个 Observables 分别用于产生 packages 列表和名字,使用一个 Operator:zipWith
将它们链接起来,将后者作为参数。
Observable.from(softwarePackages)
.zipWith(softwareNamesObservable, PackageData::new)
这样就从一个 Observable 那得到了 PackageData,但我们现在还需要筛选出符合的结果。此时需要另一个 Operator:filter
,设定判定条件,筛选出符合条件的
private static boolean isAppInstalled(final Context context, String packageName) {
boolean isAppInstalled = true;
try {
context.getPackageManager().getPackageInfo
(packageName, PackageManager.GET_ACTIVITIES);
} catch (Exception e) {
isAppInstalled = false;
}
return isAppInstalled;
}
这样我们就能得到从结果中筛选出本机已安装的
Observable.from(softwarePackages)
.zipWith(softwareNamesObservable, PackageData::new)
.filter(PackageData ->
isAppInstalled(context, PackageData.getPackageName()))
剩下的部分就是选择在那个线程上执行,这里需要将 Observable 的 subscribe 运行在 I/O 线程, Subscriber 的处理结果在 UI 线程上
Observable.from(softwarePackages)
.zipWith(softwareNamesObservable, PackageData::new)
.filter(PackageData ->
isAppInstalled(context, PackageData.getPackageName()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
现在我们成功筛选本机上已安装的所支持的 providers 在后台线程中,并将结果传递到主线程上。最后我还需要创建一个 Subscriber 在主线程上处理传递过来的数据。
//接上面部分
.subscribe(Subscribers.create(
(installedApps::add),
(thowable -> { /* on error */ }),
(this::convertInstalledAppsListToVector) ));
// TODO
RxJava 很好用,但如果不注意使用也会有风险,比如当 App 生命周期发生变化时,未完成的 subscriptions 可能会造成内存泄露。不过在 RxLifecycle 的帮助下,这个将不再是问题。使用 RxLifecycle 需要继承 RxActivity, RxAppCompatActivity, RxFragment 这三者之一或选择提供一个 Observable<ActivityEvent>
或 Observable<FragmentEvent>
无论你选择何种,最后实现起来是类似的。有了 RxLifecycle,我们的 Observable 的生命周期可以和 Activity 或 Fragment 的生命周期联系在一起。换句话说,如果我们在 onResume 阶段启动 Observable, RxLifecycle 将会保证这个 Observable 在 onPause 阶段取消 subscribe。如果还不太理解的话看下下面的栗子,反之就看下一节吧。
现在有这么一段程序,在 Activity 运行期间会一直打 Log,同时在 Activity 进入到 onPause 阶段时停止,另一方面,我们想办法让 Observable 在 Activity 处于活动时一直运行(废话讲太多了,还是直接看图吧)。可以在这里找到全部代码
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::logOnNext,this::logOnError,this::logOnCompleted);
『The sun is rising!』表示进入 onPause 阶段,最后程序跑起来的样子:
结果很明显, Observable 并没有及时地停止,这就是前面提到的我们在使用时需要注意的。
下面自然是需要给出使用 RxLifecycle 后的结果,RxLifecycle 会在适当时机调用 onCompleted 方法来停止 Observable。
// 首先要继承
public class Part5Activity extends RxAppCompatActivity {}
为了绑定到生命周期,可以自动的或使用静态方法 bindUntilActivityEvent
,这里我们就使用前者。
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.subscribe(this::logOnNext, this::logOnError,this::logOnCompleted);
最后补张图
RxBinding是 Jake Wharton 大神的作品。这部分讲的是绑定,道理我有点讲不通,差不多的内容为 Android 官方的 Data Binding实现绑定后就不用再写什么 findViewById 了。
这个库背后的原理是提供了一个 Observable 当我们所感兴趣的事件发生时被启动。例如,我们对 FAB 的点击事件比较感兴趣:
RxView.clicks(fab).subscribe(aVoid -> onFabClicked());
类似的还有 RxToolbar,itemClicks()
,RxView.navigationClicks()
。随着库的更新,所支持的也会越来越多。
最后我们来个更常用的例子,也就是监听 EditText 的输入,并对每一次输入前,时,后做出相应的处理。
usualApproachEditText.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
onNewTextChanged(s);
}
@Override
public void afterTextChanged(Editable s) {}
});
RxTextView.textChanges(reactiveApproachEditText).subscribe(this::onNewTextChanged);
这部分又回到 RxJava 了。在 RxJava 里总共有两类:一种称之为 Sequence Operators,影响原始 Observable 发送的数据,通过使用 lift()
来实现。另一种则是像 RxLifecycle 这类改变 Observable 自身,我们称之为 Transformational Operators,通过使用 compose()
来实现。自定义 Operators 有比较大的风险,所以这部分只是为了理解 Operators。
看下 Operator 接口的定义,我们会发现实际上是输入一个 subscriber
自定义 Sequence Operators 的第一步要继承 Observable.Operator 类,在这个类的 call()
方法返回一个新的 Subscriber。
public class SequenceOperator implements Observable.Operator<Integer, Integer> {
@Override
public Subscriber<? super Integer> call(Subscriber<? super Integer> subscriber) {
return new Subscriber<Integer>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(Integer integer) {
int roundedSqrt = (int) Math.round(Math.sqrt(integer));
subscriber.onNext(roundedSqrt);
}
};
}
}
自定义 Transformational Operators,将影响 Observable 整个部分。需实现 Transformer 接口,运行到我们想要的线程上,最后返回结果。
public class ObservableTransformer<T> implements Observable.Transformer<T, T> {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
}
最后,检验下我们实现的。
Observable.just(txtNumber.getText())
.delay(5, TimeUnit.SECONDS)
.map(editable -> Integer.parseInt(editable.toString()))
.lift(new SequenceOperator())
.map(sqrt -> String.format(“SQRT is %d”, sqrt))
.compose(new ObservableTransformer<>())
.subscribe(s -> txtResponse.setText(s),
throwable -> txtResponse.setText(throwable.getMessage()));
总算是有点内容了。到目前为止,我们已经知道一个 Subscriber 可以监听一个 Observable,并对其发出的数据接收和处理。但是,但是如果我们想让他们绑在一起呢?像一条管道,一端为 Observable,另一端为 Subscriber,这个就是我们将要讲的 Subject。
使用 Subject 可以很好的解决之前提到的 subscription 当 Activity 的生命周期发生变化时无法即使保存,也就是,我们可以使用一个 Subject 在 onCreate()
阶段 subscribe 我们的 Observable,然后安全的存储这个 Subject,以至于能在设备发生旋转的过程中存活下来,然后能在 onResume()
阶段重新接上正确的 Subscriber,这样就能得到结果。
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
Observable<Integer> source = Observable.interval(1, TimeUnit.SECONDS)
.map(Long::intValue)
.take(20);
subscription = source.subscribe(subject);
}
@Override
public void onResume() {
super.onResume();
listener.onObservableRetrieved(subject.asObservable());
}
下面介绍几种 subject,在不同的应用场景下使用相应的 subject。
AsyncSubject 它将发送原始 Observable 最后一次的结果(再强调一边:只是最后一次的值),并且是在原始 Observable 结束后。如果 Observable 没有发送任何数据, AsyncSubject 也能正常结束。也就是,这个 Subject 不在意接收到多少数据,在意的只是在结束前发送的最后一次数据。当我们之是在意最后一次的结果的话可以使用。
ReplaySubject 是一种最简单的 Subject,它将 Observable 的数据所有发送给所有 Subscriber 无论先后。不会错过任何信息!
使用 BehaviorSubject,每个 Subscriber 能接收到完成 subscribe 前最近的一次数据。
PublishSubject 和 BehaviorSubject 挺相似的,不过只能接收完成 subscribe 后的数据。
如果你对 RxJava 想进一步了解,推荐看下