P2P Stream ids (#630)

inbound stream id
This commit is contained in:
Oscar Beaumont 2023-05-18 14:54:20 +08:00 committed by GitHub
parent 9e15b25085
commit b5c94701b6
4 changed files with 18 additions and 6 deletions

6
Cargo.lock generated
View File

@ -577,7 +577,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bf9e4f4da24116dc0e3ffef4afbabadb7fba375bf32ccf9a1da81a667bababc"
dependencies = [
"crypto-bigint 0.5.0",
"crypto-bigint 0.5.1",
"digest 0.10.6",
"password-hash",
]
@ -1450,9 +1450,9 @@ dependencies = [
[[package]]
name = "crypto-bigint"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071c0f5945634bc9ba7a452f492377dd6b1993665ddb58f28704119b32f07a9a"
checksum = "7c2538c4e68e52548bacb3e83ac549f903d44f011ac9d5abb5e132e67d0808f7"
dependencies = [
"generic-array",
"subtle",

View File

@ -38,6 +38,7 @@ pub enum Event<TMetadata: Metadata> {
#[derive(Debug)]
pub struct PeerMessageEvent<TMetadata: Metadata> {
pub stream_id: u64,
pub peer_id: PeerId,
pub manager: Arc<Manager<TMetadata>>,
pub stream: SpaceTimeStream,

View File

@ -1,7 +1,10 @@
use std::{
collections::HashSet,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, AtomicU64},
Arc,
},
};
use libp2p::{core::muxing::StreamMuxerBox, quic, Swarm, Transport};
@ -21,6 +24,7 @@ pub struct Manager<TMetadata: Metadata> {
pub(crate) mdns_state: Arc<MdnsState<TMetadata>>,
pub(crate) peer_id: PeerId,
pub(crate) application_name: &'static [u8],
pub(crate) stream_id: AtomicU64,
event_stream_tx: mpsc::Sender<ManagerStreamAction<TMetadata>>,
}
@ -51,6 +55,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
.as_bytes()
.to_vec(),
)),
stream_id: AtomicU64::new(0),
peer_id,
event_stream_tx,
});

View File

@ -1,4 +1,8 @@
use std::{future::Future, pin::Pin, sync::Arc};
use std::{
future::Future,
pin::Pin,
sync::{atomic::Ordering, Arc},
};
use libp2p::{core::UpgradeInfo, swarm::NegotiatedSubstream, InboundUpgrade};
use tracing::debug;
@ -27,12 +31,13 @@ impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtoco
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send + 'static>>;
fn upgrade_inbound(self, io: NegotiatedSubstream, _: Self::Info) -> Self::Future {
let id = self.manager.stream_id.fetch_add(1, Ordering::Relaxed);
Box::pin(async move {
let id = 1; // TODO
debug!(
"stream({}, {id}): accepting inbound connection",
self.peer_id
);
let stream = SpaceTimeStream::from_stream(io).await;
debug!(
"stream({}, {id}): stream of type {} accepted",
@ -42,6 +47,7 @@ impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtoco
Ok(ManagerStreamAction::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,
manager: self.manager.clone(),
stream,