Skip to content

Commit

Permalink
[Refactor] Shared queue using async/await
Browse files Browse the repository at this point in the history
As nested callbacks have proven to be annoying I switched to es6
async/await syntax.

References #59, #60, #61 and #62.
  • Loading branch information
angel-penchev committed Feb 1, 2021
1 parent d318960 commit 3d81baa
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 67 deletions.
23 changes: 10 additions & 13 deletions server/services/core/@types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ interface HttpResponse {
* @interface SharedQueue
*/
interface SharedQueue {
emit(queueNames: Array<string>, message: object): void;
emit(queueNames: Array<string>, message: object): Promise<void>;
listen(
queueName: string,
callback: (message: string) => any,
): void;
callback: (message: object) => any,
): Promise<void>;
}

/**
Expand All @@ -278,10 +278,7 @@ interface SharedQueue {
* @interface QueueLibrary
*/
interface QueueLibrary {
connect(
url: string,
callback: (err: any, connection: QueueConnection) => void
): void,
connect: Function;
}

/**
Expand All @@ -290,8 +287,8 @@ interface QueueLibrary {
* @interface QueueConnection
*/
interface QueueConnection {
close(callback?: (err: any) => void): void;
createChannel(callback: (err: any, channel: QueueChannel) => void): void;
close(): Promise<void>;
createChannel(): Promise<QueueChannel>;
connection: {
serverProperties: {
host: string;
Expand All @@ -315,20 +312,20 @@ interface QueueChannel {
queue: string,
onMessage: (msg: QueueMessage | null) => void,
options?: object
): void;
): Promise<void>;

assertQueue(
queue?: string,
options?: object
): void;
): Promise<void>;

sendToQueue(
queue: string,
content: Buffer,
options?: object
): boolean;
): Promise<boolean>;

prefetch(count: number, global?: boolean): void;
prefetch(count: number, global?: boolean): Promise<any>;

close(callback: (err: any) => void): void;
}
Expand Down
2 changes: 1 addition & 1 deletion server/services/core/interfaces/shared-queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import amqp from 'amqplib/callback_api';
import amqp from 'amqplib';
import config from '../../config';
import makeSharedQueue from './shared-queue';

Expand Down
83 changes: 30 additions & 53 deletions server/services/core/interfaces/shared-queue/shared-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
QueueChannel,
QueueConnection,
QueueLibrary,
QueueMessage,
SharedQueue,
Expand All @@ -25,67 +23,46 @@ export default function makeSharedQueue({
*
* @param {Array<string>} queueNames
* @param {object} message
* @return {Promise<void>}
*/
function emit(queueNames: Array<string>, message: object): void {
queueLibrary.connect(queueUrl, (e0: Error, connection: QueueConnection) => {
queueNames.map((queueName) => {
if (e0) {
throw e0;
}

connection.createChannel((
e1: Error,
channel: QueueChannel,
) => {
if (e1) {
throw e1;
}

channel.assertQueue(queueName, {durable: true});
channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(message)),
{persistent: true},
);
});
});

setTimeout(function() {
connection.close();
}, 500);
});
async function emit(
queueNames: Array<string>,
message: object,
): Promise<void> {
const connection = await queueLibrary.connect(queueUrl);
const channel = await connection.createChannel();
Promise.all(queueNames.map(async (queueName) => {
await channel.assertQueue(queueName, {durable: true});
await channel.sendToQueue(
queueName,
Buffer.from(JSON.stringify(message)),
{persistent: true},
);
})).then(async () => await connection.close());
}

/**
* Listens for messages from a shared queue.
*
* @param {string} queueName
* @param {Function} callback
* @return {Promise<void>}
*/
function listen(
async function listen(
queueName: string,
callback: (message: string) => any,
): void {
queueLibrary.connect(queueUrl, (e0: Error, connection: QueueConnection) => {
if (e0) {
throw e0;
}
connection.createChannel((e1:Error, channel: QueueChannel) => {
if (e1) {
throw e1;
}

channel.assertQueue(queueName, {durable: true});
channel.prefetch(1);
channel.consume(
queueName,
(msg: QueueMessage | null) => {
callback(JSON.stringify(msg?.content.toString()));
},
{noAck: false},
);
});
});
callback: (message: object) => any,
): Promise<void> {
const connection = await queueLibrary.connect(queueUrl);
const channel = await connection.createChannel();
await channel.assertQueue(queueName, {durable: true});
await channel.prefetch(1);
await channel.consume(
queueName,
(msg: QueueMessage | null) => {
callback(JSON.parse(String(msg?.content.toString())));
},
{noAck: false},
);
}

return Object.freeze({
Expand Down

0 comments on commit 3d81baa

Please sign in to comment.