From d6096855b4eb37707a258b51677f8e78abdb12ef Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 26 Aug 2019 14:46:14 -0400 Subject: [PATCH 1/2] Refactor SendOfflineMessage to publish to push nodes first. The code currently publishes to the DHT first and then to the pushnodes when it finishes. This creates an opportunity where the node may be killed while it's publishing to the DHT and before it has an opportunity to publish to the pushnodes. This commit changes it to publish to the pushnodes in parrallel to the DHT so as to avoid the potentional problem from above. --- core/net.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/core/net.go b/core/net.go index a605e6e4c8..590cd699b1 100644 --- a/core/net.go +++ b/core/net.go @@ -98,23 +98,32 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M } } log.Debugf("Sending offline message to: %s, Message Type: %s, PointerID: %s, Location: %s", p.Pretty(), m.MessageType.String(), pointer.Cid.String(), pointer.Value.Addrs[0].String()) - OfflineMessageWaitGroup.Add(2) - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := ipfs.PublishPointer(n.DHT, ctx, pointer) - if err != nil { - log.Error(err) - } - // Push provider to our push nodes for redundancy - for _, p := range n.PushNodes { + // We publish our pointers to three different locations: + // 1. The pushnodes + // 2. The DHT + // 3. Pubsub + // Each one is done in a separate goroutine so as to not block but we + // do increment the OfflineMessageWaitGroup which is used to block + // shutdown until all publishing is finished. + OfflineMessageWaitGroup.Add(2 + len(n.PushNodes)) + for _, p := range n.PushNodes { + go func(pid peer.ID) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := ipfs.PutPointerToPeer(n.DHT, ctx, p, pointer) + err := ipfs.PutPointerToPeer(n.DHT, ctx, pid, pointer) if err != nil { log.Error(err) } + OfflineMessageWaitGroup.Done() + }(p) + } + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ipfs.PublishPointer(n.DHT, ctx, pointer) + if err != nil { + log.Error(err) } OfflineMessageWaitGroup.Done() From bd909fe222212f5af514634a8d76ce3c490430c4 Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 26 Aug 2019 16:02:59 -0400 Subject: [PATCH 2/2] Update message retrieve to query push nodes in parallel --- net/retriever/retriever.go | 60 ++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index b66125c59e..022cae79bc 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -96,7 +96,7 @@ func NewMessageRetriever(cfg MRConfig) *MessageRetriever { WaitGroup: new(sync.WaitGroup), } - mr.Add(1) + mr.Add(2) return &mr } @@ -105,15 +105,16 @@ func (m *MessageRetriever) Run() { peers := time.NewTicker(time.Minute * 10) defer dht.Stop() defer peers.Stop() - go m.fetchPointers(true) + go m.fetchPointersFromDHT() + go m.fetchPointersFromPushNodes() for { select { case <-dht.C: m.Add(1) - go m.fetchPointers(true) + go m.fetchPointersFromDHT() case <-peers.C: m.Add(1) - go m.fetchPointers(false) + go m.fetchPointersFromPushNodes() } } } @@ -121,41 +122,44 @@ func (m *MessageRetriever) Run() { // RunOnce - used to fetch messages only once func (m *MessageRetriever) RunOnce() { m.Add(1) - go m.fetchPointers(true) + go m.fetchPointersFromDHT() m.Add(1) - go m.fetchPointers(false) + go m.fetchPointersFromPushNodes() } -func (m *MessageRetriever) fetchPointers(useDHT bool) { +func (m *MessageRetriever) fetchPointersFromDHT() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wg := new(sync.WaitGroup) - downloaded := 0 mh, _ := multihash.FromB58String(m.node.Identity.Pretty()) peerOut := make(chan ps.PeerInfo) go func(c chan ps.PeerInfo) { - pwg := new(sync.WaitGroup) - pwg.Add(1) - go func(c chan ps.PeerInfo) { - out := m.getPointersDataPeers() - for p := range out { - c <- p - } - pwg.Done() - }(c) - if useDHT { - pwg.Add(1) - go func(c chan ps.PeerInfo) { - iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen) - for p := range iout { - c <- p - } - pwg.Done() - }(c) + iout := ipfs.FindPointersAsync(m.routing, ctx, mh, m.prefixLen) + for p := range iout { + c <- p + } + close(c) + + }(peerOut) + + m.downloadMessages(peerOut) +} + +func (m *MessageRetriever) fetchPointersFromPushNodes() { + peerOut := make(chan ps.PeerInfo) + go func(c chan ps.PeerInfo) { + out := m.getPointersDataPeers() + for p := range out { + c <- p } - pwg.Wait() close(c) + }(peerOut) + m.downloadMessages(peerOut) +} + +func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) { + wg := new(sync.WaitGroup) + downloaded := 0 inFlight := make(map[string]bool) // Iterate over the pointers, adding 1 to the waitgroup for each pointer found