异步编程神器RxJava

定义

RxJava extends the observer pattern to support sequences of data/events and adds operations that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronisation, thread-safety and concurrent data structures.

RxJava简单来说是建立在观察者模式上面的编程范式,它一定程度抽象和屏蔽了低层的线程创建管理和多线程并发安全问题,从而提供更好的编程体验,降低了开发者使用门槛。

一些概念

  • 一切皆是Stream,那么什么是流呢? Stream可以简单理解为按时间排序的Events序列, 其中Observable 就是一个Stream, 它是不可变的,任何操作都会返回一个全新的Stream, 这种Immutability性,很好的规避了多线程并发问题。
  • 另外一个重要的是Subscriber,订阅者或者观察者, 他们是事件的消费者,重要的三个方法有onNext, onCompleted, onError
  • 在使用RxJava过程中,你会发现它体现了很多函数式编程的风格,e.g:
    函数作为一等公民, 其定义如下:

    The ability to store a function as a variable and pass that function as a parameter.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.function.Function;

public class FirstClassCitizenParameterIllustration {

public static void main(String[] args) {

Function<String, String> transformToLower = (s) -> {
return s.toLowerCase();
};
System.out.println(concatAndTransform("Hello ", "World", transformToLower));
}

public static String concatAndTransform(String a, String b, Function<String, String> stringTransform) {

if (stringTransform != null) {
a = stringTransform.apply(a);
b = stringTransform.apply(b);
}

return a + b;
}
}

高阶函数, 其定义如下:

A high order function is a function that can return a function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.function.Function;
import java.util.function.Supplier;

public class HighOrderFunctionIllustration {

public static void main(String[] args) {

Supplier<String> xformOperation = createCombineAndTransform("Hello ", "World", (a) -> {
return a.toUpperCase();
});
System.out.println(xformOperation.get());
}

public static Supplier<String> createCombineAndTransform(
final String a, final String b,
final Function<String, String> transformer
) {
return () -> {

String aa = a;
String bb = b;

if (transformer != null) {
aa = transformer.apply(a);
bb = transformer.apply(b);
}

return aa + bb;
};
}
}

  • Scheduler指定工作线程池

常见的操作

  • Creating Observables:Create, From, Just
  • Transforming Observables: Map, Flatmap, groupBy
  • Filtering Observables: Filter, First, Last, Skip
  • Combining Observables: Zip, Join, Merge
  • Error Handling Operators: Catch, Retry

rxjava.png

响应时间的优化

在进行多个API调用编排的时候,如果采用同步阻塞的编排方式,其响应时间约等于所有API执行时间之和:
同步编排.png

如果是采用异步并发的方式,则响应时间会明显缩短:
异步编排1.png

在我们真实的项目中,大量采用Rxjava的响应式编程,用于服务异步编排,减少服务响应时间,提供用户体验。

举个例子

需求说明:

一个订单服务,它的数据需要聚合其他三个服务的数据,服务调用没有先后之分,没有相互依赖,调用过程中,如果任何一个服务出现问题,需要及时在响应体中加入对应的错误信息并返回。

在采用RxJava进行编程的时候,首选应该考虑Stream, 每个服务的返回可以是一个Stream(Observable),但是每个服务的返回都不一样, 不能将所有的三个服务直接通过Stream连起来。

所以第一步是提取并封装公共的返回对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final class AsyncResult<V> {
private final V value;
private final Exception exception;

private AsyncResult(V value, Exception exception) {
this.value = value;
this.exception = exception;
}

public static <V> AsyncResult<V> success(V value) {
return new AsyncResult<>(value, null);
}

public static <V> AsyncResult<V> failed(Exception exception) {
return new AsyncResult<>(null, exception);
}

public V getValue() throws Exception {
if (exception != null) {
throw exception;
}
return value;
}

public Exception getException() {
return exception;
}

public boolean hasException() {
return exception != null;
}
}

第二步,为每个服务调用创建对应的Observable Stream

1
2
3
4
5
6
7
8
9
10
11
private Observable<AsyncResult<UserModel>> getUserAsyncStream(String userId) {
return Observable.create((ObservableOnSubscribe<AsyncResult<UserModel>>) emitter -> {
try {
UserModel user = userDao.getUser(userId);
emitter.onNext(AsyncResult.success(user));
} catch (Exception exception) {
emitter.onNext(AsyncResult.failed(exception));
}
emitter.onComplete();
}).subscribeOn(Schedulers.io());
}

第三步,通过Stream将所有的服务进行聚合:

1
2
3
4
5
6
7
Observable<OrderContainer> orderContainer = Observable.just(new OrderContainer())
.zipWith(userStream, new UserAssembler())
.zipWith(logisticsStream, new LogisticsAssembler())
.zipWith(productStream, new ProductAssembler())
.subscribeOn(Schedulers.io());

return orderContainer.blockingGet();

其中,每个Assember作为具体的聚合逻辑处理器, 分别针对正常情况和异常情况进行处理。

1
2
3
4
5
6
7
8
9
10
11
public class UserAssembler implements BiFunction<OrderContainer, AsyncResult<UserModel>, OrderContainer> {
@Override
public OrderContainer apply(OrderContainer orderContainer, AsyncResult<UserModel> userModel) throws Exception {
if (userModel.hasException()) {
orderContainer.addErrors(ErrorBuilder.buildServiceError(userModel.getException().getMessage()));
} else {
orderContainer.setUser(userModel.getValue());
}
return orderContainer;
}
}

整体流程如下:
image.png

其他

  • 异常也可以作为返回的一部分,而不是直接throw出去,可以在服务层(更高层次)而不是资源层(更低层次)进行处理,e.g: AsyncResult封装了异常。
  • 尽量保持对象不可变特性,避免线程并发安全。
  • 在下游服务出现不稳定、异常、挂掉的时候,如果你发现调用时间很长,很可能是底层的Httpclient timeout时间并没有被覆写,默认为20~30s,可以配合hystrix做熔断和避免级联错误的产生,在hystrix中,也需要设置正确的timeout时间和线程池大小。