pub struct Channel {
reader: Mutex<ReadHalf<Box<dyn PtStream>>>,
writer: Mutex<WriteHalf<Box<dyn PtStream>>>,
message_subsystem: MessageSubsystem,
stop_publisher: PublisherPtr<Error>,
receive_task: StoppableTaskPtr,
stopped: AtomicBool,
pub(super) session: SessionWeakPtr,
pub version: OnceCell<Arc<VersionMessage>>,
pub info: ChannelInfo,
metering_map: Mutex<HashMap<String, MeteringQueue>>,
}Expand description
Async channel for communication between nodes.
Fields§
§reader: Mutex<ReadHalf<Box<dyn PtStream>>>The reading half of the transport stream
writer: Mutex<WriteHalf<Box<dyn PtStream>>>The writing half of the transport stream
message_subsystem: MessageSubsystemThe message subsystem instance for this channel
stop_publisher: PublisherPtr<Error>Publisher listening for stop signal for closing this channel
receive_task: StoppableTaskPtrTask that is listening for the stop signal
stopped: AtomicBoolA boolean marking if this channel is stopped
session: SessionWeakPtrWeak pointer to respective session
version: OnceCell<Arc<VersionMessage>>The version message of the node we are connected to. Some if the version exchange has already occurred, None otherwise.
info: ChannelInfoChannel debug info
metering_map: Mutex<HashMap<String, MeteringQueue>>Map holding a MeteringQueue for each crate::net::Message
to perform rate limiting of propagation towards the stream.
Implementations§
Source§impl Channel
impl Channel
Sourcepub async fn new(
stream: Box<dyn PtStream>,
resolve_addr: Option<Url>,
connect_addr: Url,
session: SessionWeakPtr,
transport_mixed: bool,
) -> Arc<Self>
pub async fn new( stream: Box<dyn PtStream>, resolve_addr: Option<Url>, connect_addr: Url, session: SessionWeakPtr, transport_mixed: bool, ) -> Arc<Self>
Sets up a new channel. Creates a reader and writer PtStream and
the message publisher subsystem. Performs a network handshake on the
subsystem dispatchers.
Sourceasync fn setup_dispatchers(subsystem: &MessageSubsystem)
async fn setup_dispatchers(subsystem: &MessageSubsystem)
Perform network handshake for message subsystem dispatchers.
Sourcepub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>)
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>)
Starts the channel. Runs a receive loop to start receiving messages or handles a network failure.
Sourcepub async fn stop(&self)
pub async fn stop(&self)
Stops the channel.
Notifies all publishers that the channel has been closed in handle_stop().
Sourcepub async fn subscribe_stop(&self) -> Result<Subscription<Error>>
pub async fn subscribe_stop(&self) -> Result<Subscription<Error>>
Creates a subscription to a stopped signal. If the channel is stopped then this will return a ChannelStopped error.
pub fn is_stopped(&self) -> bool
Sourcepub async fn send<M: Message>(&self, message: &M) -> Result<()>
pub async fn send<M: Message>(&self, message: &M) -> Result<()>
Sends a message across a channel. First it converts the message
into a SerializedMessage and then calls send_serialized to send it.
Returns an error if something goes wrong.
Sourcepub async fn send_serialized(
&self,
message: &SerializedMessage,
metering_score: &u64,
metering_config: &MeteringConfiguration,
) -> Result<()>
pub async fn send_serialized( &self, message: &SerializedMessage, metering_score: &u64, metering_config: &MeteringConfiguration, ) -> Result<()>
Sends the encoded payload of provided SerializedMessage across the channel.
We first check if we should apply some throttling, based on the provided
Message configuration. We always sleep 2x times more than the expected one,
so we don’t flood the peer.
Then, calls send_message that creates a new payload and sends it over the
network transport as a packet.
Returns an error if something goes wrong.
Sourceasync fn send_message(&self, message: &SerializedMessage) -> Result<()>
async fn send_message(&self, message: &SerializedMessage) -> Result<()>
Sends the encoded payload of provided SerializedMessage by writing
the data to the channel async stream.
Sourcepub async fn read_command<R: AsyncRead + Unpin + Send + Sized>(
&self,
stream: &mut R,
) -> Result<String>
pub async fn read_command<R: AsyncRead + Unpin + Send + Sized>( &self, stream: &mut R, ) -> Result<String>
Returns a decoded Message command. We start by extracting the length from the stream, then allocate the precise buffer for this length using stream.take(). This manual deserialization provides a basic DDOS protection, since it prevents nodes from sending an arbitarily large payload.
Sourcepub async fn subscribe_msg<M: Message>(&self) -> Result<MessageSubscription<M>>
pub async fn subscribe_msg<M: Message>(&self) -> Result<MessageSubscription<M>>
Subscribe to a message on the message subsystem.
Sourceasync fn handle_stop(self: Arc<Self>, result: Result<()>)
async fn handle_stop(self: Arc<Self>, result: Result<()>)
Handle network errors. Panic if error passes silently, otherwise broadcast the error.
Sourceasync fn main_receive_loop(self: Arc<Self>) -> Result<()>
async fn main_receive_loop(self: Arc<Self>) -> Result<()>
Run the receive loop. Start receiving messages or handle network failure.
Sourcepub fn address(&self) -> &Url
pub fn address(&self) -> &Url
Returns the relevant socket address for this connection. If this is an outbound connection, the transport-processed resolve_addr will be returned except for transport mixed connections, to make sure mixed hosts don’t enter hostlist. Otherwise for inbound connections it will default to connect_addr.
Sourcepub fn display_address(&self) -> &Url
pub fn display_address(&self) -> &Url
Returns the address used for UI purposes like in logging or tools like dnet. For transport_mixed connection shows the mixed address.
Sourcepub fn resolve_addr(&self) -> Option<Url>
pub fn resolve_addr(&self) -> Option<Url>
Returns the socket address that has undergone transport processing, if it exists. Returns None otherwise.
Sourcepub fn connect_addr(&self) -> &Url
pub fn connect_addr(&self) -> &Url
Return the socket address without transport processing.
Sourcepub(crate) async fn set_version(&self, version: Arc<VersionMessage>)
pub(crate) async fn set_version(&self, version: Arc<VersionMessage>)
Set the VersionMessage of the node this channel is connected
to. Called on receiving a version message in ProtocolVersion.
Sourcepub fn get_version(&self) -> Arc<VersionMessage>
pub fn get_version(&self) -> Arc<VersionMessage>
Should only be called after the version exchange has been completed.
Sourcepub fn message_subsystem(&self) -> &MessageSubsystem
pub fn message_subsystem(&self) -> &MessageSubsystem
Returns the inner MessageSubsystem reference
fn session(&self) -> Arc<dyn Session>
pub fn session_type_id(&self) -> SessionBitFlag
pub fn p2p(&self) -> P2pPtr
pub fn hosts(&self) -> HostsPtr
fn is_eof_error(err: &Error) -> bool
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Channel
impl !RefUnwindSafe for Channel
impl Send for Channel
impl Sync for Channel
impl Unpin for Channel
impl !UnwindSafe for Channel
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.