Search code examples
ejbjmswildfly-10

How To send messages and notification To Particular User AND All Users Using JMS and MDB?


I developed a web application using Wildfly 10.0.0 final standalone deployment,EJB,Hibernate . I referred and created the MDB, MessageProducerBean (@RequestScoped) ,Queue and ConnectionFactory .

When I send message from MessageProducerBean I can receive it in MDB.

My problem is my application can access by many users. I need to send some messages to particular user (ex: User-A) and some messages to all users.

I read some article with that knowledge if we use Queue we can send to one client , and if we use Topic we can send to many clients , But I didn't get any clear idea about that. And My next problem is this messages work only if users in active.

I need to send message in all time whether they are in online or not. If they in offline after they log in the massages should show. Can anyone help on this ? I am going to summarize my problem below.

1) Need To send Messages and Notification by users.

2) Need to keep all messages even users in offline.

Here is my codes

standalone-full.xml

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
        <server name="default">
            <security-setting name="#">
                <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
            </security-setting>
            <address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
            <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
            <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
                <param name="batch-delay" value="50"/>
            </http-connector>
            <in-vm-connector name="in-vm" server-id="0"/>
            <http-acceptor name="http-acceptor" http-listener="default"/>
            <http-acceptor name="http-acceptor-throughput" http-listener="default">
                <param name="batch-delay" value="50"/>
                <param name="direct-deliver" value="false"/>
            </http-acceptor>
            <in-vm-acceptor name="in-vm" server-id="0"/>
            <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
            <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
            <jms-queue name="testQueue" entries="queue/testQueue java:jboss/exported/jms/queue/testQueue"/>
            <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
            <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
            <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
        </server>
    </subsystem>

MDB

@MessageDriven(mappedName = "queue/testQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "queue/testQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
public class MessageDrivenBean implements MessageListener {

private final static Logger LOGGER = Logger.getLogger(MessageDrivenBean.class.toString());

public MessageDrivenBean() {
    System.out.println("TextMDB.ctor, this=" + hashCode());
}

@Override
public void onMessage(Message rcvMessage) {
    TextMessage msg = null;
    try {
        if (rcvMessage instanceof TextMessage) {
            msg = (TextMessage) rcvMessage;
            LOGGER.log(Level.INFO, "Received Message from topic: {0}", msg.getText());
        } else {
            LOGGER.log(Level.WARNING, "Message of wrong type: {0}", rcvMessage.getClass().getName());
        }
    } catch (JMSException e) {
        throw new RuntimeException(e);
    }
}}

MessageProducerBean

@JMSDestinationDefinition(name = "queue/testQueue",
    interfaceName = "javax.jms.Queue",
    destinationName = "queue/testQueue")
@Named("messageProducerBean")
@RequestScopedpublic class MessageProducerBean {

@Inject
private JMSContext context;    
@Resource(mappedName = "queue/testQueue")
private Queue queue;
private final static Logger LOGGER = Logger.getLogger(MessageProducerBean.class.toString());
@Resource(mappedName = "ConnectionFactory")
private ConnectionFactory factory;

private String message;

public String getMessage() {
    return message;
}

public void setMessage(String message) {
    this.message = message;
}

public void sendMessage() {
    try {
        String text = "Message from producer: " + message;
        context.createProducer().send(queue, text);

        FacesMessage facesMessage
                = new FacesMessage("Sent message: " + text);
        FacesContext.getCurrentInstance().addMessage(null, facesMessage);
    } catch (Throwable t) {
        LOGGER.log(Level.SEVERE, "SenderBean.sendMessage: Exception: {0}", t.toString());

    }
} }

Thanks in advance!


Solution

  • Finally I found a way. I am posting here for it may help for someone.

    If I summarized what I did.

    First we need a application user in wildfly . For that go to {wildflyHome}/bin and run the ./add-user.sh .

    Give user type as application (option b). And provide a user name and password.

    For reference : link 1

    In above link take step 1 and step 2.

    Modify the standalone-full.xml as

    <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
            <server name="default">
                <security-setting name="#">
                    <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" delete-durable-queue="true" create-durable-queue="true" consume="true" send="true"/>
                </security-setting>
                <address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
                <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
                <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
                    <param name="batch-delay" value="50"/>
                </http-connector>
                <in-vm-connector name="in-vm" server-id="0"/>
                <http-acceptor name="http-acceptor" http-listener="default"/>
                <http-acceptor name="http-acceptor-throughput" http-listener="default">
                    <param name="batch-delay" value="50"/>
                    <param name="direct-deliver" value="false"/>
                </http-acceptor>
                <in-vm-acceptor name="in-vm" server-id="0"/>
                <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
                <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
                <jms-queue name="JMSQueue" entries="java:/jboss/exported/jms/queue/JMSQueue jms/queue/JMSQueue"/>
                <jms-topic name="JMSTopic" entries="jms/topic/JMSTopic java:jboss/exported/jms/topic/JMSTopic"/>
                <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
                <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory jms/RemoteConnectionFactory" connectors="http-connector"/>
                <connection-factory name="TopicConnectionFactory" entries="java:jboss/exported/jms/TopicConnectionFactory jms/TopicConnectionFactory" connectors="http-connector"/>
                <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
            </server>
        </subsystem>
    

    In this subsystem under security-setting we allowed delete and create durable queue.

    And my java class for sending and receiving message as follows ,

    First inject the resource as

    @Resource(mappedName = "jms/topic/EMSTopic")
    private Topic topic;
    
    @Resource(mappedName = "jms/TopicConnectionFactory")
    private TopicConnectionFactory connectionFactory;
    

    Message send method as follow ,

    public void send() throws JMSException {
        TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123");
        connection.setClientID("userA");
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer messageProducer = session.createProducer(topic);
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("Hi User");
        outMessage.setStringProperty("name", "usera");
        messageProducer.send(outMessage);
        System.out.println("message sent ");
        connection.close();
    }
    

    In this method I create the TopicConnection by providing my application user name and password.

    For the connection I set clientId and for message I set a stringProperty for when users receive message they need to receive their message only .

    My receive method as follow ,

    public void recieve() throws JMSException {
        System.out.println("message reday to recieve ");
    
        String selector = "name = 'usera'";
    
        try (TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123")) {
            connection.setClientID("userA");
            TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            TopicSubscriber subscriber = session.createDurableSubscriber(topic, "usera", selector, true);
            connection.start();
            TextMessage message = (TextMessage) subscriber.receive(2000);
            if (message != null) {
                System.out.println("printing ==> " + message.getText());
            } else {
                System.out.println("no messages");
            }
            connection.close();
        }
    }
    

    In this method I created a durableTopicConsumer for keep all message when user inactive too.

    I passed a parameter value as selector for every user getting their message only.

    If you need further knowledge on this refer link 2

    Hope this will help.