Skip to content

Commit 893058a

Browse files
authored
Revert PR #1736 due to critical WebSocket subscription bug (#1750)
1 parent 4fde042 commit 893058a

File tree

14 files changed

+73
-795
lines changed

14 files changed

+73
-795
lines changed

Cargo.lock

Lines changed: 0 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ members = [
55
"apps/freenet-ping/app",
66
"apps/freenet-ping/types",
77
"apps/freenet-ping/contracts/ping",
8-
"tests/test-contract-integration",
9-
"tests/test-contract-update-nochange"
8+
"tests/test-contract-integration"
109
]
1110

1211
[workspace.dependencies]

crates/core/src/client_events/mod.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -434,28 +434,26 @@ async fn process_open_request(
434434
?data,
435435
"Starting update op",
436436
);
437-
let update_response = op_manager
437+
let new_state = match op_manager
438438
.notify_contract_handler(ContractHandlerEvent::UpdateQuery {
439439
key,
440440
data,
441441
related_contracts: related_contracts.clone(),
442442
})
443-
.await?;
444-
445-
let new_state = match update_response {
446-
ContractHandlerEvent::UpdateResponse {
443+
.await
444+
{
445+
Ok(ContractHandlerEvent::UpdateResponse {
447446
new_value: Ok(new_val),
448-
} => Ok(new_val),
449-
ContractHandlerEvent::UpdateResponse {
447+
}) => Ok(new_val),
448+
Ok(ContractHandlerEvent::UpdateResponse {
450449
new_value: Err(err),
451-
} => Err(OpError::from(err)),
452-
ContractHandlerEvent::UpdateNoChange { key } => {
453-
// This should not happen anymore since we now return UpdateResponse
454-
// from the contract handler even for NoChange cases
455-
tracing::warn!(%key, "Unexpected UpdateNoChange event - this should have been converted to UpdateResponse");
456-
return Err(OpError::UnexpectedOpState.into());
450+
}) => Err(OpError::from(err)),
451+
Ok(ContractHandlerEvent::UpdateNoChange { key }) => {
452+
tracing::debug!(%key, "update with no change, do not start op");
453+
return Ok(None);
457454
}
458-
_ => return Err(OpError::UnexpectedOpState.into()),
455+
Err(err) => Err(err.into()),
456+
Ok(_) => Err(OpError::UnexpectedOpState),
459457
}
460458
.inspect_err(|err| tracing::error!(%key, "update query failed: {}", err))?;
461459

crates/core/src/client_events/websocket.rs

Lines changed: 4 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -552,25 +552,7 @@ async fn process_host_response(
552552
HostResponse::Ok => "HostResponse::Ok",
553553
_ => "Unknown",
554554
};
555-
556-
// Enhanced logging for UPDATE responses
557-
match &res {
558-
HostResponse::ContractResponse(ContractResponse::UpdateResponse {
559-
key,
560-
summary,
561-
}) => {
562-
tracing::info!(
563-
"[UPDATE_DEBUG] Processing UpdateResponse for WebSocket delivery - client: {}, key: {}, summary length: {}",
564-
id,
565-
key,
566-
summary.size()
567-
);
568-
}
569-
_ => {
570-
tracing::debug!(response = %res, response_type, cli_id = %id, "sending response");
571-
}
572-
}
573-
555+
tracing::debug!(response = %res, response_type, cli_id = %id, "sending response");
574556
match res {
575557
HostResponse::ContractResponse(ContractResponse::GetResponse {
576558
key,
@@ -590,64 +572,14 @@ async fn process_host_response(
590572
Err(err)
591573
}
592574
};
593-
// Log when UPDATE response is about to be sent over WebSocket
594-
let is_update_response = match &result {
595-
Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
596-
key,
597-
..
598-
})) => {
599-
tracing::info!(
600-
"[UPDATE_DEBUG] About to serialize UpdateResponse for WebSocket delivery - client: {}, key: {}",
601-
client_id,
602-
key
603-
);
604-
Some(*key)
605-
}
606-
_ => None,
607-
};
608-
609575
let serialized_res = match encoding_protoc {
610576
EncodingProtocol::Flatbuffers => match result {
611577
Ok(res) => res.into_fbs_bytes()?,
612578
Err(err) => err.into_fbs_bytes()?,
613579
},
614580
EncodingProtocol::Native => bincode::serialize(&result)?,
615581
};
616-
617-
// Log serialization completion for UPDATE responses
618-
if let Some(key) = is_update_response {
619-
tracing::info!(
620-
"[UPDATE_DEBUG] Serialized UpdateResponse for WebSocket delivery - client: {}, key: {}, size: {} bytes",
621-
client_id,
622-
key,
623-
serialized_res.len()
624-
);
625-
}
626-
627-
let send_result = tx.send(Message::Binary(serialized_res)).await;
628-
629-
// Log WebSocket send result for UPDATE responses
630-
if let Some(key) = is_update_response {
631-
match &send_result {
632-
Ok(()) => {
633-
tracing::info!(
634-
"[UPDATE_DEBUG] Successfully sent UpdateResponse over WebSocket to client {} for key {}",
635-
client_id,
636-
key
637-
);
638-
}
639-
Err(err) => {
640-
tracing::error!(
641-
"[UPDATE_DEBUG] Failed to send UpdateResponse over WebSocket to client {} for key {}: {:?}",
642-
client_id,
643-
key,
644-
err
645-
);
646-
}
647-
}
648-
}
649-
650-
send_result?;
582+
tx.send(Message::Binary(serialized_res)).await?;
651583
Ok(None)
652584
}
653585
Some(HostCallbackResult::SubscriptionChannel { key, id, callback }) => {
@@ -695,88 +627,20 @@ impl ClientEventsProxy for WebSocketProxy {
695627
result: Result<HostResponse, ClientError>,
696628
) -> BoxFuture<Result<(), ClientError>> {
697629
async move {
698-
// Log UPDATE responses specifically
699-
match &result {
700-
Ok(HostResponse::ContractResponse(freenet_stdlib::client_api::ContractResponse::UpdateResponse { key, summary })) => {
701-
tracing::info!(
702-
"[UPDATE_DEBUG] WebSocket send() called with UpdateResponse for client {} - key: {}, summary length: {}",
703-
id,
704-
key,
705-
summary.size()
706-
);
707-
}
708-
Ok(other_response) => {
709-
tracing::debug!("WebSocket send() called with response for client {}: {:?}", id, other_response);
710-
}
711-
Err(error) => {
712-
tracing::debug!("WebSocket send() called with error for client {}: {:?}", id, error);
713-
}
714-
}
715-
716630
if let Some(ch) = self.response_channels.remove(&id) {
717-
// Log success/failure of sending UPDATE responses
718-
if let Ok(HostResponse::ContractResponse(freenet_stdlib::client_api::ContractResponse::UpdateResponse { key, .. })) = &result {
719-
tracing::info!(
720-
"[UPDATE_DEBUG] Found WebSocket channel for client {}, sending UpdateResponse for key {}",
721-
id,
722-
key
723-
);
724-
}
725-
726-
// Check if this is an UPDATE response and extract key before moving result
727-
let update_key = match &result {
728-
Ok(HostResponse::ContractResponse(freenet_stdlib::client_api::ContractResponse::UpdateResponse { key, .. })) => Some(*key),
729-
_ => None
730-
};
731-
732631
let should_rm = result
733632
.as_ref()
734633
.map_err(|err| matches!(err.kind(), ErrorKind::Disconnect))
735634
.err()
736635
.unwrap_or(false);
737-
738-
let send_result = ch.send(HostCallbackResult::Result { id, result });
739-
740-
// Log UPDATE response send result
741-
if let Some(key) = update_key {
742-
match send_result.is_ok() {
743-
true => {
744-
tracing::info!(
745-
"[UPDATE_DEBUG] Successfully sent UpdateResponse to client {} for key {}",
746-
id,
747-
key
748-
);
749-
}
750-
false => {
751-
tracing::error!(
752-
"[UPDATE_DEBUG] Failed to send UpdateResponse to client {} for key {} - channel send failed",
753-
id,
754-
key
755-
);
756-
}
757-
}
758-
}
759-
760-
if send_result.is_ok() && !should_rm {
636+
if ch.send(HostCallbackResult::Result { id, result }).is_ok() && !should_rm {
761637
// still alive connection, keep it
762638
self.response_channels.insert(id, ch);
763639
} else {
764640
tracing::info!("dropped connection to client #{id}");
765641
}
766642
} else {
767-
// Log when client is not found for UPDATE responses
768-
match &result {
769-
Ok(HostResponse::ContractResponse(freenet_stdlib::client_api::ContractResponse::UpdateResponse { key, .. })) => {
770-
tracing::error!(
771-
"[UPDATE_DEBUG] Client {} not found in WebSocket response channels when trying to send UpdateResponse for key {}",
772-
id,
773-
key
774-
);
775-
}
776-
_ => {
777-
tracing::warn!("client: {id} not found");
778-
}
779-
}
643+
tracing::warn!("client: {id} not found");
780644
}
781645
Ok(())
782646
}

crates/core/src/contract/mod.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,7 @@ where
144144
.await;
145145

146146
let event_result = match update_result {
147-
Ok(UpsertResult::NoChange) => {
148-
tracing::debug!(%key, "UPDATE resulted in NoChange, fetching current state to return UpdateResponse");
149-
// When there's no change, we still need to return the current state
150-
// so the client gets a proper response
151-
match contract_handler.executor().fetch_contract(key, false).await {
152-
Ok((Some(current_state), _)) => {
153-
tracing::debug!(%key, "Successfully fetched current state for NoChange update");
154-
ContractHandlerEvent::UpdateResponse {
155-
new_value: Ok(current_state),
156-
}
157-
}
158-
Ok((None, _)) => {
159-
tracing::warn!(%key, "No state found when fetching for NoChange update");
160-
// Fallback to the old behavior if we can't fetch the state
161-
ContractHandlerEvent::UpdateNoChange { key }
162-
}
163-
Err(err) => {
164-
tracing::error!(%key, %err, "Error fetching state for NoChange update");
165-
// Fallback to the old behavior if we can't fetch the state
166-
ContractHandlerEvent::UpdateNoChange { key }
167-
}
168-
}
169-
}
147+
Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key },
170148
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {
171149
new_value: Ok(state),
172150
},

crates/core/src/node/mod.rs

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -387,81 +387,12 @@ async fn report_result(
387387
client_req_handler_callback: Option<(Vec<ClientId>, ClientResponsesSender)>,
388388
event_listener: &mut dyn NetEventRegister,
389389
) {
390-
// Add UPDATE-specific debug logging at the start
391-
if let Some(tx_id) = tx {
392-
if tx_id.transaction_type().to_string().contains("Update") {
393-
tracing::info!(
394-
"[UPDATE_DEBUG] report_result called for UPDATE transaction {}",
395-
tx_id
396-
);
397-
}
398-
}
399-
400390
match op_result {
401391
Ok(Some(op_res)) => {
402-
// Log specifically for UPDATE operations
403-
if let crate::operations::OpEnum::Update(ref update_op) = op_res {
404-
tracing::info!(
405-
"[UPDATE_DEBUG] UPDATE operation {} completed, finalized: {}",
406-
update_op.id,
407-
update_op.finalized()
408-
);
409-
}
410-
411392
if let Some((client_ids, cb)) = client_req_handler_callback {
412393
for client_id in client_ids {
413-
// Enhanced logging for UPDATE operations
414-
if let crate::operations::OpEnum::Update(ref update_op) = op_res {
415-
tracing::info!(
416-
"[UPDATE_DEBUG] Sending UPDATE response to client {} for transaction {}",
417-
client_id,
418-
update_op.id
419-
);
420-
421-
// Log the result being sent
422-
let host_result = op_res.to_host_result();
423-
match &host_result {
424-
Ok(response) => {
425-
tracing::info!(
426-
"[UPDATE_DEBUG] Client {} callback found, sending successful UPDATE response: {:?}",
427-
client_id,
428-
response
429-
);
430-
}
431-
Err(error) => {
432-
tracing::error!(
433-
"[UPDATE_DEBUG] Client {} callback found, sending UPDATE error: {:?}",
434-
client_id,
435-
error
436-
);
437-
}
438-
}
439-
} else {
440-
tracing::debug!(?tx, %client_id, "Sending response to client");
441-
}
442-
443-
// Send the response
444-
if let Err(e) = cb.send((client_id, op_res.to_host_result())) {
445-
tracing::error!(
446-
?tx, %client_id,
447-
"[UPDATE_RACE_FIX] Failed to send response to client: {:?}",
448-
e
449-
);
450-
} else if let crate::operations::OpEnum::Update(ref update_op) = op_res {
451-
tracing::debug!(
452-
"[UPDATE_RACE_FIX] Successfully queued UPDATE response for client {} (tx: {})",
453-
client_id,
454-
update_op.id
455-
);
456-
}
457-
}
458-
} else {
459-
// Log when no client callback is found for UPDATE operations
460-
if let crate::operations::OpEnum::Update(ref update_op) = op_res {
461-
tracing::warn!(
462-
"[UPDATE_DEBUG] No client callback found for UPDATE transaction {} - this may indicate a missing client subscription",
463-
update_op.id
464-
);
394+
tracing::debug!(?tx, %client_id, "Sending response to client");
395+
let _ = cb.send((client_id, op_res.to_host_result()));
465396
}
466397
}
467398
// check operations.rs:handle_op_result to see what's the meaning of each state

0 commit comments

Comments
 (0)