i've a ServerSocket that listen to connections on a particular address and port and for each connection(client) make a thread in order to implement a stateful protocol,now each thread(client) messages process in it's own thread,here is the problem:
1.How can i send data from one thread(client socket) to all(simply broadcasting a message.),i see some sort of BlockingQueues between two threads,here i've one to many threads for broadcasting and also have a one to one model for private messages.
2.I see a Synchronized block solution for share a data or resource between threads but here in my code i have no idea where should i implement this and why?
Entry Point to the server,Where the ServerSocket Is initialized and Listen:
public class ServerStarter {
//All Users that creates from threads adding here.
public static Hashtable<String,User> users = new Hashtable<String,User>();
//All Channels add here.
public static Hashtable<String,Channel> channels = new Hashtable<String,Channel>();
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
public static void main(String args[]){
Config config = new Config();
ServerSocket Server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
Server = new ServerSocket(config.port, config.backlog,config.ServerIP);
} catch (IOException e) {
System.err.println(e);
}
while (true) {
Socket sock = null;
BufferedReader inFromClient = null;
try {
sock = Server.accept();
} catch (IOException e) {
if (Server != null && !Server.isClosed()) {
try {
Server.close();
} catch (IOException e1)
{
e1.printStackTrace(System.err);
}
}
System.err.println(e);
}
try {
inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
} catch (IOException e) {
System.err.println(e);
}
//each clients run on it's own thread!
new SocketThread(sock,inFromClient).start();
}
}
}
Where The Client Socket Threads Create is Here:
public class SocketThread extends Thread {
Socket csocket;
BufferedReader inFromClient;
public SocketThread(Socket csocket, BufferedReader inFromClient) {
this.csocket = csocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromclient = inFromClient.readLine();
//some primary informations sent to server for further processing.
RequestHandler Reqhandler = new RequestHandler(this.getId(), fromclient);
System.out.println("=======================================");
while (true) {
fromclient = inFromClient.readLine();
IRCParser parser = new IRCParser(fromclient);
//if the primary info's are OK and nothing causes to kill the thread the clients go to the infinite loop for processing messages.
Reqhandler.Commandhandler(parser.getCommand(), parser.getParameters());
}
} catch (IOException e) {
//kill current thread!
currentThread().interrupt();
return;
}
}
}
if the classes and other data is not sufficient,tell me to add more code and commenting,tnx
If I understand your requirement correctly, you just need something which passes the message from one client thread to other client threads. I think you should be able to use observer pattern here.
Something like this - (please note that i have removed all other things from your code which were not required to show the message broadcasting concept. You might need to change it back as per your requirement).
public class ServerStarter {
private static final ServerStarter singleton = new ServerStarter();
private volatile boolean shutdown;
// thread pool executor
private final ExecutorService executorService = Executors.newCachedThreadPool();
// observable to notify client threads
private final Observable observable = new Observable();
// fair lock (can use unfair lock if message broadcasting order is not important)
private final Lock fairLock = new ReentrantLock(true);
private ServerStarter() {
}
public static ServerStarter getInstance() {
return singleton;
}
public static void main(String args[]) {
ServerSocket server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
server = new ServerSocket();
while (!ServerStarter.getInstance().isShutdown()) {
Socket sock = server.accept();
BufferedReader inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
//each clients run on it's own thread!
SocketThread clientThread = new SocketThread(sock, inFromClient);
ServerStarter.getInstance().registerClientThread(clientThread);
ServerStarter.getInstance().startClientThread(clientThread);
}
} catch (IOException e) {
if (server != null) {
try {
server.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}
public void shutdown() {
shutdown = true;
}
public boolean isShutdown() {
return shutdown;
}
public void startClientThread(SocketThread clientThread) {
executorService.submit(clientThread);
}
private void registerClientThread(SocketThread clientThread) {
observable.addObserver(clientThread);
}
public void notifyAllClients(final Object message) {
fairLock.lock();
try {
executorService.submit(new MessageBroadcaster(message));
} finally {
fairLock.unlock();
}
}
public void unregisterClientThread(SocketThread clientThread) {
fairLock.lock();
try {
observable.deleteObserver(clientThread);
} finally {
fairLock.unlock();
}
}
private class MessageBroadcaster implements Runnable {
private final Object message;
public MessageBroadcaster(Object message) {
this.message = message;
}
@Override
public void run() {
fairLock.lock();
try {
observable.notifyObservers(message);
} finally {
fairLock.unlock();
}
}
}
}
class SocketThread implements Runnable, Observer {
Socket clientSocket;
BufferedReader inFromClient;
public SocketThread(Socket clientSocket, BufferedReader inFromClient) {
this.clientSocket = clientSocket;
this.inFromClient = inFromClient;
}
public void run() {
try {
String fromClient;
while (!ServerStarter.getInstance().isShutdown() && (fromClient = inFromClient.readLine()) != null) {
// TODO...prepare message to broadcast
Object message = new Object();
ServerStarter.getInstance().notifyAllClients(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ServerStarter.getInstance().unregisterClientThread(this);
}
}
@Override
public void update(Observable o, Object message) {
// TODO...handle the message
}
}
When client thread wants to notify other client threads then it asynchronously uses the observable to notify other threads. The observable will call update()
method of each client thread.