diff --git a/packages/backend/src/nest/storage/channels/channel.store.ts b/packages/backend/src/nest/storage/channels/channel.store.ts index ec38fbca3..842e49bb4 100644 --- a/packages/backend/src/nest/storage/channels/channel.store.ts +++ b/packages/backend/src/nest/storage/channels/channel.store.ts @@ -22,7 +22,8 @@ import { CertificatesStore } from '../certificates/certificates.store' @Injectable() export class ChannelStore extends EventStoreBase { private channelData: PublicChannel - private initialized: boolean = false + private _subscribing: boolean = false + private logger: QuietLogger constructor( @@ -44,7 +45,7 @@ export class ChannelStore extends EventStoreBase { * @returns Initialized ChannelStore instance */ public async init(channelData: PublicChannel, options: DBOptions): Promise { - if (this.initialized) { + if (this.store != null) { this.logger.warn(`Channel ${this.channelData.name} has already been initialized!`) return this } @@ -61,7 +62,6 @@ export class ChannelStore extends EventStoreBase { }) this.logger.info('Initialized') - this.initialized = true return this } @@ -72,6 +72,12 @@ export class ChannelStore extends EventStoreBase { await this.getStore().sync.start() } + // Accessors + + public get isSubscribing(): boolean { + return this._subscribing + } + /** * Subscribe to new messages on this channel * @@ -81,6 +87,7 @@ export class ChannelStore extends EventStoreBase { */ public async subscribe(): Promise { this.logger.info('Subscribing to channel ', this.channelData.id) + this._subscribing = true this.getStore().events.on('update', async (entry: LogEntry) => { this.logger.info(`${this.channelData.id} database updated`, entry.hash, entry.payload.value?.channelId) @@ -92,16 +99,7 @@ export class ChannelStore extends EventStoreBase { isVerified: message.verified, }) - const ids = (await this.getEntries()).map(msg => msg.id) - const community = await this.localDbService.getCurrentCommunity() - - if (community) { - this.emit(StorageEvents.MESSAGE_IDS_STORED, { - ids, - channelId: this.channelData.id, - communityId: community.id, - }) - } + await this.refreshMessageIds() // FIXME: the 'update' event runs if we replicate entries and if we add // entries ourselves. So we may want to check if the message is written @@ -130,6 +128,8 @@ export class ChannelStore extends EventStoreBase { }) await this.startSync() + await this.refreshMessageIds() + this._subscribing = false this.logger.info(`Subscribed to channel ${this.channelData.id}`) } @@ -163,7 +163,7 @@ export class ChannelStore extends EventStoreBase { } /** - * Read messages from OrbitDB filtered by message ID + * Read messages from OrbitDB, optionally filtered by message ID * * @param ids Message IDs to read from this channel's OrbitDB database * @returns Messages read from OrbitDB @@ -176,6 +176,24 @@ export class ChannelStore extends EventStoreBase { } } + /** + * Get the latest state of messages in OrbitDB and emit an event to trigger redux updates + * + * @emits StorageEvents.MESSAGE_IDS_STORED + */ + private async refreshMessageIds(): Promise { + const ids = (await this.getEntries()).map(msg => msg.id) + const community = await this.localDbService.getCurrentCommunity() + + if (community) { + this.emit(StorageEvents.MESSAGE_IDS_STORED, { + ids, + channelId: this.channelData.id, + communityId: community.id, + }) + } + } + // Base Store Logic /** @@ -195,9 +213,10 @@ export class ChannelStore extends EventStoreBase { } /** - * Read all entries on the OrbitDB event store + * Read a list of entries on the OrbitDB event store * - * @returns All entries on the event store + * @param ids Optional list of message IDs to filter by + * @returns All matching entries on the event store */ public async getEntries(): Promise public async getEntries(ids: string[] | undefined): Promise @@ -206,7 +225,9 @@ export class ChannelStore extends EventStoreBase { const messages: ChannelMessage[] = [] for await (const x of this.getStore().iterator()) { - if (ids == null || ids?.includes(x.id)) { + if (ids == null || ids?.includes(x.value.id)) { + // NOTE: we skipped the verification process when reading many messages in the previous version + // so I'm skipping it here - is that really the correct behavior? const processedMessage = await this.messagesService.onConsume(x.value, false) messages.push(processedMessage) } @@ -256,6 +277,6 @@ export class ChannelStore extends EventStoreBase { public clean(): void { this.logger.info(`Cleaning channel store`, this.channelData.id, this.channelData.name) this.store = undefined - this.initialized = false + this._subscribing = false } } diff --git a/packages/backend/src/nest/storage/channels/channels.service.ts b/packages/backend/src/nest/storage/channels/channels.service.ts index 8ec9eba15..824742f6f 100644 --- a/packages/backend/src/nest/storage/channels/channels.service.ts +++ b/packages/backend/src/nest/storage/channels/channels.service.ts @@ -130,7 +130,9 @@ export class ChannelsService extends EventEmitter { 'Channels names:', channels.map(x => x.name) ) - channels.forEach(channel => this.subscribeToChannel(channel)) + for (const channel of channels.values()) { + await this.subscribeToChannel(channel) + } } /** @@ -260,12 +262,12 @@ export class ChannelsService extends EventEmitter { return } repo = this.publicChannelsRepos.get(channelData.id) + } - if (repo && !repo.eventsAttached) { - this.handleMessageEventsOnChannelStore(channelData.id, repo) - await repo.store.subscribe() - repo.eventsAttached = true - } + if (repo && !repo.eventsAttached && !repo.store.isSubscribing) { + this.handleMessageEventsOnChannelStore(channelData.id, repo) + await repo.store.subscribe() + repo.eventsAttached = true } this.logger.info(`Subscribed to channel ${channelData.id}`) @@ -287,17 +289,14 @@ export class ChannelsService extends EventEmitter { private handleMessageEventsOnChannelStore(channelId: string, repo: PublicChannelsRepo): void { this.logger.info(`Subscribing to channel updates`, channelId) repo.store.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => { - this.logger.info(`Emitting ${StorageEvents.MESSAGE_IDS_STORED}`) this.emit(StorageEvents.MESSAGE_IDS_STORED, payload) }) repo.store.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => { - this.logger.info(`Emitting ${StorageEvents.MESSAGES_STORED}`) this.emit(StorageEvents.MESSAGES_STORED, payload) }) repo.store.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { - this.logger.info(`Emitting ${StorageEvents.SEND_PUSH_NOTIFICATION}`) this.emit(StorageEvents.SEND_PUSH_NOTIFICATION, payload) }) }