Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 1513c8e

Browse files
committed
add application plugin
Signed-off-by: Yuji Oshima <yuji.oshima0x3fd@gmail.com>
1 parent 6ecb296 commit 1513c8e

File tree

12 files changed

+673
-0
lines changed

12 files changed

+673
-0
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ endif
133133
$(call build_binary,infrakit-instance-maas,github.com/docker/infrakit/examples/instance/maas)
134134
$(call build_binary,infrakit-instance-docker,github.com/docker/infrakit/examples/instance/docker)
135135
$(call build_binary,infrakit-event-time,github.com/docker/infrakit/examples/event/time)
136+
$(call build_binary,infrakit-application-repeater,github.com/docker/infrakit/examples/application/event-repeater)
136137

137138
cli: build-cli
138139
build-cli:

cmd/cli/application/application.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package application
2+
3+
import (
4+
"fmt"
5+
"github.com/docker/infrakit/cmd/cli/base"
6+
"github.com/docker/infrakit/pkg/cli"
7+
"github.com/docker/infrakit/pkg/discovery"
8+
logutil "github.com/docker/infrakit/pkg/log"
9+
"github.com/docker/infrakit/pkg/plugin"
10+
application_rpc "github.com/docker/infrakit/pkg/rpc/application"
11+
// "github.com/docker/infrakit/pkg/rpc/client"
12+
"github.com/docker/infrakit/pkg/spi/application"
13+
"github.com/docker/infrakit/pkg/types"
14+
"github.com/spf13/cobra"
15+
)
16+
17+
var log = logutil.New("module", "cli/event")
18+
19+
var OPERATIONS = map[int]string{1: "Add", 2: "Delete", 3: "Update", 4: "Read"}
20+
21+
func init() {
22+
base.Register(Command)
23+
}
24+
25+
// Command is the entry point of the module
26+
func Command(plugins func() discovery.Plugins) *cobra.Command {
27+
var applicationPlugin application.Plugin
28+
29+
cmd := &cobra.Command{
30+
Use: "application",
31+
Short: "Access application plugins",
32+
}
33+
name := cmd.PersistentFlags().String("name", "", "Name of plugin")
34+
cmd.PersistentPreRunE = func(c *cobra.Command, args []string) error {
35+
if err := cli.EnsurePersistentPreRunE(c); err != nil {
36+
return err
37+
}
38+
39+
endpoint, err := plugins().Find(plugin.Name(*name))
40+
if err != nil {
41+
return err
42+
}
43+
44+
p, err := application_rpc.NewClient(plugin.Name(*name), endpoint.Address)
45+
if err != nil {
46+
return err
47+
}
48+
applicationPlugin = p
49+
50+
cli.MustNotNil(applicationPlugin, "application plugin not found", "name", *name)
51+
return nil
52+
}
53+
54+
operation := 3
55+
resource := ""
56+
value := ""
57+
update := &cobra.Command{
58+
Use: "update",
59+
Short: "Update application's resouce",
60+
RunE: func(c *cobra.Command, args []string) error {
61+
fmt.Printf("send update message plugin=%v, op=%v, resource=%v, value=%v.\n", args, OPERATIONS[operation], resource, value)
62+
err := applicationPlugin.Update(
63+
&application.Message{
64+
Op: application.Operation(operation),
65+
Resource: resource,
66+
Data: types.AnyString(value),
67+
},
68+
)
69+
if err != nil {
70+
return err
71+
}
72+
73+
return nil
74+
},
75+
}
76+
update.Flags().IntVar(&operation, "op", operation, "update operation 1: Add, 2: Delete, 3: Update, 4: Read(default)")
77+
update.Flags().StringVar(&resource, "resource", resource, "target resource")
78+
update.Flags().StringVar(&value, "value", value, "update value")
79+
80+
cmd.AddCommand(update)
81+
82+
return cmd
83+
}

cmd/cli/main/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
_ "github.com/docker/infrakit/cmd/cli/application"
45
_ "github.com/docker/infrakit/cmd/cli/event"
56
_ "github.com/docker/infrakit/cmd/cli/flavor"
67
_ "github.com/docker/infrakit/cmd/cli/group"

cmd/cli/main/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
logutil "github.com/docker/infrakit/pkg/log"
1717
"github.com/spf13/cobra"
1818

19+
_ "github.com/docker/infrakit/cmd/cli/application"
1920
_ "github.com/docker/infrakit/cmd/cli/event"
2021
_ "github.com/docker/infrakit/cmd/cli/flavor"
2122
_ "github.com/docker/infrakit/cmd/cli/group"
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
log "github.com/Sirupsen/logrus"
7+
event_rpc "github.com/docker/infrakit/pkg/rpc/event"
8+
"github.com/docker/infrakit/pkg/spi/application"
9+
"github.com/docker/infrakit/pkg/spi/event"
10+
"github.com/docker/infrakit/pkg/template"
11+
"github.com/docker/infrakit/pkg/types"
12+
MQTT "github.com/eclipse/paho.mqtt.golang"
13+
"sync"
14+
)
15+
16+
func NewEventRepeater(eSource string, eSink string, protocol string, allowall bool) application.Plugin {
17+
sub, err := event_rpc.NewClient(eSource)
18+
if err != nil {
19+
log.Errorf("Cannot Create Event Client :%v", err)
20+
return nil
21+
}
22+
pub, err := newSinkClient(protocol, eSink)
23+
if err != nil {
24+
log.Errorf("Cannot Create Sink Client protocol :%v Server :%v :%v", protocol, eSink, err)
25+
return nil
26+
}
27+
e := &eventRepeater{
28+
EventSource: eSource,
29+
EventSink: eSink,
30+
Events: make(map[string]*RepeatRule),
31+
Protocol: protocol,
32+
stop: make(chan bool),
33+
sourceEClient: sub.(event.Subscriber),
34+
sinkEClient: pub,
35+
allowAll: allowall,
36+
eventListMt: new(sync.Mutex),
37+
}
38+
go e.serve()
39+
return e
40+
}
41+
42+
func newSinkClient(protocol string, broker string) (interface{}, error) {
43+
switch protocol {
44+
case "mqtt":
45+
opts := MQTT.NewClientOptions()
46+
opts.AddBroker(broker)
47+
c := MQTT.NewClient(opts)
48+
if token := c.Connect(); token.Wait() && token.Error() != nil {
49+
return nil, token.Error()
50+
}
51+
return c, nil
52+
case "stderr":
53+
log.Info("Print Event messages to stderr. This is for debug. ")
54+
return nil, nil
55+
default:
56+
return nil, fmt.Errorf("Unkown sink protocol %s", protocol)
57+
}
58+
}
59+
60+
type eventRepeater struct {
61+
EventSource string
62+
EventSink string
63+
Events map[string]*RepeatRule
64+
Protocol string
65+
stop chan bool
66+
eventAdd chan bool
67+
eventDel chan bool
68+
sourceEClient event.Subscriber
69+
sinkEClient interface{}
70+
allowAll bool
71+
eventListMt *sync.Mutex
72+
}
73+
74+
type RepeatRule struct {
75+
SourceTopic string
76+
SinkTopic string
77+
SourceStopCh chan<- struct{}
78+
SinkStopCh chan bool
79+
SourceStream <-chan *event.Event
80+
}
81+
82+
type messageData struct {
83+
SourceTopic string `json:"sourcetopic",omitempty"`
84+
SinkTopic string `json:"sinktopic",omitempty"`
85+
}
86+
87+
func (e eventRepeater) Validate(applicationProperties *types.Any) error {
88+
return nil
89+
}
90+
91+
func (e eventRepeater) Healthy(applicationProperties *types.Any) (application.Health, error) {
92+
return application.Healthy, nil
93+
}
94+
95+
func (e eventRepeater) AddEvent(sourcesTopic string, sinkTopic string) error {
96+
if sourcesTopic == "" {
97+
return fmt.Errorf("Error: %s", "You must have a topic of source for add repeat event.")
98+
}
99+
if sinkTopic == "" {
100+
sinkTopic = sourcesTopic
101+
}
102+
log.Debugf("Add event %s as %s", sourcesTopic, sinkTopic)
103+
e.eventListMt.Lock()
104+
defer e.eventListMt.Unlock()
105+
if _, ok := e.Events[sourcesTopic]; ok {
106+
return fmt.Errorf("Error: %s %s", "Topic already exist. :", sourcesTopic)
107+
}
108+
stream, stop, err := e.sourceEClient.SubscribeOn(types.PathFromString(sourcesTopic))
109+
if err != nil {
110+
return err
111+
}
112+
e.Events[sourcesTopic] = &RepeatRule{
113+
SourceTopic: sourcesTopic,
114+
SinkTopic: sinkTopic,
115+
SourceStopCh: stop,
116+
SinkStopCh: make(chan bool),
117+
SourceStream: stream,
118+
}
119+
go e.publishToSink(e.Events[sourcesTopic])
120+
return nil
121+
}
122+
123+
func (e eventRepeater) DelEvent(sourcesTopic string) error {
124+
if sourcesTopic == "" {
125+
return fmt.Errorf("Error: %s", "You must have a topic of source for delete repeat event.")
126+
}
127+
log.Debugf("Delete event %s", sourcesTopic)
128+
e.eventListMt.Lock()
129+
defer e.eventListMt.Unlock()
130+
if _, ok := e.Events[sourcesTopic]; !ok {
131+
return fmt.Errorf("Error: %s %s", "There is no registerd topic. :", sourcesTopic)
132+
}
133+
e.Events[sourcesTopic].SinkStopCh <- true
134+
close(e.Events[sourcesTopic].SourceStopCh)
135+
delete(e.Events, sourcesTopic)
136+
return nil
137+
}
138+
139+
func (e eventRepeater) Stop() {
140+
e.stop <- true
141+
}
142+
143+
func (e eventRepeater) publishToSink(rr *RepeatRule) error {
144+
templateURL := "str://{{.}}"
145+
engine, err := template.NewTemplate(templateURL, template.Options{})
146+
if err != nil {
147+
return err
148+
}
149+
for {
150+
select {
151+
case <-rr.SinkStopCh:
152+
return nil
153+
case s, ok := <-rr.SourceStream:
154+
if !ok {
155+
log.Info("Server disconnected", "topic", rr.SourceTopic)
156+
return nil
157+
}
158+
buff, err := engine.Render(s)
159+
if err != nil {
160+
return err
161+
}
162+
switch e.Protocol {
163+
case "mqtt":
164+
e.sinkEClient.(MQTT.Client).Publish(rr.SinkTopic, 0, false, buff)
165+
case "stderr":
166+
log.Infof("Publish subtopic %s gettopic %v pubtopic %v message %s\n", rr.SourceTopic, s.Topic, rr.SinkTopic, buff)
167+
}
168+
}
169+
}
170+
return nil
171+
}
172+
173+
func (e eventRepeater) Update(message *application.Message) error {
174+
var dataStruct []messageData
175+
err := json.Unmarshal([]byte(*message.Data), &dataStruct)
176+
if err != nil {
177+
return err
178+
}
179+
switch message.Resource {
180+
case "event":
181+
log.Debugf("Update message op %v Resource %v Data %v \n", message.Op, message.Resource, dataStruct)
182+
switch message.Op {
183+
case application.ADD:
184+
for _, d := range dataStruct {
185+
log.Debugf("Add message %v \n", d)
186+
err := e.AddEvent(d.SourceTopic, d.SinkTopic)
187+
if err != nil {
188+
return err
189+
}
190+
}
191+
case application.DELETE:
192+
for _, d := range dataStruct {
193+
err := e.DelEvent(d.SourceTopic)
194+
if err != nil {
195+
return err
196+
}
197+
}
198+
case application.UPDATE:
199+
for _, d := range dataStruct {
200+
err := e.DelEvent(d.SourceTopic)
201+
if err != nil {
202+
return err
203+
}
204+
err = e.AddEvent(d.SourceTopic, d.SinkTopic)
205+
if err != nil {
206+
return err
207+
}
208+
}
209+
case application.GET:
210+
default:
211+
log.Warnf("Unknown operation\n")
212+
}
213+
default:
214+
log.Warnf("Unknown resouces\n")
215+
}
216+
return nil
217+
}
218+
219+
func (e eventRepeater) serve() error {
220+
if e.allowAll {
221+
e.AddEvent(".", "")
222+
}
223+
for {
224+
select {
225+
case <-e.stop:
226+
return nil
227+
}
228+
}
229+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"os"
5+
6+
log "github.com/Sirupsen/logrus"
7+
"github.com/docker/infrakit/pkg/cli"
8+
application "github.com/docker/infrakit/pkg/rpc/application"
9+
"github.com/spf13/cobra"
10+
)
11+
12+
func main() {
13+
14+
cmd := &cobra.Command{
15+
Use: os.Args[0],
16+
Short: "Event Repeater Application plugin",
17+
}
18+
name := cmd.Flags().String("name", "app-event-repeater", "Application name to advertise for discovery")
19+
logLevel := cmd.Flags().Int("log", cli.DefaultLogLevel, "Logging level. 0 is least verbose. Max is 5")
20+
source := cmd.Flags().String("source", "event-plugin", "Event sourve address.")
21+
sink := cmd.Flags().String("sink", "localhost:1883", "Event sink address. default: localhost:1883")
22+
sinkProtocol := cmd.Flags().String("sinkprotocol", "mqtt", "Event sink protocol. Now only mqtt and stderr is implemented.")
23+
allowall := cmd.Flags().Bool("allowall", false, "Allow all event from source and repeat the event to sink as same topic name. default: false")
24+
cmd.RunE = func(c *cobra.Command, args []string) error {
25+
cli.SetLogLevel(*logLevel)
26+
cli.RunPlugin(*name, application.PluginServer(NewEventRepeater(*source, *sink, *sinkProtocol, *allowall)))
27+
return nil
28+
}
29+
30+
// cmd.AddCommand(cli.VersionCommand())
31+
32+
if err := cmd.Execute(); err != nil {
33+
log.Error(err)
34+
os.Exit(1)
35+
}
36+
}

0 commit comments

Comments
 (0)