java - 底层 Http2(H2) 客户端和服务器实现
问题描述
我正在寻找使用 JETTY EMBEDDED(Jetty 10/Jetty 11) 的详细配置实现 HTTP/2(Client-Server) 的所有功能的示例示例,例如 STREAMS、FRAMES PUSH_PROMISE、HPACK?它应该能够在本地主机上运行,并且如果需要 ssl 证书以实现低级别 https2 详细方法。文档中的示例并不十分清楚,并分部分进行了解释。我和他们一起尝试了很多。希望对这个问题的成功回答将帮助许多想要使用嵌入式码头实现低级 h2 的业余爱好者。任何人都可以帮我参考吗?
解决方案
代码供他人参考。
H2Server.java
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import static java.lang.System.Logger.Level.INFO;
import java.io.OutputStream;
@SuppressWarnings("unused") public class H2Server {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
// Create a Server instance.
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
// server = new Server(serverExecutor);
Server server = new Server(serverExecutor);
// ServerSessionListener sessionListener = new ServerSessionListener.Adapter();
ServerSessionListener sessionListener = new ServerSessionListener.Adapter() {
@Override
public Map<Integer, Integer> onPreface(Session session) {
System.out.println("onPreface Called");
// Customize the settings, for example:
Map<Integer, Integer> settings = new HashMap<>();
// Tell the client that HTTP/2 push is disabled.
settings.put(SettingsFrame.ENABLE_PUSH, 0);
settings.put(SettingsFrame.ENABLE_CONNECT_PROTOCOL, 8);
return settings;
}
@Override
public void onAccept(Session session) {
System.out.println("onAccept Called");
InetSocketAddress remoteAddress = session.getRemoteAddress();
System.getLogger("http2").log(INFO, "Connection from {0}", remoteAddress);
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) {
System.out.println("onNewStream Called");
// This is the "new stream" event, so it's guaranteed to be a request.
MetaData.Request request = (MetaData.Request) frame.getMetaData();
if (frame.isEndStream()) {
respond(stream, request);
return null;
} else {
// Return a Stream.Listener to handle the request events,
// for example request content events or a request reset.
return new Stream.Listener.Adapter() {
@Override
public void onData(Stream stream, DataFrame frame, Callback callback) {
// Get the content buffer.
ByteBuffer buffer = frame.getData();
// Consume the buffer, here - as an example - just log it.
// System.getLogger("http2").log(INFO, "Consuming buffer {0}", buffer);
System.getLogger("http2").log(INFO, "Consuming buffer {0}", buffer);
System.out.println("Consuming buffer {0} " +StandardCharsets.UTF_8.decode(buffer).toString());
// Tell the implementation that the buffer has been consumed.
callback.succeeded();
// By returning from the method, implicitly tell the implementation
// to deliver to this method more DATA frames when they are available.
if (frame.isEndStream()) {
System.out.println("EndStream");
respond(stream, request);
}
}
};
}
}
private void respond(Stream stream, MetaData.Request request) {
// Prepare the response HEADERS frame.
System.out.println("respond Called");
// The response HTTP status and HTTP headers.
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200,
HttpFields.EMPTY);
if (HttpMethod.GET.is(request.getMethod())) {
// The response content.
ByteBuffer resourceBytes = getResourceBytes(request);
System.out.println("Request==GET resourceBytes== "+ StandardCharsets.UTF_8.decode(resourceBytes).toString());
// Send the HEADERS frame with the response status and headers,
// and a DATA frame with the response content bytes.
stream.headers(new HeadersFrame(stream.getId(), response, null, false))
.thenCompose(s -> s.data(new DataFrame(s.getId(), resourceBytes, true)));
} else {
// Send just the HEADERS frame with the response status and headers.
System.out.println("Request==POST response== "+ response);
String content1 = "{\"greet\": \"Welcome!!!\"}";
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(content1);
//stream.headers(new HeadersFrame(stream.getId(), response, null, true));
stream.headers(new HeadersFrame(stream.getId(), response, null, false))
.thenCompose(s -> s.data(new DataFrame(s.getId(), buffer1, true)));
}
}
private ByteBuffer getResourceBytes(MetaData.Request request)
{
return ByteBuffer.allocate(1024);
}
};
// HTTP Configuration
HttpConfiguration httpConfig = new HttpConfiguration();
// httpConfig.setSecureScheme("https");
httpConfig.setSecureScheme("https");
// httpConfig.setSecurePort(8443);
httpConfig.setSecurePort(8443);
httpConfig.setSendXPoweredBy(true);
httpConfig.setSendServerVersion(true);
// httpConfig.setRequestHeaderSize(16 * 1024);
// HTTPS Configuration
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
httpsConfig.addCustomizer(new SecureRequestCustomizer());
// Create a ServerConnector with RawHTTP2ServerConnectionFactory.
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(httpConfig, sessionListener);
// Configure RawHTTP2ServerConnectionFactory, for example:
// Configure the max number of concurrent requests.
http2.setMaxConcurrentStreams(128);
// Enable support for CONNECT.
http2.setConnectProtocolEnabled(true);
// Create the ServerConnector.
ServerConnector connector = new ServerConnector(server, http2);
// connector.setPort(8080);
connector.setPort(8443);
connector.setHost("localhost");
connector.setAcceptQueueSize(128);
// Add the Connector to the Server
server.addConnector(connector);
// Start the Server so it starts accepting connections from clients.
server.start();
// new H2Server().testNoPrefaceBytes();
}
}
H2Client.java
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
public class H2Client {
public static void main(String[] args) throws Exception {
HTTP2Client http2Client = new HTTP2Client();
http2Client.start();
ClientConnector connector = http2Client.getClientConnector();
// Address of the server's encrypted port.
SocketAddress serverAddress = new InetSocketAddress("localhost", 8443);
// SocketAddress serverAddress = new InetSocketAddress("http://www.google.com/", 8080);
// Address of the server's encrypted port.
// SocketAddress serverAddress = new InetSocketAddress("localhost", 8443);
// CompletableFuture<Session> sessionCF = http2Client.connect(connector.getSslContextFactory(), serverAddress, new Session.Listener.Adapter());
CompletableFuture<Session> sessionCF = http2Client.connect(serverAddress, new Session.Listener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
System.out.println("onPreface Called");
Map<Integer, Integer> configuration = new HashMap<>();
// Disable push from the server.
configuration.put(SettingsFrame.ENABLE_PUSH, 0);
// Override HTTP2Client.initialStreamRecvWindow for this session.
configuration.put(SettingsFrame.INITIAL_WINDOW_SIZE, 1024 * 1024);
return configuration;
}
});
Session session = sessionCF.get();
/*
// Configure the request headers.
HttpFields requestHeaders = HttpFields.build()
.put(HttpHeader.USER_AGENT, "Jetty HTTP2Client {version}");
// The request metadata with method, URI and headers.
MetaData.Request request = new MetaData.Request("GET", HttpURI.from("http://localhost:61432"), HttpVersion.HTTP_2, requestHeaders);
// MetaData.Request request = new MetaData.Request("GET", HttpURI.from("https://www.google.com/"), HttpVersion.HTTP_2, requestHeaders);
// The HTTP/2 HEADERS frame, with endStream=true
// to signal that this request has no content.
HeadersFrame headersFrame = new HeadersFrame(request, null, true);
// Open a Stream by sending the HEADERS frame.
session.newStream(headersFrame,new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
System.out.println("onHeaders Called");
MetaData metaData = frame.getMetaData();
// Is this HEADERS frame the response or the trailers?
if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
System.out.println( "Received response {0}== "+ response);
}
else
{
System.out.println("Received trailers {0}== " + metaData.getFields());
}
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
System.out.println("onData Called");
// Get the content buffer.
ByteBuffer buffer = frame.getData();
// Consume the buffer, here - as an example - just log it.
System.out.println("Consuming buffer {0}" +buffer);
// Tell the implementation that the buffer has been consumed.
callback.succeeded();
// By returning from the method, implicitly tell the implementation
// to deliver to this method more DATA frames when they are available.
}
});
*/
// Configure the request headers.
HttpFields requestHeaders = HttpFields.build()
.put(HttpHeader.CONTENT_TYPE, "application/json");
// The request metadata with method, URI and headers.
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("http://localhost:8443/"), HttpVersion.HTTP_2, requestHeaders);
// MetaData.Request request = new MetaData.Request("POST", HttpURI.from("0.0.0.0:63780"), HttpVersion.HTTP_2, requestHeaders);
// The HTTP/2 HEADERS frame, with endStream=false to
// signal that there will be more frames in this stream.
HeadersFrame headersFrame = new HeadersFrame(request, null, false);
// Open a Stream by sending the HEADERS frame.
// CompletableFuture<Stream> streamCF = session.newStream(headersFrame, new Stream.Listener.Adapter());
// Open a Stream by sending the HEADERS frame.
CompletableFuture<Stream> streamCF = session.newStream(headersFrame, new Stream.Listener.Adapter()
{
public void onHeaders(Stream stream, HeadersFrame frame)
{
System.out.println("onHeaders Called");
MetaData metaData = frame.getMetaData();
// Is this HEADERS frame the response or the trailers?
if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
System.out.println( "Received response {0}== "+ response);
}
else
{
System.out.println("Received trailers {0}== " + metaData.getFields());
}
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
System.out.println("onData Called");
// Get the content buffer.
ByteBuffer buffer = frame.getData();
// Consume the buffer, here - as an example - just log it.
System.out.println("Consuming buffer {0}" +StandardCharsets.UTF_8.decode(buffer).toString());
// Tell the implementation that the buffer has been consumed.
callback.succeeded();
// By returning from the method, implicitly tell the implementation
// to deliver to this method more DATA frames when they are available.
}
});
// Block to obtain the Stream.
// Alternatively you can use the CompletableFuture APIs to avoid blocking.
Stream stream = streamCF.get();
// The request content, in two chunks.
String content1 = "{\"greet\": \"hello world\"}";
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(content1);
String content2 = "{\"user\": \"jetty\"}";
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode(content2);
// Send the first DATA frame on the stream, with endStream=false
// to signal that there are more frames in this stream.
CompletableFuture<Stream> dataCF1 = stream.data(new DataFrame(stream.getId(), buffer1, false));
// Only when the first chunk has been sent we can send the second,
// with endStream=true to signal that there are no more frames.
dataCF1.thenCompose(s -> s.data(new DataFrame(s.getId(), buffer2, true)));
// end::newStreamWithData[]
System.out.println("EOF");
/*
*/
}
}
推荐阅读
- git - 在 git 存储库中的 .git 目录下提交和推送更改
- sql-server - TDE 与磁盘加密的建议
- php - 如何使用 vtwsclib 创建用户?
- python - 如何为一个表达式编写正则表达式?
- flutter - 将firebase添加到flutter时出现Gradle错误
- android - 应用默认值复制 SQLite 表
- r - 如何在构面中连接 ggplot2 的线图?
- android - 使用 AudioRecord API 的 android 录音机
- html - 引用文本的 Jsoup 选择
- java - 如何删除 ExecutorService 中所有正在运行和排队的任务并向其中添加新任务?