Search code examples
liferayliferay-7liferay-dxp

How to configure Message Bus In Liferay 7?


I want to use Liferay Message bus in DXP. I have written the following code.

DemoSender.java

package demo.sender.portlet;

import demo.sender.constants.DemoSenderPortletKeys;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.portlet.bridges.mvc.MVCPortlet;

import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.Portlet;

import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

/**
 * @author parth.ghiya
 */
@Component(
    immediate = true,
    property = {
        "com.liferay.portlet.display-category=category.sample",
        "com.liferay.portlet.instanceable=true",
        "javax.portlet.display-name=demo-sender Portlet",
        "javax.portlet.init-param.template-path=/",
        "javax.portlet.init-param.view-template=/view.jsp",
        "javax.portlet.name=" + DemoSenderPortletKeys.DemoSender,
        "javax.portlet.resource-bundle=content.Language",
        "javax.portlet.security-role-ref=power-user,user"
    },
    service = Portlet.class
)
public class DemoSenderPortlet extends MVCPortlet {

    @Activate
    protected void activate(BundleContext bundleContext) {
        _bundleContext = bundleContext;

    }



    public void sendMessage(
            ActionRequest actionRequest, ActionResponse actionResponse) {
            if (_log.isInfoEnabled()) {
                _log.info("Sending message to DE Echo service");
            }
            Message message = new Message();
            message.setDestinationName("MyEchoDestination");
            message.setPayload("Hello World!");
            message.setResponseDestinationName("MyEchoResponse");

            _messageBus.sendMessage(message.getDestinationName(), message);

        }

        private static final Log _log = LogFactoryUtil.getLog(DemoSenderPortlet.class);

        private BundleContext _bundleContext;

        @Reference
        private MessageBus _messageBus;
}

DemoReceiver.java

package demo.receiver.portlet;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageListener;

@Component(
    immediate = true, property = {"destination.name=MyEchoDestination"},
    service = MessageListener.class
)
public class DemoReceiverPortlet extends BaseMessageListener {

    @Override
    protected void doReceive(Message message) throws Exception {
        if (_log.isInfoEnabled()) {
            _log.info("Received: " + message);
        }

        String payload = (String)message.getPayload();

        if (_log.isInfoEnabled()) {
            _log.info("Message payload: " + payload);
        }
/*
        String responseDestinationName = message.getResponseDestinationName();

        if ((responseDestinationName != null) &&
            (responseDestinationName.length() > 0)) {

            Message responseMessage = new Message();

            responseMessage.setDestinationName(responseDestinationName);
            responseMessage.setResponseId(message.getResponseId());

            //This is just for demo purposes

            responseMessage.setPayload(payload);

            _messageBus.sendMessage(
                message.getResponseDestinationName(), responseMessage);
        }
  */
    }

    private static final Log _log = LogFactoryUtil.getLog(DemoReceiverPortlet.class);

    @Reference
    private volatile MessageBus _messageBus;
}

The problem is that my doReceive method is never getting called. What configuration needs to be further added?

Regards

P.S : in DemoSender, i send some message on click of button

Edit # 1

I did added configurator code as follows.

package demo.receiver.portlet;

import java.util.Dictionary;

import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.concurrent.DiscardOldestPolicy;
import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.util.HashMapDictionary;

@Component(
        enabled = false, immediate = true,
        service = DemoReceiverConfigurator.class
    )

public class DemoReceiverConfigurator {

    @Activate
    protected void activate(ComponentContext componentContext) {
        _bundleContext = componentContext.getBundleContext();
        System.out.println("===demo===");
        Dictionary<String, Object> properties =
            componentContext.getProperties();



        DestinationConfiguration destinationConfiguration =
            new DestinationConfiguration(DestinationConfiguration.DESTINATION_TYPE_PARALLEL,"MyEchoDestination");

        destinationConfiguration.setMaximumQueueSize(200);

        RejectedExecutionHandler rejectedExecutionHandler =
            new DiscardOldestPolicy() {

                @Override
                public void rejectedExecution(
                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

                    if (_log.isWarnEnabled()) {
                        _log.warn(
                            "The current thread will handle the request " +
                                "because the audit router's task queue is at " +
                                    "its maximum capacity");
                    }

                    super.rejectedExecution(runnable, threadPoolExecutor);
                }

            };

        destinationConfiguration.setRejectedExecutionHandler(
            rejectedExecutionHandler);

        Destination destination = _destinationFactory.createDestination(
            destinationConfiguration);

        Dictionary<String, Object> destinationProperties =
            new HashMapDictionary<>();

        destinationProperties.put("destination.name", destination.getName());

        _destinationServiceRegistration = _bundleContext.registerService(
            Destination.class, destination, destinationProperties);
    }

    @Deactivate
    protected void deactivate() {
        if (_destinationServiceRegistration != null) {
            Destination destination = _bundleContext.getService(
                _destinationServiceRegistration.getReference());

            _destinationServiceRegistration.unregister();

            destination.destroy();
        }

        _bundleContext = null;
    }

    @Reference(unbind = "-")
    protected void setMessageBus(MessageBus messageBus) {
    }

    private static final Log _log = LogFactoryUtil.getLog(
        DemoReceiverConfigurator.class);

    private volatile BundleContext _bundleContext;

    @Reference
    private DestinationFactory _destinationFactory;

    private volatile ServiceRegistration<Destination>
        _destinationServiceRegistration;

}

But my Activate method aint getting called, i have enabledfalse in my message listener class and enabled = false, immediate = true in my Configurator class.

Dont know what i am missing.


Solution

  • Often in OSGi, this seemingly obvious configuration is enough. In this case though, it obviously isn't, because Liferay now knows about the message you're sending and that you're interested to receive, but the Messagebus doesn't know about this destination to be created.

    It seems obvious - if there is a listener to a particular message, there probably needs to be a destination. But what type will it be? Parallel processing? How many parallel handlers? Synchronous? Queued? This is what you'll need to do.

    While a quick search didn't find a documentation on how to do this, you can use this configurator as an example for creating the missing link.