Feat: implemented mongo store/retrieve datas for webservice and imagecreator; Feat: parallel execution of Puppets with random flow

This commit is contained in:
Lorenzo Iovino 2019-05-03 01:33:31 +02:00
parent 1c8645039e
commit ae477fc3df
11 changed files with 506 additions and 223 deletions

View file

@ -1,8 +1,25 @@
import {Data} from "../../../../DataGatherer/src/shared/Data";
import {createCanvas, Image} from "canvas";
import fs from "fs";
import mongoose from "mongoose";
import {DataSchema} from "../predictor-web-service/models/Data";
export class ImageCreatorService {
private mongoEndpoint = 'mongodb://localhost:27017/predictorDatas';
constructor() {
mongoose.Promise = global.Promise;
mongoose.connect(this.mongoEndpoint, { useNewUrlParser: true },
err => {
if (err) {
console.log(err);
}
else {
console.log('Connected to MongoDb');
this.startProcessing(6, 200);
}
});
}
createImageFromData(data: Array<Data>, name: string) {
return new Promise((resolve, reject) => {
@ -124,35 +141,40 @@ export class ImageCreatorService {
}
public startProcessing(srcPath: string, destPath: string) {
public startProcessing(flows: number, agents: number, threshold: number = 5) {
const that = this;
fs.readdir(srcPath, function(err, items) {
for (var i=0; i<items.length; i++) {
const data = fs.readFileSync(srcPath + '/' + items[i]);
try {
const jsonData = JSON.parse(data.toString('utf8'));
fs.mkdirSync('./trainingImages/virtual/' + destPath, {recursive: true});
that.createImageFromData(jsonData, items[i].split('.')[0])
.then(res => {
if(res) {
console.log('virtual image ' + ' processed');
that.saveImage(res['data'], './trainingImages/virtual/' + destPath + '/' + res['name'] + '.png');
}
});
that.applyDataToImage(jsonData, items[i].split('.')[0])
.then(res => {
if(res) {
console.log('real image ' + ' processed');
that.saveImage(res['data'], './trainingImages/real/' + destPath + '/' + res['name'] + '.png');
}
});
} catch (e) {
console.log('file not valid');
}
const orderedByFlows = [];
const promises = [];
for(let i=0; i<flows; i++) {
for (let j = 0; j < agents; j++) {
promises.push(DataSchema.find({
flowName: 'flow'+(i + 1).toString(),
agentName: 'bot_'+(j+1).toString()
}, function (error, ids) {
orderedByFlows.push(ids);
}));
}
});
}
Promise.all(promises)
.then(() => {
for(let i = 0; i< flows; i++){
fs.mkdirSync('./trainingImages/virtual/' + 'flow'+(i+1), {recursive: true});
}
for(let i = 0; i < orderedByFlows.length; i++) {
for (let j of orderedByFlows[i]) {
if(j.data.length > threshold) {
that.createImageFromData(j.data, j.agentName + '_' + j.counter)
.then(res => {
if (res) {
console.log('virtual image ' + ' processed');
that.saveImage(res['data'], './trainingImages/virtual/' + j.flowName + '/' + res['name'] + '.png');
}
});
}
}
}
})
}
}

View file

@ -2,10 +2,6 @@ import {ImageCreatorService} from "./ImageCreatorService";
function main() {
const imageCreatorService = new ImageCreatorService();
imageCreatorService.startProcessing('./trainingDatas/flow1', 'flow1');
imageCreatorService.startProcessing('./trainingDatas/flow2', 'flow2');
imageCreatorService.startProcessing('./trainingDatas/flow3', 'flow3');
}
main();

View file

@ -3,9 +3,10 @@ import * as WebSocket from 'ws';
import * as http from 'http';
import cors from 'cors';
import * as bodyParser from "body-parser";
import fs from 'fs';
import {Data} from "../../../../DataGatherer/src/shared/Data";
import parser from 'query-string-parser';
import mongoose from "mongoose";
import {DataSchema} from './models/Data';
export class PredictorWebService {
@ -15,6 +16,7 @@ export class PredictorWebService {
private app: express.Application;
private httpServer: http.Server;
private wss: WebSocket.Server;
private mongoEndpoint = 'mongodb://localhost:27017/predictorDatas';
private counter: number = 0;
@ -22,6 +24,18 @@ export class PredictorWebService {
this.url = url;
this.portApi = portApi;
this.portWebSocket = portWebSocket;
mongoose.Promise = global.Promise;
mongoose.connect(this.mongoEndpoint, { useNewUrlParser: true },
err => {
if (err) {
console.log(err);
}
else {
console.log('Connected to MongoDb');
this.startPredictor();
this.startTrainer();
}
});
}
public startTrainer() {
@ -38,9 +52,7 @@ export class PredictorWebService {
that.counter++;
that.saveData(data, queryObj.flowName, queryObj.agentName)
.then( (msg) => {
if(msg === 'ok') {
ws.send('trainer - Data saved');
}
ws.send('trainer - Data saved');
})
.catch( (err) => {
ws.send('trainer - Error while saving data');
@ -71,26 +83,8 @@ export class PredictorWebService {
saveData(data: Array<Data>, flowName: string, agentName: string) {
return new Promise((resolve, reject) => {
console.log(flowName);
console.log(agentName);
if(flowName && agentName) {
fs.mkdirSync('./trainingDatas/' + flowName + '/', { recursive: true });
} else {
fs.mkdirSync('./trainingDatas/undefined', { recursive: true });
}
if(!flowName && !agentName) {
fs.writeFile('./trainingDatas/undefined/' + this.counter + '.json', JSON.stringify(data), 'utf8', function (err) {
if (err) reject('err');
console.log('Data saved');
resolve('ok');
});
} else {
fs.writeFile('./trainingDatas/' + flowName + '/' + agentName + '_' + this.counter + '.json', JSON.stringify(data), 'utf8', function (err) {
if (err) reject('err');
console.log('Data saved');
resolve('ok');
});
}
const dataSchema = new DataSchema({flowName: flowName, agentName: agentName, counter: this.counter, data: data});
resolve(dataSchema.save());
});
}
}

View file

@ -2,8 +2,7 @@ import { PredictorWebService } from "./PredictorWebService";
function main() {
const predictorWebService = new PredictorWebService('/', 4000, 4100);
predictorWebService.startPredictor();
predictorWebService.startTrainer();
}
main();

View file

@ -0,0 +1,14 @@
import mongoose from "mongoose";
import {Data} from "../../../../../DataGatherer/src/shared/Data";
import {Schema} from "mongoose";
const DataSchema_:Schema = new Schema(
{
flowName: String,
agentName: String,
counter: Number,
data: Array<Data>()
}
);
export const DataSchema: any = mongoose.model('Data', DataSchema_, 'datas');