Search code examples
goamqpazure-eventhubqpid

Connecting to the AMQP 1.0 Azure EventHub with QPid and the golang wrapper Electron


I would like to use the Electron golang wrapper for the Qpid proton-c library to connect to the Azure EventHub.

I am setting the following SASL details combined to the host/port/namespace/path required to build the connection string but for some reason I keep getting the error message: connection reset by peer.

package main

import (
    "fmt"
    "os"
    "strings"
    "qpid.apache.org/amqp"
    "qpid.apache.org/electron"
)

var (
    eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
    eventHubName = "<MY_CUSTOM_NAME>"
    eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
    eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)

func main() {

    sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
    container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))

    urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
    fmt.Printf("The URL connection string: '%v'\n", urlStr)

    // parse URL
    url, err := amqp.ParseURL(urlStr)
    if err != nil {
        panic(err)
    }
    fmt.Printf("The AMQP parsed URL: %v\n", url)

    // TCP dial
    amqpHost := url.Host
    fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
    c, err := container.Dial(
        "tcp", amqpHost, 
        electron.SASLEnable(), 
        electron.Password([]byte(eventHubSasKey)), 
        electron.User(eventHubSasKeyName),
    )
    if err != nil {
        panic(err)
    }
    defer c.Close(nil)

    // AMQP send
    addr := strings.TrimPrefix(url.Path, "/")
    s, err := c.Sender(electron.Target(addr))
    if err != nil {
        panic(err)
    }
    m := amqp.NewMessage()
    body := fmt.Sprintf("bla bla bla %v", 42)
    m.Marshal(body)
    fmt.Printf("The AMQP message body: '%v'\n", m.Body())

    go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan

    // AMQP ACK receive
    fmt.Printf("Waiting for ACKs...\n")
    for {
        fmt.Printf("Waiting for an ACK coming out of the channel...\n")
        out := <-sentChan // Outcome of async sends.
        fmt.Printf("Received something: '%v'\n", out)
    }   
}

When compiling, then running the code, this is the output:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

To me that received message saying connection reset by peer does not look like a valid ACK and I am not sure what is wrong with the connection attempt?

  • The compiled version of proton-c is 0.18.0, I am using go1.7.4 linux/amd64.
  • If I add electron.SASLAllowedMechs("EXTERNAL") to the connection options then I get the same error message.
  • If I change the port to 5672, then I get a connection refused panic error after the attempt at dialing via TCP.
  • If I decode the base64 password field with base64.StdEncoding.DecodeString(eventHubSasKey) and pass the bytes to the connection options I keep getting the same error connection reset by peer.
  • If I add this connection option electron.SASLAllowedMechs("ANONYMOUS"), then I still get the same error message connection reset by peer. The reason for doing this is that I am not using any SSL certificate, and the Java wrapper to AMQP that Microsoft provides seems to use this "anonymous" thing instead of the certificate (in fact no certificate is needed to connect to the EventHub using the Java connector).

I am not sure how to proceed here as I am stuck in the connection part and I believe the SASL details are passed in the correct way according to the docs here: https://godoc.org/qpid.apache.org/electron#ConnectionOption

I am still not sure the reason of the failure is not due to SSL certificates, if that's the case I am struggling to see how to include them in the process.

Edit:

I later found out I had to establish a TLS connection over TCP even if I am not providing any private/public pair of keys, also specifying a "virtual host" (otherwise AMQP was complaining about not recognising the host):

    // TLS connection details
    tlsConfig := &tls.Config{}
    eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
    tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
    if err != nil {
        panic(err)
    }

    // AMPQ container connection on top of TLS via TCP
    eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
    amqpConn, err := container.Connection(
        tlsConn, 
        electron.SASLEnable(),
        electron.User(eventHubSasKeyName), 
        electron.Password([]byte(eventHubSasKey)),
        electron.VirtualHost(eventHubDomain),
        // electron.SASLAllowedMechs(<SOME_MECHANISM>),
    )
    if err != nil {
        panic(err)
    }
    defer amqpConn.Close(nil)

    // AMQP sender (a AMQP link with target the name defined on the Azure portal)
    s, err := amqpConn.Sender(electron.Target(eventHubName))
    if err != nil {
        panic(err)
    }

However when running the app with the environment variable PN_TRACE_FRM=true (which is giving me some verbose logging at the proton-c level) now the error is:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

This afaik means the SASL details (username/password) must be of type "sender" because I am trying to send something to the Event Hub. I double checked those details on the Azure portal (click on "Shared access policies" > then using the policy with "claim" specified as "Send") and they are correct. So I am not sure why I am getting this error.

I actually tried these SASL policies defined on the Azure portal at different levels, both <MY_CUSTOM_NAMESPACE> and <MY_CUSTOM_NAME>, but always the same error message.

I also tried including various SASL mechanisms e.g. when using electron.SASLAllowedMechs("PLAIN") then I get this error: no mechanism available: No worthy mechs found (Authentication failed [mech=none]).


Solution

  • I managed to establish a connection using the "Claims-based authorization" (CBS) on top of AMQP. It seems something specific to Microsoft. Some details can be found at the bottom of this page: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide

    Basically this is the list of steps:

    • TLS connection with electron.VirtualHost(eventHubDomain) and the ANONYMOUS SASL mechanism electron.SASLAllowedMechs("ANONYMOUS") (no need to specify SASL username and password). Check the details in the Edit part of my question above here ^.
    • AMQP link for the special $cbs Event Hub name: cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
    • Prepare an AMQP message with the Microsoft requirements for the CBS handshake:

    The message properties (check this C# code to compare https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):

    appProps := make(map[string]interface{})
    appProps["operation"] = "put-token"
    appProps["type"] = "servicebus.windows.net:sastoken"
    appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"
    

    The SAS token formatted in the way Microsoft wants, I've adapted this piece of code: https://github.com/michaelbironneau/asbclient/blob/master/azure.go this way:

    aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
    sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))
    

    That piece of code ^ is based on the python SDK here: https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py containing lots of things like upper/lower case URL encodings, mixed with timestamps for expiration purposes and the SASL username and password.

    Build the AMQP message importing "qpid.apache.org/amqp":

    cbsHandshakeMsg := amqp.NewMessage()
    cbsHandshakeMsg.SetApplicationProperties(appProps)
    cbsHandshakeMsg.Marshal(sasToken)
    
    • Send this AMQP message with outcome := cbsLink.SendSync(cbsHandshakeMsg) and then magically you should be authenticated to the Event Hub for a while now.
    • Setup the AMQP link to the Event Hub name you wanted to connect to in the first place: msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))

    Now you can send the message you want to send using this last AMQP link this way:

    m := amqp.NewMessage()
    m.Marshal("my message: bla bla bla, foo bar baz!")
    outcome := msgSender.SendSync(m)
    

    Done :)

    Running this code with the environment variable PN_TRACE_FRM=true helps a lot in troubleshooting AMQP because the proton-c library logs lots of useful debug messages.

    For some reason the AMQP PLAIN mechanism passing the SASL username and password directly during the connection attempt does not work with the Event Hub. It may be an issue with them or with the Electron/Qpid libraries, I am not sure, but now at least someone is able to send messages using golang and that CBS Microsoft protocol they made available.