I've written two junit methods to test my Jersey resource with Atmosphere & webSockets.
The problem is both when I call Suspend and Broacast only the method onOpen of my WebSocketTextListener is called. Neither OnError, OnMessage, OnClose are called :(
Any idea why the OnMessage method is not called?
Atmosphere Jersey resources:
@Path("/websocket")
@Suspend
@GET
@Produces({MediaType.APPLICATION_JSON})
public String suspend() {
return "";
}
@Path("/websocket")
@Broadcast(writeEntity = false)
@POST
@Produces({MediaType.APPLICATION_JSON})
public String broadcast(String message) {
return "BROADCASTTT";
}
TEST SUSPEND WEBSOCKET CALL:
@Test
public void testAddMealSubscriber() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
String restaurantId = "SalernoNapoliBarcelona";
String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";
String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";
AsyncHttpClient client = new AsyncHttpClient();
try {
final AtomicReference response = new AtomicReference(null);
WebSocket websocket = client.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketTextListener() {
@Override
public void onMessage(String message) {
System.out.println("WebSocketTextListener onMessage:" + message);
response.set(message);
latch.countDown();
}
@Override
public void onFragment(String fragment, boolean last) {
System.out.println("WebSocketTextListener onFragment:" + fragment);
}
@Override
public void onOpen(WebSocket websocket) {
System.out.println("WebSocketTextListener onOpen");
}
@Override
public void onClose(WebSocket websocket) {
System.out.println("WebSocketTextListener onClose");
latch.countDown();
}
@Override
public void onError(Throwable t) {
System.out.println("WebSocketTextListener onError");
t.printStackTrace();
}
}).build()).get();
try {
latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertNotNull(response.get());
assertEquals(response.get(), "echo");
} catch (Exception e) {
e.printStackTrace();
}
client.close();
}
TEST BROADCAST WEBSOCKET CALL:
@Test
public void testAddMealPublisher() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
String restaurantId = "SalernoNapoliBarcelona";
String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";
String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";
AsyncHttpClient c = new AsyncHttpClient();
try {
final AtomicReference response = new AtomicReference(null);
WebSocket websocket = c.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketTextListener() {
@Override
public void onMessage(String message) {
response.set(message);
latch.countDown();
}
@Override
public void onFragment(String fragment, boolean last) {
System.out.println("WebSocketTextListener onFragment:" + fragment);
}
@Override
public void onOpen(WebSocket websocket) {
System.out.println("WebSocketTextListener onOpen");
}
@Override
public void onClose(WebSocket websocket) {
System.out.println("WebSocketTextListener onClose");
latch.countDown();
}
@Override
public void onError(Throwable t) {
System.out.println("WebSocketTextListener onError");
t.printStackTrace();
}
}).build()).get().sendTextMessage("MESSSAGGGEEEE");
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertNotNull(response.get());
assertEquals(response.get(), "echo");
} catch (Exception e) {
e.printStackTrace();
}
c.close();
}
Jersey log when executing first SUSPEND CALL and then BRODCAST call:
Jul 20, 2012 1:54:10 PM com.sun.jersey.api.container.filter.LoggingFilter filter INFO: 1 * Server in-bound request 1 > GET http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/ 1 > Sec-WebSocket-Version: 13 1 > Upgrade: WebSocket 1 > Sec-WebSocket-Key: Wf7vyIGCD3Sa8StcdsGIkg== 1 > Host: localhost:8080 1 > Accept: */* 1 > User-Agent: NING/1.0 1 > Connection: Upgrade 1 > Origin: http://localhost:8080 1 > X-Atmosphere-Transport: websocket 1 > Jul 20, 2012 1:54:31 PM com.sun.jersey.api.container.filter.LoggingFilter filter INFO: 2 * Server in-bound request 2 > GET http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/ 2 > Sec-WebSocket-Version: 13 2 > Upgrade: WebSocket 2 > Sec-WebSocket-Key: RH/DbdkwQK1xBwhyhXLkAQ== 2 > Host: localhost:8080 2 > Accept: */* 2 > User-Agent: NING/1.0 2 > Connection: Upgrade 2 > Origin: http://localhost:8080 2 > X-Atmosphere-Transport: websocket 2 > Jul 20, 2012 1:54:34 PM com.sun.jersey.api.container.filter.LoggingFilter filter INFO: 3 * Server in-bound request 3 > POST http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/ 3 > X-Atmosphere-Transport: websocket 3 > X-Atmosphere-Transport: websocket 3 > Content-Type: application/json 3 > Jul 20, 2012 1:54:34 PM com.sun.jersey.api.container.filter.LoggingFilter$Adapter finish INFO: 3 * Server out-bound response 3
The problem is caused by the fact that two different AsyncHttpClient are used for the SUBSCRIBE and BROADCAST calls.
To solve this create JUST ONE AsyncHttpClient in the SetUp method and use it in both the tests methods:
private AsyncHttpClient client;
@Before
public void setUp() throws Exception {
client = new AsyncHttpClient();
}