Skip to content

Commit f9571be

Browse files
committed
Merge branch 'syncfix'
2 parents 4568206 + fb95dd0 commit f9571be

File tree

5 files changed

+54
-219
lines changed

5 files changed

+54
-219
lines changed

backend/coins/btc/account.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,10 @@ func (account *Account) Info() *accounts.Info {
448448
}
449449

450450
func (account *Account) onNewHeader(header *blockchain.Header) error {
451+
if account.isClosed() {
452+
account.log.Debug("Ignoring new header after the account was closed")
453+
return nil
454+
}
451455
account.log.WithField("block-height", header.BlockHeight).Debug("Received new header")
452456
// Fee estimates change with each block.
453457
account.updateFeeTargets()

backend/coins/btc/transactions/transactions.go

Lines changed: 17 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,9 @@ func getScriptHashHex(txOut *wire.TxOut) blockchain.ScriptHashHex {
5454
type Transactions struct {
5555
locker.Locker
5656

57-
net *chaincfg.Params
58-
db DBInterface
59-
headers headers.Interface
60-
requestedTXs map[chainhash.Hash][]func(DBTxInterface, *wire.MsgTx)
57+
net *chaincfg.Params
58+
db DBInterface
59+
headers headers.Interface
6160

6261
// headersTipHeight is the current chain tip height, so we can compute the number of
6362
// confirmations of a transaction.
@@ -85,10 +84,9 @@ func NewTransactions(
8584
log *logrus.Entry,
8685
) *Transactions {
8786
transactions := &Transactions{
88-
net: net,
89-
db: db,
90-
headers: headers,
91-
requestedTXs: map[chainhash.Hash][]func(DBTxInterface, *wire.MsgTx){},
87+
net: net,
88+
db: db,
89+
headers: headers,
9290

9391
headersTipHeight: headers.TipHeight(),
9492

@@ -117,29 +115,8 @@ func (transactions *Transactions) isClosed() bool {
117115
return transactions.closed
118116
}
119117

120-
func (transactions *Transactions) txInHistory(
121-
dbTx DBTxInterface, scriptHashHex blockchain.ScriptHashHex, txHash chainhash.Hash) bool {
122-
history, err := dbTx.AddressHistory(scriptHashHex)
123-
if err != nil {
124-
// TODO
125-
panic(err)
126-
}
127-
for _, entry := range history {
128-
if txHash == entry.TXHash.Hash() {
129-
return true
130-
}
131-
}
132-
return false
133-
}
134-
135118
func (transactions *Transactions) processTxForAddress(
136119
dbTx DBTxInterface, scriptHashHex blockchain.ScriptHashHex, txHash chainhash.Hash, tx *wire.MsgTx, height int) {
137-
// Don't process the tx if it is not found in the address history. It could have been removed
138-
// from the history before this function was called.
139-
if !transactions.txInHistory(dbTx, scriptHashHex, txHash) {
140-
return
141-
}
142-
143120
_, _, previousHeight, _, err := dbTx.TxInfo(txHash)
144121
if err != nil {
145122
transactions.log.WithError(err).Panic("Failed to retrieve tx info")
@@ -341,70 +318,47 @@ func (transactions *Transactions) UpdateAddressHistory(scriptHashHex blockchain.
341318
if err := dbTx.PutAddressHistory(scriptHashHex, txs); err != nil {
342319
transactions.log.WithError(err).Panic("Failed to store address history")
343320
}
344-
345321
for _, txInfo := range txs {
346-
func(txHash chainhash.Hash, height int) {
347-
transactions.doForTransaction(dbTx, txHash, func(innerDBTx DBTxInterface, tx *wire.MsgTx) {
348-
transactions.processTxForAddress(innerDBTx, scriptHashHex, txHash, tx, height)
349-
})
350-
}(txInfo.TXHash.Hash(), txInfo.Height)
322+
txHash := txInfo.TXHash.Hash()
323+
height := txInfo.Height
324+
tx := transactions.getTransactionCached(dbTx, txHash)
325+
transactions.processTxForAddress(dbTx, scriptHashHex, txHash, tx, height)
351326
}
352327
if err := dbTx.Commit(); err != nil {
353328
transactions.log.WithError(err).Panic("Failed to commit transaction")
354329
}
355330
}
356331

357332
// requires transactions lock
358-
func (transactions *Transactions) doForTransaction(
333+
func (transactions *Transactions) getTransactionCached(
359334
dbTx DBTxInterface,
360335
txHash chainhash.Hash,
361-
callback func(DBTxInterface, *wire.MsgTx),
362-
) {
336+
) *wire.MsgTx {
363337
tx, _, _, _, err := dbTx.TxInfo(txHash)
364338
if err != nil {
365339
transactions.log.WithError(err).Panic("Failed to retrieve transaction info")
366340
}
367341
if tx != nil {
368-
callback(dbTx, tx)
369-
return
370-
}
371-
if transactions.requestedTXs[txHash] == nil {
372-
transactions.requestedTXs[txHash] = []func(DBTxInterface, *wire.MsgTx){}
373-
}
374-
alreadyDownloading := len(transactions.requestedTXs[txHash]) != 0
375-
transactions.requestedTXs[txHash] = append(transactions.requestedTXs[txHash], callback)
376-
if alreadyDownloading {
377-
return
342+
return tx
378343
}
379-
done := transactions.synchronizer.IncRequestsCounter()
344+
txChan := make(chan *wire.MsgTx)
380345
transactions.blockchain.TransactionGet(
381346
txHash,
382347
func(tx *wire.MsgTx) error {
383348
if transactions.isClosed() {
384349
transactions.log.Debug("TransactionGet result ignored after the instance was closed")
385350
return nil
386351
}
387-
388-
defer transactions.Lock()()
389-
dbTx, err := transactions.db.Begin()
390-
if err != nil {
391-
transactions.log.WithError(err).Panic("Failed to begin transaction")
392-
}
393-
defer dbTx.Rollback()
394-
395-
for _, callback := range transactions.requestedTXs[txHash] {
396-
callback(dbTx, tx)
397-
}
398-
delete(transactions.requestedTXs, txHash)
399-
return dbTx.Commit()
352+
txChan <- tx
353+
return nil
400354
},
401355
func(err error) {
402-
done()
403356
if err != nil {
404357
panic(err)
405358
}
406359
},
407360
)
361+
return <-txChan
408362
}
409363

410364
// Balance computes the confirmed and unconfirmed balance of the account.

backend/coins/btc/transactions/transactions_test.go

Lines changed: 9 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package transactions_test
1616

1717
import (
18+
"os"
1819
"testing"
1920

2021
"github.com/btcsuite/btcd/chaincfg"
@@ -40,34 +41,23 @@ import (
4041
"github.com/stretchr/testify/suite"
4142
)
4243

44+
func TestMain(m *testing.M) {
45+
test.TstSetupLogging()
46+
os.Exit(m.Run())
47+
}
48+
4349
type BlockchainMock struct {
4450
blockchainMock.Interface
45-
transactions map[chainhash.Hash]*wire.MsgTx
46-
transactionGetCallbacks map[chainhash.Hash][]func()
51+
transactions map[chainhash.Hash]*wire.MsgTx
4752
}
4853

4954
func NewBlockchainMock() *BlockchainMock {
5055
blockchainMock := &BlockchainMock{
51-
transactions: map[chainhash.Hash]*wire.MsgTx{},
52-
transactionGetCallbacks: map[chainhash.Hash][]func(){},
56+
transactions: map[chainhash.Hash]*wire.MsgTx{},
5357
}
5458
return blockchainMock
5559
}
5660

57-
func (blockchain *BlockchainMock) CallTransactionGetCallbacks(txHash chainhash.Hash) {
58-
callbacks := blockchain.transactionGetCallbacks[txHash]
59-
delete(blockchain.transactionGetCallbacks, txHash)
60-
for _, callback := range callbacks {
61-
callback()
62-
}
63-
}
64-
65-
func (blockchain *BlockchainMock) CallAllTransactionGetCallbacks() {
66-
for txHash := range blockchain.transactionGetCallbacks {
67-
blockchain.CallTransactionGetCallbacks(txHash)
68-
}
69-
}
70-
7161
func (blockchain *BlockchainMock) RegisterTxs(txs ...*wire.MsgTx) {
7262
for _, tx := range txs {
7363
blockchain.transactions[tx.TxHash()] = tx
@@ -84,17 +74,7 @@ func (blockchain *BlockchainMock) TransactionGet(
8474
if !ok {
8575
panic("you need to first register the transaction with the mock backend")
8676
}
87-
callbacks, ok := blockchain.transactionGetCallbacks[txHash]
88-
if !ok {
89-
callbacks = []func(){}
90-
}
91-
blockchain.transactionGetCallbacks[txHash] = append(callbacks,
92-
func() {
93-
defer cleanup(nil)
94-
if err := success(tx); err != nil {
95-
panic(err)
96-
}
97-
})
77+
go func() { _ = success(tx) }()
9878
}
9979

10080
func (blockchain *BlockchainMock) ConnectionStatus() blockchainpkg.Status {
@@ -152,7 +132,6 @@ func (s *transactionsSuite) updateAddressHistory(
152132
}
153133

154134
s.transactions.UpdateAddressHistory(address.PubkeyScriptHashHex(), txs)
155-
s.blockchainMock.CallAllTransactionGetCallbacks()
156135
}
157136

158137
func newTx(
@@ -170,50 +149,6 @@ func newTx(
170149
}
171150
}
172151

173-
// TestUpdateAddressHistorySyncStatus checks that the synchronizer is calling the
174-
// syncStart/syncFinished callbacks once in the beginning and once after all transactions have
175-
// processed.
176-
func (s *transactionsSuite) TestUpdateAddressHistorySyncStatus() {
177-
addresses := s.addressChain.EnsureAddresses()
178-
address := addresses[0]
179-
expectedAmount := btcutil.Amount(123)
180-
tx1 := newTx(chainhash.HashH(nil), 0, address, expectedAmount)
181-
tx1Hash := tx1.TxHash()
182-
tx2 := newTx(chainhash.HashH(nil), 1, address, expectedAmount)
183-
tx2Hash := tx2.TxHash()
184-
s.blockchainMock.RegisterTxs(tx1, tx2)
185-
var syncStarted, syncFinished bool
186-
onSyncStarted := func() {
187-
if syncStarted {
188-
require.FailNow(s.T(), "sync started twice")
189-
}
190-
syncStarted = true
191-
}
192-
onSyncFinished := func() {
193-
if syncFinished {
194-
require.FailNow(s.T(), "sync finished twice")
195-
}
196-
syncFinished = true
197-
}
198-
*s.synchronizer = *synchronizer.NewSynchronizer(onSyncStarted, onSyncFinished, s.log)
199-
s.notifierMock.On("Put", tx1Hash[:]).Return(nil).Once()
200-
s.notifierMock.On("Put", tx2Hash[:]).Return(nil).Once()
201-
s.transactions.UpdateAddressHistory(address.PubkeyScriptHashHex(), []*blockchainpkg.TxInfo{
202-
{TXHash: blockchainpkg.TXHash(tx1Hash), Height: 10},
203-
{TXHash: blockchainpkg.TXHash(tx2Hash), Height: 10},
204-
})
205-
require.True(s.T(), syncStarted)
206-
require.False(s.T(), syncFinished)
207-
s.headersMock.On("HeaderByHeight", 10).Return(nil, nil).Once()
208-
s.blockchainMock.CallTransactionGetCallbacks(tx1.TxHash())
209-
require.True(s.T(), syncStarted)
210-
require.False(s.T(), syncFinished)
211-
s.headersMock.On("HeaderByHeight", 10).Return(nil, nil).Once()
212-
s.blockchainMock.CallTransactionGetCallbacks(tx2.TxHash())
213-
require.True(s.T(), syncStarted)
214-
require.True(s.T(), syncFinished)
215-
}
216-
217152
func newBalance(available, incoming btcutil.Amount) *accounts.Balance {
218153
return accounts.NewBalance(
219154
coin.NewAmountFromInt64(int64(available)),
@@ -253,35 +188,6 @@ func (s *transactionsSuite) TestUpdateAddressHistorySingleTxReceive() {
253188
require.Equal(s.T(), expectedHeight, transactions[0].Height)
254189
}
255190

256-
// TestUpdateAddressHistoryOppositeOrder checks that a spend is correctly recognized even if the
257-
// transactions in the history of an address are processed in the wrong order. If the spending tx is
258-
// processed before the funding tx, the output is unknown when processing the funds, but after the
259-
// output has been added, the input spending it needs to be indexed correctly.
260-
func (s *transactionsSuite) TestUpdateAddressHistoryOppositeOrder() {
261-
addresses := s.addressChain.EnsureAddresses()
262-
address := addresses[0]
263-
address2 := addresses[1]
264-
tx1 := newTx(chainhash.HashH(nil), 0, address, 123)
265-
tx1Hash := tx1.TxHash()
266-
tx2 := newTx(tx1.TxHash(), 0, address2, 123)
267-
tx2Hash := tx2.TxHash()
268-
s.blockchainMock.RegisterTxs(tx1, tx2)
269-
s.notifierMock.On("Put", tx1Hash[:]).Return(nil).Once()
270-
s.notifierMock.On("Put", tx2Hash[:]).Return(nil).Once()
271-
s.transactions.UpdateAddressHistory(address.PubkeyScriptHashHex(), []*blockchainpkg.TxInfo{
272-
{TXHash: blockchainpkg.TXHash(tx1Hash), Height: 0},
273-
{TXHash: blockchainpkg.TXHash(tx2Hash), Height: 0},
274-
})
275-
// Process tx2 (the spend) before tx1 (the funding). This should result in a zero balance, as
276-
// the received funds are spent.
277-
s.blockchainMock.CallTransactionGetCallbacks(tx2.TxHash())
278-
s.blockchainMock.CallTransactionGetCallbacks(tx1.TxHash())
279-
require.Equal(s.T(),
280-
newBalance(0, 0),
281-
s.transactions.Balance(),
282-
)
283-
}
284-
285191
// TestSpendableOutputs checks that the utxo set is correct. Only confirmed (or unconfirmed outputs
286192
// we own) outputs can be spent.
287193
func (s *transactionsSuite) TestSpendableOutputs() {
@@ -453,23 +359,3 @@ func (s *transactionsSuite) TestRemoveTransaction() {
453359
s.transactions.Transactions(func(blockchainpkg.ScriptHashHex) bool { return false }),
454360
2)
455361
}
456-
457-
// TestRemoveTransactionPendingDownload tests that a tx can be removed from the address history
458-
// while it is still pending to be indexed.
459-
func (s *transactionsSuite) TestRemoveTransactionPendingDownload() {
460-
address := s.addressChain.EnsureAddresses()[0]
461-
tx := newTx(chainhash.HashH(nil), 0, address, 123)
462-
s.blockchainMock.RegisterTxs(tx)
463-
s.transactions.UpdateAddressHistory(address.PubkeyScriptHashHex(), []*blockchainpkg.TxInfo{
464-
{TXHash: blockchainpkg.TXHash(tx.TxHash()), Height: 0},
465-
})
466-
// Callback for processing the tx is not called yet. We remove the tx.
467-
s.transactions.UpdateAddressHistory(address.PubkeyScriptHashHex(), []*blockchainpkg.TxInfo{})
468-
// Process the tx now. It should not be indexed anymore.
469-
s.blockchainMock.CallAllTransactionGetCallbacks()
470-
require.Equal(s.T(),
471-
newBalance(0, 0),
472-
s.transactions.Balance())
473-
require.Empty(s.T(),
474-
s.transactions.Transactions(func(blockchainpkg.ScriptHashHex) bool { return false }))
475-
}

0 commit comments

Comments
 (0)