Skip to content

Commit 3182325

Browse files
committed
[server] code refactor and add naming changes
1 parent 96fe877 commit 3182325

File tree

14 files changed

+262
-274
lines changed

14 files changed

+262
-274
lines changed

examples/sample/main.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,57 +18,57 @@ func main() {
1818
defer db.Close()
1919

2020
// Use Entry.WithPayload() method to bulk store messages as topic is parsed one time on first request.
21-
topic := []byte("teams.alpha.ch1.r1")
21+
topic := []byte("teams.private.sales.ch1.message")
2222
entry := &unitdb.Entry{Topic: topic}
2323
for j := 0; j < 50; j++ {
24-
db.PutEntry(entry.WithPayload([]byte(fmt.Sprintf("msg for team alpha channel1 recipient1 #%2d", j))))
24+
db.PutEntry(entry.WithPayload([]byte(fmt.Sprintf("msg for sales team channel1 #%2d", j))))
2525
}
2626

27-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r1?last=1h")).WithLimit(100)); err == nil {
27+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch1.message?last=1h")).WithLimit(100)); err == nil {
2828
for _, msg := range msgs {
2929
log.Printf("%s ", msg)
3030
}
3131
}
3232

3333
// Writing to single topic in a batch
3434
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
35-
topic := []byte("teams.alpha.ch1.*?ttl=1h")
36-
b.Put(topic, []byte("msg for team alpha channel1 all recipients"))
35+
topic := []byte("teams.private.sales.*.message?ttl=1h")
36+
b.Put(topic, []byte("msg for sales team all channels"))
3737
return nil
3838
})
3939
if err != nil {
4040
log.Fatal(err)
4141
return
4242
}
4343

44-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r2?last=1h")).WithLimit(10)); err == nil {
44+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch2.message?last=1h")).WithLimit(10)); err == nil {
4545
for _, msg := range msgs {
4646
log.Printf("%s ", msg)
4747
}
4848
}
49-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r3?last=1h")).WithLimit(10)); err == nil {
49+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch3.message?last=1h")).WithLimit(10)); err == nil {
5050
for _, msg := range msgs {
5151
log.Printf("%s ", msg)
5252
}
5353
}
5454

5555
// Writing to multiple topics in a batch
5656
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
57-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r2"), []byte("msg for team alpha channel1 recipient2")))
58-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r3"), []byte("msg for team alpha channel1 recipient3")))
57+
b.PutEntry(unitdb.NewEntry([]byte("teams.private.sales.ch2.message"), []byte("msg for sales team channel2")))
58+
b.PutEntry(unitdb.NewEntry([]byte("teams.private.sales.ch3.message"), []byte("msg for sales team channel3")))
5959
return nil
6060
})
6161
if err != nil {
6262
log.Fatal(err)
6363
return
6464
}
6565

66-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r2?last=1h")).WithLimit(10)); err == nil {
66+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch2.message?last=1h")).WithLimit(10)); err == nil {
6767
for _, msg := range msgs {
6868
log.Printf("%s ", msg)
6969
}
7070
}
71-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r3?last=1h")).WithLimit(10)); err == nil {
71+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch3.message?last=1h")).WithLimit(10)); err == nil {
7272
for _, msg := range msgs {
7373
log.Printf("%s ", msg)
7474
}
@@ -81,43 +81,43 @@ func main() {
8181
// Writing to single topic in a batch
8282
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
8383
b.SetOptions(unitdb.WithBatchContract(contract))
84-
topic := []byte("teams.alpha.ch1.*?ttl=1h")
85-
b.Put(topic, []byte("msg for team alpha channel1 all recipients #1"))
86-
b.Put(topic, []byte("msg for team alpha channel1 all recipients #2"))
87-
b.Put(topic, []byte("msg for team alpha channel1 all recipients #3"))
84+
topic := []byte("teams.private.sales.*.message?ttl=1h")
85+
b.Put(topic, []byte("msg #1 for sales team all channels"))
86+
b.Put(topic, []byte("msg #2 for sales team all channels"))
87+
b.Put(topic, []byte("msg #3 for sales team all channels"))
8888
return nil
8989
})
9090

9191
// Writing to multiple topics in a batch
9292
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
9393
b.SetOptions(unitdb.WithBatchContract(contract))
94-
b.PutEntry(unitdb.NewEntry([]byte("teams.*.ch1"), []byte("msg for channel1 in any teams")))
95-
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.*"), []byte("msg for all channels in team alpha")))
96-
b.PutEntry(unitdb.NewEntry([]byte("teams..."), []byte("msg for all teams and all channels")))
97-
b.PutEntry(unitdb.NewEntry([]byte("..."), []byte("msg broadcast to all recipients of all channels in all teams")))
94+
b.PutEntry(unitdb.NewEntry([]byte("teams.private.*.ch1.message"), []byte("msg for channel1 in any teams")))
95+
b.PutEntry(unitdb.NewEntry([]byte("teams.private.sales.*.message"), []byte("msg for all channels in sales team")))
96+
b.PutEntry(unitdb.NewEntry([]byte("teams.private..."), []byte("msg for all private teams and all channels")))
97+
b.PutEntry(unitdb.NewEntry([]byte("..."), []byte("msg broadcast to all channels in all teams")))
9898
return nil
9999
})
100100
if err != nil {
101101
log.Fatal(err)
102102
return
103103
}
104104

105-
// Get message for team alpha channel1
106-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1?last=1h")).WithLimit(10)); err == nil {
105+
// Get message for sales team channel1
106+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.ch1.message?last=1h")).WithLimit(10)); err == nil {
107107
for _, msg := range msgs {
108108
log.Printf("%s ", msg)
109109
}
110110
}
111111

112-
// Get message for team beta channel1
113-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.beta.ch1?last=1h")).WithLimit(10)); err == nil {
112+
// Get message for customer support team channel1
113+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.public.customersupport.ch1.message?last=1h")).WithLimit(10)); err == nil {
114114
for _, msg := range msgs {
115115
log.Printf("%s ", msg)
116116
}
117117
}
118118

119-
// Get message for team beta channel2 recipient11
120-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.beta.ch2.r1?last=1h")).WithLimit(10)); err == nil {
119+
// Get message for customer support team channel2
120+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.public.customersupport.ch2.message?last=1h")).WithLimit(10)); err == nil {
121121
for _, msg := range msgs {
122122
log.Printf("%s ", msg)
123123
}
@@ -127,8 +127,8 @@ func main() {
127127
// Note, encryption can also be set on entire database using DB.Open() and set encryption flag in options parameter.
128128
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
129129
b.SetOptions(unitdb.WithBatchEncryption())
130-
topic := []byte("teams.alpha.ch1.r1?ttl=1h")
131-
b.Put(topic, []byte("msg for team alpha channel1 recipient1"))
130+
topic := []byte("teams.private.sales.ch1.message?ttl=1h")
131+
b.Put(topic, []byte("msg for sales team channel1"))
132132
return nil
133133
})
134134

examples/server/main.go renamed to examples/server_client/main.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,26 @@ import (
77
"os"
88
"time"
99

10-
unitdb "github.com/unit-io/unitdb-go"
10+
udb "github.com/unit-io/unitdb-go"
1111
)
1212

1313
func main() {
1414
recv := make(chan [2][]byte)
1515

16-
client, err := unitdb.NewClient(
16+
client, err := udb.NewClient(
1717
"grpc://localhost:6080",
1818
"UCBFDONCNJLaKMCAIeJBaOVfbAXUZHNPLDKKLDKLHZHKYIZLCDPQ",
19-
unitdb.WithInsecure(),
20-
unitdb.WithConnectionLostHandler(func(client unitdb.Client, err error) {
19+
udb.WithInsecure(),
20+
udb.WithConnectionLostHandler(func(client udb.Client, err error) {
2121
if err != nil {
2222
log.Fatal(err)
2323
}
2424
close(recv)
2525
}),
26-
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, msg unitdb.Message) {
27-
recv <- [2][]byte{[]byte(msg.Topic()), msg.Payload()}
26+
udb.WithDefaultMessageHandler(func(client udb.Client, pubMsg udb.PubMessage) {
27+
for _, msg := range pubMsg.Messages() {
28+
recv <- [2][]byte{[]byte(msg.Topic), msg.Payload}
29+
}
2830
}),
2931
)
3032
if err != nil {
@@ -36,33 +38,33 @@ func main() {
3638
log.Fatalf("err: %s", err)
3739
}
3840

39-
var r unitdb.Result
41+
var r udb.Result
4042

4143
for i := 0; i < 3; i++ {
42-
msg := fmt.Sprintf("Hi mygroup1 #%d time!", i)
43-
r = client.Publish("groups.private.mygroup1.message", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(1))
44+
msg := fmt.Sprintf("Hi sales channel #%d time!", i)
45+
r = client.Publish("teams.private.sales.saleschannel.message", []byte(msg), udb.WithTTL("1m"), udb.WithPubDeliveryMode(1))
4446
if _, err := r.Get(ctx, 1*time.Second); err != nil {
4547
log.Fatalf("err: %s", err)
4648
}
4749
}
4850

4951
for i := 0; i < 3; i++ {
50-
msg := fmt.Sprintf("Hi mygroup2 #%d time!", i)
51-
r = client.Publish("groups.private.mygroup2.message", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(1))
52+
msg := fmt.Sprintf("Hi customer connect channel #%d time!", i)
53+
r = client.Publish("teams.public.customersupport.connectchannel.message", []byte(msg), udb.WithTTL("1m"), udb.WithPubDeliveryMode(1))
5254
if _, err := r.Get(ctx, 1*time.Second); err != nil {
5355
log.Fatalf("err: %s", err)
5456
}
5557
}
5658

5759
for i := 0; i < 3; i++ {
58-
msg := fmt.Sprintf("Hi all private groups #%d time!", i)
59-
r = client.Publish("groups.private.*.message", []byte(msg), unitdb.WithTTL("1m"), unitdb.WithPubDeliveryMode(1))
60+
msg := fmt.Sprintf("Hi all sales channels #%d time!", i)
61+
r = client.Publish("teams.private.sales.*.message", []byte(msg), udb.WithTTL("1m"), udb.WithPubDeliveryMode(1))
6062
if _, err := r.Get(ctx, 1*time.Second); err != nil {
6163
log.Fatalf("err: %s", err)
6264
}
6365
}
6466

65-
r = client.Relay("groups.private.mygroup1.message", unitdb.WithLast("1m"))
67+
r = client.Relay("teams.private.sales.saleschannel.message", udb.WithLast("1m"))
6668
if _, err := r.Get(ctx, 1*time.Second); err != nil {
6769
fmt.Println(err)
6870
os.Exit(1)

examples/simple/main.go

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,61 +16,42 @@ func main() {
1616
}
1717
defer db.Close()
1818

19-
topic := []byte("teams.alpha.ch1")
20-
msg := []byte("msg for team alpha channel1")
19+
topic := []byte("teams.private.sales.saleschannel.message")
20+
msg := []byte("msg for sales channel")
2121
db.Put(topic, msg)
2222

23-
// Send message to all recipients of channel1 in team alpha
24-
topic = []byte("teams.alpha.ch1.*")
23+
// Send message to all channels in team sales
24+
topic = []byte("teams.private.sales.*.message")
2525
msg = []byte("msg for team alpha channel1 all recipients")
2626
db.Put(topic, msg)
2727

28-
// Send message to all channels in team alpha
29-
topic = []byte("teams.alpha...")
28+
// Send message to all private teams
29+
topic = []byte("teams.private...")
3030
msg = []byte("msg for team alpha all channels")
3131
db.Put(topic, msg)
3232

33-
// Get message for team alpha channel1
34-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1?last=1h")).WithLimit(10)); err == nil {
33+
// Get message for sales channel
34+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.private.sales.saleschannel.message?last=1h")).WithLimit(10)); err == nil {
3535
for _, msg := range msgs {
3636
log.Printf("%s ", msg)
3737
}
3838
}
3939

40-
// Get message for team alpha channel1 recipient1
41-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r1?last=1h")).WithLimit(10)); err == nil {
42-
for _, msg := range msgs {
43-
log.Printf("%s ", msg)
44-
}
45-
}
46-
47-
// Get message for team alpha channel2
48-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch2?last=1h")).WithLimit(10)); err == nil {
49-
for _, msg := range msgs {
50-
log.Printf("%s ", msg)
51-
}
52-
}
53-
54-
// Specify time to live so if you run the program again after 1 min you will not receive this message
55-
topic = []byte("teams.alpha.ch1.r1?ttl=1m")
56-
msg = []byte("msg with 1m ttl for team alpha channel1 recipient1")
57-
db.Put(topic, msg)
58-
5940
// Delete message
6041
messageId := db.NewID()
61-
entry := unitdb.NewEntry([]byte("teams.alpha.ch1.r1"), []byte("msg for team alpha channel1 recipient1")).WithID(messageId)
42+
entry := unitdb.NewEntry([]byte("teams.public.customersupport.connectchannel.message"), []byte("msg for customer connect channel")).WithID(messageId)
6243
db.PutEntry(entry)
6344

6445
db.Sync()
6546

66-
err = db.DeleteEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r1"), nil).WithID(messageId))
47+
err = db.DeleteEntry(unitdb.NewEntry([]byte("teams.public.customersupport.connectchannel.message"), nil).WithID(messageId))
6748
if err != nil {
6849
log.Fatal(err)
6950
return
7051
}
7152

72-
// Get message for team alpha channel1 recipient1
73-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r1?last=1h")).WithLimit(10)); err == nil {
53+
// Get message for customer connect channel
54+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.public.customersupport.connectchannel.message?last=1h")).WithLimit(10)); err == nil {
7455
for _, msg := range msgs {
7556
log.Printf("%s ", msg)
7657
}
@@ -79,10 +60,10 @@ func main() {
7960
// Topic isolation using contract
8061
contract, err := db.NewContract()
8162

82-
db.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.r1"), []byte("msg for team alpha channel1 recipient1")).WithContract(contract))
63+
db.PutEntry(unitdb.NewEntry([]byte("teams.public.customersupport.connectchannel.message"), []byte("msg for customer connect channel")).WithContract(contract))
8364

84-
// Get message for team alpha channel1 recipient1 with new contract
85-
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.r1?last=1h")).WithContract(contract).WithLimit(10)); err == nil {
65+
// Get message for customer connect chanel with new contract
66+
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.public.customersupport.connectchannel.message?last=1h")).WithContract(contract).WithLimit(10)); err == nil {
8667
for _, msg := range msgs {
8768
log.Printf("%s ", msg)
8869
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ require (
1515
google.golang.org/grpc v1.39.0
1616
)
1717

18-
replace github.com/unit-io/unitdb-go => /src/github.com/unit-io/unitdb-go
18+
// replace github.com/unit-io/unitdb-go => /src/github.com/unit-io/unitdb-go

server/internal/batch.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ type (
2727
batchByteThreshold int
2828
}
2929
batch struct {
30-
count int
31-
size int
32-
msgs []*utp.PublishMessage
30+
count int
31+
size int
32+
pubMessages []*utp.PublishMessage
3333
}
3434
batchManager struct {
3535
mu sync.RWMutex
@@ -45,7 +45,7 @@ type (
4545

4646
func (m *batchManager) newBatch(timeID timeID) *batch {
4747
b := &batch{
48-
msgs: make([]*utp.PublishMessage, 0),
48+
pubMessages: make([]*utp.PublishMessage, 0),
4949
}
5050
m.batchGroup[timeID] = b
5151

@@ -90,27 +90,27 @@ func (m *batchManager) close() {
9090
}
9191

9292
// add adds a publish message to a batch in the batch group.
93-
func (m *batchManager) add(delay int32, p *utp.PublishMessage) {
93+
func (m *batchManager) add(delay int32, pubMsg *utp.PublishMessage) {
9494
m.mu.Lock()
9595
defer m.mu.Unlock()
9696
timeID := m.TimeID(delay)
9797
b, ok := m.batchGroup[timeID]
9898
if !ok {
9999
b = m.newBatch(timeID)
100100
}
101-
b.msgs = append(b.msgs, p)
101+
b.pubMessages = append(b.pubMessages, pubMsg)
102102
b.count++
103-
b.size += len(p.Payload)
103+
b.size += len(pubMsg.Payload)
104104
if b.count > m.opts.batchCountThreshold || b.size > m.opts.batchByteThreshold {
105105
m.push(b)
106106
delete(m.batchGroup, timeID)
107107
}
108108
}
109109

110110
// push enqueues a batch to publish.
111-
func (m *batchManager) push(p *batch) {
112-
if len(p.msgs) != 0 {
113-
m.publishQueue <- p
111+
func (m *batchManager) push(b *batch) {
112+
if len(b.pubMessages) != 0 {
113+
m.publishQueue <- b
114114
}
115115
}
116116

@@ -188,7 +188,7 @@ func (m *batchManager) publish(c *_Conn, publishWaitTimeout time.Duration) {
188188
m.stopWg.Done()
189189
return
190190
}
191-
pub := &utp.Publish{Messages: b.msgs}
191+
pub := &utp.Publish{Messages: b.pubMessages}
192192
pub.MessageID = uint16(c.MessageIds.NextID(utp.PUBLISH))
193193

194194
// persist outbound
@@ -201,7 +201,7 @@ func (m *batchManager) publish(c *_Conn, publishWaitTimeout time.Duration) {
201201
}
202202
case b := <-m.send:
203203
if b != nil {
204-
pub := &utp.Publish{Messages: b.msgs}
204+
pub := &utp.Publish{Messages: b.pubMessages}
205205
pub.MessageID = uint16(c.MessageIds.NextID(utp.PUBLISH))
206206

207207
// persist outbound

0 commit comments

Comments
 (0)