理解Spring Cloud Hystrix

本文出自:【InTheWorld的博客】 (欢迎留言、交流)

hystrix-logo-tagline-640

Spring Cloud“全家桶”风头正劲,Hystrix作为服务容错保护组件也是挺有名气。最近我有在看一些Spring Cloud的内容,其中就包括Hystrix。这里我打算从宏观理论和微观实现两个部分来分析Hystrix。

1. Hystrix的宏观知识

首先是宏观理论了,先抛出两个问题。Hystrix的设计目的是什么?应该怎么完成这些目标?针对第一个问题,我们首先需要明确的是微服务架构应该是有一定的容错性的,而服务不可用的问题是客观存在的。而且这些服务错误常常会恶化和扩散,结果造成更严重的负面影响。所以在无法绝对保证服务可用性的前提下,我们需要一种机制来保护服务错误。

Hystrix的作用主要体现在一下几个方面,

  • 保护和控制底层服务的高延迟和失效对上层服务的影响。
  • 避免复杂分布式中服务失效的雪崩效应。在大型的分布式系统中,存在各种复杂的依赖关系。如果某个服务失效,很可能会对其他服务造成影响,形成连锁反应。
  • 快速失效和迅速恢复。以Spring为例,一般在实现controller的时候,都会以同步的逻辑调用依赖的服务。如果服务时效,而且没有客户端失效机制,就好导致请求长时间的阻塞。如果不能快速的发现失效,而就很难通过高可用机制或者负载均衡实现迅速的恢复。
  • 优雅地实现服务降级。个人理解,这一点是从用户体验来考虑的,一个预定义默认返回会比请求卡死或者500好很多。
  • 实现了服务监控、报警和运维控制。Hystrix Dashboard和Turbine可以配合Hystrix完成这些功能。

soa-2-640

Hystrix是怎么完成这些需求呢?个人觉得Hystrix的实现有以下几个关键点;

  • 为服务调用设置超时时间;这一点非常容易理解,超时返回可以实现服务快速失效。
  • 使用线程池来完成服务请求;我在之前的博客中也讲到过,线程池可以实现资源控制的作用。不同的依赖服务使用不同的线程池来完成请求,就可以实现服务依赖之间的隔离。这样就是Hystrix中比较常提起的“仓壁模式”。这种方式可以一定程度的抑制失效扩散。比如,服务A依赖于服务B和服务C,服务B宕机了,导致服务A中依赖于B的请求大量积压。如果没有隔离,依赖于服务C的请求的CPU时间将被严重压缩,甚至导致服务A中的依赖于B和C的请求都失败。
  • 基于对请求的统计,Hystrix实现了服务监控。并且在服务失效达到一定的比例之后,直接熔断服务。因为,即使有超时,仍然有可能形成请求积压。在这种极端情况下,熔断机制就能发挥作用了,它会让请求直接失败。
2. Hystrix的实现

Hystrix使用了“命令模式”,每一个依赖请求都是HystrixCommand或者HystrixObservableCommand。Hystrix的命令执行流程图如下:

hystrix-command-flow-chart

HystrixCommand和HystrixObservableCommand的区别主要体现在异步执行的返回值句柄不同。它们分别对应传统型的Future、和RxJava中的Observable。虽然看似用法略有不同,但是内部实现都是通过RxJava的形式完成的。HystrixCommand的简单使用方法如下:

  @HystrixCommand(fallbackMethod = "findByIdFallback")
  @GetMapping("/user/{id}")
  public User findById(@PathVariable Long id) {
    return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class);
  }

  User findByIdFallback(Long id, Throwable throwable) {
    LOGGER.error("进入回退方法,异常:", throwable);
    User user = new User();
    user.setId(-1L);
    user.setName("默认用户");
    return user;
  }

HystrixCommand这个注解相当于把findById这个函数包装成了一个HystrixCommand。对这个api端点的请求都会引发一个对应的HystrixCommand执行。HystrixCommand注解除了fallbackMethod,还有很多其他的参数,这些参数会用来构造HystrixCommand命令。

HystrixCommand命令是如何执行的呢?接下来,我们就通过对Hystrix源代码的分析来理解它!无论是HystrixCommand还是HystrixObservableCommand,都是AbstractCommand的子类。所以它包含了很多Hystrix命令执行的细节。以HystrixCommand类为例,它的执行其实是通过调用execute()方法完成的。这个方法的实现如下:

    public R execute() {
        try {
            return this.queue().get();
        } catch (Exception var2) {
            throw this.decomposeException(var2);
        }
    }

    public Future queue() {
        final Future delegate = this.toObservable().toBlocking().toFuture();
        Future f = new Future() {
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
        };
        if(f.isDone()) {
        /* */
        } else {
            return f;
        }
    }

可以看到execute()方法实际上是通过调用queue()方法实现的,而queue()方法的目的就是构造出一个Future f。这个f其实仅仅是封装了delegate这个Futrue。所以一番分析之后,我们知道了问题的关键所在,其实就是这行this.toObservable().toBlocking().toFuture()。那就从toObservable()开始分析!

    public Observable toObservable() {
        final Action0 terminateCommandCleanup = new Action0() {
        /* */
        };
        final Action0 unsubscribeCommandCleanup = new Action0() {
        /* */             
        };
        final Func0> applyHystrixSemantics = new Func0>() {
            public Observable call() {
                return AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
            }
        };
        final Func1 wrapWithAllOnNextHooks = new Func1() {
            /* */
        };
        final Action0 fireOnCompletedHook = new Action0() {
        /* */
        };
        return Observable.defer(new Func0>() {
            public Observable call() {
                if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) {
                } else {
                    Observable hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                    Observable afterCache;
                    if(requestCacheEnabled && cacheKey != null) {
            /* */
                        afterCache = toCache.toObservable();
                    } else {
                        afterCache = hystrixObservable;
                    }
                    return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
                }
            }
        });
    }

虽然这个方法的代码不少,但还是可以很快的定位到关键点——applyHystrixSemantics(); 这个函数完成了Hystrix的基本语义。首先是熔断器的语义,如果熔断器允许请求就去执行请求,反之则直接执行fallback函数。在正常的请求通路下,最终会调用executeCommandWithSpecifiedIsolation()方法来完成请求的执行。从这个函数名就可以看出,这个方法完成了Hystrix隔离的语义。由于这个方法代码比较长,就不贴完整代码了。但是方法中的一个subscribeOn()运算符非常显眼,如下所示:

subscribeOn(this.threadPool.getScheduler(new Func0() {
            public Boolean call() {
                return Boolean.valueOf(((Boolean)AbstractCommand.this.properties.executionIsolationThreadInterruptOnTimeout().get()).booleanValue() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT);
            }
        }));

这个threadPool就是构造HystrixCommand的时候设定的线程池,现在大家应该可以理解Hystrix如何实现隔离的。回到那行代码this.toObservable().toBlocking().toFuture()。toBlocking()仅仅是为了构造一个BlockingObservable,然后通过toFuture()构造出一个Future。其实构造这个future的过程也很简单,只是订阅上游的Observable,然后通知和更新Future,具体过程如下:

public final class BlockingOperatorToFuture {
    public static  Future toFuture(Observable that) {
        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference value = new AtomicReference();
        final AtomicReference error = new AtomicReference();
        final Subscription s = that.single().subscribe(new Subscriber() {
            public void onCompleted() {
                finished.countDown();
            }
            public void onError(Throwable e) {
                error.compareAndSet((Object)null, e);
                finished.countDown();
            }
            public void onNext(T v) {
                value.set(v);
            }
        });
        return new Future() {
            private volatile boolean cancelled;
            public boolean isDone() {
                return finished.getCount() == 0L;
            }
            public T get() throws InterruptedException, ExecutionException {
                finished.await();
                return this.getValue();
            }
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                if(finished.await(timeout, unit)) {
                    return this.getValue();
                } else {
                    throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
                }
            }
            private T getValue() throws ExecutionException {
                Throwable throwable = (Throwable)error.get();
                if(throwable != null) {
                    throw new ExecutionException("Observable onError", throwable);
                } else if(this.cancelled) {
                    throw new CancellationException("Subscription unsubscribed");
                } else {
                    return value.get();
                }
            }
        };
    }
}

这个返回的Future就是前面的delegate。讲到这里,Hystrix command的大致回路,算是跑马观花的看了一遍。具体的一些细节实现,这里就先略过了!

对于Hystrix的具体使用,可以参考这个https://github.com/eacdy/spring-cloud-study。我也有看这个demo项目学习Spring Cloud。

发表评论