本文出自:【InTheWorld的博客】 (欢迎留言、交流)
Spring Cloud“全家桶”风头正劲,Hystrix作为服务容错保护组件也是挺有名气。最近我有在看一些Spring Cloud的内容,其中就包括Hystrix。这里我打算从宏观理论和微观实现两个部分来分析Hystrix。
1. Hystrix的宏观知识
首先是宏观理论了,先抛出两个问题。Hystrix的设计目的是什么?应该怎么完成这些目标?针对第一个问题,我们首先需要明确的是微服务架构应该是有一定的容错性的,而服务不可用的问题是客观存在的。而且这些服务错误常常会恶化和扩散,结果造成更严重的负面影响。所以在无法绝对保证服务可用性的前提下,我们需要一种机制来保护服务错误。
Hystrix的作用主要体现在一下几个方面,
- 保护和控制底层服务的高延迟和失效对上层服务的影响。
- 避免复杂分布式中服务失效的雪崩效应。在大型的分布式系统中,存在各种复杂的依赖关系。如果某个服务失效,很可能会对其他服务造成影响,形成连锁反应。
- 快速失效和迅速恢复。以Spring为例,一般在实现controller的时候,都会以同步的逻辑调用依赖的服务。如果服务时效,而且没有客户端失效机制,就好导致请求长时间的阻塞。如果不能快速的发现失效,而就很难通过高可用机制或者负载均衡实现迅速的恢复。
- 优雅地实现服务降级。个人理解,这一点是从用户体验来考虑的,一个预定义默认返回会比请求卡死或者500好很多。
- 实现了服务监控、报警和运维控制。Hystrix Dashboard和Turbine可以配合Hystrix完成这些功能。
Hystrix是怎么完成这些需求呢?个人觉得Hystrix的实现有以下几个关键点;
- 为服务调用设置超时时间;这一点非常容易理解,超时返回可以实现服务快速失效。
- 使用线程池来完成服务请求;我在之前的博客中也讲到过,线程池可以实现资源控制的作用。不同的依赖服务使用不同的线程池来完成请求,就可以实现服务依赖之间的隔离。这样就是Hystrix中比较常提起的“仓壁模式”。这种方式可以一定程度的抑制失效扩散。比如,服务A依赖于服务B和服务C,服务B宕机了,导致服务A中依赖于B的请求大量积压。如果没有隔离,依赖于服务C的请求的CPU时间将被严重压缩,甚至导致服务A中的依赖于B和C的请求都失败。
- 基于对请求的统计,Hystrix实现了服务监控。并且在服务失效达到一定的比例之后,直接熔断服务。因为,即使有超时,仍然有可能形成请求积压。在这种极端情况下,熔断机制就能发挥作用了,它会让请求直接失败。
2. Hystrix的实现
Hystrix使用了“命令模式”,每一个依赖请求都是HystrixCommand或者HystrixObservableCommand。Hystrix的命令执行流程图如下:
HystrixCommand和HystrixObservableCommand的区别主要体现在异步执行的返回值句柄不同。它们分别对应传统型的Future、和RxJava中的Observable。虽然看似用法略有不同,但是内部实现都是通过RxJava的形式完成的。HystrixCommand的简单使用方法如下:
@HystrixCommand(fallbackMethod = "findByIdFallback") @GetMapping("/user/{id}") public User findById(@PathVariable Long id) { return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class); } User findByIdFallback(Long id, Throwable throwable) { LOGGER.error("进入回退方法,异常:", throwable); User user = new User(); user.setId(-1L); user.setName("默认用户"); return user; }
HystrixCommand这个注解相当于把findById这个函数包装成了一个HystrixCommand。对这个api端点的请求都会引发一个对应的HystrixCommand执行。HystrixCommand注解除了fallbackMethod,还有很多其他的参数,这些参数会用来构造HystrixCommand命令。
HystrixCommand命令是如何执行的呢?接下来,我们就通过对Hystrix源代码的分析来理解它!无论是HystrixCommand还是HystrixObservableCommand,都是AbstractCommand的子类。所以它包含了很多Hystrix命令执行的细节。以HystrixCommand类为例,它的执行其实是通过调用execute()方法完成的。这个方法的实现如下:
public R execute() { try { return this.queue().get(); } catch (Exception var2) { throw this.decomposeException(var2); } } public Futurequeue() { final Future delegate = this.toObservable().toBlocking().toFuture(); Future f = new Future () { public R get() throws InterruptedException, ExecutionException { return delegate.get(); } public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; if(f.isDone()) { /* */ } else { return f; } }
可以看到execute()方法实际上是通过调用queue()方法实现的,而queue()方法的目的就是构造出一个Future f。这个f其实仅仅是封装了delegate这个Futrue。所以一番分析之后,我们知道了问题的关键所在,其实就是这行this.toObservable().toBlocking().toFuture()。那就从toObservable()开始分析!
public ObservabletoObservable() { final Action0 terminateCommandCleanup = new Action0() { /* */ }; final Action0 unsubscribeCommandCleanup = new Action0() { /* */ }; final Func0 > applyHystrixSemantics = new Func0 >() { public Observable call() { return AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this); } }; final Func1 wrapWithAllOnNextHooks = new Func1 () { /* */ }; final Action0 fireOnCompletedHook = new Action0() { /* */ }; return Observable.defer(new Func0 >() { public Observable call() { if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) { } else { Observable hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable afterCache; if(requestCacheEnabled && cacheKey != null) { /* */ afterCache = toCache.toObservable(); } else { afterCache = hystrixObservable; } return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook); } } }); }
虽然这个方法的代码不少,但还是可以很快的定位到关键点——applyHystrixSemantics(); 这个函数完成了Hystrix的基本语义。首先是熔断器的语义,如果熔断器允许请求就去执行请求,反之则直接执行fallback函数。在正常的请求通路下,最终会调用executeCommandWithSpecifiedIsolation()方法来完成请求的执行。从这个函数名就可以看出,这个方法完成了Hystrix隔离的语义。由于这个方法代码比较长,就不贴完整代码了。但是方法中的一个subscribeOn()运算符非常显眼,如下所示:
subscribeOn(this.threadPool.getScheduler(new Func0() { public Boolean call() { return Boolean.valueOf(((Boolean)AbstractCommand.this.properties.executionIsolationThreadInterruptOnTimeout().get()).booleanValue() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT); } }));
这个threadPool就是构造HystrixCommand的时候设定的线程池,现在大家应该可以理解Hystrix如何实现隔离的。回到那行代码this.toObservable().toBlocking().toFuture()。toBlocking()仅仅是为了构造一个BlockingObservable,然后通过toFuture()构造出一个Future。其实构造这个future的过程也很简单,只是订阅上游的Observable,然后通知和更新Future,具体过程如下:
public final class BlockingOperatorToFuture { public staticFuture toFuture(Observable extends T> that) { final CountDownLatch finished = new CountDownLatch(1); final AtomicReference value = new AtomicReference(); final AtomicReference error = new AtomicReference(); final Subscription s = that.single().subscribe(new Subscriber () { public void onCompleted() { finished.countDown(); } public void onError(Throwable e) { error.compareAndSet((Object)null, e); finished.countDown(); } public void onNext(T v) { value.set(v); } }); return new Future () { private volatile boolean cancelled; public boolean isDone() { return finished.getCount() == 0L; } public T get() throws InterruptedException, ExecutionException { finished.await(); return this.getValue(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if(finished.await(timeout, unit)) { return this.getValue(); } else { throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable."); } } private T getValue() throws ExecutionException { Throwable throwable = (Throwable)error.get(); if(throwable != null) { throw new ExecutionException("Observable onError", throwable); } else if(this.cancelled) { throw new CancellationException("Subscription unsubscribed"); } else { return value.get(); } } }; } }
这个返回的Future就是前面的delegate。讲到这里,Hystrix command的大致回路,算是跑马观花的看了一遍。具体的一些细节实现,这里就先略过了!
对于Hystrix的具体使用,可以参考这个https://github.com/eacdy/spring-cloud-study。我也有看这个demo项目学习Spring Cloud。
发表评论