Skip to content

Commit

Permalink
Fix subscribing not getting old messages on join
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed Jan 28, 2025
1 parent 9a98b64 commit f0ece4e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 27 deletions.
57 changes: 39 additions & 18 deletions packages/backend/src/nest/storage/channels/channel.store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import { CertificatesStore } from '../certificates/certificates.store'
@Injectable()
export class ChannelStore extends EventStoreBase<ChannelMessage> {
private channelData: PublicChannel
private initialized: boolean = false
private _subscribing: boolean = false

private logger: QuietLogger

constructor(
Expand All @@ -44,7 +45,7 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
* @returns Initialized ChannelStore instance
*/
public async init(channelData: PublicChannel, options: DBOptions): Promise<ChannelStore> {
if (this.initialized) {
if (this.store != null) {
this.logger.warn(`Channel ${this.channelData.name} has already been initialized!`)
return this
}
Expand All @@ -61,7 +62,6 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
})

this.logger.info('Initialized')
this.initialized = true
return this
}

Expand All @@ -72,6 +72,12 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
await this.getStore().sync.start()
}

// Accessors

public get isSubscribing(): boolean {
return this._subscribing
}

/**
* Subscribe to new messages on this channel
*
Expand All @@ -81,6 +87,7 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
*/
public async subscribe(): Promise<void> {
this.logger.info('Subscribing to channel ', this.channelData.id)
this._subscribing = true

this.getStore().events.on('update', async (entry: LogEntry<ChannelMessage>) => {
this.logger.info(`${this.channelData.id} database updated`, entry.hash, entry.payload.value?.channelId)
Expand All @@ -92,16 +99,7 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
isVerified: message.verified,
})

const ids = (await this.getEntries()).map(msg => msg.id)
const community = await this.localDbService.getCurrentCommunity()

if (community) {
this.emit(StorageEvents.MESSAGE_IDS_STORED, {
ids,
channelId: this.channelData.id,
communityId: community.id,
})
}
await this.refreshMessageIds()

// FIXME: the 'update' event runs if we replicate entries and if we add
// entries ourselves. So we may want to check if the message is written
Expand Down Expand Up @@ -130,6 +128,8 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
})

await this.startSync()
await this.refreshMessageIds()
this._subscribing = false

this.logger.info(`Subscribed to channel ${this.channelData.id}`)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
}

/**
* Read messages from OrbitDB filtered by message ID
* Read messages from OrbitDB, optionally filtered by message ID
*
* @param ids Message IDs to read from this channel's OrbitDB database
* @returns Messages read from OrbitDB
Expand All @@ -176,6 +176,24 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
}
}

/**
* Get the latest state of messages in OrbitDB and emit an event to trigger redux updates
*
* @emits StorageEvents.MESSAGE_IDS_STORED
*/
private async refreshMessageIds(): Promise<void> {
const ids = (await this.getEntries()).map(msg => msg.id)
const community = await this.localDbService.getCurrentCommunity()

if (community) {
this.emit(StorageEvents.MESSAGE_IDS_STORED, {
ids,
channelId: this.channelData.id,
communityId: community.id,
})
}
}

// Base Store Logic

/**
Expand All @@ -195,9 +213,10 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
}

/**
* Read all entries on the OrbitDB event store
* Read a list of entries on the OrbitDB event store
*
* @returns All entries on the event store
* @param ids Optional list of message IDs to filter by
* @returns All matching entries on the event store
*/
public async getEntries(): Promise<ChannelMessage[]>
public async getEntries(ids: string[] | undefined): Promise<ChannelMessage[]>
Expand All @@ -206,7 +225,9 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
const messages: ChannelMessage[] = []

for await (const x of this.getStore().iterator()) {
if (ids == null || ids?.includes(x.id)) {
if (ids == null || ids?.includes(x.value.id)) {
// NOTE: we skipped the verification process when reading many messages in the previous version
// so I'm skipping it here - is that really the correct behavior?
const processedMessage = await this.messagesService.onConsume(x.value, false)
messages.push(processedMessage)
}
Expand Down Expand Up @@ -256,6 +277,6 @@ export class ChannelStore extends EventStoreBase<ChannelMessage> {
public clean(): void {
this.logger.info(`Cleaning channel store`, this.channelData.id, this.channelData.name)
this.store = undefined
this.initialized = false
this._subscribing = false
}
}
17 changes: 8 additions & 9 deletions packages/backend/src/nest/storage/channels/channels.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ export class ChannelsService extends EventEmitter {
'Channels names:',
channels.map(x => x.name)
)
channels.forEach(channel => this.subscribeToChannel(channel))
for (const channel of channels.values()) {
await this.subscribeToChannel(channel)
}
}

/**
Expand Down Expand Up @@ -260,12 +262,12 @@ export class ChannelsService extends EventEmitter {
return
}
repo = this.publicChannelsRepos.get(channelData.id)
}

if (repo && !repo.eventsAttached) {
this.handleMessageEventsOnChannelStore(channelData.id, repo)
await repo.store.subscribe()
repo.eventsAttached = true
}
if (repo && !repo.eventsAttached && !repo.store.isSubscribing) {
this.handleMessageEventsOnChannelStore(channelData.id, repo)
await repo.store.subscribe()
repo.eventsAttached = true
}

this.logger.info(`Subscribed to channel ${channelData.id}`)
Expand All @@ -287,17 +289,14 @@ export class ChannelsService extends EventEmitter {
private handleMessageEventsOnChannelStore(channelId: string, repo: PublicChannelsRepo): void {
this.logger.info(`Subscribing to channel updates`, channelId)
repo.store.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => {
this.logger.info(`Emitting ${StorageEvents.MESSAGE_IDS_STORED}`)
this.emit(StorageEvents.MESSAGE_IDS_STORED, payload)
})

repo.store.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => {
this.logger.info(`Emitting ${StorageEvents.MESSAGES_STORED}`)
this.emit(StorageEvents.MESSAGES_STORED, payload)
})

repo.store.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
this.logger.info(`Emitting ${StorageEvents.SEND_PUSH_NOTIFICATION}`)
this.emit(StorageEvents.SEND_PUSH_NOTIFICATION, payload)
})
}
Expand Down

0 comments on commit f0ece4e

Please sign in to comment.