Skip to content

Commit

Permalink
Merge pull request #7 from vodyani/feature/0.0.2
Browse files Browse the repository at this point in the history
Feature/0.0.2
  • Loading branch information
ChoGathK authored Mar 22, 2022
2 parents 061972b + c47b2c4 commit 3c6d5df
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 40 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@vodyani/dust",
"license": "MIT",
"version": "0.0.1",
"version": "0.0.2",
"author": "ChoGathK",
"description": "🌟 dust is a hybrid thread pool of different workers that can self-manage, self-assemble and interact externally.",
"homepage": "https://github.com/vodyani/dust#readme",
Expand Down
66 changes: 33 additions & 33 deletions src/common/base/dust-container.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import {
toRetry,
isValid,
FixedContext,
isValidArray,
isValidObject,
} from '@vodyani/core';
import { cloneDeep, fill } from 'lodash';
import { cloneDeep } from 'lodash';
import { Pool, Thread, spawn } from 'threads';
import { FixedContext, isValidObject } from '@vodyani/core';

import { DustWorkflow } from '../type';
import { DustOptions } from '../interface';

import { DustWorker } from './dust-worker';
Expand All @@ -18,59 +11,66 @@ export class DustContainer<KEY = any> {

@FixedContext
public create(key: KEY, path: string, options?: DustOptions) {
let dustThreadPool: Pool<Thread> = null;
let dust: Pool<Thread> = null;

if (!this.store.has(key)) {
const worker = new DustWorker(path, options?.worker);

dustThreadPool = Pool(
dust = Pool(
() => spawn(worker),
// Prevents contamination of incoming configuration parameters
isValidObject(options) && isValidObject(options.pool) ? cloneDeep(options.pool) : {},
isValidObject(options) && isValidObject(options.pool)
? cloneDeep(options.pool)
: {},
);

this.store.set(key, dustThreadPool);
this.store.set(key, dust);
}

return dustThreadPool;
}

@FixedContext
public async close(key: KEY, isForce = false) {
if (this.store.has(key)) {
await toRetry(3, 1000, this.store.get(key).terminate, isForce);

this.store.get(key).terminate(isForce);
this.store.delete(key);
}
}

@FixedContext
public async execute<T = any>(key: KEY, workflow: DustWorkflow<T>, count = 1): Promise<T> {
if (this.store.has(key) && isValid(workflow)) {
const dustThreadPool = this.store.get(key);
public async push(key: KEY, ...args: any[]) {
if (this.store.has(key)) {
const dust = this.store.get(key);

// push to thread queue
fill(Array(count), workflow).forEach(task => dustThreadPool.queue(task));
dust.queue(
async (threadHandler: any) => this.workflow(threadHandler, ...args),
);

const result = await dustThreadPool.completed();
return result;
} else {
return null;
await dust.settled();
}
}

@FixedContext
public async hybridExecute<T = any>(key: KEY, workflows: DustWorkflow<T>[]): Promise<T> {
if (this.store.has(key) && isValidArray(workflows)) {
const dustThreadPool = this.store.get(key);
public async execute<T = any>(key: KEY, ...args: any[]) {
if (this.store.has(key)) {
const dust = this.store.get(key);

// push to thread queue
workflows.forEach(task => dustThreadPool.queue(task));
const result = await dust.queue(
async (threadHandler: any) => this.workflow(threadHandler, ...args),
);

const result = await dustThreadPool.completed();
return result;
return result as T;
} else {
return null;
}
}

@FixedContext
private async workflow(threadHandler: any, ...args: any[]) {
try {
const result = await threadHandler(...args);
return result;
} catch (error) {
return null;
}
}
}
4 changes: 2 additions & 2 deletions src/common/base/dust-thread.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { cloneDeep } from 'lodash';
import { spawn, Thread } from 'threads';
import { FixedContext, isValidObject, toRetry } from '@vodyani/core';
import { FixedContext, isValidObject } from '@vodyani/core';

import { DustWorkerOptions } from '../interface';

Expand All @@ -19,7 +19,7 @@ export class DustThread {

@FixedContext
public async close(dustThread: Thread): Promise<void> {
await toRetry(3, 1000, Thread.terminate, dustThread);
await Thread.terminate(dustThread);
this.worker = null;
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/base/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './dust-container';
export * from './dust-worker';
export * from './dust-thread';
export * from './dust-worker';
9 changes: 9 additions & 0 deletions src/common/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { ThreadsWorkerOptions } from 'threads/dist/types/master';
export { ThreadsWorkerOptions };

export interface DustWorkerOptions extends ThreadsWorkerOptions {
/**
* Whether to use absolute paths
*/
useAbsolute?: boolean;
}

Expand All @@ -26,6 +29,12 @@ export interface DustPoolOptions {
}

export interface DustOptions {
/**
* Dust worker creation parameters
*/
worker?: DustWorkerOptions;
/**
* Dust pool initialization parameters
*/
pool?: DustPoolOptions;
}
104 changes: 103 additions & 1 deletion test/provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { isMainThread } from 'worker_threads';

import { describe, it, expect } from '@jest/globals';

import { DustThread } from '../src/common/base';
import { DustThread, DustContainer } from '../src/common/base';

const workers = {
'task': resolve(__dirname, './worker/task.js'),
Expand Down Expand Up @@ -50,3 +50,105 @@ describe('Dust', () => {
}
});
});

describe('DustContainer execute', () => {
const container = new DustContainer();

it('Test create task and execute with pool', async () => {
container.create(
'task',
workers.task,
{
worker: { useAbsolute: true },
pool: { maxQueuedJobs: 1, size: 1 },
},
);

const result = await container.execute(
'task',
1,
2,
);

const result2 = await container.execute(
'task2',
1,
2,
);

expect(result2).toBe(null);
expect(result.count).toBe(3);
expect(result.isMainThread).toBe(false);
await container.close('task');
});

it('Test create async task and execute with pool', async () => {
container.create(
'async-task',
workers['async-task'],
{
worker: { useAbsolute: true },
pool: { maxQueuedJobs: 1, size: 1 },
},
);

const result = await container.execute(
'async-task',
1,
2,
);

expect(result.count).toBe(3);
expect(result.isMainThread).toBe(false);
await container.close('async-task');
});

it('Test create error task and execute with pool', async () => {
container.create(
'error',
workers.error,
{
worker: { useAbsolute: true },
pool: { maxQueuedJobs: 1, size: 1 },
},
);

const result = await container.execute('error');

expect(result).toEqual((null));
await container.close('error');
});
});

describe('DustContainer Push', () => {
const container = new DustContainer();

it('Test create task and push with pool but use error path', async () => {
try {
container.create(
'error-task',
workers.task,
);
} catch (error) {
expect(!!error).toBe(true);
}
});

it('Test create task and push with pool', async () => {
container.create(
'push-task',
workers.task,
{
worker: { useAbsolute: true },
},
);

container.push(
'push-task',
1,
2,
);

await container.close('push-task');
});
});
Empty file removed test/worker/transferable-task.js
Empty file.

0 comments on commit 3c6d5df

Please sign in to comment.