format to linux file (\r\n -> \n) (#320)
This commit is contained in:
@@ -1,131 +1,131 @@
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaomingwork
|
||||
*/
|
||||
// java AsrWebsocketClient
|
||||
// usage: AsrWebsocketClient soPath srvIp srvPort wavPath numThreads
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineRecognizer;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.*;
|
||||
import java.util.Map;
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
import org.java_websocket.drafts.Draft;
|
||||
import org.java_websocket.handshake.ServerHandshake;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** This example demonstrates how to connect to websocket server. */
|
||||
public class AsrWebsocketClient extends WebSocketClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AsrWebsocketClient.class);
|
||||
|
||||
public AsrWebsocketClient(URI serverUri, Draft draft) {
|
||||
super(serverUri, draft);
|
||||
}
|
||||
|
||||
public AsrWebsocketClient(URI serverURI) {
|
||||
super(serverURI);
|
||||
}
|
||||
|
||||
public AsrWebsocketClient(URI serverUri, Map<String, String> httpHeaders) {
|
||||
super(serverUri, httpHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(ServerHandshake handshakedata) {
|
||||
|
||||
float[] floats = OnlineRecognizer.readWavFile(AsrWebsocketClient.wavPath);
|
||||
ByteBuffer buffer =
|
||||
ByteBuffer.allocate(4 * floats.length)
|
||||
.order(ByteOrder.LITTLE_ENDIAN); // float is sizeof 4. allocate enough buffer
|
||||
|
||||
for (float f : floats) {
|
||||
buffer.putFloat(f);
|
||||
}
|
||||
buffer.rewind();
|
||||
buffer.flip();
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
send(buffer.array()); // send buf to server
|
||||
send("Done"); // send 'Done' means finished
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
|
||||
logger.info("received: " + message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(int code, String reason, boolean remote) {
|
||||
|
||||
logger.info(
|
||||
"Connection closed by "
|
||||
+ (remote ? "remote peer" : "us")
|
||||
+ " Code: "
|
||||
+ code
|
||||
+ " Reason: "
|
||||
+ reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception ex) {
|
||||
ex.printStackTrace();
|
||||
// if the error is fatal then onClose will be called additionally
|
||||
}
|
||||
|
||||
public static OnlineRecognizer rcgobj;
|
||||
public static String wavPath;
|
||||
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
|
||||
if (args.length != 5) {
|
||||
System.out.println("usage: AsrWebsocketClient soPath srvIp srvPort wavPath numThreads");
|
||||
return;
|
||||
}
|
||||
|
||||
String soPath = args[0];
|
||||
String srvIp = args[1];
|
||||
String srvPort = args[2];
|
||||
String wavPath = args[3];
|
||||
int numThreads = Integer.parseInt(args[4]);
|
||||
System.out.println("serIp=" + srvIp + ",srvPort=" + srvPort + ",wavPath=" + wavPath);
|
||||
|
||||
class ClientThread implements Runnable {
|
||||
|
||||
String soPath;
|
||||
String srvIp;
|
||||
String srvPort;
|
||||
String wavPath;
|
||||
|
||||
ClientThread(String soPath, String srvIp, String srvPort, String wavPath) {
|
||||
this.soPath = soPath;
|
||||
this.srvIp = srvIp;
|
||||
this.srvPort = srvPort;
|
||||
this.wavPath = wavPath;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
OnlineRecognizer.setSoPath(soPath);
|
||||
|
||||
AsrWebsocketClient.wavPath = wavPath;
|
||||
|
||||
String wsAddress = "ws://" + srvIp + ":" + srvPort;
|
||||
AsrWebsocketClient c = new AsrWebsocketClient(new URI(wsAddress));
|
||||
|
||||
c.connect();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
System.out.println("Thread1 is running...");
|
||||
Thread t = new Thread(new ClientThread(soPath, srvIp, srvPort, wavPath));
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaomingwork
|
||||
*/
|
||||
// java AsrWebsocketClient
|
||||
// usage: AsrWebsocketClient soPath srvIp srvPort wavPath numThreads
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineRecognizer;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.*;
|
||||
import java.util.Map;
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
import org.java_websocket.drafts.Draft;
|
||||
import org.java_websocket.handshake.ServerHandshake;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** This example demonstrates how to connect to websocket server. */
|
||||
public class AsrWebsocketClient extends WebSocketClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AsrWebsocketClient.class);
|
||||
|
||||
public AsrWebsocketClient(URI serverUri, Draft draft) {
|
||||
super(serverUri, draft);
|
||||
}
|
||||
|
||||
public AsrWebsocketClient(URI serverURI) {
|
||||
super(serverURI);
|
||||
}
|
||||
|
||||
public AsrWebsocketClient(URI serverUri, Map<String, String> httpHeaders) {
|
||||
super(serverUri, httpHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(ServerHandshake handshakedata) {
|
||||
|
||||
float[] floats = OnlineRecognizer.readWavFile(AsrWebsocketClient.wavPath);
|
||||
ByteBuffer buffer =
|
||||
ByteBuffer.allocate(4 * floats.length)
|
||||
.order(ByteOrder.LITTLE_ENDIAN); // float is sizeof 4. allocate enough buffer
|
||||
|
||||
for (float f : floats) {
|
||||
buffer.putFloat(f);
|
||||
}
|
||||
buffer.rewind();
|
||||
buffer.flip();
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
send(buffer.array()); // send buf to server
|
||||
send("Done"); // send 'Done' means finished
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
|
||||
logger.info("received: " + message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(int code, String reason, boolean remote) {
|
||||
|
||||
logger.info(
|
||||
"Connection closed by "
|
||||
+ (remote ? "remote peer" : "us")
|
||||
+ " Code: "
|
||||
+ code
|
||||
+ " Reason: "
|
||||
+ reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception ex) {
|
||||
ex.printStackTrace();
|
||||
// if the error is fatal then onClose will be called additionally
|
||||
}
|
||||
|
||||
public static OnlineRecognizer rcgobj;
|
||||
public static String wavPath;
|
||||
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
|
||||
if (args.length != 5) {
|
||||
System.out.println("usage: AsrWebsocketClient soPath srvIp srvPort wavPath numThreads");
|
||||
return;
|
||||
}
|
||||
|
||||
String soPath = args[0];
|
||||
String srvIp = args[1];
|
||||
String srvPort = args[2];
|
||||
String wavPath = args[3];
|
||||
int numThreads = Integer.parseInt(args[4]);
|
||||
System.out.println("serIp=" + srvIp + ",srvPort=" + srvPort + ",wavPath=" + wavPath);
|
||||
|
||||
class ClientThread implements Runnable {
|
||||
|
||||
String soPath;
|
||||
String srvIp;
|
||||
String srvPort;
|
||||
String wavPath;
|
||||
|
||||
ClientThread(String soPath, String srvIp, String srvPort, String wavPath) {
|
||||
this.soPath = soPath;
|
||||
this.srvIp = srvIp;
|
||||
this.srvPort = srvPort;
|
||||
this.wavPath = wavPath;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
OnlineRecognizer.setSoPath(soPath);
|
||||
|
||||
AsrWebsocketClient.wavPath = wavPath;
|
||||
|
||||
String wsAddress = "ws://" + srvIp + ":" + srvPort;
|
||||
AsrWebsocketClient c = new AsrWebsocketClient(new URI(wsAddress));
|
||||
|
||||
c.connect();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
System.out.println("Thread1 is running...");
|
||||
Thread t = new Thread(new ClientThread(soPath, srvIp, srvPort, wavPath));
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,173 +1,173 @@
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaoming
|
||||
*/
|
||||
// java DecoderThreadHandler
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineRecognizer;
|
||||
import com.k2fsa.sherpa.onnx.OnlineStream;
|
||||
import java.nio.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.java_websocket.WebSocket;
|
||||
import org.java_websocket.drafts.Draft;
|
||||
import org.java_websocket.framing.Framedata;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DecoderThreadHandler extends Thread {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DecoderThreadHandler.class);
|
||||
// Websocket Queue that waiting for decoding
|
||||
private LinkedBlockingQueue<WebSocket> decoderQueue;
|
||||
// the mapping between websocket and connection data
|
||||
private ConcurrentHashMap<WebSocket, ConnectionData> connMap;
|
||||
|
||||
private OnlineRecognizer rcgOjb = null; // recgnizer object
|
||||
|
||||
// connection data list for this thread to decode in parallel
|
||||
private List<ConnectionData> connDataList = new ArrayList<ConnectionData>();
|
||||
|
||||
private int parallelDecoderNum = 10; // parallel decoding number
|
||||
private int deocderTimeIdle = 10; // idle time(ms) when no job
|
||||
private int deocderTimeOut = 3000; // if it is timeout(ms), the connection data will be removed
|
||||
|
||||
public DecoderThreadHandler(
|
||||
LinkedBlockingQueue<WebSocket> decoderQueue,
|
||||
ConcurrentHashMap<WebSocket, ConnectionData> connMap,
|
||||
OnlineRecognizer rcgOjb,
|
||||
int deocderTimeIdle,
|
||||
int parallelDecoderNum,
|
||||
int deocderTimeOut) {
|
||||
this.decoderQueue = decoderQueue;
|
||||
this.connMap = connMap;
|
||||
this.rcgOjb = rcgOjb;
|
||||
this.deocderTimeIdle = deocderTimeIdle;
|
||||
this.parallelDecoderNum = parallelDecoderNum;
|
||||
this.deocderTimeOut = deocderTimeOut;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
// time(ms) idle if there is no job
|
||||
|
||||
Thread.sleep(deocderTimeIdle);
|
||||
// clear data list for this threads
|
||||
connDataList.clear();
|
||||
if (rcgOjb == null) continue;
|
||||
|
||||
// loop for total decoder Queue
|
||||
while (!decoderQueue.isEmpty()) {
|
||||
|
||||
// get websocket
|
||||
WebSocket conn = decoderQueue.take();
|
||||
// get connection data according to websocket
|
||||
ConnectionData connData = connMap.get(conn);
|
||||
|
||||
// if the websocket closed, continue
|
||||
if (connData == null) continue;
|
||||
// get the stream
|
||||
OnlineStream stream = connData.getStream();
|
||||
|
||||
// put to decoder list if 1) stream is ready; 2) and
|
||||
// size not > parallelDecoderNum
|
||||
if ((rcgOjb.isReady(stream) && connDataList.size() < parallelDecoderNum)) {
|
||||
|
||||
// add to this thread's decoder list
|
||||
connDataList.add(connData);
|
||||
// change the handled time for this connection data
|
||||
connData.setLastHandleTime(LocalDateTime.now());
|
||||
}
|
||||
// break when decoder list size >= parallelDecoderNum
|
||||
if (connDataList.size() >= parallelDecoderNum) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if decoder data list for this thread >0
|
||||
if (connDataList.size() > 0) {
|
||||
|
||||
// create a stream array for parallel decoding
|
||||
OnlineStream[] arr = new OnlineStream[connDataList.size()];
|
||||
for (int i = 0; i < connDataList.size(); i++) {
|
||||
|
||||
arr[i] = connDataList.get(i).getStream();
|
||||
}
|
||||
|
||||
// parallel decoding
|
||||
rcgOjb.decodeStreams(arr);
|
||||
}
|
||||
|
||||
// get result for each connection
|
||||
for (ConnectionData connData : connDataList) {
|
||||
|
||||
OnlineStream stream = connData.getStream();
|
||||
WebSocket webSocket = connData.getWebSocket();
|
||||
|
||||
String txtResult = rcgOjb.getResult(stream);
|
||||
|
||||
// decode text in utf-8
|
||||
byte[] utf8Data = txtResult.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
boolean isEof = (connData.getEof() == true && !rcgOjb.isReady(stream));
|
||||
// result
|
||||
if (utf8Data.length > 0) {
|
||||
|
||||
String jsonResult =
|
||||
"{\"text\":\"" + txtResult + "\",\"eof\":" + String.valueOf(isEof) + "\"}";
|
||||
|
||||
if (webSocket.isOpen()) {
|
||||
// create a TEXT Frame for send back json result
|
||||
Draft draft = webSocket.getDraft();
|
||||
List<Framedata> frames = null;
|
||||
frames = draft.createFrames(jsonResult, false);
|
||||
// send to client
|
||||
webSocket.sendFrame(frames);
|
||||
}
|
||||
}
|
||||
}
|
||||
// loop for each connection data in this thread
|
||||
for (ConnectionData connData : connDataList) {
|
||||
OnlineStream stream = connData.getStream();
|
||||
WebSocket webSocket = connData.getWebSocket();
|
||||
// if the stream is still ready, put it to decoder Queue again for next decoding
|
||||
if (rcgOjb.isReady(stream)) {
|
||||
decoderQueue.put(webSocket);
|
||||
}
|
||||
// the duration between last handled time and now
|
||||
java.time.Duration duration =
|
||||
java.time.Duration.between(connData.getLastHandleTime(), LocalDateTime.now());
|
||||
// close the websocket if 1) data is done and stream not ready; 2) or data is time out;
|
||||
// 3) or
|
||||
// connection is closed
|
||||
if ((connData.getEof() == true
|
||||
&& !rcgOjb.isReady(stream)
|
||||
&& connData.getQueueSamples().isEmpty())
|
||||
|| duration.toMillis() > deocderTimeOut
|
||||
|| !connData.getWebSocket().isOpen()) {
|
||||
|
||||
logger.info("close websocket!!!");
|
||||
|
||||
// delay close web socket as data may still in processing
|
||||
Timer timer = new Timer();
|
||||
timer.schedule(
|
||||
new TimerTask() {
|
||||
public void run() {
|
||||
|
||||
webSocket.close();
|
||||
}
|
||||
},
|
||||
5000); // 5 seconds
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaoming
|
||||
*/
|
||||
// java DecoderThreadHandler
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineRecognizer;
|
||||
import com.k2fsa.sherpa.onnx.OnlineStream;
|
||||
import java.nio.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.java_websocket.WebSocket;
|
||||
import org.java_websocket.drafts.Draft;
|
||||
import org.java_websocket.framing.Framedata;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DecoderThreadHandler extends Thread {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DecoderThreadHandler.class);
|
||||
// Websocket Queue that waiting for decoding
|
||||
private LinkedBlockingQueue<WebSocket> decoderQueue;
|
||||
// the mapping between websocket and connection data
|
||||
private ConcurrentHashMap<WebSocket, ConnectionData> connMap;
|
||||
|
||||
private OnlineRecognizer rcgOjb = null; // recgnizer object
|
||||
|
||||
// connection data list for this thread to decode in parallel
|
||||
private List<ConnectionData> connDataList = new ArrayList<ConnectionData>();
|
||||
|
||||
private int parallelDecoderNum = 10; // parallel decoding number
|
||||
private int deocderTimeIdle = 10; // idle time(ms) when no job
|
||||
private int deocderTimeOut = 3000; // if it is timeout(ms), the connection data will be removed
|
||||
|
||||
public DecoderThreadHandler(
|
||||
LinkedBlockingQueue<WebSocket> decoderQueue,
|
||||
ConcurrentHashMap<WebSocket, ConnectionData> connMap,
|
||||
OnlineRecognizer rcgOjb,
|
||||
int deocderTimeIdle,
|
||||
int parallelDecoderNum,
|
||||
int deocderTimeOut) {
|
||||
this.decoderQueue = decoderQueue;
|
||||
this.connMap = connMap;
|
||||
this.rcgOjb = rcgOjb;
|
||||
this.deocderTimeIdle = deocderTimeIdle;
|
||||
this.parallelDecoderNum = parallelDecoderNum;
|
||||
this.deocderTimeOut = deocderTimeOut;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
// time(ms) idle if there is no job
|
||||
|
||||
Thread.sleep(deocderTimeIdle);
|
||||
// clear data list for this threads
|
||||
connDataList.clear();
|
||||
if (rcgOjb == null) continue;
|
||||
|
||||
// loop for total decoder Queue
|
||||
while (!decoderQueue.isEmpty()) {
|
||||
|
||||
// get websocket
|
||||
WebSocket conn = decoderQueue.take();
|
||||
// get connection data according to websocket
|
||||
ConnectionData connData = connMap.get(conn);
|
||||
|
||||
// if the websocket closed, continue
|
||||
if (connData == null) continue;
|
||||
// get the stream
|
||||
OnlineStream stream = connData.getStream();
|
||||
|
||||
// put to decoder list if 1) stream is ready; 2) and
|
||||
// size not > parallelDecoderNum
|
||||
if ((rcgOjb.isReady(stream) && connDataList.size() < parallelDecoderNum)) {
|
||||
|
||||
// add to this thread's decoder list
|
||||
connDataList.add(connData);
|
||||
// change the handled time for this connection data
|
||||
connData.setLastHandleTime(LocalDateTime.now());
|
||||
}
|
||||
// break when decoder list size >= parallelDecoderNum
|
||||
if (connDataList.size() >= parallelDecoderNum) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if decoder data list for this thread >0
|
||||
if (connDataList.size() > 0) {
|
||||
|
||||
// create a stream array for parallel decoding
|
||||
OnlineStream[] arr = new OnlineStream[connDataList.size()];
|
||||
for (int i = 0; i < connDataList.size(); i++) {
|
||||
|
||||
arr[i] = connDataList.get(i).getStream();
|
||||
}
|
||||
|
||||
// parallel decoding
|
||||
rcgOjb.decodeStreams(arr);
|
||||
}
|
||||
|
||||
// get result for each connection
|
||||
for (ConnectionData connData : connDataList) {
|
||||
|
||||
OnlineStream stream = connData.getStream();
|
||||
WebSocket webSocket = connData.getWebSocket();
|
||||
|
||||
String txtResult = rcgOjb.getResult(stream);
|
||||
|
||||
// decode text in utf-8
|
||||
byte[] utf8Data = txtResult.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
boolean isEof = (connData.getEof() == true && !rcgOjb.isReady(stream));
|
||||
// result
|
||||
if (utf8Data.length > 0) {
|
||||
|
||||
String jsonResult =
|
||||
"{\"text\":\"" + txtResult + "\",\"eof\":" + String.valueOf(isEof) + "\"}";
|
||||
|
||||
if (webSocket.isOpen()) {
|
||||
// create a TEXT Frame for send back json result
|
||||
Draft draft = webSocket.getDraft();
|
||||
List<Framedata> frames = null;
|
||||
frames = draft.createFrames(jsonResult, false);
|
||||
// send to client
|
||||
webSocket.sendFrame(frames);
|
||||
}
|
||||
}
|
||||
}
|
||||
// loop for each connection data in this thread
|
||||
for (ConnectionData connData : connDataList) {
|
||||
OnlineStream stream = connData.getStream();
|
||||
WebSocket webSocket = connData.getWebSocket();
|
||||
// if the stream is still ready, put it to decoder Queue again for next decoding
|
||||
if (rcgOjb.isReady(stream)) {
|
||||
decoderQueue.put(webSocket);
|
||||
}
|
||||
// the duration between last handled time and now
|
||||
java.time.Duration duration =
|
||||
java.time.Duration.between(connData.getLastHandleTime(), LocalDateTime.now());
|
||||
// close the websocket if 1) data is done and stream not ready; 2) or data is time out;
|
||||
// 3) or
|
||||
// connection is closed
|
||||
if ((connData.getEof() == true
|
||||
&& !rcgOjb.isReady(stream)
|
||||
&& connData.getQueueSamples().isEmpty())
|
||||
|| duration.toMillis() > deocderTimeOut
|
||||
|| !connData.getWebSocket().isOpen()) {
|
||||
|
||||
logger.info("close websocket!!!");
|
||||
|
||||
// delay close web socket as data may still in processing
|
||||
Timer timer = new Timer();
|
||||
timer.schedule(
|
||||
new TimerTask() {
|
||||
public void run() {
|
||||
|
||||
webSocket.close();
|
||||
}
|
||||
},
|
||||
5000); // 5 seconds
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,67 +1,67 @@
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaoming
|
||||
*/
|
||||
// java StreamThreadHandler
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineStream;
|
||||
import java.nio.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.java_websocket.WebSocket;
|
||||
// thread for processing stream
|
||||
|
||||
public class StreamThreadHandler extends Thread {
|
||||
// Queue between io network io thread pool and stream thread pool, use websocket as the key
|
||||
private LinkedBlockingQueue<WebSocket> streamQueue;
|
||||
// Queue waiting for deocdeing, use websocket as the key
|
||||
private LinkedBlockingQueue<WebSocket> decoderQueue;
|
||||
// mapping between websocket connection and connection data
|
||||
private ConcurrentHashMap<WebSocket, ConnectionData> connMap;
|
||||
|
||||
public StreamThreadHandler(
|
||||
LinkedBlockingQueue<WebSocket> streamQueue,
|
||||
LinkedBlockingQueue<WebSocket> decoderQueue,
|
||||
ConcurrentHashMap<WebSocket, ConnectionData> connMap) {
|
||||
this.streamQueue = streamQueue;
|
||||
this.decoderQueue = decoderQueue;
|
||||
this.connMap = connMap;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
// fetch one websocket from queue
|
||||
WebSocket conn = (WebSocket) this.streamQueue.take();
|
||||
// get the connection data according to websocket
|
||||
ConnectionData connData = connMap.get(conn);
|
||||
OnlineStream stream = connData.getStream();
|
||||
|
||||
// handle received binary data
|
||||
if (!connData.getQueueSamples().isEmpty()) {
|
||||
// loop to put all received binary data to stream
|
||||
while (!connData.getQueueSamples().isEmpty()) {
|
||||
|
||||
float[] samples = connData.getQueueSamples().poll();
|
||||
|
||||
stream.acceptWaveform(samples);
|
||||
}
|
||||
// if data is finished
|
||||
if (connData.getEof() == true) {
|
||||
|
||||
stream.inputFinished();
|
||||
}
|
||||
// add this websocket to decoder Queue if not in the Queue
|
||||
if (!decoderQueue.contains(conn)) {
|
||||
|
||||
decoderQueue.put(conn);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* // Copyright 2022-2023 by zhaoming
|
||||
*/
|
||||
// java StreamThreadHandler
|
||||
package websocketsrv;
|
||||
|
||||
import com.k2fsa.sherpa.onnx.OnlineStream;
|
||||
import java.nio.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.java_websocket.WebSocket;
|
||||
// thread for processing stream
|
||||
|
||||
public class StreamThreadHandler extends Thread {
|
||||
// Queue between io network io thread pool and stream thread pool, use websocket as the key
|
||||
private LinkedBlockingQueue<WebSocket> streamQueue;
|
||||
// Queue waiting for deocdeing, use websocket as the key
|
||||
private LinkedBlockingQueue<WebSocket> decoderQueue;
|
||||
// mapping between websocket connection and connection data
|
||||
private ConcurrentHashMap<WebSocket, ConnectionData> connMap;
|
||||
|
||||
public StreamThreadHandler(
|
||||
LinkedBlockingQueue<WebSocket> streamQueue,
|
||||
LinkedBlockingQueue<WebSocket> decoderQueue,
|
||||
ConcurrentHashMap<WebSocket, ConnectionData> connMap) {
|
||||
this.streamQueue = streamQueue;
|
||||
this.decoderQueue = decoderQueue;
|
||||
this.connMap = connMap;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
// fetch one websocket from queue
|
||||
WebSocket conn = (WebSocket) this.streamQueue.take();
|
||||
// get the connection data according to websocket
|
||||
ConnectionData connData = connMap.get(conn);
|
||||
OnlineStream stream = connData.getStream();
|
||||
|
||||
// handle received binary data
|
||||
if (!connData.getQueueSamples().isEmpty()) {
|
||||
// loop to put all received binary data to stream
|
||||
while (!connData.getQueueSamples().isEmpty()) {
|
||||
|
||||
float[] samples = connData.getQueueSamples().poll();
|
||||
|
||||
stream.acceptWaveform(samples);
|
||||
}
|
||||
// if data is finished
|
||||
if (connData.getEof() == true) {
|
||||
|
||||
stream.inputFinished();
|
||||
}
|
||||
// add this websocket to decoder Queue if not in the Queue
|
||||
if (!decoderQueue.contains(conn)) {
|
||||
|
||||
decoderQueue.put(conn);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user