We are currently setting up a Springboot project using Apache Camel as the integration framework.
At the end of our route we need to send messages with the WSS websocket protocol, where our application acts as the client and needs to send the messages to a remote server which exposes a websocket endpoint.
But the remote server first requires an authentication during the WSS handshake. In fact it requires some specific HTTP headers including the Authorization: Bearer {jwt-bearer-token}
header. The token is only required during the handshake. After the websocket has been opened no further authentication is needed anymore.
Since it seems to be the only websocket client component for Apache Camel, we intended to use the AHC-WS component as shown in the POM
below:
...
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>3.0.0-M2</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ahc-ws</artifactId>
<version>3.0.0-M2</version>
</dependency>
...
In our route definition we then append the required headers to the message (e.g. the Authorization
header with the JWT bearer token). Additionally we log the token to see if it really gets added as a header of the message (which it is).
<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="distributorRoute">
<from uri="seda:distributorEntryPoint"/>
<setHeader headerName="Authorization">
<simple>Bearer {jwt-bearer-token}</simple>
</setHeader>
<log message="${header.Authorization}"/>
<to uri="ahc-wss://{remote-server-wss-endpoint}>
</route>
</routes>
Simply said, the messages cannot be sent via WSS and it seems like the connection is never established.
The application produces the following NPE exception:
2019-06-07 15:52:00.304 INFO 17164 --- [butorEntryPoint] o.a.camel.component.ahc.ws.WsEndpoint : Reconnecting websocket: wss://{remote-server-wss-endpoint}
...
java.lang.NullPointerException: null
at org.apache.camel.component.ahc.ws.WsProducer.sendMessage(WsProducer.java:76) ~[camel-ahc-ws-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.component.ahc.ws.WsProducer.process(WsProducer.java:51) ~[camel-ahc-ws-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:130) ~[camel-core-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.processor.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:482) ~[camel-core-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.support.ReactiveHelper$Worker.schedule(ReactiveHelper.java:130) [camel-support-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.support.ReactiveHelper.scheduleMain(ReactiveHelper.java:43) [camel-support-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:88) [camel-core-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:221) [camel-core-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:289) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:148) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
While debugging we found that the org.apache.camel.component.ahc.ws.WsProducer
tries to send the message with a org.apache.camel.component.ahc.ws.WebSocket
object which, in our case, is null
:
However the component seems to be plugged-in properly because sending to a non-secured WS endpoint for test purposes worked as expected. It then also only produces the "Reconnecting websocket" log once.
Did anyone already face a similar problem while authenticating during the WS handshake?
We already looked up the configuration possibilities of the AHC-WS component, but none of them seem to fit for this case.
Additionally, we need to generate and add the token dynamically, so hard-coding it wouldn't help.
The comment of @paizo led us to the idea to overwrite the prepareGet()
method in a custom DefaultAsyncHttpClient
implementation:
@Override
public BoundRequestBuilder prepareGet(String url) {
// OAuth token
String token = "Bearer my-jwt-token";
// Headers for the handshake request
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.set("Authorization", token);
// Prepare the websocket upgrade handshake
BoundRequestBuilder boundRequestBuilder = super.prepareGet(url);
boundRequestBuilder.setHeaders(httpHeaders);
return boundRequestBuilder;
}
This client must now be used by the AHC-WS components. In fact it gets utilized by the WsEndpoint
class during the connect()
method, in which getClient()
must return the custom client from above:
public void connect() throws Exception {
String uri = getHttpUri().toASCIIString();
log.debug("Connecting to {}", uri);
websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
.addWebSocketListener(listener).build()).get();
}
Unfortunately, the AHC-WS components are not available as Spring beans, but the custom client can be pointed at from within the application.yml
:
camel:
component:
ahc-ws:
client: myCustomAsyncHttpClientImpl
Like this, the websocket upgrade handshake can be customized.