I would like to consume an Event hub with Rust.
I've found a some promising crates, but nothing quite satisfactory:
I haven't found any real world application using these crates.
I have created the-hub-namespace
, the-hub
, the-consumer-group
and a the-policy
with listen permission and got the-key
.
So far I've made the most success with ntex-amqp. I had to hotfix the rustls connector as it was failing with Invalid DNS - the host name included the port number (name:port) when only the DNS name is expected.
Cargo.toml:
[package]
name = "tmp"
version = "0.1.0"
authors = ["me"]
edition = "2018"
[dependencies]
env_logger = "0.8"
ntex-amqp = { version="0.4", git="https://github.com/BrightOpen/ntex-amqp", branch="master" }
ntex = { version="0.3", features=["rustls"], git="https://github.com/BrightOpen/ntex", branch="master" }
rustls = "0.19"
futures = "0.3"
src/main.rs:
use ntex::connect::rustls::RustlsConnector;
use ntex_amqp::client::{self, SaslAuth};
use ntex_amqp::codec::types::{Descriptor, Symbol, Variant};
use rustls::ClientConfig;
use std::sync::Arc;
use futures::StreamExt;
#[ntex::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "ntex=trace,ntex_amqp=trace,basic=trace");
env_logger::init();
let mut tlsconfig = ClientConfig::new();
let certs = tlsconfig
.root_store
.add_pem_file(
// This is the Azure CA cert
&mut "-----BEGIN CERTIFICATE-----
MIIF8zCCBNugAwIBAgIQCq+mxcpjxFFB6jvh98dTFzANBgkqhkiG9w0BAQwFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH
MjAeFw0yMDA3MjkxMjMwMDBaFw0yNDA2MjcyMzU5NTlaMFkxCzAJBgNVBAYTAlVT
MR4wHAYDVQQKExVNaWNyb3NvZnQgQ29ycG9yYXRpb24xKjAoBgNVBAMTIU1pY3Jv
c29mdCBBenVyZSBUTFMgSXNzdWluZyBDQSAwMTCCAiIwDQYJKoZIhvcNAQEBBQAD
ggIPADCCAgoCggIBAMedcDrkXufP7pxVm1FHLDNA9IjwHaMoaY8arqqZ4Gff4xyr
RygnavXL7g12MPAx8Q6Dd9hfBzrfWxkF0Br2wIvlvkzW01naNVSkHp+OS3hL3W6n
l/jYvZnVeJXjtsKYcXIf/6WtspcF5awlQ9LZJcjwaH7KoZuK+THpXCMtzD8XNVdm
GW/JI0C/7U/E7evXn9XDio8SYkGSM63aLO5BtLCv092+1d4GGBSQYolRq+7Pd1kR
EkWBPm0ywZ2Vb8GIS5DLrjelEkBnKCyy3B0yQud9dpVsiUeE7F5sY8Me96WVxQcb
OyYdEY/j/9UpDlOG+vA+YgOvBhkKEjiqygVpP8EZoMMijephzg43b5Qi9r5UrvYo
o19oR/8pf4HJNDPF0/FJwFVMW8PmCBLGstin3NE1+NeWTkGt0TzpHjgKyfaDP2tO
4bCk1G7pP2kDFT7SYfc8xbgCkFQ2UCEXsaH/f5YmpLn4YPiNFCeeIida7xnfTvc4
7IxyVccHHq1FzGygOqemrxEETKh8hvDR6eBdrBwmCHVgZrnAqnn93JtGyPLi6+cj
WGVGtMZHwzVvX1HvSFG771sskcEjJxiQNQDQRWHEh3NxvNb7kFlAXnVdRkkvhjpR
GchFhTAzqmwltdWhWDEyCMKC2x/mSZvZtlZGY+g37Y72qHzidwtyW7rBetZJAgMB
AAGjggGtMIIBqTAdBgNVHQ4EFgQUDyBd16FXlduSzyvQx8J3BM5ygHYwHwYDVR0j
BBgwFoAUTiJUIBiV5uNu5g/6+rkS7QYXjzkwDgYDVR0PAQH/BAQDAgGGMB0GA1Ud
JQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjASBgNVHRMBAf8ECDAGAQH/AgEAMHYG
CCsGAQUFBwEBBGowaDAkBggrBgEFBQcwAYYYaHR0cDovL29jc3AuZGlnaWNlcnQu
Y29tMEAGCCsGAQUFBzAChjRodHRwOi8vY2FjZXJ0cy5kaWdpY2VydC5jb20vRGln
aUNlcnRHbG9iYWxSb290RzIuY3J0MHsGA1UdHwR0MHIwN6A1oDOGMWh0dHA6Ly9j
cmwzLmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5jcmwwN6A1oDOG
MWh0dHA6Ly9jcmw0LmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5j
cmwwHQYDVR0gBBYwFDAIBgZngQwBAgEwCAYGZ4EMAQICMBAGCSsGAQQBgjcVAQQD
AgEAMA0GCSqGSIb3DQEBDAUAA4IBAQAlFvNh7QgXVLAZSsNR2XRmIn9iS8OHFCBA
WxKJoi8YYQafpMTkMqeuzoL3HWb1pYEipsDkhiMnrpfeYZEA7Lz7yqEEtfgHcEBs
K9KcStQGGZRfmWU07hPXHnFz+5gTXqzCE2PBMlRgVUYJiA25mJPXfB00gDvGhtYa
+mENwM9Bq1B9YYLyLjRtUz8cyGsdyTIG/bBM/Q9jcV8JGqMU/UjAdh1pFyTnnHEl
Y59Npi7F87ZqYYJEHJM2LGD+le8VsHjgeWX2CJQko7klXvcizuZvUEDTjHaQcs2J
+kPgfyMIOY1DMJ21NxOJ2xPRC/wAh/hzSBRVtoAnyuxtkZ4VjIOh
-----END CERTIFICATE-----"
.as_bytes(),
)
.unwrap();
assert_eq!(certs, (1, 0));
let driver = client::Connector::new()
.connector(RustlsConnector::new(Arc::new(tlsconfig)))
.hostname("...the-hub-namespace...servicebus.windows.net")
.connect_sasl(
"...the-hub-namespace....servicebus.windows.net:5671",
SaslAuth {
authz_id: "".into(),
authn_id: "...the-policy...".into(),
password: "...the-key...".into(),
},
)
.await
.unwrap();
let sink = driver.sink();
ntex::rt::spawn(driver.start_default());
let mut session = sink.open_session().await.unwrap();
let mut links = vec![];
// I have 32 partitions
for i in 0..32u8 {
let link = session
.build_receiver_link(
format!("mylink{}", i),
format!("...the-hub.../ConsumerGroups/...the-consumer-group.../Partitions/{}", i),
)
.max_message_size(65535)
.property(
Symbol::from_slice("com.microsoft:entity-type"),
Some("8".into()),
)
.property(
Symbol::from_slice("apache.org:selector-filter:string"),
Some(Variant::Described((
Descriptor::Symbol(Symbol::from_slice("apache.org:selector-filter:string")),
Box::new(Variant::from("amqp.annotation.x-opt-offset > '@latest'")),
))),
)
.open()
.await
.unwrap();
link.set_link_credit(20);
links.push(link);
}
let mut links = futures::stream::select_all(links);
use futures::StreamExt;
while let Some(msg) = links.next().await {
eprintln!("Message: {:#?}", msg);
}
Ok(())
}
I have no clue where to set the-hub and the-consumer-group, let alone the partition number. And how do I actually receive, or react to messages? How do I handle my offset?
Update! @JesseSquire helped figure out the appropriate resource path. A little more research finally revealed the Stream implementation on ReceiverLink - where was I looking? So now I actually get some messages. Some of the above questions out of the way.
When I run this code, I get the output:
[2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: resolving host "...the-hub-namespace....servicebus.windows.net:5671"
[2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: host "...the-hub-namespace....servicebus.windows.net:5671" resolved to [13.69.64.2:5671]
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - connecting to "...the-hub-namespace....servicebus.windows.net:5671" port:5671
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - successfully connected to connecting to "...the-hub-namespace....servicebus.windows.net:5671" - Ok(13.69.64.2:5671)
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake start for: "...the-hub-namespace....servicebus.windows.net"
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake success: DNSNameRef("...the-hub-namespace....servicebus.windows.net")
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: AmqpSasl
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: Amqp
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open client amqp connection: Open { container_id: "42b4f39ce51f46bf85cf8631f64d8cca", hostname: Some("...the-hub-namespace....servicebus.windows.net"), max_frame_size: 65535, channel_max: 1024, idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open confirmed: Open { container_id: "f11d7639f6454b819bbf7b1e1150c101_G6", hostname: None, max_frame_size: 65535, channel_max: 1024, idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::connection] Session opened: local 0 remote 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink0" 0 -> 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink1" 1 -> 1
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink2" 2 -> 2
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink3" 3 -> 3
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink4" 4 -> 4
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink5" 5 -> 5
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink6" 6 -> 6
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink7" 7 -> 7
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink8" 8 -> 8
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink9" 9 -> 9
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink10" 10 -> 10
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink11" 11 -> 11
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink12" 12 -> 12
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink13" 13 -> 13
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink14" 14 -> 14
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink15" 15 -> 15
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink16" 16 -> 16
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink17" 17 -> 17
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink18" 18 -> 18
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink19" 19 -> 19
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink20" 20 -> 20
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink21" 21 -> 21
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink22" 22 -> 22
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink23" 23 -> 23
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink24" 24 -> 24
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink25" 25 -> 25
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink26" 26 -> 26
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink27" 27 -> 27
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink28" 28 -> 28
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink29" 29 -> 29
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink30" 30 -> 30
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink31" 31 -> 31
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
Message: Ok(
Transfer {
handle: 0,
delivery_id: Some(
0,
),
delivery_tag: Some(
b"",
),
message_format: Some(
0,
),
settled: Some(
true,
),
more: false,
rcv_settle_mode: None,
state: None,
resume: false,
aborted: false,
batchable: true,
body: Some(
Data(
b"\0Sr\xc1I\x06\xa3\x15x-opt-sequence-numberU\0\xa3\x0cx-opt-offset\xa1\x010\xa3\x13x-opt-enqueued-time\x83\0\0\x01z4\xcd\xfc \0Su\xa0\x14s6df54s65df4s6d5f4sf",
),
),
},
)
...
[2021-06-22T18:23:37Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:24:09Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
Message: Ok(
Transfer {
handle: 13,
delivery_id: Some(
27,
),
delivery_tag: Some(
b"",
),
message_format: Some(
0,
),
settled: Some(
true,
),
more: false,
rcv_settle_mode: None,
state: None,
resume: false,
aborted: false,
batchable: true,
body: Some(
Data(
b"\0Sr\xc1J\x06\xa3\x15x-opt-sequence-numberU\x01\xa3\x0cx-opt-offset\xa1\x0256\xa3\x13x-opt-enqueued-time\x83\0\0\x01z4\xf7\"\xfb\0Su\xa0\x15999999999999999999999",
),
),
},
)
[2021-06-22T18:24:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295, pending: 0
[2021-06-22T18:24:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:26:35Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:27:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295, pending: 0
[2021-06-22T18:27:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
Ok, the main objective fulfilled: I can receive Event Hub messages in Rust with AMQP. It leaves a lot to desire though:
This might be a little late for you. For future viewers on this question, I have made an AMQP 1.0 implementation in rust with serde and tokio, and I have already added an example with event hubs, including one example that uses filter.