I would like to use the Electron golang wrapper for the Qpid proton-c library to connect to the Azure EventHub.
I am setting the following SASL details combined to the host/port/namespace/path required to build the connection string but for some reason I keep getting the error message: connection reset by peer
.
package main
import (
"fmt"
"os"
"strings"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
)
var (
eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
eventHubName = "<MY_CUSTOM_NAME>"
eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)
func main() {
sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))
urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
fmt.Printf("The URL connection string: '%v'\n", urlStr)
// parse URL
url, err := amqp.ParseURL(urlStr)
if err != nil {
panic(err)
}
fmt.Printf("The AMQP parsed URL: %v\n", url)
// TCP dial
amqpHost := url.Host
fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
c, err := container.Dial(
"tcp", amqpHost,
electron.SASLEnable(),
electron.Password([]byte(eventHubSasKey)),
electron.User(eventHubSasKeyName),
)
if err != nil {
panic(err)
}
defer c.Close(nil)
// AMQP send
addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
if err != nil {
panic(err)
}
m := amqp.NewMessage()
body := fmt.Sprintf("bla bla bla %v", 42)
m.Marshal(body)
fmt.Printf("The AMQP message body: '%v'\n", m.Body())
go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
// AMQP ACK receive
fmt.Printf("Waiting for ACKs...\n")
for {
fmt.Printf("Waiting for an ACK coming out of the channel...\n")
out := <-sentChan // Outcome of async sends.
fmt.Printf("Received something: '%v'\n", out)
}
}
When compiling, then running the code, this is the output:
The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...
To me that received message saying connection reset by peer
does not look like a valid ACK and I am not sure what is wrong with the connection attempt?
proton-c
is 0.18.0, I am using go1.7.4 linux/amd64
.electron.SASLAllowedMechs("EXTERNAL")
to the connection options then I get the same error message.5672
, then I get a connection refused
panic error after the attempt at dialing via TCP.base64.StdEncoding.DecodeString(eventHubSasKey)
and pass the bytes to the connection options I keep getting the same error connection reset by peer
.electron.SASLAllowedMechs("ANONYMOUS")
, then I still get the same error message connection reset by peer
. The reason for doing this is that I am not using any SSL certificate, and the Java wrapper to AMQP that Microsoft provides seems to use this "anonymous" thing instead of the certificate (in fact no certificate is needed to connect to the EventHub using the Java connector).I am not sure how to proceed here as I am stuck in the connection part and I believe the SASL details are passed in the correct way according to the docs here: https://godoc.org/qpid.apache.org/electron#ConnectionOption
I am still not sure the reason of the failure is not due to SSL certificates, if that's the case I am struggling to see how to include them in the process.
Edit:
I later found out I had to establish a TLS connection over TCP even if I am not providing any private/public pair of keys, also specifying a "virtual host" (otherwise AMQP was complaining about not recognising the host):
// TLS connection details
tlsConfig := &tls.Config{}
eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
if err != nil {
panic(err)
}
// AMPQ container connection on top of TLS via TCP
eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
amqpConn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.User(eventHubSasKeyName),
electron.Password([]byte(eventHubSasKey)),
electron.VirtualHost(eventHubDomain),
// electron.SASLAllowedMechs(<SOME_MECHANISM>),
)
if err != nil {
panic(err)
}
defer amqpConn.Close(nil)
// AMQP sender (a AMQP link with target the name defined on the Azure portal)
s, err := amqpConn.Sender(electron.Target(eventHubName))
if err != nil {
panic(err)
}
However when running the app with the environment variable PN_TRACE_FRM=true
(which is giving me some verbose logging at the proton-c
level) now the error is:
[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]
This afaik means the SASL details (username/password) must be of type "sender" because I am trying to send something to the Event Hub. I double checked those details on the Azure portal (click on "Shared access policies" > then using the policy with "claim" specified as "Send") and they are correct. So I am not sure why I am getting this error.
I actually tried these SASL policies defined on the Azure portal at different levels, both <MY_CUSTOM_NAMESPACE>
and <MY_CUSTOM_NAME>
, but always the same error message.
I also tried including various SASL mechanisms e.g. when using electron.SASLAllowedMechs("PLAIN")
then I get this error: no mechanism available: No worthy mechs found (Authentication failed [mech=none])
.
I managed to establish a connection using the "Claims-based authorization" (CBS) on top of AMQP. It seems something specific to Microsoft. Some details can be found at the bottom of this page: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide
Basically this is the list of steps:
electron.VirtualHost(eventHubDomain)
and the ANONYMOUS
SASL mechanism electron.SASLAllowedMechs("ANONYMOUS")
(no need to specify SASL username and password). Check the details in the Edit part of my question above here ^.$cbs
Event Hub name: cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
The message properties (check this C# code to compare https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):
appProps := make(map[string]interface{})
appProps["operation"] = "put-token"
appProps["type"] = "servicebus.windows.net:sastoken"
appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"
The SAS token formatted in the way Microsoft wants, I've adapted this piece of code: https://github.com/michaelbironneau/asbclient/blob/master/azure.go this way:
aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))
That piece of code ^ is based on the python SDK here: https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py containing lots of things like upper/lower case URL encodings, mixed with timestamps for expiration purposes and the SASL username and password.
Build the AMQP message importing "qpid.apache.org/amqp"
:
cbsHandshakeMsg := amqp.NewMessage()
cbsHandshakeMsg.SetApplicationProperties(appProps)
cbsHandshakeMsg.Marshal(sasToken)
outcome := cbsLink.SendSync(cbsHandshakeMsg)
and then magically you should be authenticated to the Event Hub for a while now.msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))
Now you can send the message you want to send using this last AMQP link this way:
m := amqp.NewMessage()
m.Marshal("my message: bla bla bla, foo bar baz!")
outcome := msgSender.SendSync(m)
Done :)
Running this code with the environment variable PN_TRACE_FRM=true
helps a lot in troubleshooting AMQP because the proton-c
library logs lots of useful debug messages.
For some reason the AMQP PLAIN
mechanism passing the SASL username and password directly during the connection attempt does not work with the Event Hub. It may be an issue with them or with the Electron/Qpid libraries, I am not sure, but now at least someone is able to send messages using golang and that CBS Microsoft protocol they made available.