Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub struct Cli {
#[clap(flatten)]
pub run: RunCmd,

/// Choose sealing method.
#[arg(long, value_enum, ignore_case = true)]
/// Choose sealing method: manual, instant, or interval=<ms>.
#[arg(long)]
pub sealing: Option<Sealing>,

/// Whether to try Aura or Babe consensus on first start.
Expand Down Expand Up @@ -170,13 +170,32 @@ impl fmt::Display for HistoryBackfill {
}

/// Available Sealing methods.
#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
#[derive(Copy, Clone, Debug, Default)]
pub enum Sealing {
/// Seal using rpc method.
#[default]
Manual,
/// Seal when transaction is executed.
Instant,
/// Seal on a fixed timer interval. Value is the period in milliseconds.
Interval(u64),
}

impl std::str::FromStr for Sealing {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"manual" => Ok(Sealing::Manual),
"instant" => Ok(Sealing::Instant),
s => s
.parse::<u64>()
.map(Sealing::Interval)
.map_err(|_| format!(
"unknown sealing mode '{s}': expected 'manual', 'instant', or a number of milliseconds"
)),
}
}
}

/// Supported consensus mechanisms.
Expand Down
84 changes: 56 additions & 28 deletions node/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.

use crate::consensus::ConsensusMechanism;
use futures::{FutureExt, channel::mpsc, future};
use futures::{FutureExt, StreamExt as _, channel::mpsc};
use node_subtensor_runtime::{RuntimeApi, TransactionConverter, opaque::Block};
use sc_chain_spec::ChainType;
use sc_client_api::{Backend as BackendT, BlockBackend};
Expand All @@ -25,6 +25,7 @@ use stc_shield::{self, MemoryShieldKeystore};
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use sc_transaction_pool_api::TransactionPool as _;
use std::{cell::RefCell, path::Path};
use std::{sync::Arc, time::Duration};
use stp_shield::ShieldKeystorePtr;
Expand Down Expand Up @@ -756,9 +757,12 @@ fn run_manual_seal_authorship(
inherent_data: &mut sp_inherents::InherentData,
) -> Result<(), sp_inherents::Error> {
TIMESTAMP.with(|x| {
let mut x_ref = x.borrow_mut();
*x_ref = x_ref.saturating_add(subtensor_runtime_common::time::SLOT_DURATION);
inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x_ref)
let ts = {
let mut x_ref = x.borrow_mut();
*x_ref = x_ref.saturating_add(subtensor_runtime_common::time::SLOT_DURATION);
*x_ref
};
inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &ts)
})
}

Expand All @@ -778,32 +782,56 @@ fn run_manual_seal_authorship(
let aura_data_provider =
sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new(client.clone());

let manual_seal = match sealing {
Sealing::Manual => future::Either::Left(sc_consensus_manual_seal::run_manual_seal(
sc_consensus_manual_seal::ManualSealParams {
block_import,
env: proposer_factory,
client,
pool: transaction_pool,
commands_stream,
select_chain,
consensus_data_provider: Some(Box::new(aura_data_provider)),
create_inherent_data_providers,
},
)),
Sealing::Instant => future::Either::Right(sc_consensus_manual_seal::run_instant_seal(
sc_consensus_manual_seal::InstantSealParams {
block_import,
env: proposer_factory,
client,
pool: transaction_pool,
select_chain,
consensus_data_provider: None,
create_inherent_data_providers,
},
)),
type SealStream = std::pin::Pin<
Box<
dyn futures::Stream<
Item = sc_consensus_manual_seal::rpc::EngineCommand<<Block as BlockT>::Hash>,
> + Send,
>,
>;

let seal_stream: SealStream = match sealing {
Sealing::Manual => Box::pin(commands_stream),
Sealing::Instant => Box::pin(
transaction_pool
.import_notification_stream()
.map(|_| sc_consensus_manual_seal::rpc::EngineCommand::SealNewBlock {
create_empty: false,
finalize: false,
parent_hash: None,
sender: None,
}),
),
Sealing::Interval(millis) => Box::pin(
futures::stream::unfold(
tokio::time::interval(std::time::Duration::from_millis(millis)),
|mut interval| async move {
interval.tick().await;
Some(((), interval))
},
)
.map(|_| sc_consensus_manual_seal::rpc::EngineCommand::SealNewBlock {
create_empty: true,
finalize: true,
parent_hash: None,
sender: None,
}),
),
};

let manual_seal = sc_consensus_manual_seal::run_manual_seal(
sc_consensus_manual_seal::ManualSealParams {
block_import,
env: proposer_factory,
client,
pool: transaction_pool,
commands_stream: seal_stream,
select_chain,
consensus_data_provider: Some(Box::new(aura_data_provider)),
create_inherent_data_providers,
},
);

// we spawn the future on a background thread managed by service.
task_manager
.spawn_essential_handle()
Expand Down
Loading