diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 20366a39bfa..cdf3688c574 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -60,7 +60,6 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< async fn start_subgraph( self: Arc, loc: DeploymentLocator, - manifest: serde_yaml::Mapping, stop_block: Option, ) { let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst); @@ -78,6 +77,19 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< let deployment_status_metric = deployment_status_metric.clone(); async move { + let link_resolver = self + .link_resolver + .for_manifest(&loc.hash.to_string()) + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let file_bytes = link_resolver + .cat(&logger, &loc.hash.to_ipfs_link()) + .await + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; + match BlockchainKind::from_manifest(&manifest)? { BlockchainKind::Ethereum => { let runner = instance_manager diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 9ad50f43942..a7122442531 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -44,14 +44,12 @@ impl DeploymentRegistry { pub struct SubgraphAssignmentProvider { logger_factory: LoggerFactory, deployment_registry: DeploymentRegistry, - link_resolver: Arc, instance_manager: Arc, } impl SubgraphAssignmentProvider { pub fn new( logger_factory: &LoggerFactory, - link_resolver: Arc, instance_manager: I, subgraph_metrics: Arc, ) -> Self { @@ -61,7 +59,6 @@ impl SubgraphAssignmentProvider { // Create the subgraph provider SubgraphAssignmentProvider { logger_factory, - link_resolver: link_resolver.with_retries().into(), instance_manager: Arc::new(instance_manager), deployment_registry: DeploymentRegistry::new(subgraph_metrics), } @@ -86,22 +83,9 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let link_resolver = self - .link_resolver - .for_manifest(&loc.hash.to_string()) - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let file_bytes = link_resolver - .cat(&logger, &loc.hash.to_ipfs_link()) - .await - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; - self.instance_manager .cheap_clone() - .start_subgraph(loc, raw, stop_block) + .start_subgraph(loc, stop_block) .await; Ok(()) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7c56e5dd583..eea63b88128 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -149,9 +149,10 @@ where let store = self.store.clone(); let node_id = self.node_id.clone(); let logger = self.logger.clone(); + let logger2 = logger.clone(); self.subscription_manager - .subscribe() + .subscribe().inspect(move |x| debug!(logger2, "Received store event: {:?}", x; "tag" => "assignment")) .map_err(|()| anyhow!("Entity change stream failed")) .map(|event| { let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect(); @@ -160,9 +161,10 @@ where .flatten() .and_then( move |(deployment, operation)| -> Result + Send>, _> { - trace!(logger, "Received assignment change"; + debug!(logger, "Received assignment change"; "deployment" => %deployment, "operation" => format!("{:?}", operation), + "tag" => "assignment" ); match operation { @@ -173,24 +175,25 @@ where anyhow!("Failed to get subgraph assignment entity: {}", e) }) .map(|assigned| -> Box + Send> { + let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string(), "tag" => "assignment")); if let Some((assigned,is_paused)) = assigned { if assigned == node_id { if is_paused{ // Subgraph is paused, so we don't start it - debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); return Box::new(stream::empty()); } // Start subgraph on this node - debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); Box::new(stream::once(Ok(AssignmentEvent::Add { deployment, node_id: node_id.clone(), }))) } else { // Ensure it is removed from this node - debug!(logger, "Deployment assignee is not this node, broadcasting remove event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); Box::new(stream::once(Ok(AssignmentEvent::Remove { deployment, node_id: node_id.clone(), @@ -198,7 +201,7 @@ where } } else { // Was added/updated, but is now gone. - debug!(logger, "Deployment has not assignee, we will get a separate remove event later"; "node_id" => &node_id); + debug!(logger, "Deployment assignee not found in database"; "action" => "ignore"); Box::new(stream::empty()) } }) diff --git a/graph/Cargo.toml b/graph/Cargo.toml index bb4287b2c31..3c78221e7a8 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -61,7 +61,7 @@ sqlparser = { workspace = true } # stable-hash_legacy = { version = "0.3.3", package = "stable-hash" } # stable-hash = { version = "0.4.2" } stable-hash = { git = "https://github.com/graphprotocol/stable-hash", branch = "main" } -stable-hash_legacy = { git = "https://github.com/graphprotocol/stable-hash", branch = "old", package = "stable-hash", doc = false } +stable-hash_legacy = { git = "https://github.com/graphprotocol/stable-hash", branch = "old", package = "stable-hash" } strum_macros = "0.27.1" slog-async = "2.5.0" slog-envlogger = "2.1.0" diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 585df5945f1..d6a98267921 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -9,7 +9,7 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Integer; use diesel_derives::{AsExpression, FromSqlRow}; pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; -use slog::Logger; +use slog::{debug, Logger}; pub use super::subgraph::Entity; pub use err::{StoreError, StoreResult}; @@ -636,6 +636,7 @@ impl PartialEq for StoreEvent { /// A `StoreEventStream` produces the `StoreEvents`. Various filters can be applied /// to it to reduce which and how many events are delivered by the stream. pub struct StoreEventStream { + logger: Logger, source: S, } @@ -651,7 +652,9 @@ where type Error = (); fn poll(&mut self) -> Result>, Self::Error> { - self.source.poll() + let res = self.source.poll(); + debug!(self.logger, "Polled store event"; "result" => format!("{:?}", res)); + res } } @@ -660,8 +663,14 @@ where S: Stream, Error = ()> + Send + 'static, { // Create a new `StoreEventStream` from another such stream - pub fn new(source: S) -> Self { - StoreEventStream { source } + pub fn new(logger: Logger, source: S) -> Self { + StoreEventStream { logger, source } + } +} + +impl Drop for StoreEventStream { + fn drop(&mut self) { + debug!(self.logger, "Dropping store event stream"); } } diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index b75d2912de2..fc0ebaea856 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -145,7 +145,7 @@ impl EntityModification { /// Return the details of the write if `self` is a write operation for a /// new or an existing entity - fn as_write(&self) -> Option { + fn as_write(&self) -> Option> { EntityWrite::try_from(self).ok() } @@ -823,7 +823,7 @@ impl Batch { &self, entity_type: &EntityType, at: BlockNumber, - ) -> impl Iterator { + ) -> impl Iterator> { self.mods .group(entity_type) .map(|group| group.effective_ops(at)) diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index c04fd5237b4..c9f076a2a36 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -13,7 +13,6 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static { async fn start_subgraph( self: Arc, deployment: DeploymentLocator, - manifest: serde_yaml::Mapping, stop_block: Option, ); async fn stop_subgraph(&self, deployment: DeploymentLocator); diff --git a/graph/src/components/subgraph/proof_of_indexing/reference.rs b/graph/src/components/subgraph/proof_of_indexing/reference.rs index 5c7d269d7a7..3a11a4db4e3 100644 --- a/graph/src/components/subgraph/proof_of_indexing/reference.rs +++ b/graph/src/components/subgraph/proof_of_indexing/reference.rs @@ -9,6 +9,7 @@ use web3::types::{Address, H256}; /// well-implemented (without conflicting sequence numbers, or other oddities). /// It's just way easier to check that this works, and serves as a kind of /// documentation as a side-benefit. +#[allow(dead_code)] pub struct PoI<'a> { pub causality_regions: HashMap>, pub subgraph_id: DeploymentHash, @@ -16,10 +17,12 @@ pub struct PoI<'a> { pub indexer: Option
, } +#[allow(dead_code)] fn h256_as_bytes(val: &H256) -> AsBytes<&[u8]> { AsBytes(val.as_bytes()) } +#[allow(dead_code)] fn indexer_opt_as_bytes(val: &Option
) -> Option> { val.as_ref().map(|v| AsBytes(v.as_bytes())) } diff --git a/graph/src/util/timed_rw_lock.rs b/graph/src/util/timed_rw_lock.rs index 4a52d531604..e8ff394be44 100644 --- a/graph/src/util/timed_rw_lock.rs +++ b/graph/src/util/timed_rw_lock.rs @@ -20,7 +20,7 @@ impl TimedRwLock { } } - pub fn write(&self, logger: &Logger) -> parking_lot::RwLockWriteGuard { + pub fn write(&self, logger: &Logger) -> parking_lot::RwLockWriteGuard<'_, T> { loop { let mut elapsed = Duration::from_secs(0); match self.lock.try_write_for(self.log_threshold) { @@ -36,11 +36,11 @@ impl TimedRwLock { } } - pub fn try_read(&self) -> Option> { + pub fn try_read(&self) -> Option> { self.lock.try_read() } - pub fn read(&self, logger: &Logger) -> parking_lot::RwLockReadGuard { + pub fn read(&self, logger: &Logger) -> parking_lot::RwLockReadGuard<'_, T> { loop { let mut elapsed = Duration::from_secs(0); match self.lock.try_read_for(self.log_threshold) { @@ -73,7 +73,7 @@ impl TimedMutex { } } - pub fn lock(&self, logger: &Logger) -> parking_lot::MutexGuard { + pub fn lock(&self, logger: &Logger) -> parking_lot::MutexGuard<'_, T> { let start = Instant::now(); let guard = self.lock.lock(); let elapsed = start.elapsed(); diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4441131a52e..cd10d10b175 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -294,12 +294,8 @@ fn build_subgraph_registrar( ); // Create IPFS-based subgraph provider - let subgraph_provider = IpfsSubgraphAssignmentProvider::new( - &logger_factory, - link_resolver.clone(), - subgraph_instance_manager, - sg_count, - ); + let subgraph_provider = + IpfsSubgraphAssignmentProvider::new(&logger_factory, subgraph_instance_manager, sg_count); // Check version switching mode environment variable let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; @@ -399,7 +395,7 @@ pub async fn run( // Obtain metrics server port let metrics_port = opt.metrics_port; - info!(logger, "Starting up"); + info!(logger, "Starting up"; "node_id" => &node_id); // Set up metrics let (prometheus_registry, metrics_registry) = setup_metrics(&logger); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 5a767eaa125..003fac69ae9 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -158,7 +158,6 @@ pub async fn run( // Create IPFS-based subgraph provider let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( &logger_factory, - link_resolver.cheap_clone(), subgraph_instance_manager, sg_metrics, )); diff --git a/runtime/wasm/src/module/instance.rs b/runtime/wasm/src/module/instance.rs index 63845e81c60..cddac22f9fc 100644 --- a/runtime/wasm/src/module/instance.rs +++ b/runtime/wasm/src/module/instance.rs @@ -141,7 +141,7 @@ impl WasmInstance { self.store.into_data() } - pub(crate) fn instance_ctx(&mut self) -> WasmInstanceContext { + pub(crate) fn instance_ctx(&mut self) -> WasmInstanceContext<'_> { WasmInstanceContext::new(&mut self.store) } diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 6bb7122613f..3f19716f487 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -466,6 +466,7 @@ where } #[derive(Debug, Clone, Eq, PartialEq, AscType)] +#[allow(dead_code)] pub enum AscSubgraphEntityOp { Create, Modify, diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index ecb7486daf2..9774868a20e 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -147,10 +147,19 @@ impl NotificationListener { } } - let logger = logger.new(o!( - "component" => "NotificationListener", - "channel" => channel_name.0.clone() - )); + let store_events = channel_name.0 == "store_events"; + let logger = if store_events { + logger.new(o!( + "component" => "NotificationListener", + "channel" => channel_name.0.clone(), + "tag" => "assignment" + )) + } else { + logger.new(o!( + "component" => "NotificationListener", + "channel" => channel_name.0.clone() + )) + }; debug!( logger, @@ -234,7 +243,9 @@ impl NotificationListener { }) .filter(|notification| notification.channel() == channel_name.0) .collect(); - + if store_events && notifications.len() > 0 { + debug!(logger, "Received store events"; "num_events" => notifications.len(), "queue_size" => conn.notifications().len()); + } // Read notifications until there hasn't been one for 500ms for notification in notifications { // Terminate the thread if desired @@ -245,6 +256,9 @@ impl NotificationListener { match JsonNotification::parse(¬ification, &mut conn) { Ok(json_notification) => { let timeout = ENV_VARS.store.notification_broadcast_timeout; + if store_events { + debug!(logger, "Broadcasting store event"; "notification" => format!("{:?}", json_notification)); + } match graph::block_on( sender.send_timeout(json_notification, timeout), ) { @@ -284,6 +298,7 @@ impl NotificationListener { } } } + warn!(logger, "Listener dropped. Terminating listener thread"; "tag" => "assignment"); })) .unwrap_or_else(|_| std::process::exit(1)) }); diff --git a/store/postgres/src/relational/dsl.rs b/store/postgres/src/relational/dsl.rs index e804a4d06ca..13cab9dd9d0 100644 --- a/store/postgres/src/relational/dsl.rs +++ b/store/postgres/src/relational/dsl.rs @@ -176,7 +176,7 @@ impl<'a> Table<'a> { } /// Reference a column in this table and use the correct SQL type `ST` - fn bind(&self, name: &str) -> Option> { + fn bind(&self, name: &str) -> Option> { self.column(name).map(|c| c.bind()) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 533990c42b9..79d96edd30c 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -4964,6 +4964,7 @@ impl<'a, Conn> RunQueryDsl for CountCurrentVersionsQuery<'a> {} /// Helper struct for returning the id's touched by the RevertRemove and /// RevertExtend queries #[derive(QueryableByName, PartialEq, Eq, Hash)] +#[allow(dead_code)] pub struct CopyVid { #[diesel(sql_type = BigInt)] pub vid: i64, diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 83b8e3b069b..fc83e364a8c 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -92,15 +92,21 @@ pub struct SubscriptionManager { /// Keep the notification listener alive listener: StoreEventListener, + + logger: Logger, } impl SubscriptionManager { pub fn new(logger: Logger, postgres_url: String, registry: Arc) -> Self { - let (listener, store_events) = StoreEventListener::new(logger, postgres_url, registry); + let logger = logger.new(o!("component" => "StoreEventListener", "tag" => "assignment")); + + let (listener, store_events) = + StoreEventListener::new(logger.cheap_clone(), postgres_url, registry); let mut manager = SubscriptionManager { subscriptions: Arc::new(RwLock::new(HashMap::new())), listener, + logger, }; // Deal with store subscriptions @@ -112,6 +118,34 @@ impl SubscriptionManager { manager } + async fn broadcast_event( + logger: &Logger, + subscriptions: &Arc>>>>, + event: StoreEvent, + ) { + let event = Arc::new(event); + + // Send to `subscriptions`. + { + let senders = subscriptions.read().unwrap().clone(); + let ids = format!("{:?}", senders.keys().cloned().collect::>()); + debug!(logger, "Broadcasting store event"; "event" => format!("{:?}", event), "subscriptions" => ids); + + // Write change to all matching subscription streams; remove subscriptions + // whose receiving end has been dropped + for (id, sender) in senders { + if let Err(e) = sender.send(event.cheap_clone()).await { + error!( + logger, + "Failed to send store event to subscriber {}: {}", id, e + ); + // Receiver was dropped + subscriptions.write().unwrap().remove(&id); + } + } + } + } + /// Receive store events from Postgres and send them to all active /// subscriptions. Detect stale subscriptions in the process and /// close them. @@ -121,24 +155,23 @@ impl SubscriptionManager { ) { let subscriptions = self.subscriptions.cheap_clone(); let mut store_events = store_events.compat(); + let logger = self.logger.cheap_clone(); // This channel is constantly receiving things and there are locks involved, // so it's best to use a blocking task. graph::spawn_blocking(async move { - while let Some(Ok(event)) = store_events.next().await { - let event = Arc::new(event); - - // Send to `subscriptions`. - { - let senders = subscriptions.read().unwrap().clone(); - - // Write change to all matching subscription streams; remove subscriptions - // whose receiving end has been dropped - for (id, sender) in senders { - if sender.send(event.cheap_clone()).await.is_err() { - // Receiver was dropped - subscriptions.write().unwrap().remove(&id); - } + loop { + debug!(logger, "Waiting for store event"); + match store_events.next().await { + Some(Ok(event)) => { + Self::broadcast_event(&logger, &subscriptions, event).await; + } + Some(Err(_)) => { + error!(logger, "Error receiving store event"); + } + None => { + error!(logger, "Store event stream ended"); + break; } } } @@ -147,6 +180,7 @@ impl SubscriptionManager { fn periodically_clean_up_stale_subscriptions(&self) { let subscriptions = self.subscriptions.cheap_clone(); + let logger = self.logger.cheap_clone(); // Clean up stale subscriptions every 5s graph::spawn(async move { @@ -169,6 +203,7 @@ impl SubscriptionManager { // Remove all stale subscriptions for id in stale_ids { + warn!(logger, "Removing stale subscription {}", id); subscriptions.remove(&id); } } @@ -188,7 +223,11 @@ impl SubscriptionManagerTrait for SubscriptionManager { // Add the new subscription self.subscriptions.write().unwrap().insert(id, sender); + debug!(self.logger, "New subscription {}", id); // Return the subscription ID and entity change stream - StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat())) + StoreEventStream::new( + self.logger.new(o!("id" => id)), + Box::new(ReceiverStream::new(receiver).map(Ok).compat()), + ) } } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 28fe5bba514..2cb2df8a0d6 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -789,7 +789,7 @@ impl SubgraphStoreInner { /// connections can deadlock the entire process if the pool runs out /// of connections in between getting the first one and trying to get the /// second one. - pub(crate) fn primary_conn(&self) -> Result { + pub(crate) fn primary_conn(&self) -> Result, StoreError> { let conn = self.mirror.primary().get()?; Ok(primary::Connection::new(conn)) } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 74b516433b6..9c512e27ae7 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -49,7 +49,7 @@ use crate::{primary, primary::Site, relational::Layout, SubgraphStore}; struct WritableSubgraphStore(SubgraphStore); impl WritableSubgraphStore { - fn primary_conn(&self) -> Result { + fn primary_conn(&self) -> Result, StoreError> { self.0.primary_conn() } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 86b67918673..e7de6a59460 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -574,7 +574,6 @@ pub async fn setup_inner( // Create IPFS-based subgraph provider let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( &logger_factory, - link_resolver.cheap_clone(), subgraph_instance_manager.clone(), sg_count, ));