Add TrackedRequest implementation in DTFluxCoreSubsystem and various Modifications and bugfixes.

This commit is contained in:
2025-07-11 23:45:23 +02:00
parent f1f300a351
commit d92ca63ea4
8 changed files with 199 additions and 142 deletions

View File

@ -120,6 +120,7 @@ void FDTFluxAsyncParser::ParseResponseAsync(
OnFailed.ExecuteIfBound(RequestId, TEXT("Empty JSON data"));
return;
}
UE_LOG(logDTFluxNetwork, Verbose, TEXT("Starting async parsing for request %s"), *RequestId.ToString());
// Créer la tâche de parsing
FGraphEventRef Task = FFunctionGraphTask::CreateAndDispatchWhenReady(

View File

@ -255,6 +255,7 @@ bool FDTFluxQueuedRequestManager::MarkRequestAsSent(const FGuid& RequestId)
bool FDTFluxQueuedRequestManager::CompleteRequest(const FGuid& RequestId, const FString& RawResponseData,
bool bUseAsyncParsing)
{
UE_LOG(logDTFluxNetwork, Log, TEXT("FDTFluxQueuedRequestManager::CompleteRequest() %s"), *RequestId.ToString());
TSharedPtr<FDTFluxTrackedRequest> Request;
{
@ -274,7 +275,8 @@ bool FDTFluxQueuedRequestManager::CompleteRequest(const FGuid& RequestId, const
// Stocker la réponse brute
Request->SetRawResponse(RawResponseData);
Request->CompletedAt = FDateTime::Now();
UE_LOG(logDTFluxNetwork, Log, TEXT("Request %s completed at %s"), *RequestId.ToString(),
*Request->CompletedAt.ToString());
// Décider du parsing selon les callbacks et la configuration
bool bHasCallbacks = false;
{
@ -284,6 +286,12 @@ bool FDTFluxQueuedRequestManager::CompleteRequest(const FGuid& RequestId, const
if (bHasCallbacks && bUseAsyncParsing && !RawResponseData.IsEmpty())
{
UE_LOG(logDTFluxNetwork, Log,
TEXT("Request %s [bHasCallbacks=%s], [bUseAsyncParsing=%s], [bIsRawResponseEmpty=%s]"),
*RequestId.ToString(),
bHasCallbacks ? TEXT("true") : TEXT("false"), bUseAsyncParsing ? TEXT("true") : TEXT("false"),
RawResponseData.IsEmpty() ? TEXT("true") : TEXT("false"));
// Parsing asynchrone pour les callbacks
FOnParsingCompleted OnCompleted = FOnParsingCompleted::CreateRaw(
this, &FDTFluxQueuedRequestManager::OnParsingCompleted
@ -294,12 +302,13 @@ bool FDTFluxQueuedRequestManager::CompleteRequest(const FGuid& RequestId, const
);
AsyncParser->ParseResponseAsync(RequestId, RawResponseData, OnCompleted, OnFailed);
// CleanupCallbacks(RequestId);
UE_LOG(logDTFluxNetwork, Verbose, TEXT("Started async parsing for request %s"), *RequestId.ToString());
return true;
}
else
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("request %s completed without sync"), *RequestId.ToString());
// Compléter immédiatement sans parsing ou avec parsing sync
EDTFluxRequestState NewState = Request->Config.bEnableCache
? EDTFluxRequestState::Cached
@ -799,28 +808,55 @@ void FDTFluxQueuedRequestManager::RecordCacheMiss() const
void FDTFluxQueuedRequestManager::OnParsingCompleted(const FGuid& RequestId,
TSharedPtr<FDTFluxServerResponse> ParsedResponse, bool bSuccess)
{
FScopeLock Lock(&RequestsLock);
UE_LOG(logDTFluxNetwork, Log, TEXT("FDTFluxQueuedRequestManager::OnParsingCompleted() request %s"),
*RequestId.ToString())
TSharedPtr<FDTFluxTrackedRequest> Request;
{
FScopeLock Lock(&RequestsLock);
auto* RequestPtr = AllRequests.Find(RequestId);
if (!RequestPtr || !RequestPtr->IsValid())
return;
auto* RequestPtr = AllRequests.Find(RequestId);
if (!RequestPtr || !RequestPtr->IsValid())
{
UE_LOG(logDTFluxNetwork, Error,
TEXT(
"DTFluxQueuedRequestManager::OnParsingCompleted() RequestId%s [InvalidRequestId=%s], [RequestPtrValid=%s]"
),
*RequestId.ToString(), RequestPtr ? TEXT("true") : TEXT("false"),
RequestPtr->IsValid() ? TEXT("true") : TEXT("false"));
return;
}
auto Request = *RequestPtr;
Request = *RequestPtr;
}
if (bSuccess && ParsedResponse.IsValid())
{
Request->ParsedResponse = ParsedResponse;
Request->bIsResponseParsed = true;
EDTFluxRequestState NewState = Request->Config.bEnableCache
? EDTFluxRequestState::Cached
: EDTFluxRequestState::Completed;
UE_LOG(logDTFluxNetwork, VeryVerbose,
ChangeRequestState(Request, NewState);
if (Request->Config.bEnableCache)
{
FScopeLock Lock(&RequestsLock);
CacheKeyToRequestId.Add(Request->GetCacheKey(), RequestId);
}
UE_LOG(logDTFluxNetwork, Log,
TEXT("DTFluxQueuedRequestManager: Async parsing completed for request %s"),
*RequestId.ToString());
}
else
{
UE_LOG(logDTFluxNetwork, Warning, TEXT("DTFluxQueuedRequestManager: Async parsing failed for request %s"),
*RequestId.ToString());
Request->LastErrorMessage = TEXT("Async parsing failed");
ChangeRequestState(Request, EDTFluxRequestState::Failed);
UE_LOG(logDTFluxNetwork, Error, TEXT("Async parsing failed for request %s"), *RequestId.ToString());
}
// ✅ FIX: Déclencher les callbacks maintenant !
TriggerCallbacks(*Request);
CleanupCallbacks(RequestId);
}
void FDTFluxQueuedRequestManager::OnParsingFailed(const FGuid& RequestId, const FString& ErrorMessage)
@ -828,6 +864,7 @@ void FDTFluxQueuedRequestManager::OnParsingFailed(const FGuid& RequestId, const
UE_LOG(logDTFluxNetwork, Error, TEXT("DTFluxQueuedRequestManager: Async parsing failed for request %s: %s"),
*RequestId.ToString(),
*ErrorMessage);
FailRequest(RequestId, FString::Printf(TEXT("Parsing failed: %s"), *ErrorMessage));
}
FString FDTFluxQueuedRequestManager::GenerateCacheKey(EDTFluxApiDataType RequestType, int32 ContestId, int32 StageId,

View File

@ -1,6 +1,7 @@
#pragma once
#include "Struct/DTFluxServerResponseStruct.h"
#include "DTFluxNetworkModule.h"
#include "Types/Objects/UDTFluxParticipantFactory.h"
#include "Struct/DTFluxServerResponseStruct.h"
// === IMPLÉMENTATION DES CONSTRUCTEURS ===
@ -194,7 +195,7 @@ FString FDTFluxServerResponse::ToDebugString() const
*UEnum::GetValueAsString(ParsingStatus), *Type, Code, ContestID, StageID, SplitID, *ReceivedAt.ToString());
}
bool FDTFluxServerResponse::ParseTeamListResponse(FDTFluxTeamListDefinition& OutTeamList)
bool FDTFluxServerResponse::ParseTeamList(FDTFluxTeamListDefinition& OutTeamList)
{
ParsingStatus = EDTFluxResponseStatus::Unset;
if (!ValidateResponseType(TEXT("team-list")))
@ -256,9 +257,9 @@ bool FDTFluxServerResponse::ParseTeamListResponse(FDTFluxTeamListDefinition& Out
return false;
}
bool FDTFluxServerResponse::ParseTeamUpdateResponse(FDTFluxTeamListDefinition& OutTeamUpdate)
bool FDTFluxServerResponse::ParseTeamUpdate(FDTFluxTeamListDefinition& OutTeamUpdate)
{
return ParseTeamListResponse(OutTeamUpdate);
return ParseTeamList(OutTeamUpdate);
}
bool FDTFluxServerResponse::ParseRaceData(FDTFluxRaceData& OutRaceData)
@ -368,7 +369,7 @@ bool FDTFluxServerResponse::ParseContestRanking(FDTFluxContestRankings& OutConte
return true;
}
bool FDTFluxServerResponse::ParseStageRankingResponse(FDTFluxStageRankings& OutStageRankings)
bool FDTFluxServerResponse::ParseStageRanking(FDTFluxStageRankings& OutStageRankings)
{
// UE_LOG(logDTFluxNetwork, Log, TEXT("Response is stage-ranking type %s"), *RawMessage);
if (!ValidateResponseType(TEXT("stage-ranking")))
@ -396,7 +397,7 @@ bool FDTFluxServerResponse::ParseStageRankingResponse(FDTFluxStageRankings& OutS
return true;
}
bool FDTFluxServerResponse::ParseSplitRankingResponse(FDTFluxSplitRankings& OutSplitRankings)
bool FDTFluxServerResponse::ParseSplitRanking(FDTFluxSplitRankings& OutSplitRankings)
{
if (!ValidateResponseType(TEXT("stage-ranking")) || SplitID == -1)
{
@ -427,7 +428,7 @@ bool FDTFluxServerResponse::ParseSplitRankingResponse(FDTFluxSplitRankings& OutS
return true;
}
bool FDTFluxServerResponse::ParseStatusUpdateResponse(FDTFluxTeamStatusUpdate& OutStatusUpdate)
bool FDTFluxServerResponse::ParseStatusUpdate(FDTFluxTeamStatusUpdate& OutStatusUpdate)
{
if (!ValidateResponseType(TEXT("status-update")))
{
@ -448,7 +449,7 @@ bool FDTFluxServerResponse::ParseStatusUpdateResponse(FDTFluxTeamStatusUpdate& O
return true;
}
bool FDTFluxServerResponse::ParseSplitSensorResponse(TArray<FDTFluxSplitSensorInfo>& OutSplitSensorInfos)
bool FDTFluxServerResponse::ParseSplitSensor(TArray<FDTFluxSplitSensorInfo>& OutSplitSensorInfos)
{
if (!ValidateResponseType(TEXT("split-sensor")))
{

View File

@ -444,7 +444,9 @@ bool UDTFluxNetworkSubsystem::TryMatchResponseToQueuedRequest(const FString& Mes
// Utiliser le parsing asynchrone pour les réponses volumineuses
bool bUseAsyncParsing = ShouldUseAsyncParsing(MessageString);
RequestManager->CompleteRequest(FoundRequestId, MessageString, bUseAsyncParsing);
if (RequestManager->CompleteRequest(FoundRequestId, MessageString, bUseAsyncParsing))
{
}
UE_LOG(logDTFluxNetwork, Log, TEXT("Matched response to tracked request %s (async=%s)"),
*FoundRequestId.ToString(), bUseAsyncParsing ? TEXT("true") : TEXT("false"));
@ -521,7 +523,12 @@ void UDTFluxNetworkSubsystem::OnRequestCompleted_Internal(const FDTFluxTrackedRe
CompletedRequest.RawResponseData
);
UE_LOG(logDTFluxNetwork, Log, TEXT("Tracked request completed: %s"), *CompletedRequest.RequestId.ToString());
if (CompletedRequest.ParsedResponse.IsSet())
{
UE_LOG(logDTFluxNetwork, Log, TEXT("Tracked About to process : %s"), *CompletedRequest.RequestId.ToString());
TSharedPtr<FDTFluxServerResponse> Response = CompletedRequest.ParsedResponse.GetValue();
ProcessParsedResponse(Response);
}
}
void UDTFluxNetworkSubsystem::OnRequestFailed_Internal(const FDTFluxTrackedRequest& FailedRequest)
@ -571,7 +578,7 @@ void UDTFluxNetworkSubsystem::ReconnectWs(const FName WsClientId)
void UDTFluxNetworkSubsystem::ParseTeamListResponse(FDTFluxServerResponse& Response)
{
FDTFluxTeamListDefinition TeamListDefinition;
Response.ParseTeamListResponse(TeamListDefinition);
Response.ParseTeamList(TeamListDefinition);
if (Response.GetParsingStatus() == EDTFluxResponseStatus::Success)
{
@ -620,7 +627,7 @@ void UDTFluxNetworkSubsystem::ParseContestRanking(FDTFluxServerResponse& Respons
void UDTFluxNetworkSubsystem::ParseStageRankingResponse(FDTFluxServerResponse& Response)
{
FDTFluxStageRankings StageRankings;
Response.ParseStageRankingResponse(StageRankings);
Response.ParseStageRanking(StageRankings);
if (Response.GetParsingStatus() == EDTFluxResponseStatus::Success)
{
@ -637,7 +644,7 @@ void UDTFluxNetworkSubsystem::ParseStageRankingResponse(FDTFluxServerResponse& R
void UDTFluxNetworkSubsystem::ParseSplitRankingResponse(FDTFluxServerResponse& Response)
{
FDTFluxSplitRankings SplitRankings;
Response.ParseSplitRankingResponse(SplitRankings);
Response.ParseSplitRanking(SplitRankings);
if (Response.GetParsingStatus() == EDTFluxResponseStatus::Success)
{
@ -654,7 +661,7 @@ void UDTFluxNetworkSubsystem::ParseSplitRankingResponse(FDTFluxServerResponse& R
void UDTFluxNetworkSubsystem::ParseStatusUpdateResponse(FDTFluxServerResponse& Response)
{
FDTFluxTeamStatusUpdate StatusUpdate;
Response.ParseStatusUpdateResponse(StatusUpdate);
Response.ParseStatusUpdate(StatusUpdate);
if (Response.GetParsingStatus() == EDTFluxResponseStatus::Success)
{
@ -671,7 +678,7 @@ void UDTFluxNetworkSubsystem::ParseStatusUpdateResponse(FDTFluxServerResponse& R
void UDTFluxNetworkSubsystem::ParseSplitSensorResponse(FDTFluxServerResponse& Response)
{
TArray<FDTFluxSplitSensorInfo> SplitSensorInfos;
Response.ParseSplitSensorResponse(SplitSensorInfos);
Response.ParseSplitSensor(SplitSensorInfos);
if (Response.GetParsingStatus() == EDTFluxResponseStatus::Success)
{
@ -702,7 +709,7 @@ EDTFluxResponseStatus UDTFluxNetworkSubsystem::ProcessPushMessage(FDTFluxServerR
case EDTFluxApiDataType::SplitSensor:
{
TArray<FDTFluxSplitSensorInfo> SplitSensorInfos;
if (Response.ParseSplitSensorResponse(SplitSensorInfos))
if (Response.ParseSplitSensor(SplitSensorInfos))
{
for (const auto& SplitSensorInfo : SplitSensorInfos)
{
@ -715,7 +722,7 @@ EDTFluxResponseStatus UDTFluxNetworkSubsystem::ProcessPushMessage(FDTFluxServerR
case EDTFluxApiDataType::StatusUpdate:
{
FDTFluxTeamStatusUpdate StatusUpdate;
if (Response.ParseStatusUpdateResponse(StatusUpdate))
if (Response.ParseStatusUpdate(StatusUpdate))
{
OnTeamStatusUpdateReceived.ExecuteIfBound(StatusUpdate);
}
@ -725,7 +732,7 @@ EDTFluxResponseStatus UDTFluxNetworkSubsystem::ProcessPushMessage(FDTFluxServerR
case EDTFluxApiDataType::TeamUpdate:
{
FDTFluxTeamListDefinition TeamUpdateList;
if (Response.ParseTeamUpdateResponse(TeamUpdateList))
if (Response.ParseTeamUpdate(TeamUpdateList))
{
OnTeamUpdateReceived.ExecuteIfBound(TeamUpdateList);
}