Search code examples
goactivemq-classicstomp

I am trying to connect to the darwin pushport service hosted by network rail using go-stom/stomp


I am trying to connect to the Darwin Push port feed located here. When I try to connect following the instructions from wiki I get 1 of 2 errors.

If I leave out the subscription options I get an authentication error, and if I include them it says I cannot create a durable subscription to a queue.

If someone could just tell me which error is closer to being connected that would be a great help.

package main

import (
    "fmt"

    "github.com/Showmax/go-fqdn"
    "github.com/go-stomp/stomp"
    "github.com/go-stomp/stomp/frame"
)

var serverAddr = "darwin-dist-44ae45.nationalrail.co.uk:61613"
var messageCount = 10
var topic = "topic/darwin.pushport-v16"
var username = "####"
var password = "###"
var stop = make(chan bool)

    fqdn, err := fqdn.FqdnHostname()
    if err != nil {
        panic(err)
    }
    var connOptions []func(*stomp.Conn) error = []func(*stomp.Conn) error{
        stomp.ConnOpt.Login(username, password),
        stomp.ConnOpt.Host(serverAddr),
        stomp.ConnOpt.Header("client-id", fmt.Sprintf("%v-%v", username, fqdn)),
    }
    conn, err := stomp.Dial("tcp", serverAddr, connOptions...)

    if err != nil {
        println("cannot connect to server", err.Error())
        return
    }
    subOptions := []func(*frame.Frame) error{
        stomp.SubscribeOpt.Header("activemq.subscriptionName", fqdn),
        stomp.SubscribeOpt.Header("durable-subscription-name", fqdn),
    }
    sub, err := conn.Subscribe(topic, stomp.AckClient, subOptions...)
    if err != nil {
        println("cannot subscribe to", topic, err.Error())
        return
    }
    

    for i := 1; i <= messageCount; i++ {
        msg := <-sub.C
        fmt.PrintLn(msg)
        }
    }
    println("receiver finished")

}

All I get is:

2020/09/04 16:55:35 received ERROR; Closing underlying connection
2020/09/04 16:55:35 Subscription 1: topic/darwin.pushport-v16: ERROR message:Invalid Subscription: cannot durably subscribe to a Queue destination!
Expected: Message #1
Actual: org.apache.activemq.transport.stomp.ProtocolException: Invalid Subscription: cannot durably subscribe to a Queue destination!
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:581)
        at ...

If I remove the subscription headers:

subOptions := []func(*frame.Frame) error{
        // stomp.SubscribeOpt.Header("activemq.subscriptionName", fqdn),
        // stomp.SubscribeOpt.Header("durable-subscription-name", fqdn),
    }

I get this:

2020/09/04 17:00:13 received ERROR; Closing underlying connection
2020/09/04 17:00:13 Subscription 1: topic/darwin.pushport-v16: ERROR message:User REDACTED is not authorized to read from: queue://topic/darwin.pushport-v16
Expected: Message #1
Actual: java.lang.SecurityException: User REDACTED is not authorized to read from: queue://topic/darwin.pushport-v16
        at org.apache.activemq.security.AuthorizationBroker.addConsumer(AuthorizationBroker.java:159)
        at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
        at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:104)
        at 

I have tried both of these approaches with and without the topic/ proceeding the topic.

Any help would be greatly appreciated.


Solution

  • The destination prefix in ActiveMQ for STOMP clients for Queues and Topics are /queue/ or /topic/ respectively and the default when there is no prefix or in your case an invalid one is to use a Queue.