Skip to content

Commit

Permalink
Merge pull request #131 from getamis/max-blocks-to-insert
Browse files Browse the repository at this point in the history
Max blocks to insert
  • Loading branch information
markya0616 authored Feb 13, 2019
2 parents b39ea73 + f7f03a0 commit e0e1357
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 99 deletions.
3 changes: 2 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -41,6 +41,7 @@ var (
type EthClient interface {
Balancer

HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error)
Expand Down
23 changes: 23 additions & 0 deletions client/mocks/EthClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddReorgUniqueIndexFromAndToHash < ActiveRecord::Migration[5.2]
def change
add_index :reorgs, [:from_hash, :to_hash], :unique => true
end
end
3 changes: 2 additions & 1 deletion migration/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2018_12_18_072947) do
ActiveRecord::Schema.define(version: 2019_02_13_032740) do

create_table "accounts", options: "ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci", force: :cascade do |t|
t.binary "address", limit: 20, null: false
Expand Down Expand Up @@ -97,6 +97,7 @@
t.binary "to_hash", limit: 32, null: false
t.datetime "created_at", null: false
t.index ["from", "to"], name: "index_reorgs_on_from_and_to"
t.index ["from_hash", "to_hash"], name: "index_reorgs_on_from_hash_and_to_hash", unique: true
end

create_table "subscriptions", options: "ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci", force: :cascade do |t|
Expand Down
90 changes: 65 additions & 25 deletions service/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"github.com/getamis/sirius/metrics"
)

const (
maxBlocksToInsert = 50
)

var (
//ErrInvalidAddress returns if invalid ERC20 address is detected
ErrInvalidAddress = errors.New("invalid address")
Expand Down Expand Up @@ -144,9 +148,9 @@ func (idx *indexer) Listen(ctx context.Context, fromBlock int64) error {
idx.client = idx.newClientFunc(header.Client)
err := idx.sync(listenCtx, header.Header)
if err != nil {
// Other indexer exists because it's a duplicate error. Reload latest states again.
if common.DuplicateError(err) {
log.Info("Duplicate blocks exist", "number", header.Number, "err", err)
// Other indexer exists because the blockchain data is updated by others.
if common.DuplicateError(err) || err == store.ErrModifiedData {
log.Info("Blocks are modified by others", "number", header.Number, "err", err)
loadErr := idx.loadLocalState(ctx, fromBlock)
if loadErr != nil {
log.Error("Failed to load local states", "err", err)
Expand Down Expand Up @@ -182,7 +186,7 @@ func (idx *indexer) loadLocalState(ctx context.Context, from int64) error {
return err
}

block, td, err := idx.insertBlocks(ctx, []*types.Block{block}, nil)
block, td, err := idx.insertBlocks(ctx, []*types.Block{block})
if err != nil {
log.Error("Failed to insert from block from ethereum", "number", from, "err", err)
return err
Expand Down Expand Up @@ -211,16 +215,32 @@ func (idx *indexer) loadLocalState(ctx context.Context, from int64) error {
return nil
}

func (idx *indexer) sync(ctx context.Context, header *types.Header) error {
// Update existing blocks and TD from ethereum to db
block, td, err := idx.addBlockMaybeReorg(ctx, header)
if err != nil {
return err
}
// If the block is inserted, update current td and header
if block != nil {
idx.currentHeader = common.Header(block)
idx.currentTD = td
func (idx *indexer) sync(ctx context.Context, targetHeader *types.Header) error {
targetBlockNumber := targetHeader.Number.Int64()
for idx.currentHeader.Number < targetBlockNumber {
nextHeader := targetHeader
// If there are too many blocks, we sync them partially
if targetBlockNumber-idx.currentHeader.Number > maxBlocksToInsert {
nextBlockNumber := idx.currentHeader.Number + maxBlocksToInsert
header, err := idx.client.HeaderByNumber(ctx, big.NewInt(nextBlockNumber))
if err != nil {
log.Error("Failed to get block from ethereum", "number", nextBlockNumber, "err", err)
return err
}
log.Debug("Set next header to sync", "number", nextBlockNumber)
nextHeader = header
}

// Update existing blocks and TD from ethereum to db
block, td, err := idx.addBlockMaybeReorg(ctx, nextHeader)
if err != nil {
return err
}
// If the block is inserted, update current td and header
if block != nil {
idx.currentHeader = common.Header(block)
idx.currentTD = td
}
}
return nil
}
Expand Down Expand Up @@ -272,7 +292,7 @@ func (idx *indexer) getTd(ctx context.Context, hash []byte) (td *big.Int, err er
return common.ParseTd(ltd)
}

func (idx *indexer) insertBlocks(ctx context.Context, blocks []*types.Block, reorgEvent *model.Reorg) (*types.Block, *big.Int, error) {
func (idx *indexer) insertBlocks(ctx context.Context, blocks []*types.Block) (*types.Block, *big.Int, error) {
var lastTd *big.Int
// Insert td
for i := len(blocks) - 1; i >= 0; i-- {
Expand All @@ -298,7 +318,7 @@ func (idx *indexer) insertBlocks(ctx context.Context, blocks []*types.Block, reo
receipts = append(receipts, receipt)
events = append(events, event)
}
err := idx.manager.UpdateBlocks(ctx, idx.client, newBlocks, receipts, events, reorgEvent)
err := idx.manager.InsertBlocks(ctx, idx.client, newBlocks, receipts, events)
if err != nil {
log.Error("Failed to update blocks", "err", err)
return nil, nil, err
Expand Down Expand Up @@ -327,7 +347,7 @@ func (idx *indexer) addBlockMaybeReorg(ctx context.Context, header *types.Header
var blocksToInsert []*types.Block
if idx.currentHeader.Number+1 == header.Number.Int64() && bytes.Equal(block.ParentHash().Bytes(), idx.currentHeader.Hash) {
blocksToInsert = append(blocksToInsert, block)
return idx.insertBlocks(ctx, blocksToInsert, nil)
return idx.insertBlocks(ctx, blocksToInsert)
}

// Find targetTD to check whether we need to handle it
Expand All @@ -354,7 +374,6 @@ func (idx *indexer) addBlockMaybeReorg(ctx context.Context, header *types.Header
for {
// Get old blocks from db only if the number is equal or smaller than the current block number
if idx.currentHeader.Number == block.Number().Int64()-1 && bytes.Equal(idx.currentHeader.Hash, block.ParentHash().Bytes()) {
logger.Info("Reorg tracing: Not a reorg event", "number", idx.currentHeader.Number)
reorgEvent = nil
break
} else if idx.currentHeader.Number > block.Number().Int64()-1 {
Expand Down Expand Up @@ -384,19 +403,40 @@ func (idx *indexer) addBlockMaybeReorg(ctx context.Context, header *types.Header
}
blocks = append(blocks, block)
}
logger.Trace("Reorg tracing: Stop", "at", block.Number(), "hash", block.Hash().Hex())
branchBlock := block
blocksToInsert = append(blocksToInsert, blocks...)
logger = logger.New("branchNumber", branchBlock.Number(), "branchHash", branchBlock.Hash().Hex())

// Now delete the reorg'ed blocks and rely on top-level sync to insert new blocks
if reorgEvent == nil {
logger.Info("Reorg tracing: Not a reorg event", "number", idx.currentHeader.Number)
block, targetTD, err = idx.insertBlocks(ctx, blocksToInsert)
if err != nil {
logger.Error("Reorg tracing: Failed to insert blocks", "err", err)
return nil, nil, err
}
return block, targetTD, nil
}

// Now atomically update the reorg'ed blocks
logger.Trace("Reorg: Starting at", "branch", branchBlock.Number(), "hash", branchBlock.Hash().Hex())
block, targetTD, err = idx.insertBlocks(ctx, blocksToInsert, reorgEvent)
logger = logger.New("reorgFrom", reorgEvent.From, "reorgFromHash", common.BytesTo0xHex(reorgEvent.FromHash), "reorgTo", reorgEvent.To, "reorgToHash", common.BytesTo0xHex(reorgEvent.ToHash))
logger.Info("Reorg tracing: A reorg event", "number", idx.currentHeader.Number)
// Return the parent of branch block and its TD as our current block and TD
parentBranchTD, err := idx.getTd(ctx, branchBlock.ParentHash().Bytes())
if err != nil {
logger.Error("Reorg tracing: Failed to get parent td of branch block", "err", err)
return nil, nil, err
}
parentBranchBlock, err := idx.client.BlockByHash(ctx, branchBlock.ParentHash())
if err != nil {
logger.Error("Reorg tracing: Failed to get parent block of branch block", "err", err)
return nil, nil, err
}
err = idx.manager.ReorgBlocks(ctx, reorgEvent)
if err != nil {
logger.Error("Reorg: Failed to insert blocks", "err", err)
logger.Error("Reorg tracing: Failed to delete blocks", "err", err)
return nil, nil, err
}
logger.Trace("Reorg: Done", "at", block.Number(), "inserted", len(blocksToInsert), "hash", block.Hash().Hex())
return block, targetTD, nil
return parentBranchBlock, parentBranchTD, nil
}

// getBlockData returns the receipts generated in the given block, and state diff since last block
Expand Down
Loading

0 comments on commit e0e1357

Please sign in to comment.