Feat: Added websocket comm. for DataGatherer and PredictorWebService

This commit is contained in:
Lorenzo Iovino 2019-04-11 16:13:54 +02:00
parent 9946f8c057
commit b3e47f098f
5 changed files with 245 additions and 174 deletions

View file

@ -12,14 +12,14 @@ function main() {
]);
gatherer.start();
startPrediction(gatherer);
//startPrediction(gatherer);
startGathering(gatherer);
}
function startPrediction(gatherer: Gatherer) {
const sender: Sender = new Sender(() => gatherer.getData(), 'localhost:4000/predict', 10000);
sender.start()
sender.start('http')
.subscribe(
val => {
console.log(val);
@ -29,8 +29,8 @@ function startPrediction(gatherer: Gatherer) {
function startGathering(gatherer: Gatherer) {
const sender: Sender = new Sender(() => gatherer.getData(), 'localhost:4000/trainData', 3000);
sender.start()
const sender: Sender = new Sender(() => gatherer.getData(), 'localhost:4100/', 3000);
sender.start('ws')
.subscribe(
val => {
console.log(val);

View file

@ -1,11 +1,13 @@
import { interval, Observable, of } from 'rxjs';
import { filter, flatMap, delay } from 'rxjs/operators';
import { Rxios } from 'rxios';
import {Data} from "../data/Data";
export class Sender {
private url: string;
private interval: number;
private dataSourceFn: any;
private wsConnect;
constructor(dataSourceFn: any, url: string, interval: number) {
this.url = url;
@ -13,22 +15,35 @@ export class Sender {
this.dataSourceFn = dataSourceFn;
}
public start() {
public start(protocol: string) {
if(protocol === 'ws') {
this.wsConnect = new WebSocket('ws://' + this.url);
}
return interval(this.interval)
.pipe(
flatMap(() => this.send(this.dataSourceFn()))
flatMap(() => this.send(this.dataSourceFn(), protocol))
)
}
private send(data: any) {
const http: Rxios = new Rxios();
/* var wsConnect = new WebSocket('ws://' + this.url, "protocolOne");
wsConnect.send(data);
wsConnect.onmessage((msg) => {
console.log(msg)
private send(data: Array<Data>, protocol: string) {
if(data.length > 0) {
if (protocol === 'http') {
const http: Rxios = new Rxios();
return http.post('http://' + this.url, data);
}
//gestire con rxjs o un wrapper di websocket client
)*/
return http.post('http://' + this.url, data);
if (protocol === 'ws') {
this.wsConnect.send(JSON.stringify(data));
return new Observable((observer) => {
this.wsConnect.onmessage = (msg) => {
console.log(msg);
observer.next(msg);
};
});
}
} else {
return of(null)
}
}
}

View file

@ -13,11 +13,10 @@ export class Screen extends Source {
if (document.body) {
Html2CanvasStatic(document.body, {logging: false})
.then((canvas) => {
console.log(canvas);
const imgData = canvas.toDataURL("image/png");
this.data.push(new Data('screen', imgData, {width: window.innerWidth, height: window.innerHeight}))
});
}
}, 3000);
}, 5000);
}
}