首页 > 技术文章 > Spring中AsyncRestTemplate的应用

niugang0920 2020-04-15 08:44 原文

Spring中AsyncRestTemplate的应用

Web应用程序通常需要查询外部REST服务。 在为满足这些需求扩展应用程序时,HTTP和同步调用的本质会带来挑战:可能会阻塞多个线程,等待远程HTTP响应。

AsyncRestTemplate类,在开发REST客户端时允许非阻塞异步支持。

Spring的中心类,用于异步客户端HTTP访问。 公开与RestTemplate相似的方法,但返回ListenableFuture包装器,而不是具体的结果。AsyncRestTemplate通过getRestOperations()方法公开一个同步RestTemplate,并与该RestTemplate共享其错误处理程序和消息转换器。

注意:默认情况下,AsyncRestTemplate依靠标准JDK工具建立HTTP连接。 您可以通过使用接受AsyncClientHttpRequestFactory的构造函数来切换使用其他HTTP库,例如Apache HttpComponents,Netty和OkHttp。

默认的AsyncRestTemplate构造函数注册一个SimpleAsyncTaskExecutor来执行HTTP请求。 当处理大量短期请求时,像ThreadPoolTaskExecutor这样的线程池TaskExecutor实现可能是一个不错的选择。

即支持HTTPS调用也支持HTTP调用

import org.springframework.http.client.SimpleClientHttpRequestFactory;

import javax.net.ssl.*;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.Socket;
import java.security.cert.X509Certificate;

/**
 *
 */
public class HttpsClientRequestFactory extends SimpleClientHttpRequestFactory {

    @Override
    protected void prepareConnection(HttpURLConnection connection, String httpMethod) {
        try {
            if (!(connection instanceof HttpsURLConnection)) {
                //这样即支持http  也支持 https
                return;
             //   throw new RuntimeException("An instance of HttpsURLConnection is expected");
            }

            HttpsURLConnection httpsConnection = (HttpsURLConnection) connection;

            TrustManager[] trustAllCerts = new TrustManager[]{
                    new X509TrustManager() {
                        @Override
                        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                            return null;
                        }
                        @Override
                        public void checkClientTrusted(X509Certificate[] certs, String authType) {
                        }
                        @Override
                        public void checkServerTrusted(X509Certificate[] certs, String authType) {
                        }

                    }
            };
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
            httpsConnection.setSSLSocketFactory(new MyCustomSSLSocketFactory(sslContext.getSocketFactory()));

            httpsConnection.setHostnameVerifier(new HostnameVerifier() {
                @Override
                public boolean verify(String s, SSLSession sslSession) {
                    return true;
                }
            });

            super.prepareConnection(httpsConnection, httpMethod);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * We need to invoke sslSocket.setEnabledProtocols(new String[] {"SSLv3"});
     * see http://www.oracle.com/technetwork/java/javase/documentation/cve-2014-3566-2342133.html (Java 8 section)
     */
    // SSLSocketFactory用于创建 SSLSockets
    private static class MyCustomSSLSocketFactory extends SSLSocketFactory {

        private final SSLSocketFactory delegate;

        public MyCustomSSLSocketFactory(SSLSocketFactory delegate) {
            this.delegate = delegate;
        }

        // 返回默认启用的密码套件。除非一个列表启用,对SSL连接的握手会使用这些密码套件。
        // 这些默认的服务的最低质量要求保密保护和服务器身份验证
        @Override
        public String[] getDefaultCipherSuites() {
            return delegate.getDefaultCipherSuites();
        }

        // 返回的密码套件可用于SSL连接启用的名字
        @Override
        public String[] getSupportedCipherSuites() {
            return delegate.getSupportedCipherSuites();
        }


        @Override
        public Socket createSocket(final Socket socket, final String host, final int port,
                                   final boolean autoClose) throws IOException {
            final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
            return overrideProtocol(underlyingSocket);
        }


        @Override
        public Socket createSocket(final String host, final int port) throws IOException {
            final Socket underlyingSocket = delegate.createSocket(host, port);
            return overrideProtocol(underlyingSocket);
        }

        @Override
        public Socket createSocket(final String host, final int port, final InetAddress localAddress,
                                   final int localPort) throws
                IOException {
            final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
            return overrideProtocol(underlyingSocket);
        }

        @Override
        public Socket createSocket(final InetAddress host, final int port) throws IOException {
            final Socket underlyingSocket = delegate.createSocket(host, port);
            return overrideProtocol(underlyingSocket);
        }

        @Override
        public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress,
                                   final int localPort) throws
                IOException {
            final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
            return overrideProtocol(underlyingSocket);
        }

        private Socket overrideProtocol(final Socket socket) {
            if (!(socket instanceof SSLSocket)) {
                throw new RuntimeException("An instance of SSLSocket is expected");
            }
            ((SSLSocket) socket).setEnabledProtocols(new String[]{"TLSv1"});
            return socket;
        }
    }
}

RestTemplate

  RestTemplate restTemplate = new RestTemplate(new HttpsClientRequestFactory());
   //要使用Apache HttpComponents代替本机java.net功能,请按以下方式构造RestTemplate:
  RestTemplate template = new RestTemplate(new HttpComponentsClientHttpRequestFactory());

AsyncRestTemplate

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.AsyncRestTemplate;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author Created by niugang on 2020/4/10/16:47
 */
public class AsyncRestTemplateTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //即支持http  也 支持 https
        AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
        HttpsClientRequestFactory httpsClientRequestFactory = new HttpsClientRequestFactory();
        httpsClientRequestFactory.setTaskExecutor(new SimpleAsyncTaskExecutor());
        asyncRestTemplate.setAsyncRequestFactory(httpsClientRequestFactory);

        // async call
        Future<ResponseEntity<String>> futureEntity = asyncRestTemplate.getForEntity(
                "https://11.12.115.104/api/serverStatus/serverInfo", String.class);
        ResponseEntity<String> stringResponseEntity = futureEntity.get();
        System.out.println(stringResponseEntity.getBody());


        ListenableFuture<ResponseEntity<String>> futureEntity1 = asyncRestTemplate.getForEntity(
                "http://localhost:8088/boot/dateTest", String.class);

        // register a callback
        futureEntity1.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onSuccess(ResponseEntity<String> entity) {
                System.out.println("success:"+entity.getBody());
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("error");
            }
        });

        System.out.println("主线程");
    }
}

要使用Apache HttpComponents代替本机java.net功能

     <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.10</version>
        </dependency>
import org.apache.http.Consts;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.conn.SystemDefaultDnsResolver;
import org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.conn.ManagedNHttpClientConnection;
import org.apache.http.nio.conn.NHttpConnectionFactory;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.client.AsyncRestTemplate;

import java.nio.charset.CodingErrorAction;
import java.util.concurrent.ExecutionException;
import org.apache.http.conn.ssl.TrustStrategy;

import org.apache.http.impl.nio.codecs.DefaultHttpResponseParserFactory;


import org.apache.http.ssl.SSLContextBuilder;

import javax.net.ssl.SSLContext;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.Future;


/**
 * @author Created by niugang on 2020/4/10/16:47
 */
public class AsyncRestTemplateTest2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
        //即支持http  也 支持 https
        AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();

        // HTTPConnection工厂 :配置请求/解析响应
        NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory =
                new ManagedNHttpClientConnectionFactory(
                DefaultHttpRequestWriterFactory.INSTANCE,
                        DefaultHttpResponseParserFactory.INSTANCE, HeapByteBufferAllocator.INSTANCE);


        //ssl 连接设置 无须证书也能访问 https
        //使用 loadTrustMaterial() 方法实现一个信任策略,信任所有证书
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
            // 信任所有
            @Override
            public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                return true;
            }
        }).build();

        // 为支持的协议方案创建自定义连接套接字工厂的注册表。
        Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
                .register("http", NoopIOSessionStrategy.INSTANCE)
                .register("https", new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))
                .build();

        //DNS解析器
        DnsResolver dnsResolver = SystemDefaultDnsResolver.INSTANCE;

        // Create I/O reactor configuration
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
                .setIoThreadCount(Runtime.getRuntime().availableProcessors())
                .setConnectTimeout(30000)
                .setSoTimeout(30000)
                .build();

        // 创建一个定制的I/O reactort
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);

        // 使用自定义配置创建连接管理器。
        PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(
                ioReactor, connFactory, sessionStrategyRegistry, dnsResolver);

        //创建连接配置
        ConnectionConfig connectionConfig = ConnectionConfig.custom()
                .setMalformedInputAction(CodingErrorAction.IGNORE)
                .setUnmappableInputAction(CodingErrorAction.IGNORE)
                .setCharset(Consts.UTF_8)
                .build();
        // 将连接管理器配置为默认使用或针对特定主机使用连接配置。
        connManager.setDefaultConnectionConfig(connectionConfig);

        // 配置永久连接的最大总数或每个路由限制
        // 可以保留在池中或由连接管理器租用。
        //每个路由的默认最大连接,每个路由实际最大连接为默认为DefaultMaxPreRoute控制,而MaxTotal是控制整个池子最大数
        connManager.setMaxTotal(100);
        connManager.setDefaultMaxPerRoute(10);


        // 创建全局请求配置
        RequestConfig defaultRequestConfig = RequestConfig.custom()
                .setCookieSpec(CookieSpecs.DEFAULT)
                .setSocketTimeout(5 * 1000)
                .setConnectTimeout(5 * 1000)
                .setExpectContinueEnabled(true)
                .build();

        // Create an HttpClientUtils with the given custom dependencies and configuration.
        HttpAsyncClient httpclient = HttpAsyncClients.custom()
                .setConnectionManager(connManager)
                .setDefaultRequestConfig(defaultRequestConfig)
                .build();



        asyncRestTemplate.setAsyncRequestFactory(new HttpComponentsAsyncClientHttpRequestFactory(httpclient));

        // async call
        Future<ResponseEntity<String>> futureEntity = asyncRestTemplate.getForEntity(
                "https://11.12.115.104/api/serverStatus/serverInfo", String.class);
        ResponseEntity<String> stringResponseEntity = futureEntity.get();
        System.out.println(stringResponseEntity.getBody());


        ListenableFuture<ResponseEntity<String>> futureEntity1 = asyncRestTemplate.getForEntity(
                "http://localhost:8088/boot/dateTest", String.class);

        // register a callback
        futureEntity1.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onSuccess(ResponseEntity<String> entity) {
                System.out.println("success:"+entity.getBody());
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("error");
            }
        });

        System.out.println("主线程");
    }
}

在这里插入图片描述

推荐阅读