• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

Spring中异步调用描述

程序人生 supingemail 2208次浏览 0个评论

.

目录

 

一、@EnabelAsync与@Asyn注解的使用

二、自动配置@EnableAsync的代理选择

三、ProxyAsyncConfiguration的自动配置

四、AsyncAnnotationBeanPostProcessor 初始化

五、@Asyn注解实现异步的过程

六、使用异常

一、@EnabelAsync与@Asyn注解的使用

1.@EnabelAsync注解的使用。如不指定自定义异步线程池直接使用@EnableAsync即可使用,若自定义线程池可以使用下面的方法进行自定义,这种方法我认为可读性比较好,当然你可以利用@Bean(“taskExecutor”)申明一个Executor类型的实例,至于为什么可以生效后面的文章会进行介绍。

 

@Configuration
@EnableAsync
public class SpringbootAsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(1);
        threadPool.setMaxPoolSize(1);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;
    }

}

2.@Asyn注解的使用。在下方代码中在AsynMethodInvocation类中对使用@Asyn注解的异步方法进行了调用,再使用上其实很简单,只要在想要实现异步的方法上使用@Asyn注解即可,值得注意的是调用方法与目标方法不能在一个类中,在一个类中异步不会生效,这与代理模式有一定的关系。

 

@RestController
public class AsynMethodInvocation {

    @Autowired
    AsynServer server;

    @RequestMapping(value = "/test")
    public void test() {
        System.out.println("调用线程:"+Thread.currentThread().getName());
        server.test();
    }
}


@Component
public class AsynServer {

    @Async
    public void test() {
        System.out.println("异步线程:" + Thread.currentThread().getName());
    }
}

3.异步方法调用的验证。我们可以看到上面的代码执行结果如下,异步方法在一个新的线程中完成。

 

调用线程:http-nio-8080-exec-2
2019-05-15 17:19:51.120  INFO 23368 --- [nio-8080-exec-2] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService
异步线程:ThreadPoolTaskExecutor-1

二、自动配置@EnableAsync的代理选择

Spring中异步调用描述

@EnableAsync自动配置类关系图

1.从@EnableAsync注解入手,可以的得到@EnableAsync自动配置类之间的关系图,在@EnableAsync中除了一些属性以外(属性通过看源码的注释应该很清楚)还有一个注解@Import(AsyncConfigurationSelector.class),项目启动时通过读取注解可以将该类引入,AsyncConfigurationSelector主要是用来选择代理方式是由JDK还是AspectJ实现代理,默认使用JDK的代理。

 


@Import(AsyncConfigurationSelector.class)

public @interface EnableAsync {

Classannotation()default Annotation.class;

//当AdviceModemode为PROXY时,选择代理是基于接口实现还是cglib实现

boolean proxyTargetClass()default false;

//代理方式是由JDK实现还是AspectJ实现

AdviceModemode()default AdviceMode.PROXY;

int order()default Ordered.LOWEST_PRECEDENCE;

}

2.从关系图可以看出通过抽象类与实现类的方式(敲黑板:Java基础知识不经常使用容易忘)实现了selectImports方法,决定是返回ProxyAsyncConfiguration还是AspectJAsyncConfiguration。

 

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
    @Override
    public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
        Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
        Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
        //获得注解类读取其中的配置,就是上图代码中的注解的属性
        AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
        if (attributes == null) {
            throw new IllegalArgumentException(String.format(
                    "@%s is not present on importing class '%s' as expected",
                    annType.getSimpleName(), importingClassMetadata.getClassName()));
        }

        AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
        //该方法由AsyncConfigurationSelector类实现,参数为注解的AdviceModemode属性
        String[] imports = selectImports(adviceMode);
        if (imports == null) {
            throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
        }
        return imports;
    }
}

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }
}

3.截止到现在即可实现ProxyAsyncConfiguration配置类的引入,这项工作其实是在org.springframework.context.annotation.ConfigurationClassParser#processImports中完成,从而实现ProxyAsyncConfiguration的自动配置

三、ProxyAsyncConfiguration的自动配置

在注解中定义了两种代理的方式后,有不同的自动配置类进行加载,我们这里只对JDK的代理方式进行说明,主要是理解一下整个原理与流程,有兴趣的同学也可以去了解AspectJ的具体做法,下面我们来看一下ProxyAsyncConfiguration。

 

Spring中异步调用描述

ProxyAsyncConfiguration自动配置类关系图

 

1.ProxyAsyncConfiguration 类主要就是通过自动配置,读取注解中的相关属性,从而实例化AsyncAnnotationBeanPostProcessor,代码中的注释设置bbp对象的过程(敲黑板:Spring代码的名字其实很有讲究,通过名字就可以大概知道对象的作用,如看到BeanPostProcessor大家应该就会想到Spring中的Bean生命周期中的后置处理器,如不熟悉请自行补课,其实我也有似乎有一些忘记了)。

 

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        //整个方法就是在Spring上下文中实例化一个AsyncAnnotationBeanPostProcessor,这个Bean主要是实现方法或者类上使用@Async注解从而达到异步的效果
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        //设置执行器与异常执行器
        bpp.configure(this.executor, this.exceptionHandler);
        //注解annotation是否指定了自定义的注解,如果没有指定默认@Async和 @javax.ejb.Asynchronous注解生效,若自定义需要加入到bbp中
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        //注解ProxyTargetClass、setOrder属性的设置  
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }
}

2.在ProxyAsyncConfiguration中我们可以看到它继承了抽象类AbstractAsyncConfiguration,它是一个基础的配置类提供了异步的一些公共功能,可以通过实现AsyncConfigurer接口或者继承AsyncConfigurerSupport类来实现自定义异步线程池执行器与异常执行器,如果自定义了则会设置到bpp对象中,若没有自定义Spring会找实现TaskExecutor接口或bean的名字为taskExecutor的执行器。

 

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {
    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        //获得@EnableAsync注解及其属性
        this.enableAsync = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
        if (this.enableAsync == null) {
            throw new IllegalArgumentException(
                    "@EnableAsync is not present on importing class " + importMetadata.getClassName());
        }
    }
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
        AsyncConfigurer configurer = configurers.iterator().next();
        //设置自定义执行器与异常执行器,若没有自定义则不会执行setConfigurers方法
        this.executor = configurer::getAsyncExecutor;
        this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
    }
}

四、AsyncAnnotationBeanPostProcessor 初始化

在第二节我们主要讲解了如何初始化AsyncAnnotationBeanPostProcessor对象,但是大家可能对这个对象是干什么的在什么时候使用存在一定的疑虑,这节我们就重点来说一下Spring是如何生成代理类,首先我们看一下AsyncAnnotationBeanPostProcessor类的关系图,看起来的确很可怕(从前的我看到这种图几乎也会放弃查看源码的奢望),但是我们我们不用过多的担心,主要看下一下蓝色箭头的类。

 

Spring中异步调用描述

AsyncAnnotationBeanPostProcessor类关系图

 

1.,由于AsyncAnnotationBeanPostProcessor类在父类AbstractBeanFactoryAwareAdvisingPostProcessor中实现了BeanFactoryAware接口,所以在初始化后,会执行setBeanFactory方法,在该方法中,实例化了AsyncAnnotationAdvisor。

 

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        //实例化AsyncAnnotationAdvisor对象,初始化异步切面,该对象通过注解实现AOP的通知,使其在使用注解时能够触发异步方法的执行
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }
}

2.AbstractAdvisingBeanPostProcessor类中实现了BeanPostProcessor方法,在AsyncAnnotationBeanPostProcessor类初始化完后,会执行postProcessAfterInitialization方法,在该方法中会生成目标类的代理类,从而实现在调用该方法时能后达到异步的效果。

 

public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
    private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);

    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (this.advisor == null || bean instanceof AopInfrastructureBean) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }
        //第一次进入到这个方法时bean是注解了异步的那个原始类,因为那时候还没有生成代理类bean instanceof Advised为false,所以该端代码不会执行。还有可能进入到这个方法是生成代理类后实例化CglibAopProxy时会进入到这个类,bean为该类的代理类,这不做过多的介绍,感兴趣的可以去查看一下Spring Aop的源码部分
        if (bean instanceof Advised) {
            Advised advised = (Advised) bean;
            if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
                // Add our local Advisor to the existing proxy's Advisor chain...
                if (this.beforeExistingAdvisors) {
                    advised.addAdvisor(0, this.advisor);
                }
                else {
                    advised.addAdvisor(this.advisor);
                }
                return bean;
            }
        }
        //由于在初始化AsyncAnnotationBeanPostProcessor,该方法并没有调用本类中的,而是调用了下方类的isEligible方法,主要判断改类是否为原始类,而不是该类的代理类,如果是原始类则进入方法。
        if (isEligible(bean, beanName)) {
            //创建目标类的代理工厂,在AbstractBeanFactoryAwareAdvisingPostProcessor中重写了该方法
            ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
            //判断代理使用的是JDK代理还是使用的CGLIB代理,默认false,使用JDK创建代理
            if (!proxyFactory.isProxyTargetClass()) {
                //检查bean是接口还是类,如果是接口默认使用JDK创建代理,如果为类则修改为使用CGLIB来创建代理
                evaluateProxyInterfaces(bean.getClass(), proxyFactory);
            }
            //代理工厂应用自定义的Advisor(AsynAnnotationAdvisor)
            proxyFactory.addAdvisor(this.advisor);
            customizeProxyFactory(proxyFactory);
            //返回该类的代理类
            return proxyFactory.getProxy(getProxyClassLoader());
        }
        // No proxy needed.
        return bean;
    }
}

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor implements BeanFactoryAware {
@Override
    protected boolean isEligible(Object bean, String beanName) {
        return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
                super.isEligible(bean, beanName));
    }
}

3.在这里简单的介绍一下是proxyFactory是如何生成代理的,创建Aop代理的过程是在DefaultAopProxyFactory中完成,根据Config不同的属性条件,来决定不同类型的AOP代理(JDK动态代理、CGLIB代理),然后根据创建的代理getProxy方法生成代理类,如CglibAopProxy#getProxy。

 

public class DefaultAopProxyFactory implements AopProxyFactory, Serializable {
    @Override
    public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
        if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
            Class<?> targetClass = config.getTargetClass();
            if (targetClass == null) {
                throw new AopConfigException("TargetSource cannot determine target class: " +
                        "Either an interface or a target is required for proxy creation.");
            }
            if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
                return new JdkDynamicAopProxy(config);
            }
            return new ObjenesisCglibAopProxy(config);
        }
        else {
            return new JdkDynamicAopProxy(config);
        }
    }
}

总结:到这里我们应该对@EnableAsync自动配置有一个比较清晰的认识,回顾一下这部分的内容主要从两方面来入手。首先,@EnableAsync注解–>配置选择–>ProxyAsyncConfiguration自动配置–>初始化AsyncAnnotationBeanPostProcessor对象。其次,AsyncAnnotationBeanPostProcessor自定义Advisor–>AbstractAdvisingBeanPostProcessor实现代理类的生成。

五、@Asyn注解实现异步的过程

在之前的部分我们一笔带过了AsyncAnnotationAdvisor对象的初始化是因为这部分与Spring Aop有很大关系,所以这没有做重点的介绍(其实这部分我也没有去关注),但是它的类的关系图中我们可以注意到AsyncAnnotationAdvisor类中有一个buildAdvice方法,生成了AnnotationAsyncExecutionInterceptor对象,它的父类AsyncExecutionInterceptor重写了AsyncExecutionInterceptor接口的invoke方法,通过委托实现@Async异步方法的调用。

 

Spring中异步调用描述

AsyncAnnotationAdvisor类关系图

当我们使用@Async注解在方法或者类上面时,在进行方法调用时,代理类会进行拦截,如CglibAopProxy.DynamicAdvisedInterceptor的intercept方法,在这里我们不会仔细的去分析这个方法的源码,大家只要知道在该方法中retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed()这行代码的proceed方法时会被
AsyncExecutionInterceptor拦截器进行拦截,在该拦截器中完成了异步方法的调用。

 

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable {
        //该方法的参数为异步要调用的方法
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        //获得方法的详细信息,如参数、返回值等
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        //确定异步执行器,在spring上下文中找到实现TaskExecutor的实例
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
        //建立一个线程,用于执行目标异步方法,该段代码只是线程的生命,而没有直接的调用
        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };
        //结合目标方法、异步执行器对目标方法进行调用
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

在上述的代码中determineAsyncExecutor方法用于确定异步执行器,在执行defaultExecutor.get()方法时,最终会执行AsyncExecutionAspectSupport的getDefaultExecutor方法,该方法会查找实现TaskExecutor接口的实例作为执行器,在该文中没有特别实现TaskExecutor接口,故默认的执行器为ThreadPoolTaskExecutor。

 

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        //executors的类型为Map<Method, AsyncTaskExecutor>,用于缓存方法的执行器,第一次加载时executor为null,调用过一次后对应关系缓存在Map中
        AsyncTaskExecutor executor = this.executors.get(method);
        if (executor == null) {
            Executor targetExecutor;
            String qualifier = getExecutorQualifier(method);
            if (StringUtils.hasLength(qualifier)) {
                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
            }
            else {
                //指定执行器
                targetExecutor = this.defaultExecutor.get();
            }
            if (targetExecutor == null) {
                return null;
            }
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            this.executors.put(method, executor);
        }
        return executor;
    }

在invoke方法的最后一行是submit方法,最后的执行要落在线程池的执行,spring线程池的实现这里不做过多的介绍,大家只要知道在这里利用其他线程实现了异步方法的调用即可。

 

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#submit
public <T> Future<T> submit(Callable<T> task) {
        ExecutorService executor = getThreadPoolExecutor();
        try {
            return executor.submit(task);
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
        }
    }

在了解了@EnableAsync注解,这一部分我们重点对@Asyn注解在调用时是如何实现异步的进行了分析,主要是两个方面。一是在方法调用是对其进行拦截,然后指定方法执行的执行器;二是在方法的调用过程中应用了线程池,由线程池执行task从而达到异步的作用。

从官方说明上可以看出返回值类型只支持4种类型:

  • void

  • Future

  • ListenableFuture

  • CompletableFuture

在来看看具体的源码

Spring中异步调用描述

六、使用异常

springcloud项目中做了个异步发短信的功能,需要从业务系统A 中,给用户发短信,用户信息在用户中心,但是不能串行执行。所以就使用了spring的异步任务调用方式。

使用方式如上“一”中所示的那样,唯一不同的是:需要业务系统A中,使用 @FeignClient 申明调用方法,在 @Async 修饰的方法中,用 @FeignClient  的实例 userClient 去用户中心的查询接口获取用户的手机号,但是结果却报错:401 认证异常 

但是在业务系统A中,非 @Async 修饰的方法却可以正常调用,不会报错 401 ,究其原因:
A.@FeignClient 的申明是在springboot应用初始化的时候,就被加载到了 BeanFactory 中去的;

B.@EnableAsync 修饰的异步任务,是启用之后通过 AsyncAnnotationBeanPostProcessor 进行的实例化和获得异步任务对象;

C.执行异步任务的方法是放在@Component修饰的组件中的,它随 springboot应用一起进行的初始化,但是 @Async 的却为被初始化;

综上所述:要在 @Async 异步调用的任务中,使用初始化实例的 userClient 就不能获得授权的信息,所以就会报错401;

解决方案 : A ; B 

A. 使用 userClient 调用的操作放在  @Async 修饰的方法前执行,即把 userClient 调用的结果传入到  @Async 修饰的方法里,亲测是可以使用的。

B.按照 401 的提示,给 springcloud 的 auth 模块配置访问限制,放开 userClient 调用的方法,这种改动比较大,不推荐。

其他参考:

https://xie.infoq.cn/article/2921fdbae94fee573e8cf9b91

https://juejin.im/post/6858854987280809997

https://cloud.tencent.com/developer/article/1497604

 


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明Spring中异步调用描述
喜欢 (0)

您必须 登录 才能发表评论!

加载中……