From a5d881174f0f07d0c337bbcda561b7cbbf34afa3 Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Thu, 14 Aug 2025 18:58:41 +0200 Subject: [PATCH 01/10] all: fix all cargo check warnings (#6102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes all 16 cargo check warnings across the codebase: **Cargo.toml fixes:** - Remove invalid `doc = false` key from git dependency **Dead code warnings:** - Add #[allow(dead_code)] to reference implementations and test utilities - PoI struct and helper functions (proof_of_indexing/reference.rs) - AscSubgraphEntityOp enum (runtime/wasm) - CopyVid struct (store/postgres) **Lifetime syntax warnings:** - Add explicit '_ lifetime parameters to fix confusing syntax - Fix 11 lifetime warnings across store, runtime, and utility modules All tests pass and cargo check now produces zero warnings. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude --- graph/Cargo.toml | 2 +- graph/src/components/store/write.rs | 4 ++-- .../components/subgraph/proof_of_indexing/reference.rs | 3 +++ graph/src/util/timed_rw_lock.rs | 8 ++++---- runtime/wasm/src/module/instance.rs | 2 +- runtime/wasm/src/to_from/external.rs | 1 + store/postgres/src/relational/dsl.rs | 2 +- store/postgres/src/relational_queries.rs | 1 + store/postgres/src/subgraph_store.rs | 2 +- store/postgres/src/writable.rs | 2 +- 10 files changed, 16 insertions(+), 11 deletions(-) diff --git a/graph/Cargo.toml b/graph/Cargo.toml index bb4287b2c31..3c78221e7a8 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -61,7 +61,7 @@ sqlparser = { workspace = true } # stable-hash_legacy = { version = "0.3.3", package = "stable-hash" } # stable-hash = { version = "0.4.2" } stable-hash = { git = "https://github.com/graphprotocol/stable-hash", branch = "main" } -stable-hash_legacy = { git = "https://github.com/graphprotocol/stable-hash", branch = "old", package = "stable-hash", doc = false } +stable-hash_legacy = { git = "https://github.com/graphprotocol/stable-hash", branch = "old", package = "stable-hash" } strum_macros = "0.27.1" slog-async = "2.5.0" slog-envlogger = "2.1.0" diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index b75d2912de2..fc0ebaea856 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -145,7 +145,7 @@ impl EntityModification { /// Return the details of the write if `self` is a write operation for a /// new or an existing entity - fn as_write(&self) -> Option { + fn as_write(&self) -> Option> { EntityWrite::try_from(self).ok() } @@ -823,7 +823,7 @@ impl Batch { &self, entity_type: &EntityType, at: BlockNumber, - ) -> impl Iterator { + ) -> impl Iterator> { self.mods .group(entity_type) .map(|group| group.effective_ops(at)) diff --git a/graph/src/components/subgraph/proof_of_indexing/reference.rs b/graph/src/components/subgraph/proof_of_indexing/reference.rs index 5c7d269d7a7..3a11a4db4e3 100644 --- a/graph/src/components/subgraph/proof_of_indexing/reference.rs +++ b/graph/src/components/subgraph/proof_of_indexing/reference.rs @@ -9,6 +9,7 @@ use web3::types::{Address, H256}; /// well-implemented (without conflicting sequence numbers, or other oddities). /// It's just way easier to check that this works, and serves as a kind of /// documentation as a side-benefit. +#[allow(dead_code)] pub struct PoI<'a> { pub causality_regions: HashMap>, pub subgraph_id: DeploymentHash, @@ -16,10 +17,12 @@ pub struct PoI<'a> { pub indexer: Option
, } +#[allow(dead_code)] fn h256_as_bytes(val: &H256) -> AsBytes<&[u8]> { AsBytes(val.as_bytes()) } +#[allow(dead_code)] fn indexer_opt_as_bytes(val: &Option
) -> Option> { val.as_ref().map(|v| AsBytes(v.as_bytes())) } diff --git a/graph/src/util/timed_rw_lock.rs b/graph/src/util/timed_rw_lock.rs index 4a52d531604..e8ff394be44 100644 --- a/graph/src/util/timed_rw_lock.rs +++ b/graph/src/util/timed_rw_lock.rs @@ -20,7 +20,7 @@ impl TimedRwLock { } } - pub fn write(&self, logger: &Logger) -> parking_lot::RwLockWriteGuard { + pub fn write(&self, logger: &Logger) -> parking_lot::RwLockWriteGuard<'_, T> { loop { let mut elapsed = Duration::from_secs(0); match self.lock.try_write_for(self.log_threshold) { @@ -36,11 +36,11 @@ impl TimedRwLock { } } - pub fn try_read(&self) -> Option> { + pub fn try_read(&self) -> Option> { self.lock.try_read() } - pub fn read(&self, logger: &Logger) -> parking_lot::RwLockReadGuard { + pub fn read(&self, logger: &Logger) -> parking_lot::RwLockReadGuard<'_, T> { loop { let mut elapsed = Duration::from_secs(0); match self.lock.try_read_for(self.log_threshold) { @@ -73,7 +73,7 @@ impl TimedMutex { } } - pub fn lock(&self, logger: &Logger) -> parking_lot::MutexGuard { + pub fn lock(&self, logger: &Logger) -> parking_lot::MutexGuard<'_, T> { let start = Instant::now(); let guard = self.lock.lock(); let elapsed = start.elapsed(); diff --git a/runtime/wasm/src/module/instance.rs b/runtime/wasm/src/module/instance.rs index 63845e81c60..cddac22f9fc 100644 --- a/runtime/wasm/src/module/instance.rs +++ b/runtime/wasm/src/module/instance.rs @@ -141,7 +141,7 @@ impl WasmInstance { self.store.into_data() } - pub(crate) fn instance_ctx(&mut self) -> WasmInstanceContext { + pub(crate) fn instance_ctx(&mut self) -> WasmInstanceContext<'_> { WasmInstanceContext::new(&mut self.store) } diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 6bb7122613f..3f19716f487 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -466,6 +466,7 @@ where } #[derive(Debug, Clone, Eq, PartialEq, AscType)] +#[allow(dead_code)] pub enum AscSubgraphEntityOp { Create, Modify, diff --git a/store/postgres/src/relational/dsl.rs b/store/postgres/src/relational/dsl.rs index e804a4d06ca..13cab9dd9d0 100644 --- a/store/postgres/src/relational/dsl.rs +++ b/store/postgres/src/relational/dsl.rs @@ -176,7 +176,7 @@ impl<'a> Table<'a> { } /// Reference a column in this table and use the correct SQL type `ST` - fn bind(&self, name: &str) -> Option> { + fn bind(&self, name: &str) -> Option> { self.column(name).map(|c| c.bind()) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 533990c42b9..79d96edd30c 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -4964,6 +4964,7 @@ impl<'a, Conn> RunQueryDsl for CountCurrentVersionsQuery<'a> {} /// Helper struct for returning the id's touched by the RevertRemove and /// RevertExtend queries #[derive(QueryableByName, PartialEq, Eq, Hash)] +#[allow(dead_code)] pub struct CopyVid { #[diesel(sql_type = BigInt)] pub vid: i64, diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 28fe5bba514..2cb2df8a0d6 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -789,7 +789,7 @@ impl SubgraphStoreInner { /// connections can deadlock the entire process if the pool runs out /// of connections in between getting the first one and trying to get the /// second one. - pub(crate) fn primary_conn(&self) -> Result { + pub(crate) fn primary_conn(&self) -> Result, StoreError> { let conn = self.mirror.primary().get()?; Ok(primary::Connection::new(conn)) } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 74b516433b6..9c512e27ae7 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -49,7 +49,7 @@ use crate::{primary, primary::Site, relational::Layout, SubgraphStore}; struct WritableSubgraphStore(SubgraphStore); impl WritableSubgraphStore { - fn primary_conn(&self) -> Result { + fn primary_conn(&self) -> Result, StoreError> { self.0.primary_conn() } From 388d23089b6d1c1b76b34448666f96c23c468d63 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 16 Sep 2025 10:45:12 -0700 Subject: [PATCH 02/10] core: Better logging of assignment events In particular, include the deployment hash --- core/src/subgraph/registrar.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7c56e5dd583..7fd7bf7c0f2 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -173,24 +173,25 @@ where anyhow!("Failed to get subgraph assignment entity: {}", e) }) .map(|assigned| -> Box + Send> { + let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string())); if let Some((assigned,is_paused)) = assigned { if assigned == node_id { if is_paused{ // Subgraph is paused, so we don't start it - debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); return Box::new(stream::empty()); } // Start subgraph on this node - debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); Box::new(stream::once(Ok(AssignmentEvent::Add { deployment, node_id: node_id.clone(), }))) } else { // Ensure it is removed from this node - debug!(logger, "Deployment assignee is not this node, broadcasting remove event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); Box::new(stream::once(Ok(AssignmentEvent::Remove { deployment, node_id: node_id.clone(), @@ -198,7 +199,7 @@ where } } else { // Was added/updated, but is now gone. - debug!(logger, "Deployment has not assignee, we will get a separate remove event later"; "node_id" => &node_id); + debug!(logger, "Deployment assignee not found in database"; "action" => "ignore"); Box::new(stream::empty()) } }) From 2bd144b5946c89da07432e7c31e4f22154e6c15a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 16 Sep 2025 11:42:38 -0700 Subject: [PATCH 03/10] node: Log the node_id when starting up --- node/src/launcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4441131a52e..550da346861 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -399,7 +399,7 @@ pub async fn run( // Obtain metrics server port let metrics_port = opt.metrics_port; - info!(logger, "Starting up"); + info!(logger, "Starting up"; "node_id" => &node_id); // Set up metrics let (prometheus_registry, metrics_registry) = setup_metrics(&logger); From d2f40bdfa6bdc6f2d8662ac4a06d1cb8e75ab581 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 16 Sep 2025 12:13:44 -0700 Subject: [PATCH 04/10] store: Log something when a notification listener is shut down --- store/postgres/src/notification_listener.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index ecb7486daf2..583ef91479e 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -284,6 +284,7 @@ impl NotificationListener { } } } + warn!(logger, "Listener dropped. Terminating listener thread"); })) .unwrap_or_else(|_| std::process::exit(1)) }); From 260f13f890d85849bc157fd09585ca70ca242f49 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 16 Sep 2025 12:29:16 -0700 Subject: [PATCH 05/10] store: Log more details when SubscriptionManager encounters problems --- store/postgres/src/store_events.rs | 62 ++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 83b8e3b069b..b9da04f30d7 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -92,15 +92,21 @@ pub struct SubscriptionManager { /// Keep the notification listener alive listener: StoreEventListener, + + logger: Logger, } impl SubscriptionManager { pub fn new(logger: Logger, postgres_url: String, registry: Arc) -> Self { - let (listener, store_events) = StoreEventListener::new(logger, postgres_url, registry); + let logger = logger.new(o!("component" => "StoreEventListener")); + + let (listener, store_events) = + StoreEventListener::new(logger.cheap_clone(), postgres_url, registry); let mut manager = SubscriptionManager { subscriptions: Arc::new(RwLock::new(HashMap::new())), listener, + logger, }; // Deal with store subscriptions @@ -112,6 +118,32 @@ impl SubscriptionManager { manager } + async fn broadcast_event( + logger: &Logger, + subscriptions: &Arc>>>>, + event: StoreEvent, + ) { + let event = Arc::new(event); + + // Send to `subscriptions`. + { + let senders = subscriptions.read().unwrap().clone(); + + // Write change to all matching subscription streams; remove subscriptions + // whose receiving end has been dropped + for (id, sender) in senders { + if let Err(e) = sender.send(event.cheap_clone()).await { + error!( + logger, + "Failed to send store event to subscriber {}: {}", id, e + ); + // Receiver was dropped + subscriptions.write().unwrap().remove(&id); + } + } + } + } + /// Receive store events from Postgres and send them to all active /// subscriptions. Detect stale subscriptions in the process and /// close them. @@ -121,24 +153,22 @@ impl SubscriptionManager { ) { let subscriptions = self.subscriptions.cheap_clone(); let mut store_events = store_events.compat(); + let logger = self.logger.cheap_clone(); // This channel is constantly receiving things and there are locks involved, // so it's best to use a blocking task. graph::spawn_blocking(async move { - while let Some(Ok(event)) = store_events.next().await { - let event = Arc::new(event); - - // Send to `subscriptions`. - { - let senders = subscriptions.read().unwrap().clone(); - - // Write change to all matching subscription streams; remove subscriptions - // whose receiving end has been dropped - for (id, sender) in senders { - if sender.send(event.cheap_clone()).await.is_err() { - // Receiver was dropped - subscriptions.write().unwrap().remove(&id); - } + loop { + match store_events.next().await { + Some(Ok(event)) => { + Self::broadcast_event(&logger, &subscriptions, event).await; + } + Some(Err(_)) => { + error!(logger, "Error receiving store event"); + } + None => { + error!(logger, "Store event stream ended"); + break; } } } @@ -147,6 +177,7 @@ impl SubscriptionManager { fn periodically_clean_up_stale_subscriptions(&self) { let subscriptions = self.subscriptions.cheap_clone(); + let logger = self.logger.cheap_clone(); // Clean up stale subscriptions every 5s graph::spawn(async move { @@ -169,6 +200,7 @@ impl SubscriptionManager { // Remove all stale subscriptions for id in stale_ids { + warn!(logger, "Removing stale subscription {}", id); subscriptions.remove(&id); } } From 35554d06f636c74b6daffea369c26d9c0b986f01 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 16 Sep 2025 15:46:06 -0700 Subject: [PATCH 06/10] core: Log assignment events very verbosely --- core/src/subgraph/registrar.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7fd7bf7c0f2..427ba194b47 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -149,9 +149,10 @@ where let store = self.store.clone(); let node_id = self.node_id.clone(); let logger = self.logger.clone(); + let logger2 = logger.clone(); self.subscription_manager - .subscribe() + .subscribe().inspect(move |x| debug!(logger2, "Received store event: {:?}", x)) .map_err(|()| anyhow!("Entity change stream failed")) .map(|event| { let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect(); @@ -160,7 +161,7 @@ where .flatten() .and_then( move |(deployment, operation)| -> Result + Send>, _> { - trace!(logger, "Received assignment change"; + debug!(logger, "Received assignment change"; "deployment" => %deployment, "operation" => format!("{:?}", operation), ); From e734af7da47e41b8132f047d0e30441aac8d18f7 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 17 Sep 2025 09:38:12 -0700 Subject: [PATCH 07/10] Make it easier to find assignment log messages --- core/src/subgraph/registrar.rs | 5 +++-- store/postgres/src/notification_listener.rs | 19 ++++++++++++++----- store/postgres/src/store_events.rs | 6 +++++- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 427ba194b47..eea63b88128 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -152,7 +152,7 @@ where let logger2 = logger.clone(); self.subscription_manager - .subscribe().inspect(move |x| debug!(logger2, "Received store event: {:?}", x)) + .subscribe().inspect(move |x| debug!(logger2, "Received store event: {:?}", x; "tag" => "assignment")) .map_err(|()| anyhow!("Entity change stream failed")) .map(|event| { let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect(); @@ -164,6 +164,7 @@ where debug!(logger, "Received assignment change"; "deployment" => %deployment, "operation" => format!("{:?}", operation), + "tag" => "assignment" ); match operation { @@ -174,7 +175,7 @@ where anyhow!("Failed to get subgraph assignment entity: {}", e) }) .map(|assigned| -> Box + Send> { - let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string())); + let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string(), "tag" => "assignment")); if let Some((assigned,is_paused)) = assigned { if assigned == node_id { diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index 583ef91479e..822c02d1798 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -147,10 +147,19 @@ impl NotificationListener { } } - let logger = logger.new(o!( - "component" => "NotificationListener", - "channel" => channel_name.0.clone() - )); + let store_events = channel_name.0 == "store_events"; + let logger = if store_events { + logger.new(o!( + "component" => "NotificationListener", + "channel" => channel_name.0.clone(), + "tag" => "assignment" + )) + } else { + logger.new(o!( + "component" => "NotificationListener", + "channel" => channel_name.0.clone() + )) + }; debug!( logger, @@ -284,7 +293,7 @@ impl NotificationListener { } } } - warn!(logger, "Listener dropped. Terminating listener thread"); + warn!(logger, "Listener dropped. Terminating listener thread"; "tag" => "assignment"); })) .unwrap_or_else(|_| std::process::exit(1)) }); diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index b9da04f30d7..eea32690f57 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -98,7 +98,7 @@ pub struct SubscriptionManager { impl SubscriptionManager { pub fn new(logger: Logger, postgres_url: String, registry: Arc) -> Self { - let logger = logger.new(o!("component" => "StoreEventListener")); + let logger = logger.new(o!("component" => "StoreEventListener", "tag" => "assignment")); let (listener, store_events) = StoreEventListener::new(logger.cheap_clone(), postgres_url, registry); @@ -128,6 +128,8 @@ impl SubscriptionManager { // Send to `subscriptions`. { let senders = subscriptions.read().unwrap().clone(); + let ids = format!("{:?}", senders.keys().cloned().collect::>()); + debug!(logger, "Broadcasting store event"; "event" => format!("{:?}", event), "subscriptions" => ids); // Write change to all matching subscription streams; remove subscriptions // whose receiving end has been dropped @@ -159,6 +161,7 @@ impl SubscriptionManager { // so it's best to use a blocking task. graph::spawn_blocking(async move { loop { + debug!(logger, "Waiting for store event"); match store_events.next().await { Some(Ok(event)) => { Self::broadcast_event(&logger, &subscriptions, event).await; @@ -220,6 +223,7 @@ impl SubscriptionManagerTrait for SubscriptionManager { // Add the new subscription self.subscriptions.write().unwrap().insert(id, sender); + debug!(self.logger, "New subscription {}", id); // Return the subscription ID and entity change stream StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat())) } From 25caea7cd44edeadef1515f41eb3d2fefea2bdda Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 17 Sep 2025 09:43:55 -0700 Subject: [PATCH 08/10] store: Log more details about notifications --- store/postgres/src/notification_listener.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index 822c02d1798..ff9102ff03a 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -222,6 +222,9 @@ impl NotificationListener { // longer than 500ms for new notifications to arrive, // but limit the size of each batch to 128 to guarantee // progress on a busy system + if store_events { + debug!(logger, "Waiting for store event"; "queue_size" => queue_size); + } let notifications: Vec<_> = conn .notifications() .timeout_iter(Duration::from_millis(500)) @@ -243,7 +246,9 @@ impl NotificationListener { }) .filter(|notification| notification.channel() == channel_name.0) .collect(); - + if store_events { + debug!(logger, "Received store events"; "num_events" => notifications.len(), "queue_size" => conn.notifications().len()); + } // Read notifications until there hasn't been one for 500ms for notification in notifications { // Terminate the thread if desired @@ -254,6 +259,9 @@ impl NotificationListener { match JsonNotification::parse(¬ification, &mut conn) { Ok(json_notification) => { let timeout = ENV_VARS.store.notification_broadcast_timeout; + if store_events { + debug!(logger, "Broadcasting store event"; "notification" => format!("{:?}", json_notification)); + } match graph::block_on( sender.send_timeout(json_notification, timeout), ) { From 98dea2e9c3695e188ada91d8c1472edba8c379f6 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 17 Sep 2025 11:54:18 -0700 Subject: [PATCH 09/10] graph, store: Moar logging --- graph/src/components/store/mod.rs | 17 +++++++++++++---- store/postgres/src/notification_listener.rs | 5 +---- store/postgres/src/store_events.rs | 5 ++++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 585df5945f1..d6a98267921 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -9,7 +9,7 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Integer; use diesel_derives::{AsExpression, FromSqlRow}; pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; -use slog::Logger; +use slog::{debug, Logger}; pub use super::subgraph::Entity; pub use err::{StoreError, StoreResult}; @@ -636,6 +636,7 @@ impl PartialEq for StoreEvent { /// A `StoreEventStream` produces the `StoreEvents`. Various filters can be applied /// to it to reduce which and how many events are delivered by the stream. pub struct StoreEventStream { + logger: Logger, source: S, } @@ -651,7 +652,9 @@ where type Error = (); fn poll(&mut self) -> Result>, Self::Error> { - self.source.poll() + let res = self.source.poll(); + debug!(self.logger, "Polled store event"; "result" => format!("{:?}", res)); + res } } @@ -660,8 +663,14 @@ where S: Stream, Error = ()> + Send + 'static, { // Create a new `StoreEventStream` from another such stream - pub fn new(source: S) -> Self { - StoreEventStream { source } + pub fn new(logger: Logger, source: S) -> Self { + StoreEventStream { logger, source } + } +} + +impl Drop for StoreEventStream { + fn drop(&mut self) { + debug!(self.logger, "Dropping store event stream"); } } diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index ff9102ff03a..9774868a20e 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -222,9 +222,6 @@ impl NotificationListener { // longer than 500ms for new notifications to arrive, // but limit the size of each batch to 128 to guarantee // progress on a busy system - if store_events { - debug!(logger, "Waiting for store event"; "queue_size" => queue_size); - } let notifications: Vec<_> = conn .notifications() .timeout_iter(Duration::from_millis(500)) @@ -246,7 +243,7 @@ impl NotificationListener { }) .filter(|notification| notification.channel() == channel_name.0) .collect(); - if store_events { + if store_events && notifications.len() > 0 { debug!(logger, "Received store events"; "num_events" => notifications.len(), "queue_size" => conn.notifications().len()); } // Read notifications until there hasn't been one for 500ms diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index eea32690f57..fc83e364a8c 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -225,6 +225,9 @@ impl SubscriptionManagerTrait for SubscriptionManager { debug!(self.logger, "New subscription {}", id); // Return the subscription ID and entity change stream - StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat())) + StoreEventStream::new( + self.logger.new(o!("id" => id)), + Box::new(ReceiverStream::new(receiver).map(Ok).compat()), + ) } } From 0106f2691f29cb6eda78d79f88e57a8df36f6293 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 17 Sep 2025 12:42:06 -0700 Subject: [PATCH 10/10] all: Defer getting manifest from IPFS when starting subgraph The current code in `SubgraphAssignmentProvider.start` fetched the manifest from IPFS before starting a subgraph. But the code calling `start`, ultimately `SubgraphRegistrar.start_assigned_subgraphs` waited for all subgraphs to start successfully before processing assignment events. That could lead to a situation where a slow IPFS server, even if it was slow for just one subgraph, could keep a node from processing assignment events. With these changes, interacting with IPFS is deferred to the future that is spawned for running the subgraph so that slow IPFS can slow how long it takes for a subgraph to start, but not the system overall. --- core/src/subgraph/instance_manager.rs | 14 +++++++++++++- core/src/subgraph/provider.rs | 18 +----------------- .../components/subgraph/instance_manager.rs | 1 - node/src/launcher.rs | 8 ++------ node/src/manager/commands/run.rs | 1 - tests/src/fixture/mod.rs | 1 - 6 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 20366a39bfa..cdf3688c574 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -60,7 +60,6 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< async fn start_subgraph( self: Arc, loc: DeploymentLocator, - manifest: serde_yaml::Mapping, stop_block: Option, ) { let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst); @@ -78,6 +77,19 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< let deployment_status_metric = deployment_status_metric.clone(); async move { + let link_resolver = self + .link_resolver + .for_manifest(&loc.hash.to_string()) + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let file_bytes = link_resolver + .cat(&logger, &loc.hash.to_ipfs_link()) + .await + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; + match BlockchainKind::from_manifest(&manifest)? { BlockchainKind::Ethereum => { let runner = instance_manager diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 9ad50f43942..a7122442531 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -44,14 +44,12 @@ impl DeploymentRegistry { pub struct SubgraphAssignmentProvider { logger_factory: LoggerFactory, deployment_registry: DeploymentRegistry, - link_resolver: Arc, instance_manager: Arc, } impl SubgraphAssignmentProvider { pub fn new( logger_factory: &LoggerFactory, - link_resolver: Arc, instance_manager: I, subgraph_metrics: Arc, ) -> Self { @@ -61,7 +59,6 @@ impl SubgraphAssignmentProvider { // Create the subgraph provider SubgraphAssignmentProvider { logger_factory, - link_resolver: link_resolver.with_retries().into(), instance_manager: Arc::new(instance_manager), deployment_registry: DeploymentRegistry::new(subgraph_metrics), } @@ -86,22 +83,9 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let link_resolver = self - .link_resolver - .for_manifest(&loc.hash.to_string()) - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let file_bytes = link_resolver - .cat(&logger, &loc.hash.to_ipfs_link()) - .await - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; - self.instance_manager .cheap_clone() - .start_subgraph(loc, raw, stop_block) + .start_subgraph(loc, stop_block) .await; Ok(()) diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index c04fd5237b4..c9f076a2a36 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -13,7 +13,6 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static { async fn start_subgraph( self: Arc, deployment: DeploymentLocator, - manifest: serde_yaml::Mapping, stop_block: Option, ); async fn stop_subgraph(&self, deployment: DeploymentLocator); diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 550da346861..cd10d10b175 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -294,12 +294,8 @@ fn build_subgraph_registrar( ); // Create IPFS-based subgraph provider - let subgraph_provider = IpfsSubgraphAssignmentProvider::new( - &logger_factory, - link_resolver.clone(), - subgraph_instance_manager, - sg_count, - ); + let subgraph_provider = + IpfsSubgraphAssignmentProvider::new(&logger_factory, subgraph_instance_manager, sg_count); // Check version switching mode environment variable let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 5a767eaa125..003fac69ae9 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -158,7 +158,6 @@ pub async fn run( // Create IPFS-based subgraph provider let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( &logger_factory, - link_resolver.cheap_clone(), subgraph_instance_manager, sg_metrics, )); diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 86b67918673..e7de6a59460 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -574,7 +574,6 @@ pub async fn setup_inner( // Create IPFS-based subgraph provider let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( &logger_factory, - link_resolver.cheap_clone(), subgraph_instance_manager.clone(), sg_count, ));