Commit b8e9d3aa authored by Ishankha K.C's avatar Ishankha K.C

handle multiple websocket sessions in backend

parent ce6986dd
...@@ -4,69 +4,65 @@ import com.fasterxml.jackson.databind.ObjectMapper; ...@@ -4,69 +4,65 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.kaluwa.enterprises.babycarebackendservice.dto.ActivityLogDto; import com.kaluwa.enterprises.babycarebackendservice.dto.ActivityLogDto;
import com.kaluwa.enterprises.babycarebackendservice.service.ActivityLogService; import com.kaluwa.enterprises.babycarebackendservice.service.ActivityLogService;
import com.kaluwa.enterprises.babycarebackendservice.socketHandlers.EmotionPrediction; import com.kaluwa.enterprises.babycarebackendservice.socketHandlers.EmotionPrediction;
import org.springframework.beans.factory.annotation.Autowired; import lombok.Setter;
import org.springframework.scheduling.annotation.Scheduled; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.Executors; import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference;
import static com.kaluwa.enterprises.babycarebackendservice.config.WebSocketConfig.VideoFrameHandler.sendTextMessageToClient;
import static com.kaluwa.enterprises.babycarebackendservice.constants.Configs.WEBSOCKET_URL; import static com.kaluwa.enterprises.babycarebackendservice.constants.Configs.WEBSOCKET_URL;
import static com.kaluwa.enterprises.babycarebackendservice.constants.LogTypes.EMOTION; import static com.kaluwa.enterprises.babycarebackendservice.constants.LogTypes.EMOTION;
@Slf4j
public class WebSocketClient { public class WebSocketClient {
private WebSocketSession session; private ConcurrentHashMap<String, WebSocketSession> activeSessions = new ConcurrentHashMap<>();
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ActivityLogService activityLogService; private ActivityLogService activityLogService;
// Method to set the VideoFrameHandler reference
@Setter
private WebSocketConfig.VideoFrameHandler videoFrameHandler; // Reference to VideoFrameHandler
public WebSocketClient(ActivityLogService activityLogService) { public WebSocketClient(ActivityLogService activityLogService) {
this.activityLogService = activityLogService; this.activityLogService = activityLogService;
connectToWebSocket();
} }
private void connectToWebSocket() { public void connectToWebSocket(String deviceId) {
try { try {
this.session = new StandardWebSocketClient() WEBSOCKET_URL = WEBSOCKET_URL.replace("{device_id}", deviceId);
WebSocketSession session = new StandardWebSocketClient()
.doHandshake(new MyWebSocketHandler(this), WEBSOCKET_URL) .doHandshake(new MyWebSocketHandler(this), WEBSOCKET_URL)
.get(); .get();
System.out.println("Connected to WebSocket!"); activeSessions.put(deviceId, session); // Store the session
log.info("Connected to WebSocket for device: {}", deviceId);
} catch (Exception e) { } catch (Exception e) {
System.out.println("Failed to connect to WebSocket, scheduling reconnection..."); log.error("Failed to connect to WebSocket for device: {}", deviceId);
scheduleReconnection(); // e.printStackTrace();
} }
} }
public void sendBytesToWebSocket(byte[] bytes) { public void disconnectWebSocket(String deviceId) {
WebSocketSession session = activeSessions.remove(deviceId); // Remove session from the map
if (session != null && session.isOpen()) {
try { try {
session.sendMessage(new BinaryMessage(bytes)); session.close(); // Gracefully close the session
} catch (Exception e) { log.info("WebSocket connection closed successfully for device: {}", deviceId);
e.printStackTrace(); } catch (IOException e) {
} log.error("Error while closing WebSocket session for device: {}, Error: {}", deviceId, e.getMessage());
} }
} else {
private void scheduleReconnection() { log.error("No active WebSocket connection to close for device: {}", deviceId);
scheduler.schedule(this::connectToWebSocket, 2, TimeUnit.MINUTES);
}
@Scheduled(fixedDelay = 60000) // Check every 1 minutes
private void checkConnection() {
if (session == null || !session.isOpen()) {
System.out.println("WebSocket connection is closed. Reconnecting...");
connectToWebSocket();
} }
} }
static ObjectMapper objectMapper = new ObjectMapper(); static ObjectMapper objectMapper = new ObjectMapper();
public static EmotionPrediction[] predictions; public static EmotionPrediction[] predictions;
@Slf4j
static class MyWebSocketHandler extends TextWebSocketHandler { static class MyWebSocketHandler extends TextWebSocketHandler {
private final WebSocketClient webSocketClient; private final WebSocketClient webSocketClient;
...@@ -77,6 +73,13 @@ public class WebSocketClient { ...@@ -77,6 +73,13 @@ public class WebSocketClient {
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String deviceId = null;
for (String key : webSocketClient.activeSessions.keySet()) {
if (webSocketClient.activeSessions.get(key).getId().equals(session.getId())) {
deviceId = key;
break;
}
}
// This method will be called when the server sends a text message // This method will be called when the server sends a text message
// Deserialize JSON array of objects into an array of Java objects // Deserialize JSON array of objects into an array of Java objects
predictions = objectMapper.readValue(message.getPayload(), EmotionPrediction[].class); predictions = objectMapper.readValue(message.getPayload(), EmotionPrediction[].class);
...@@ -86,22 +89,28 @@ public class WebSocketClient { ...@@ -86,22 +89,28 @@ public class WebSocketClient {
System.out.println("Last emotion prediction: " + lastPrediction); System.out.println("Last emotion prediction: " + lastPrediction);
// Save the last prediction to the database // Save the last prediction to the database
if (lastPrediction.getEmotion() != null && !lastPrediction.getEmotion().isEmpty()) { if (lastPrediction.getEmotion() != null && !lastPrediction.getEmotion().isEmpty() && !lastPrediction.getError()) {
ActivityLogDto activityLogDto = new ActivityLogDto(); ActivityLogDto activityLogDto = new ActivityLogDto();
activityLogDto.setActivityLogType(EMOTION); activityLogDto.setActivityLogType(EMOTION);
activityLogDto.setActivityLogDescription(lastPrediction.getEmotion()); activityLogDto.setActivityLogDescription(lastPrediction.getEmotion());
webSocketClient.activityLogService.saveActivityLog(activityLogDto); webSocketClient.activityLogService.saveActivityLog(activityLogDto);
} }
// Send the last prediction to the client // Send the last prediction to the client using VideoFrameHandler
sendTextMessageToClient(lastPrediction); if (webSocketClient.videoFrameHandler != null && deviceId != null) {
webSocketClient.videoFrameHandler.sendTextMessageToClient(deviceId, lastPrediction);
}
} }
} }
private String getDeviceIdFromSession(WebSocketSession session) {
String uri = session.getUri().toString();
return uri.substring(uri.lastIndexOf('/') + 1); // Extract device_id from the URI path
}
@Override @Override
public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) { public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) {
System.out.println("WebSocket connection closed. Status: " + status); log.info("WebSocket connection closed. Status: {}", status);
webSocketClient.scheduleReconnection();
} }
} }
} }
...@@ -24,6 +24,9 @@ import org.springframework.web.socket.handler.BinaryWebSocketHandler; ...@@ -24,6 +24,9 @@ import org.springframework.web.socket.handler.BinaryWebSocketHandler;
import springfox.documentation.spring.web.json.Json; import springfox.documentation.spring.web.json.Json;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import static com.kaluwa.enterprises.babycarebackendservice.constants.Configs.SERVER_WS_PATH_TO_ANDROID;
@Configuration @Configuration
@Slf4j @Slf4j
...@@ -37,7 +40,9 @@ public class WebSocketConfig implements WebSocketConfigurer { ...@@ -37,7 +40,9 @@ public class WebSocketConfig implements WebSocketConfigurer {
@Override @Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new VideoFrameHandler(), "/emotional/video-process").setAllowedOrigins("*"); VideoFrameHandler videoFrameHandler = new VideoFrameHandler(customWebSocketClient());
customWebSocketClient().setVideoFrameHandler(videoFrameHandler);
registry.addHandler(videoFrameHandler, SERVER_WS_PATH_TO_ANDROID);
} }
@Bean @Bean
...@@ -48,12 +53,20 @@ public class WebSocketConfig implements WebSocketConfigurer { ...@@ -48,12 +53,20 @@ public class WebSocketConfig implements WebSocketConfigurer {
@Component @Component
class VideoFrameHandler extends BinaryWebSocketHandler { class VideoFrameHandler extends BinaryWebSocketHandler {
private static WebSocketSession currentSession; private ConcurrentHashMap<String, WebSocketSession> activeSessions = new ConcurrentHashMap<>();
private final WebSocketClient webSocketClient;
@Autowired
public VideoFrameHandler(WebSocketClient webSocketClient) {
this.webSocketClient = webSocketClient;
}
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
currentSession = session; // Store the current session String deviceId = getDeviceIdFromSession(session);
log.info("Connection established with client: {}", session); activeSessions.put(deviceId, session); // Store the session in the map
webSocketClient.connectToWebSocket(deviceId);
log.info("Connection established with client for device: {}", deviceId);
super.afterConnectionEstablished(session); super.afterConnectionEstablished(session);
} }
...@@ -63,22 +76,29 @@ public class WebSocketConfig implements WebSocketConfigurer { ...@@ -63,22 +76,29 @@ public class WebSocketConfig implements WebSocketConfigurer {
// Process the binary data as needed // Process the binary data as needed
// Send binary data back to the client // Send binary data back to the client
sendBinaryMessageToClient(binaryData); sendBinaryMessageToClient(session, binaryData);
// Optionally, send a text message // Optionally, send a text message
EmotionPrediction emotionPrediction = new EmotionPrediction(); // Empty object EmotionPrediction emotionPrediction = new EmotionPrediction(); // Empty object
sendTextMessageToClient(emotionPrediction);
String deviceId = getDeviceIdFromSession(session);
sendTextMessageToClient(deviceId, emotionPrediction);
} }
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
if (currentSession == session) { String deviceId = getDeviceIdFromSession(session);
currentSession = null; // Reset session variable activeSessions.remove(deviceId); // Remove session from the map
} webSocketClient.disconnectWebSocket(deviceId);
log.info("Connection closed with client: {}", session); log.info("Connection closed with client: {}", session);
super.afterConnectionClosed(session, status); super.afterConnectionClosed(session, status);
} }
private String getDeviceIdFromSession(WebSocketSession session) {
String uri = session.getUri().toString();
return uri.substring(uri.lastIndexOf('/') + 1); // Extract device_id from the URI path
}
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) { protected void handleTextMessage(WebSocketSession session, TextMessage message) {
super.handleTextMessage(session, message); super.handleTextMessage(session, message);
...@@ -90,28 +110,29 @@ public class WebSocketConfig implements WebSocketConfigurer { ...@@ -90,28 +110,29 @@ public class WebSocketConfig implements WebSocketConfigurer {
} }
// Method to send a binary message back to the client // Method to send a binary message back to the client
public void sendBinaryMessageToClient(byte[] bytes) { public void sendBinaryMessageToClient(WebSocketSession session, byte[] bytes) {
try { try {
if (currentSession != null && currentSession.isOpen()) { if (session.isOpen()) {
currentSession.sendMessage(new BinaryMessage(bytes)); session.sendMessage(new BinaryMessage(bytes));
System.out.println("Sent binary message to client: " + bytes.length); log.info("Sent binary message to client: {}", bytes.length);
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error("Error sending binary message: {}", e.getMessage());
} }
} }
// Method to send a text message back to the client // Method to send a text message back to the client
public static void sendTextMessageToClient(EmotionPrediction emotionPrediction) { public void sendTextMessageToClient(String deviceId, EmotionPrediction emotionPrediction) {
try { try {
if (currentSession != null && currentSession.isOpen()) { WebSocketSession session = activeSessions.get(deviceId);
if (session.isOpen()) {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(emotionPrediction); String json = objectMapper.writeValueAsString(emotionPrediction);
currentSession.sendMessage(new TextMessage(json)); session.sendMessage(new TextMessage(json));
System.out.println("Sent text message to client: " + json); log.info("Sent text message to client: {}", json);
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error("Error sending text message: {}", e.getMessage());
} }
} }
} }
......
package com.kaluwa.enterprises.babycarebackendservice.constants; package com.kaluwa.enterprises.babycarebackendservice.constants;
public class Configs { public class Configs {
public static final String WEBSOCKET_URL = "ws://localhost:8000/ws/emotion"; public static String WEBSOCKET_URL = "ws://localhost:8000/ws/emotion/{device_id}";
public static final String SERVER_WS_PATH_TO_ANDROID = "/emotional/video-process/{device_id}";
} }
...@@ -7,5 +7,6 @@ public class TableNames { ...@@ -7,5 +7,6 @@ public class TableNames {
public final static String DOCUMENT_TABLE = "documents"; public final static String DOCUMENT_TABLE = "documents";
public final static String ACTIVITY_LOG_TABLE = "activity_logs"; public final static String ACTIVITY_LOG_TABLE = "activity_logs";
public final static String CONTACT_INFO_TABLE = "contact_info"; public final static String CONTACT_INFO_TABLE = "contact_info";
public final static String DEVICES_TABLE = "devices";
} }
...@@ -6,4 +6,7 @@ import org.springframework.stereotype.Repository; ...@@ -6,4 +6,7 @@ import org.springframework.stereotype.Repository;
@Repository @Repository
public interface BabyDao extends JpaRepository<Baby, Long> { public interface BabyDao extends JpaRepository<Baby, Long> {
boolean existsByDeviceUidAndBabyIdNot(String deviceUid, Long babyId);
boolean existsByDeviceUid(String deviceUid);
} }
package com.kaluwa.enterprises.babycarebackendservice.dao;
import com.kaluwa.enterprises.babycarebackendservice.model.Devices;
import org.springframework.data.jpa.repository.JpaRepository;
public interface DevicesDao extends JpaRepository<Devices, Long> {
boolean existsByDeviceUid(String deviceUid);
}
\ No newline at end of file
...@@ -26,6 +26,7 @@ public class BabyDto { ...@@ -26,6 +26,7 @@ public class BabyDto {
@NotNull(message = "Baby's gender is required") @NotNull(message = "Baby's gender is required")
@NotEmpty(message = "Baby's gender is required") @NotEmpty(message = "Baby's gender is required")
private String sex; private String sex;
private String deviceUid;
private String status = STATUS_NEW; private String status = STATUS_NEW;
private Boolean isActive; private Boolean isActive;
private Float weight; private Float weight;
......
...@@ -22,6 +22,7 @@ public class Baby { ...@@ -22,6 +22,7 @@ public class Baby {
private String lastName; private String lastName;
private LocalDate dob; private LocalDate dob;
private String sex; private String sex;
private String deviceUid;
private String status; private String status;
private Boolean isActive; private Boolean isActive;
private Float weight; private Float weight;
......
package com.kaluwa.enterprises.babycarebackendservice.model;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.kaluwa.enterprises.babycarebackendservice.constants.TableNames.DEVICES_TABLE;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = DEVICES_TABLE)
public class Devices {
@Id
@GeneratedValue(strategy= GenerationType.AUTO)
private Long deviceId;
private String deviceUid;
}
package com.kaluwa.enterprises.babycarebackendservice.service.impl; package com.kaluwa.enterprises.babycarebackendservice.service.impl;
import com.kaluwa.enterprises.babycarebackendservice.dao.BabyDao; import com.kaluwa.enterprises.babycarebackendservice.dao.BabyDao;
import com.kaluwa.enterprises.babycarebackendservice.dao.DevicesDao;
import com.kaluwa.enterprises.babycarebackendservice.dao.DocumentDao; import com.kaluwa.enterprises.babycarebackendservice.dao.DocumentDao;
import com.kaluwa.enterprises.babycarebackendservice.dto.BabyDto; import com.kaluwa.enterprises.babycarebackendservice.dto.BabyDto;
import com.kaluwa.enterprises.babycarebackendservice.dto.ResponseDto; import com.kaluwa.enterprises.babycarebackendservice.dto.ResponseDto;
...@@ -32,12 +33,14 @@ public class BabyServiceImpl implements BabyService { ...@@ -32,12 +33,14 @@ public class BabyServiceImpl implements BabyService {
private final BabyMapper babyMapper; private final BabyMapper babyMapper;
private final DocumentDao documentDao; private final DocumentDao documentDao;
private final DocumentService documentService; private final DocumentService documentService;
private final DevicesDao devicesDao;
public BabyServiceImpl(BabyDao babyDao, BabyMapper babyMapper, DocumentDao documentDao, DocumentService documentService) { public BabyServiceImpl(BabyDao babyDao, BabyMapper babyMapper, DocumentDao documentDao, DocumentService documentService, DevicesDao devicesDao) {
this.babyDao = babyDao; this.babyDao = babyDao;
this.babyMapper = babyMapper; this.babyMapper = babyMapper;
this.documentDao = documentDao; this.documentDao = documentDao;
this.documentService = documentService; this.documentService = documentService;
this.devicesDao = devicesDao;
} }
@Override @Override
...@@ -45,6 +48,26 @@ public class BabyServiceImpl implements BabyService { ...@@ -45,6 +48,26 @@ public class BabyServiceImpl implements BabyService {
log.info("Inside baby service createBaby method"); log.info("Inside baby service createBaby method");
try { try {
babyDto.setUniqKey(uniqKeyGenerator()); babyDto.setUniqKey(uniqKeyGenerator());
if (!babyDto.getDeviceUid().isEmpty() && !devicesDao.existsByDeviceUid(babyDto.getDeviceUid())) {
throw new BadRequestAlertException("Device is not registered in the system by uid " + babyDto.getDeviceUid(), "baby", "baby.error");
}
if (!babyDto.getDeviceUid().isEmpty() && devicesDao.existsByDeviceUid(babyDto.getDeviceUid())) {
if (babyDao.existsByDeviceUid(babyDto.getDeviceUid())) {
throw new BadRequestAlertException("Device is already assigned to another baby", "baby", "baby.error");
}
}
if (babyDto.getFirstName().isEmpty()) {
throw new BadRequestAlertException("Baby's first name is required", "baby", "baby.error");
}
if (babyDto.getDob() == null) {
throw new BadRequestAlertException("Baby's date of birth is required", "baby", "baby.error");
}
if (babyDto.getSex().isEmpty()) {
throw new BadRequestAlertException("Baby's gender is required", "baby", "baby.error");
}
if (babyDto.getUserId() == null) {
throw new BadRequestAlertException("User id is required", "baby", "baby.error");
}
return babyMapper.toDto(babyDao.save(babyMapper.toEntity(babyDto))); return babyMapper.toDto(babyDao.save(babyMapper.toEntity(babyDto)));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -98,13 +121,34 @@ public class BabyServiceImpl implements BabyService { ...@@ -98,13 +121,34 @@ public class BabyServiceImpl implements BabyService {
Optional<Baby> babyOp = babyDao.findById(babyId); Optional<Baby> babyOp = babyDao.findById(babyId);
if (babyDto.getBabyId() == null) { if (babyDto.getBabyId() == null) {
throw new BadRequestAlertException("Baby id is required", "baby", "baby.error"); throw new BadRequestAlertException("Baby id is required", "baby", "baby.error");
} else if (!babyId.equals(babyDto.getBabyId())) { }
if (!babyId.equals(babyDto.getBabyId())) {
throw new BadRequestAlertException("Baby id mismatch", "baby", "baby.error"); throw new BadRequestAlertException("Baby id mismatch", "baby", "baby.error");
} else if (babyOp.isEmpty()) { }
if (babyOp.isEmpty()) {
throw new BadRequestAlertException("Baby not found", "baby", "baby.error"); throw new BadRequestAlertException("Baby not found", "baby", "baby.error");
} else {
return babyMapper.toDto(babyDao.save(babyMapper.toEntity(babyDto)));
} }
if (!babyDto.getDeviceUid().isEmpty() && !devicesDao.existsByDeviceUid(babyDto.getDeviceUid())) {
throw new BadRequestAlertException("Device is not registered in the system by uid " + babyDto.getDeviceUid(), "baby", "baby.error");
}
if (!babyDto.getDeviceUid().isEmpty() && devicesDao.existsByDeviceUid(babyDto.getDeviceUid())) {
if (babyDao.existsByDeviceUidAndBabyIdNot(babyDto.getDeviceUid(), babyId)) {
throw new BadRequestAlertException("Device is already assigned to another baby", "baby", "baby.error");
}
}
if (babyDto.getFirstName().isEmpty()) {
throw new BadRequestAlertException("Baby's first name is required", "baby", "baby.error");
}
if (babyDto.getDob() == null) {
throw new BadRequestAlertException("Baby's date of birth is required", "baby", "baby.error");
}
if (babyDto.getSex().isEmpty()) {
throw new BadRequestAlertException("Baby's gender is required", "baby", "baby.error");
}
if (babyDto.getUserId() == null) {
throw new BadRequestAlertException("User id is required", "baby", "baby.error");
}
return babyMapper.toDto(babyDao.save(babyMapper.toEntity(babyDto)));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw new BadRequestAlertException(e.getMessage(), "baby", "baby.error"); throw new BadRequestAlertException(e.getMessage(), "baby", "baby.error");
......
...@@ -10,4 +10,5 @@ import lombok.NoArgsConstructor; ...@@ -10,4 +10,5 @@ import lombok.NoArgsConstructor;
public class EmotionPrediction { public class EmotionPrediction {
private String emotion; private String emotion;
private BoundingBox bounding_box; private BoundingBox bounding_box;
private Boolean error;
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment