Skip to content

Commit

Permalink
Disable hub streaming API for backfills and stuff onDbMessage process…
Browse files Browse the repository at this point in the history
…ing if callback is undefined
  • Loading branch information
tybook committed Dec 20, 2024
1 parent dbb0f87 commit 76bb62e
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions packages/shuttle/src/shuttle/messageReconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { randomUUID } from "crypto";

const MAX_PAGE_SIZE = 500;

type DBMessage = {
export type DBMessage = {
hash: Uint8Array;
prunedAt: Date | null;
revokedAt: Date | null;
Expand All @@ -42,7 +42,7 @@ export class MessageReconciliation {
this.db = db;
this.log = log;
this.connectionTimeout = connectionTimeout;
this.establishStream();
//this.establishStream(); // Disable the hub streaming gRPC API because it seems unreliable
}

async establishStream() {
Expand All @@ -54,7 +54,7 @@ export class MessageReconciliation {
}
}

async close() {
close() {
if (this.stream) {
this.stream.cancel();
this.stream = undefined;
Expand All @@ -64,11 +64,12 @@ export class MessageReconciliation {
async reconcileMessagesForFid(
fid: number,
onHubMessage: (message: Message, missingInDb: boolean, prunedInDb: boolean, revokedInDb: boolean) => Promise<void>,
onDbMessage: (message: DBMessage, missingInHub: boolean) => Promise<void>,
onDbMessage?: (message: DBMessage, missingInHub: boolean) => Promise<void>,
startTimestamp?: number,
stopTimestamp?: number,
types?: MessageType[],
) {
for (const type of [
for (const type of types ?? [
MessageType.CAST_ADD,
MessageType.REACTION_ADD,
MessageType.LINK_ADD,
Expand All @@ -84,7 +85,7 @@ export class MessageReconciliation {
fid: number,
type: MessageType,
onHubMessage: (message: Message, missingInDb: boolean, prunedInDb: boolean, revokedInDb: boolean) => Promise<void>,
onDbMessage: (message: DBMessage, missingInHub: boolean) => Promise<void>,
onDbMessage?: (message: DBMessage, missingInHub: boolean) => Promise<void>,
startTimestamp?: number,
stopTimestamp?: number,
) {
Expand Down Expand Up @@ -134,15 +135,17 @@ export class MessageReconciliation {
}

// Next, reconcile messages that are in the database but not in the hub
const dbMessages = await this.allActiveDbMessagesOfTypeForFid(fid, type, startTimestamp, stopTimestamp);
if (dbMessages.isErr()) {
this.log.error({ startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
return;
}
if (onDbMessage) {
const dbMessages = await this.allActiveDbMessagesOfTypeForFid(fid, type, startTimestamp, stopTimestamp);
if (dbMessages.isErr()) {
this.log.error({ startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation");
return;
}

for (const dbMessage of dbMessages.value) {
const key = Buffer.from(dbMessage.hash).toString("hex");
await onDbMessage(dbMessage, !hubMessagesByHash[key]);
for (const dbMessage of dbMessages.value) {
const key = Buffer.from(dbMessage.hash).toString("hex");
await onDbMessage(dbMessage, !hubMessagesByHash[key]);
}
}
}

Expand Down

0 comments on commit 76bb62e

Please sign in to comment.