Files
DTFluxAPI/Source/DTFluxNetwork/Private/Subsystems/DTFluxNetworkSubsystem.cpp

793 lines
26 KiB
C++

// Fill out your copyright notice in the Description page of Project Settings.
#include "Subsystems/DTFluxNetworkSubsystem.h"
#include "DTFluxCoreModule.h"
#include "DTFluxNetworkModule.h"
#include "DTFluxNetworkSettings.h"
#include "DTFluxQueuedManager.h"
#include "DTFluxQueuedManager.h"
#include "JsonObjectConverter.h"
#include "Clients/DTFluxHttpClient.h"
#include "Clients/DTFluxWebSocketClient.h"
#include "Struct/DTFluxServerResponseStruct.h"
#include "Struct/DTFluxRequestStructs.h"
#include "Struct/DTFluxRaceDataServerResponse.h"
#include "Struct/DTFluxRankingServerResponse.h"
#include "Struct/DTFluxSplitSensorServerResponse.h"
#include "Struct/DTFluxTeamListServerResponse.h"
#include "Types/Objects/UDTFluxParticipantFactory.h"
#include "Types/Struct/DTFluxRaceDataStructs.h"
#include "Types/Struct/DTFluxSplitSensor.h"
// === CONNEXION WEBSOCKET ===
void UDTFluxNetworkSubsystem::Connect()
{
WsClient->SetAddress(ConstructWsAddress(WsSettings.Address, WsSettings.Path, WsSettings.Port));
WsClient->Connect();
}
void UDTFluxNetworkSubsystem::Disconnect()
{
WsClient->Disconnect();
}
void UDTFluxNetworkSubsystem::Reconnect()
{
ReconnectWs(FName("Ws_Client_0"));
}
// === REQUÊTES AVEC TRACKING ===
FGuid UDTFluxNetworkSubsystem::SendTrackedRequest(
EDTFluxApiDataType RequestType,
int32 ContestId,
int32 StageId,
int32 SplitId,
float TimeoutSeconds)
{
if (!QueueManager)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("QueueManager is not initialized"));
return FGuid();
}
// Vérifier si une requête similaire est déjà en cours (optionnel)
if (IsRequestPending(RequestType, ContestId, StageId, SplitId))
{
UE_LOG(logDTFluxNetwork, Warning,
TEXT("Similar request already pending: Type=%d, Contest=%d, Stage=%d, Split=%d"),
(int32)RequestType, ContestId, StageId, SplitId);
}
// Créer et enqueue la requête
FGuid RequestId = QueueManager->QueueRequest(RequestType, ContestId, StageId, SplitId);
// Envoyer immédiatement si possible (le QueueManager gère la queue)
if (const FDTFluxQueuedRequest* QueuedRequest = QueueManager->GetRequest(RequestId))
{
SendQueuedRequest(*QueuedRequest);
}
UE_LOG(logDTFluxNetwork, Log, TEXT("Queued tracked request %s: Type=%d, Contest=%d, Stage=%d, Split=%d"),
*RequestId.ToString(), (int32)RequestType, ContestId, StageId, SplitId);
return RequestId;
}
FGuid UDTFluxNetworkSubsystem::SendTrackedRequestWithCallback(
EDTFluxApiDataType RequestType,
int32 ContestId,
int32 StageId,
int32 SplitId,
FOnDTFluxTrackedRequestResponse OnCompleted,
FOnDTFluxTrackedRequestTimeout OnTimeout,
TOptional<FOnDTFluxRequestResponseError> OnError,
float TimeoutSeconds)
{
FGuid RequestId = SendTrackedRequest(RequestType, ContestId, StageId, SplitId, TimeoutSeconds);
if (RequestId.IsValid())
{
// Stocker les callbacks pour cette requête
if (OnCompleted.IsBound())
{
PendingCallbacks.Add(RequestId, OnCompleted);
}
if (OnTimeout.IsBound())
{
PendingTimeoutCallbacks.Add(RequestId, OnTimeout);
}
if (OnError.IsSet() && OnError.GetValue().IsBound())
{
PendingErrorCallbacks.Add(RequestId, OnError.GetValue());
}
}
return RequestId;
}
bool UDTFluxNetworkSubsystem::GetTrackedRequest(const FGuid& RequestId, FDTFluxQueuedRequest& OutRequest) const
{
if (!QueueManager)
{
return false;
}
const FDTFluxQueuedRequest* Request = QueueManager->GetRequest(RequestId);
if (Request)
{
OutRequest = *Request;
return true;
}
return false;
}
const FDTFluxQueuedRequest* UDTFluxNetworkSubsystem::GetTrackedRequestPtr(const FGuid& RequestId) const
{
if (!QueueManager)
{
return nullptr;
}
return QueueManager->GetRequest(RequestId);
}
bool UDTFluxNetworkSubsystem::HasRequestReceivedResponse(const FGuid& RequestId) const
{
FDTFluxQueuedRequest Request;
if (GetTrackedRequest(RequestId, Request))
{
return Request.bHasReceivedResponse;
}
return false;
}
FString UDTFluxNetworkSubsystem::GetRequestResponseData(const FGuid& RequestId) const
{
FDTFluxQueuedRequest Request;
if (GetTrackedRequest(RequestId, Request))
{
return Request.RawResponse;
}
return FString();
}
bool UDTFluxNetworkSubsystem::IsRequestPending(EDTFluxRequestType RequestType, int32 ContestId, int32 StageId,
int32 SplitId) const
{
if (!QueueManager)
{
return false;
}
FGuid OutRequestId;
return QueueManager->IsRequestPending(OutRequestId, RequestType, ContestId, StageId, SplitId);
}
int32 UDTFluxNetworkSubsystem::GetPendingRequestCount() const
{
if (!QueueManager)
{
return 0;
}
return QueueManager->GetPendingRequestCount();
}
UDTFluxQueuedManager* UDTFluxNetworkSubsystem::GetQueueManager() const
{
return QueueManager;
}
void UDTFluxNetworkSubsystem::SendRequest(const EDTFluxRequestType RequestType, int InContestId, int InStageId,
int InSplitId)
{
FString Message;
switch (RequestType)
{
case EDTFluxRequestType::ContestRanking:
FJsonObjectConverter::UStructToJsonObjectString(FDTFluxContestRankingRequest(InContestId), Message);
break;
case EDTFluxRequestType::StageRanking:
FJsonObjectConverter::UStructToJsonObjectString(FDTFluxStageRankingRequest(InContestId, InStageId), Message);
break;
case EDTFluxRequestType::SplitRanking:
FJsonObjectConverter::UStructToJsonObjectString(FDTFluxSplitRankingRequest(InContestId, InStageId, InSplitId),
Message);
break;
case EDTFluxRequestType::TeamList:
FJsonObjectConverter::UStructToJsonObjectString(FDTFluxTeamListRequest(), Message);
break;
case EDTFluxRequestType::RaceData:
FJsonObjectConverter::UStructToJsonObjectString(FDTFluxRaceDataRequest(), Message);
break;
default:
return;
}
//Dirty trick to fix Case
Message = Message.Replace(TEXT("Id"),TEXT("ID"), ESearchCase::CaseSensitive);
UE_LOG(logDTFluxCore, Warning, TEXT("Sending Request %s"), *Message);
SendMessage(Message);
}
void UDTFluxNetworkSubsystem::SendMessage(const FString& Message)
{
UE_LOG(logDTFluxCore, Warning, TEXT("Sending Message %s"), *Message);
if (WsClient.IsValid() && WsClient->CanSend())
{
WsClient->Send(Message);
UE_LOG(logDTFluxNetwork, Log, TEXT("Can send request"));
}
else
{
UE_LOG(logDTFluxNetwork, Error, TEXT("[Websocket Not Connected]. Connect before sending requests..."));
}
}
void UDTFluxNetworkSubsystem::Initialize(FSubsystemCollectionBase& Collection)
{
Super::Initialize(Collection);
FDTFluxCoreModule& DTFluxCore = FModuleManager::Get().LoadModuleChecked<FDTFluxCoreModule>("DTFluxCore");
UDTFluxNetworkSettings* NetworkSettings = GetMutableDefault<UDTFluxNetworkSettings>();
UDTFluxNetworkSettings::GetWebSocketSettings(NetworkSettings, WsSettings);
UDTFluxNetworkSettings::GetHTTPSettings(NetworkSettings, HttpSettings);
WsClient = MakeShareable<FDTFluxWebSocketClient>(new FDTFluxWebSocketClient());
HttpClient = MakeShareable<FDTFluxHttpClient>(new FDTFluxHttpClient());
RegisterWebSocketEvents();
RegisterHttpEvents();
#if WITH_EDITOR
NetworkSettings->OnDTFluxWebSocketSettingsChanged.AddUFunction(this, FName("WsSettingsChanged"));
NetworkSettings->OnDTFluxHttpSettingsChanged.AddUFunction(this, FName("HttpSettingsChanged"));
#endif
if (WsSettings.bShouldConnectAtStartup)
{
WsClient->SetAddress(ConstructWsAddress(WsSettings.Address, WsSettings.Path, WsSettings.Port));
WsClient->Connect();
}
// Initialisation du Queue Manager
QueueManager = NewObject<UDTFluxQueuedManager>(this);
QueueManager->Initialize();
// Connexion au delegate de timeout du Queue Manager
QueueManager->OnRequestTimedOut.AddDynamic(this, &UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal);
}
void UDTFluxNetworkSubsystem::Deinitialize()
{
Super::Deinitialize();
// Nettoyer le Queue Manager
if (QueueManager)
{
QueueManager->OnRequestTimedOut.RemoveDynamic(this, &UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal);
QueueManager->ClearAllRequests();
}
// Nettoyer les callbacks
PendingCallbacks.Empty();
PendingTimeoutCallbacks.Empty();
// Déconnexion des clients
UnregisterWebSocketEvents();
UnregisterHttpEvents();
}
void UDTFluxNetworkSubsystem::WsSettingsChanged(const FDTFluxWsSettings& NewWsSettings)
{
// TODO Implement a ClientSelector To retrieve impacted WsClients and populate changes or maybe create a delegate
bool bNeedsReload = WsSettings != NewWsSettings;
WsSettings = NewWsSettings;
if (bNeedsReload || WsSettings.bShouldConnectAtStartup)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("WSocket Settings needs Reloding client"))
ReconnectWs(FName("Ws_Client_0"));
}
}
void UDTFluxNetworkSubsystem::HttpSettingsChanged(const FDTFluxHttpSettings& NewHttpSettings)
{
// TODO Implement a ClientSelector To retrieve impacted HttpClients and populate changes or maybe create a delegate
HttpSettings = NewHttpSettings;
}
void UDTFluxNetworkSubsystem::ReconnectWs(const FName WsClientId)
{
FString NewAddress = ConstructWsAddress(WsSettings.Address, WsSettings.Path, WsSettings.Port);
WsClient->SetAddress(NewAddress);
WsClient->Reconnect();
}
void UDTFluxNetworkSubsystem::ReconnectHttp(const FName WsClientId)
{
}
void UDTFluxNetworkSubsystem::RegisterWebSocketEvents()
{
OnWsConnectedEventDelegateHandle =
WsClient->RegisterConnectedEvent().AddUObject(this, &UDTFluxNetworkSubsystem::OnWebSocketConnected_Subsystem);
OnWsConnectionErrorEventDelegateHandle =
WsClient->RegisterConnectionError()
.AddUObject(this, &UDTFluxNetworkSubsystem::OnWebSocketConnectionError_Subsystem);
OnWsClosedEventDelegateHandle =
WsClient->RegisterClosedEvent()
.AddUObject(this, &UDTFluxNetworkSubsystem::OnWebSocketClosedEvent_Subsystem);
OnWsMessageEventDelegateHandle =
WsClient->RegisterMessageEvent()
.AddUObject(this, &UDTFluxNetworkSubsystem::OnWebSocketMessageEvent_Subsystem);
OnWsMessageSentEventDelegateHandle =
WsClient->RegisterMessageSentEvent()
.AddUObject(this, &UDTFluxNetworkSubsystem::OnWebSocketMessageSentEvent_Subsystem);
}
void UDTFluxNetworkSubsystem::RegisterHttpEvents()
{
}
void UDTFluxNetworkSubsystem::UnregisterWebSocketEvents()
{
if (OnWsConnectedEventDelegateHandle.IsValid())
{
WsClient->UnregisterConnectedEvent().Remove(OnWsConnectedEventDelegateHandle);
}
if (OnWsConnectionErrorEventDelegateHandle.IsValid())
{
WsClient->UnregisterConnectionError().Remove(OnWsConnectionErrorEventDelegateHandle);
}
if (OnWsClosedEventDelegateHandle.IsValid())
{
WsClient->UnregisterClosedEvent().Remove(OnWsClosedEventDelegateHandle);
}
if (OnWsMessageEventDelegateHandle.IsValid())
{
WsClient->UnregisterMessageEvent().Remove(OnWsMessageEventDelegateHandle);
}
if (OnWsMessageSentEventDelegateHandle.IsValid())
{
WsClient->UnregisterRawMessageEvent().Remove(OnWsMessageSentEventDelegateHandle);
}
}
void UDTFluxNetworkSubsystem::UnregisterHttpEvents()
{
}
void UDTFluxNetworkSubsystem::OnWebSocketConnected_Subsystem()
{
WsStatus = EDTFluxConnectionStatus::Connected;
OnWebSocketConnected.Broadcast();
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws Is Connected with %s"), *WsClient->GetAddress())
}
void UDTFluxNetworkSubsystem::OnWebSocketConnectionError_Subsystem(const FString& Error)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws Error with %s : %s"), *WsClient->GetAddress(), *Error);
WsStatus = EDTFluxConnectionStatus::Error;
if (WsSettings.bShouldAutoReconnectOnError)
{
WsClient->Reconnect();
}
}
void UDTFluxNetworkSubsystem::OnWebSocketClosedEvent_Subsystem(int32 StatusCode, const FString& Reason, bool bWasClean)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws Error with %s :\n Reason : %s \tStatusCode : %i, bWasClean : %s"),
*WsClient->GetAddress(), *Reason, StatusCode, bWasClean ? TEXT("True") : TEXT("False"));
WsStatus = EDTFluxConnectionStatus::Closed;
}
void UDTFluxNetworkSubsystem::ParseTeamListResponse(FDTFluxServerResponse& Response)
{
FDTFluxTeamListDefinition TeamListDefinition;
Response.ParseTeamListResponse(TeamListDefinition);
UE_LOG(logDTFluxNetwork, Warning, TEXT("Parsing Team List Response"));
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseTeamListResponse() for JSON Response : %s"), *Response.RawMessage);
return;
}
UE_LOG(logDTFluxNetwork, Warning, TEXT("PArsing OK. Sending to Core..."));
const bool bIsSuccessfullyBounded = OnTeamListReceived.ExecuteIfBound(TeamListDefinition);
UE_LOG(logDTFluxNetwork, Warning, TEXT("Inserting %i Participants [%s]"), TeamListDefinition.Participants.Num(),
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseRaceData(FDTFluxServerResponse& Response)
{
FDTFluxRaceData RaceData;
Response.ParseRaceData(RaceData);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseRaceData() for JSON Response : %s"), *Response.RawMessage);
return;
}
const bool bIsSuccessfullyBounded = OnRaceDataReceived.ExecuteIfBound(RaceData);
UE_LOG(logDTFluxNetwork, Warning, TEXT("Sending %i Contests, [%s]"), RaceData.Datas.Num(),
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseContestRanking(FDTFluxServerResponse& Response)
{
FDTFluxContestRankings ContestRankings;
Response.ParseContestRanking(ContestRankings);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseContestRanking() for JSON Response : %s"), *Response.RawMessage);
return;
}
const bool bIsSuccessfullyBounded = OnContestRankingReceived.ExecuteIfBound(ContestRankings);
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws ContestRanking Data Sent for Contest %i, [%s]"),
ContestRankings.ContestId,
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseStageRankingResponse(FDTFluxServerResponse& Response)
{
FDTFluxStageRankings StageRankings;
Response.ParseStageRankingResponse(StageRankings);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseStageRankingResponse() for JSON Response : %s"),
*Response.RawMessage);
}
const bool bIsSuccessfullyBounded = OnStageRankingReceived.ExecuteIfBound(StageRankings);
UE_LOG(logDTFluxNetwork, Warning, TEXT("StageRanking Data Sent for Contest %i, Stage %i\n[Result] : %s"),
StageRankings.ContestId, StageRankings.StageId,
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseSplitRankingResponse(FDTFluxServerResponse& Response)
{
FDTFluxSplitRankings SplitRankings;
Response.ParseSplitRankingResponse(SplitRankings);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseSplitRankingResponse() for JSON Response : %s"),
*Response.RawMessage);
}
const bool bIsSuccessfullyBounded = OnSplitRankingReceived.ExecuteIfBound(SplitRankings);
UE_LOG(logDTFluxNetwork, Warning, TEXT("SplitRanking Data Sent for Contest %i, Stage %i, Split %i\n[Result] : %s"),
SplitRankings.ContestId, SplitRankings.StageId, SplitRankings.SplitId,
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseStatusUpdateResponse(FDTFluxServerResponse& Response)
{
FDTFluxTeamStatusUpdate StatusUpdate;
Response.ParseStatusUpdateResponse(StatusUpdate);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseStatusUpdateResponse() for JSON Response : %s"),
*Response.RawMessage);
}
const bool bIsSuccessfullyBounded = OnTeamStatusUpdateReceived.ExecuteIfBound(StatusUpdate);
UE_LOG(logDTFluxNetwork, Warning, TEXT("StatusUpdate Data Sent for Bib %i with new status %s\n[Result] : %s"),
StatusUpdate.Bib, *UEnum::GetValueAsString(StatusUpdate.Status),
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
void UDTFluxNetworkSubsystem::ParseSplitSensorResponse(FDTFluxServerResponse& Response)
{
TArray<FDTFluxSplitSensorInfo> SplitSensorInfos = TArray<FDTFluxSplitSensorInfo>();
Response.ParseSplitSensorResponse(SplitSensorInfos);
if (Response.GetParsingStatus() != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("ParseSplitSensorResponse() for JSON Response : %s"),
*Response.RawMessage);
}
for (auto& SplitSensorInfo : SplitSensorInfos)
{
const bool bIsSuccessfullyBounded = OnSplitSensorReceived.ExecuteIfBound(SplitSensorInfo);
UE_LOG(logDTFluxNetwork, Warning,
TEXT("SplitSensor Data Sent for Bib %i on [Split %i] of [Stage %i] in [Contest %i]\n[Result] : %s"),
SplitSensorInfo.Bib, SplitSensorInfo.SplitId, SplitSensorInfo.StageId, SplitSensorInfo.ContestId,
bIsSuccessfullyBounded ? TEXT("SUCCESS_SEND") : TEXT("NOT_BOUNDED"));
}
}
EDTFluxResponseStatus UDTFluxNetworkSubsystem::ProcessPushMessage(FDTFluxServerResponse& Response)
{
EDTFluxResponseStatus ResponseStatus = EDTFluxResponseStatus::UnknownError;
if (DTFluxDataTypeUtils::IsPushOnly(Response.GetResponseType()))
{
switch (Response.GetResponseType())
{
case EDTFluxApiDataType::SplitSensor:
{
TArray<FDTFluxSplitSensorInfo> SplitSensorInfos;
if (Response.ParseSplitSensorResponse(SplitSensorInfos))
{
for (const auto& SplitSensorInfo : SplitSensorInfos)
{
OnSplitSensorReceived.ExecuteIfBound(SplitSensorInfo);
}
}
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::StatusUpdate:
{
FDTFluxTeamStatusUpdate StatusUpdate;
if (Response.ParseStatusUpdateResponse(StatusUpdate))
{
OnTeamStatusUpdateReceived.ExecuteIfBound(StatusUpdate);
}
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::TeamUpdate:
{
FDTFluxTeamListDefinition TeamUpdateList;
if (Response.ParseTeamUpdateResponse(TeamUpdateList))
{
OnTeamUpdateReceived.ExecuteIfBound(TeamUpdateList);
}
ResponseStatus = Response.GetParsingStatus();
break;
}
default:
{
ResponseStatus = EDTFluxResponseStatus::UnknownError;
break;
}
}
}
return ResponseStatus;
}
void UDTFluxNetworkSubsystem::Parse(FDTFluxServerResponse& Response)
{
EDTFluxResponseStatus ResponseStatus = EDTFluxResponseStatus::Success;
switch (Response.GetResponseType())
{
case EDTFluxApiDataType::RaceData:
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Legacy Parsing RaceData"));
ParseRaceData(Response);
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::TeamList:
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Legacy Parsing TeamList"));
ParseTeamListResponse(Response);
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::ContestRanking:
{
ParseContestRanking(Response);
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::StageRanking:
{
ParseStageRankingResponse(Response);
ResponseStatus = Response.GetParsingStatus();
break;
}
case EDTFluxApiDataType::SplitRanking:
{
ParseSplitRankingResponse(Response);
ResponseStatus = Response.GetParsingStatus();
break;
}
default:
{
UE_LOG(logDTFluxNetwork, Error, TEXT("Legacy Parsing Unknown"));
ResponseStatus = EDTFluxResponseStatus::UnknownError;
break;
}
}
if (ResponseStatus != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("UDTFluxNetworkSubsystem::Parse() Parsing failed"));
}
}
void UDTFluxNetworkSubsystem::OnWebSocketMessageEvent_Subsystem(const FString& MessageString)
{
// UE_LOG(logDTFluxNetwork, Warning, TEXT("Client %s :\nMessage Received : %s"), *WsClient->GetAddress(), *MessageString);
//Do Something With the message
EDTFluxResponseStatus ResponseStatus;
FDTFluxServerResponse Response(MessageString, ResponseStatus);
if (!TryMatchResponseToQueuedRequest(Response))
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Response %s does not match any queued request"),
*UEnum::GetValueAsString(Response.GetResponseType()));
if (ProcessPushMessage(Response) != EDTFluxResponseStatus::Success)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Not a push message"));
// Legacy
Parse(Response);
return;
}
}
// // if we are here we have a tracked Message
// QueueManager->MarkRequestAsResponded()
}
void UDTFluxNetworkSubsystem::OnWebSocketMessageSentEvent_Subsystem(const FString& MessageSent)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws %s :\nMessage Sent: %s"), *WsClient->GetAddress(), *MessageSent);
}
bool UDTFluxNetworkSubsystem::CleanRequestCallbacks(const FGuid& RequestId)
{
bool bCbSuppressSuccess = false;
bool bErrorCbSuppressSuccess = false;
bool bTimeoutCbSuppressSuccess = false;
if (PendingCallbacks.Contains(RequestId))
{
PendingCallbacks.Remove(RequestId);
bCbSuppressSuccess = true;
}
if (PendingTimeoutCallbacks.Contains(RequestId))
{
PendingTimeoutCallbacks.Remove(RequestId);
bTimeoutCbSuppressSuccess = true;
}
if (PendingTimeoutCallbacks.Contains(RequestId))
{
PendingTimeoutCallbacks.Remove(RequestId);
bErrorCbSuppressSuccess = true;
}
return bCbSuppressSuccess && bErrorCbSuppressSuccess && bTimeoutCbSuppressSuccess;
}
void UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal(const FDTFluxQueuedRequest& TimedOutRequest)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Request %s timed out: Type=%d, Contest=%d, Stage=%d, Split=%d"),
*TimedOutRequest.RequestId.ToString(),
(int32)TimedOutRequest.RequestType,
TimedOutRequest.ContestId,
TimedOutRequest.StageId,
TimedOutRequest.SplitId);
// Appeler le callback de timeout si présent
if (FOnDTFluxTrackedRequestTimeout* TimeoutCallback = PendingTimeoutCallbacks.Find(TimedOutRequest.RequestId))
{
if (TimeoutCallback->IsBound())
{
TimeoutCallback->Execute(TimedOutRequest.RequestId, TEXT("Request timeout"));
}
PendingTimeoutCallbacks.Remove(TimedOutRequest.RequestId);
}
// Nettoyer les callbacks de succès aussi
PendingCallbacks.Remove(TimedOutRequest.RequestId);
// Broadcaster l'événement Blueprint
OnTrackedRequestFailed.Broadcast(TimedOutRequest.RequestId, TimedOutRequest.RequestType, TEXT("Request timeout"));
}
bool UDTFluxNetworkSubsystem::TryMatchResponseToQueuedRequest(FDTFluxServerResponse& Response)
{
if (!QueueManager)
{
return false;
}
FGuid FoundRequestId;
if (QueueManager->IsRequestPending(FoundRequestId, Response.GetResponseType(), Response.ContestID, Response.StageID,
Response.SplitID))
{
UE_LOG(logDTFluxNetwork, Log,
TEXT("Matched response to queued request: Type=%s, Contest=%d, Stage=%d, Split=%d"),
*UEnum::GetValueAsString(Response.GetResponseType()), Response.ContestID, Response.StageID,
Response.SplitID);
if (PendingCallbacks.Contains(FoundRequestId))
{
FOnDTFluxTrackedRequestResponse* SuccessCallback = PendingCallbacks.Find(FoundRequestId);
SuccessCallback->ExecuteIfBound(FoundRequestId, Response);
//Suppress Callback;
return CleanRequestCallbacks(FoundRequestId);
}
return QueueManager->MarkRequestAsResponded(FoundRequestId);
}
return false;
}
void UDTFluxNetworkSubsystem::CompleteTrackedRequest(const FGuid& RequestId, const FString& ResponseData,
EDTFluxRequestType RequestType)
{
// Marquer la requête comme ayant reçu une réponse
if (QueueManager)
{
QueueManager->MarkRequestAsResponded(RequestId);
}
// Appeler le callback de succès si présent
if (FOnDTFluxTrackedRequestResponse* SuccessCallback = PendingCallbacks.Find(RequestId))
{
if (SuccessCallback->IsBound())
{
EDTFluxResponseStatus ResponseStatus;
FDTFluxServerResponse Response(ResponseData, ResponseStatus);
if (ResponseStatus == EDTFluxResponseStatus::Success)
{
SuccessCallback->Execute(RequestId, Response);
QueueManager->MarkRequestAsResponded(RequestId);
PendingCallbacks.Remove(RequestId);
PendingTimeoutCallbacks.Remove(RequestId);
}
else
{
QueueManager->MarkRequestAsError(RequestId);
// Fail
// FailTrackedRequest()
}
}
}
// Nettoyer le callback de timeout
PendingTimeoutCallbacks.Remove(RequestId);
// Broadcaster l'événement Blueprint
OnTrackedRequestCompleted.Broadcast(RequestId, RequestType, ResponseData);
UE_LOG(logDTFluxNetwork, Log, TEXT("Completed tracked request %s"), *RequestId.ToString());
}
void UDTFluxNetworkSubsystem::FailTrackedRequest(const FGuid& RequestId, const FString& ErrorMessage,
EDTFluxRequestType RequestType)
{
// Appeler le callback d'erreur si présent
if (FOnDTFluxTrackedRequestTimeout* ErrorCallback = PendingTimeoutCallbacks.Find(RequestId))
{
if (ErrorCallback->IsBound())
{
ErrorCallback->ExecuteIfBound(RequestId, ErrorMessage);
}
PendingTimeoutCallbacks.Remove(RequestId);
}
// Nettoyer les callbacks
PendingCallbacks.Remove(RequestId);
// Broadcaster l'événement Blueprint
OnTrackedRequestFailed.Broadcast(RequestId, RequestType, ErrorMessage);
UE_LOG(logDTFluxNetwork, Error, TEXT("Failed tracked request %s: %s"), *RequestId.ToString(), *ErrorMessage);
}
void UDTFluxNetworkSubsystem::SendQueuedRequest(const FDTFluxQueuedRequest& QueuedRequest)
{
// Générer le message JSON à partir de la requête
FString Message = QueuedRequest.Serialize();
if (Message.IsEmpty())
{
UE_LOG(logDTFluxNetwork, Error, TEXT("Failed to serialize queued request %s"),
*QueuedRequest.RequestId.ToString());
FailTrackedRequest(QueuedRequest.RequestId, TEXT("Serialization failed"), QueuedRequest.RequestType);
return;
}
// Dirty trick to fix Case (comme dans l'original)
Message = Message.Replace(TEXT("Id"), TEXT("ID"), ESearchCase::CaseSensitive);
UE_LOG(logDTFluxNetwork, Log, TEXT("Sending queued request %s: %s"), *QueuedRequest.RequestId.ToString(), *Message);
// Envoyer via WebSocket
SendMessage(Message);
}
FString UDTFluxNetworkSubsystem::ConstructWsAddress(const FString& Address, const FString& Path, const int& Port)
{
FString NewAddress;
if (!Address.Contains("ws://") && !Address.Contains("wss://"))
{
NewAddress += FString("ws://");
}
NewAddress += Address + FString(":") + FString::FromInt(Port) + Path;
return NewAddress;
// UE_LOG(logDTFluxNetwork, Log, TEXT("NewAddress : %s"), *NewAddress);
}