I have minio/s3 object store with lambda notifications into cockroachdb (postgres db). I am trying to monitor these events with below golang code.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"time"
)
const (
//crdbConnectStr = "dbname=alerts user=crdbuser1 host=localhost port=26257 sslmode=disable connect_timeout=5"
crdbConnectStr = "postgres://crdbuser1@localhost:26257/alerts?sslmode=disable"
dbDriver = "postgres"
)
func monitorEvents() {
_, err := sql.Open(dbDriver, crdbConnectStr)
if err != nil {
fmt.Printf("connection open to crdb failed - %v\n", err.Error())
}
fmt.Printf("sql open on crdb OK\n")
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Printf("NewListener - event : %v, err - %v\n", ev, err.Error())
}
}
minReconnect := 2 * time.Second
maxReconnect := 20 * time.Second
listener := pq.NewListener(crdbConnectStr, minReconnect, maxReconnect, reportProblem)
err = listener.Listen("monitor")
if err != nil {
fmt.Printf("Listen error - %v\n", err.Error())
return
}
fmt.Printf("begin monitoring events in CRDB\n")
for {
waitForAlertEvents(listener)
}
}
// Record holds json data from object.
type Record struct {
Data struct {
Value struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
} `json:"value"`
} `json:"data"`
}
func waitForAlertEvents(l *pq.Listener) {
for {
select {
case n := <-l.Notify:
fmt.Printf("Received data from channel [%v]\n", n.Channel)
// Prepare notification payload for pretty print
fmt.Println(n.Extra)
record := Record{}
jerr := json.Unmarshal([]byte(n.Extra), &record)
if jerr != nil {
fmt.Println("Error processing JSON: ", jerr)
return
}
bucket := record.Data.Value.Records[0].S3.Bucket.Name
object := record.Data.Value.Records[0].S3.Object.Key
fmt.Printf("received event on bucket: %v, object: %v\n", bucket, object)
return
case <-time.After(60 * time.Second):
fmt.Println("Received no events for 90 seconds, checking connection")
go func() {
l.Ping()
}()
return
}
}
}
func main() {
monitorAlerts()
}
When I run this program, I see the below errors and it stucks.
[root]# ./alerts
sql open on crdb OK
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
Connection to cockroachdb manually works OK.
[root]# cockroach sql --insecure --user=crdbuser1
crdbuser1@:26257/defaultdb> show databases; database_name
+---------------+
alerts
(1 row)
Time: 1.22359ms
crdbuser1@:26257/defaultdb> set database=alerts;
SET
Time: 363.994µs
crdbuser1@:26257/alerts> show tables;
table_name
+------------+
alertstab
(1 row)
Time: 1.399014ms
crdbuser1@:26257/alerts>
Any thoughts as why the error pq: syntax error at or near "listen"
. Also I was looking at
pq sources, the error most likely related to notify.go#L756
The error indicates that CockroachDB does not support the LISTEN
and NOTIFY
statements.
You will need to find a different way of doing this. The closest thing in CRDB would be Change Data Capture, but that is more about data streaming than custom notifications.
You can find some discussion about LISTEN
/ NOTIFY
for CRDB in this issue, but there is no firm plan to date.