Skip to content

Commit

Permalink
feat: events: add lotus-shed indexes inspect-events health-check
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 6, 2024
1 parent 5069d8b commit 9707526
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
## New features

- feat: Add trace filter API supporting RPC method `trace_filter` ([filecoin-project/lotus#12123](https://github.com/filecoin-project/lotus/pull/12123)). Configuring `EthTraceFilterMaxResults` sets a limit on how many results are returned in any individual `trace_filter` RPC API call.

- feat: `FilecoinAddressToEthAddress` RPC can now return ETH addresses for all Filecoin address types ("f0"/"f1"/"f2"/"f3") based on client's re-org tolerance. This is a breaking change if you are using the API via the go-jsonrpc library or by using Lotus as a library, but is a non-breaking change when using the API via any other RPC method as it adds an optional second argument.
([filecoin-project/lotus#12324](https://github.com/filecoin-project/lotus/pull/12324)).
- feat: Added `lotus-shed indexes inspect-events` health-check command ([filecoin-project/lotus#12346](https://github.com/filecoin-project/lotus/pull/12346)).

# v1.28.1 / 2024-07-24

Expand Down
195 changes: 193 additions & 2 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"math"
"path"
"path/filepath"
"strings"
Expand All @@ -27,9 +28,12 @@ import (
const (
// same as in chain/events/index.go
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?`
entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand All @@ -46,12 +50,13 @@ var indexesCmd = &cli.Command{
withCategory("msgindex", pruneMsgIndexCmd),
withCategory("txhash", backfillTxHashCmd),
withCategory("events", backfillEventsCmd),
withCategory("events", inspectEventsCmd),
},
}

var backfillEventsCmd = &cli.Command{
Name: "backfill-events",
Usage: "Backfill the events.db for a number of epochs starting from a specified height",
Usage: "Backfill the events.db for a number of epochs starting from a specified height and working backward",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "from",
Expand All @@ -61,7 +66,7 @@ var backfillEventsCmd = &cli.Command{
&cli.IntFlag{
Name: "epochs",
Value: 2000,
Usage: "the number of epochs to backfill",
Usage: "the number of epochs to backfill (working backwards)",
},
&cli.BoolFlag{
Name: "temporary-index",
Expand Down Expand Up @@ -387,6 +392,192 @@ var backfillEventsCmd = &cli.Command{
},
}

var inspectEventsCmd = &cli.Command{
Name: "inspect-events",
Usage: "Perform a health-check on the events.db for a number of epochs starting from a specified height and working backward. " +
"Logs tipsets with problems and optionally logs tipsets without problems. Without specifying a fixed number of epochs, " +
"the command will continue until it reaches a tipset that is not in the blockstore.",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "from",
Value: 0,
Usage: "the tipset height to start inspecting from (0 is head of chain)",
},
&cli.IntFlag{
Name: "epochs",
Value: 0,
Usage: "the number of epochs to inspect (working backwards) [0 = until we reach a block we don't have]",
},
&cli.BoolFlag{
Name: "log-good",
Usage: "log tipsets that have no detected problems",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return err
}
defer srv.Close() //nolint:errcheck

api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

// currTs will be the tipset where we start backfilling from
currTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
if cctx.IsSet("from") {
// we need to fetch the tipset after the epoch being specified since we will need to advance currTs
currTs, err = api.ChainGetTipSetAfterHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key())
if err != nil {
return err
}
}

logGood := cctx.Bool("log-good")

// advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API)
prevTs := currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err)
}

epochs := cctx.Int("epochs")
if epochs <= 0 {
epochs = math.MaxInt32
}

basePath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}

dbPath := path.Join(basePath, "sqlite", "events.db")
db, err := sql.Open("sqlite3", dbPath+"?mode=ro")
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

stmtEventCount, err := db.Prepare(eventCount)
if err != nil {
return err
}
stmtEntryCount, err := db.Prepare(entryCount)
if err != nil {
return err
}
stmtTipsetSeen, err := db.Prepare(tipsetSeen)
if err != nil {
return err
}

processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error {
tsKeyCid, err := ts.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

var expectEvents int
var expectEntries int

for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)
}
}

var problems []string

var seenHeight int
var seenReverted int
if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil {
if err == sql.ErrNoRows {
if expectEvents > 0 {
problems = append(problems, "not in events_seen table")
} else {
problems = append(problems, "zero-event epoch not in events_seen table")
}
} else {
return fmt.Errorf("failed to check if tipset is seen: %w", err)
}
} else {
if seenHeight != int(ts.Height()) {
problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight))
}
if seenReverted != 0 {
problems = append(problems, "events_seen marked as reverted")
}
}

var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}

if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}

if len(problems) > 0 {
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems)
} else if logGood {
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid)
}

return nil
}

for i := 0; ctx.Err() == nil && i < epochs; i++ {
// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid())
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}

err = processHeight(ctx, currTs, receipts)
if err != nil {
return err
}

// advance prevTs and currTs up the chain
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
}
}

return nil
},
}

var backfillMsgIndexCmd = &cli.Command{
Name: "backfill-msgindex",
Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",
Expand Down

0 comments on commit 9707526

Please sign in to comment.