Added Pursuit functionality (Untested and not fully implemented) + Global TrackedRequestSending check

This commit is contained in:
2025-07-09 03:27:23 +02:00
parent 8f884f6224
commit 03eb1132ef
22 changed files with 636 additions and 294 deletions

View File

@ -90,6 +90,38 @@ FGuid UDTFluxQueuedManager::QueueRequest(EDTFluxRequestType RequestType, int32 C
return NewRequest.RequestId;
}
bool UDTFluxQueuedManager::MarkRequestAsError(const FGuid& TargetRequestGuid)
{
// TODO: Implement a retry mechanism
// For now we simply suppress the request and log a message
bool bFoundMatch = false;
FDTFluxQueuedRequest Request;
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
while (PendingRequestsQueue.Dequeue(Request))
{
if (Request.RequestId == TargetRequestGuid)
{
UE_LOG(logDTFluxNetwork, Error,
TEXT("Marked request %s as error: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
else
{
TempQueue.Enqueue(Request);
}
}
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
if (bFoundMatch)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("No Request Found with GUID %s"), *TargetRequestGuid.ToString());
}
return true;
}
bool UDTFluxQueuedManager::MarkRequestAsResponded(const FGuid& TargetRequestGuid)
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
@ -134,7 +166,8 @@ bool UDTFluxQueuedManager::MarkRequestAsResponded(const FDTFluxQueuedRequest& Ta
return MarkRequestAsResponded(TargetRequest.RequestId);
}
bool UDTFluxQueuedManager::IsRequestPending(EDTFluxApiDataType RequestType, int32 ContestId, int32 StageId,
bool UDTFluxQueuedManager::IsRequestPending(FGuid& OutRequestId, EDTFluxApiDataType RequestType, int32 ContestId,
int32 StageId,
int32 SplitId)
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
@ -148,6 +181,11 @@ bool UDTFluxQueuedManager::IsRequestPending(EDTFluxApiDataType RequestType, int3
if (!bFoundMatch && Request.Matches(RequestType, ContestId, StageId, SplitId))
{
bFoundMatch = true;
OutRequestId = Request.RequestId;
UE_LOG(logDTFluxNetwork, Verbose,
TEXT("Found pending request %s: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
// Remettre dans la queue temporaire

View File

@ -370,13 +370,13 @@ bool FDTFluxServerResponse::ParseContestRanking(FDTFluxContestRankings& OutConte
bool FDTFluxServerResponse::ParseStageRankingResponse(FDTFluxStageRankings& OutStageRankings)
{
// UE_LOG(logDTFluxNetwork, Log, TEXT("Response is stage-ranking type %s"), *RawMessage);
if (!ValidateResponseType(TEXT("stage-ranking")))
{
UE_LOG(logDTFluxNetwork, Error, TEXT("Response is not a stage-ranking type"));
ParsingStatus = EDTFluxResponseStatus::InvalidType;
return false;
}
FDTFluxStageRankingResponse RankingResponse;
if (!FJsonObjectConverter::JsonObjectStringToUStruct<FDTFluxStageRankingResponse>(RawMessage, &RankingResponse))
{
@ -384,6 +384,7 @@ bool FDTFluxServerResponse::ParseStageRankingResponse(FDTFluxStageRankings& OutS
ParsingStatus = EDTFluxResponseStatus::JsonParseError;
return false;
}
UE_LOG(logDTFluxNetwork, Log, TEXT("Reponse Update"));
OutStageRankings.ContestId = ContestID;
OutStageRankings.StageId = StageID;

View File

@ -81,8 +81,9 @@ FGuid UDTFluxNetworkSubsystem::SendTrackedRequestWithCallback(
int32 ContestId,
int32 StageId,
int32 SplitId,
FOnDTFluxRequestResponse OnCompleted,
FOnDTFluxRequestTimeout OnTimeout,
FOnDTFluxTrackedRequestResponse OnCompleted,
FOnDTFluxTrackedRequestTimeout OnTimeout,
TOptional<FOnDTFluxRequestResponseError> OnError,
float TimeoutSeconds)
{
FGuid RequestId = SendTrackedRequest(RequestType, ContestId, StageId, SplitId, TimeoutSeconds);
@ -98,11 +99,16 @@ FGuid UDTFluxNetworkSubsystem::SendTrackedRequestWithCallback(
{
PendingTimeoutCallbacks.Add(RequestId, OnTimeout);
}
if (OnError.IsSet() && OnError.GetValue().IsBound())
{
PendingErrorCallbacks.Add(RequestId, OnError.GetValue());
}
}
return RequestId;
}
bool UDTFluxNetworkSubsystem::GetTrackedRequest(const FGuid& RequestId, FDTFluxQueuedRequest& OutRequest) const
{
if (!QueueManager)
@ -155,7 +161,8 @@ bool UDTFluxNetworkSubsystem::IsRequestPending(EDTFluxRequestType RequestType, i
{
return false;
}
return QueueManager->IsRequestPending(RequestType, ContestId, StageId, SplitId);
FGuid OutRequestId;
return QueueManager->IsRequestPending(OutRequestId, RequestType, ContestId, StageId, SplitId);
}
int32 UDTFluxNetworkSubsystem::GetPendingRequestCount() const
@ -583,8 +590,6 @@ void UDTFluxNetworkSubsystem::Parse(FDTFluxServerResponse& Response)
}
}
//TODO reforge API to keep track of Requests
void UDTFluxNetworkSubsystem::OnWebSocketMessageEvent_Subsystem(const FString& MessageString)
{
// UE_LOG(logDTFluxNetwork, Warning, TEXT("Client %s :\nMessage Received : %s"), *WsClient->GetAddress(), *MessageString);
@ -600,8 +605,11 @@ void UDTFluxNetworkSubsystem::OnWebSocketMessageEvent_Subsystem(const FString& M
UE_LOG(logDTFluxNetwork, Warning, TEXT("Not a push message"));
// Legacy
Parse(Response);
return;
}
}
// // if we are here we have a tracked Message
// QueueManager->MarkRequestAsResponded()
}
void UDTFluxNetworkSubsystem::OnWebSocketMessageSentEvent_Subsystem(const FString& MessageSent)
@ -609,6 +617,29 @@ void UDTFluxNetworkSubsystem::OnWebSocketMessageSentEvent_Subsystem(const FStrin
UE_LOG(logDTFluxNetwork, Warning, TEXT("Ws %s :\nMessage Sent: %s"), *WsClient->GetAddress(), *MessageSent);
}
bool UDTFluxNetworkSubsystem::CleanRequestCallbacks(const FGuid& RequestId)
{
bool bCbSuppressSuccess = false;
bool bErrorCbSuppressSuccess = false;
bool bTimeoutCbSuppressSuccess = false;
if (PendingCallbacks.Contains(RequestId))
{
PendingCallbacks.Remove(RequestId);
bCbSuppressSuccess = true;
}
if (PendingTimeoutCallbacks.Contains(RequestId))
{
PendingTimeoutCallbacks.Remove(RequestId);
bTimeoutCbSuppressSuccess = true;
}
if (PendingTimeoutCallbacks.Contains(RequestId))
{
PendingTimeoutCallbacks.Remove(RequestId);
bErrorCbSuppressSuccess = true;
}
return bCbSuppressSuccess && bErrorCbSuppressSuccess && bTimeoutCbSuppressSuccess;
}
void UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal(const FDTFluxQueuedRequest& TimedOutRequest)
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("Request %s timed out: Type=%d, Contest=%d, Stage=%d, Split=%d"),
@ -619,7 +650,7 @@ void UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal(const FDTFluxQueuedRequ
TimedOutRequest.SplitId);
// Appeler le callback de timeout si présent
if (FOnDTFluxRequestTimeout* TimeoutCallback = PendingTimeoutCallbacks.Find(TimedOutRequest.RequestId))
if (FOnDTFluxTrackedRequestTimeout* TimeoutCallback = PendingTimeoutCallbacks.Find(TimedOutRequest.RequestId))
{
if (TimeoutCallback->IsBound())
{
@ -635,32 +666,28 @@ void UDTFluxNetworkSubsystem::OnRequestTimedOut_Internal(const FDTFluxQueuedRequ
OnTrackedRequestFailed.Broadcast(TimedOutRequest.RequestId, TimedOutRequest.RequestType, TEXT("Request timeout"));
}
bool UDTFluxNetworkSubsystem::TryMatchResponseToQueuedRequest(const FDTFluxServerResponse& Response)
bool UDTFluxNetworkSubsystem::TryMatchResponseToQueuedRequest(FDTFluxServerResponse& Response)
{
if (!QueueManager)
{
return false;
}
// Essayer de trouver une requête correspondante
// Note: Cette méthode nécessiterait une modification de UDTFluxQueuedManager pour supporter le matching par type et paramètres
// Pour l'instant, on utilise une approche simple : chercher la première requête du bon type
// Vous devrez probablement modifier UDTFluxQueuedManager pour ajouter une méthode comme FindMatchingRequest()
// Implémentation temporaire : on assume qu'il n'y a qu'une requête de chaque type en cours
if (QueueManager->IsRequestPending(Response.GetResponseType(), Response.ContestID, Response.StageID,
FGuid FoundRequestId;
if (QueueManager->IsRequestPending(FoundRequestId, Response.GetResponseType(), Response.ContestID, Response.StageID,
Response.SplitID))
{
// Marquer comme répondu - vous devrez adapter cette méthode selon votre logique de matching
// Pour l'instant, on va faire un workaround simple
UE_LOG(logDTFluxNetwork, Log,
TEXT("Matched response to queued request: Type=%s, Contest=%d, Stage=%d, Split=%d"),
*UEnum::GetValueAsString(Response.GetResponseType()), Response.ContestID, Response.StageID,
Response.SplitID);
return true;
if (PendingCallbacks.Contains(FoundRequestId))
{
FOnDTFluxTrackedRequestResponse* SuccessCallback = PendingCallbacks.Find(FoundRequestId);
SuccessCallback->ExecuteIfBound(FoundRequestId, Response);
//Suppress Callback;
return CleanRequestCallbacks(FoundRequestId);
}
return QueueManager->MarkRequestAsResponded(FoundRequestId);
}
return false;
@ -676,13 +703,26 @@ void UDTFluxNetworkSubsystem::CompleteTrackedRequest(const FGuid& RequestId, con
}
// Appeler le callback de succès si présent
if (FOnDTFluxRequestResponse* SuccessCallback = PendingCallbacks.Find(RequestId))
if (FOnDTFluxTrackedRequestResponse* SuccessCallback = PendingCallbacks.Find(RequestId))
{
if (SuccessCallback->IsBound())
{
SuccessCallback->Execute(RequestId, ResponseData);
EDTFluxResponseStatus ResponseStatus;
FDTFluxServerResponse Response(ResponseData, ResponseStatus);
if (ResponseStatus == EDTFluxResponseStatus::Success)
{
SuccessCallback->Execute(RequestId, Response);
QueueManager->MarkRequestAsResponded(RequestId);
PendingCallbacks.Remove(RequestId);
PendingTimeoutCallbacks.Remove(RequestId);
}
else
{
QueueManager->MarkRequestAsError(RequestId);
// Fail
// FailTrackedRequest()
}
}
PendingCallbacks.Remove(RequestId);
}
// Nettoyer le callback de timeout
@ -698,11 +738,11 @@ void UDTFluxNetworkSubsystem::FailTrackedRequest(const FGuid& RequestId, const F
EDTFluxRequestType RequestType)
{
// Appeler le callback d'erreur si présent
if (FOnDTFluxRequestTimeout* ErrorCallback = PendingTimeoutCallbacks.Find(RequestId))
if (FOnDTFluxTrackedRequestTimeout* ErrorCallback = PendingTimeoutCallbacks.Find(RequestId))
{
if (ErrorCallback->IsBound())
{
ErrorCallback->Execute(RequestId, ErrorMessage);
ErrorCallback->ExecuteIfBound(RequestId, ErrorMessage);
}
PendingTimeoutCallbacks.Remove(RequestId);
}