darkfi/event_graph/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2026 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashSet, VecDeque},
21    slice,
22    sync::{
23        atomic::{AtomicUsize, Ordering::SeqCst},
24        Arc,
25    },
26};
27
28use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
29use smol::Executor;
30use tracing::{debug, error, trace, warn};
31
32use super::{Event, EventGraphPtr, NULL_ID};
33use crate::{
34    impl_p2p_message,
35    net::{
36        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
37        ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
38        ProtocolJobsManager, ProtocolJobsManagerPtr,
39    },
40    system::msleep,
41    util::time::NanoTimestamp,
42    Error, Result,
43};
44
45/// Malicious behaviour threshold. If the threshold is reached, we will
46/// drop the peer from our P2P connection.
47const MALICIOUS_THRESHOLD: usize = 5;
48
49/// Global limit of messages per window
50const WINDOW_MAXSIZE: usize = 200;
51/// Rolling length of the window
52const WINDOW_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(60);
53
54/// Rolling length of the window
55const RATELIMIT_EXPIRY_TIME: NanoTimestamp = NanoTimestamp::from_secs(10);
56/// Ratelimit kicks in above this count
57const RATELIMIT_MIN_COUNT: usize = 6;
58/// Sample point used to calculate sleep time when ratelimit is active
59const RATELIMIT_SAMPLE_IDX: usize = 10;
60/// Sleep for this amount of time when `count == RATE_LIMIT_SAMPLE_IDX`.
61const RATELIMIT_SAMPLE_SLEEP: usize = 1000;
62
63struct MovingWindow {
64    times: VecDeque<NanoTimestamp>,
65    expiry_time: NanoTimestamp,
66}
67
68impl MovingWindow {
69    fn new(expiry_time: NanoTimestamp) -> Self {
70        Self { times: VecDeque::new(), expiry_time }
71    }
72
73    /// Clean out expired timestamps from the window.
74    fn clean(&mut self) {
75        while let Some(ts) = self.times.front() {
76            let Ok(elapsed) = ts.elapsed() else {
77                debug!(target: "event_graph::protocol::MovingWindow::clean", "Timestamp [{ts}] is in future. Removing...");
78                let _ = self.times.pop_front();
79                continue
80            };
81            if elapsed < self.expiry_time {
82                break
83            }
84            let _ = self.times.pop_front();
85        }
86    }
87
88    /// Add new timestamp
89    fn ticktock(&mut self) {
90        self.clean();
91        self.times.push_back(NanoTimestamp::current_time());
92    }
93
94    #[inline]
95    fn count(&self) -> usize {
96        self.times.len()
97    }
98}
99
100/// P2P protocol implementation for the Event Graph.
101pub struct ProtocolEventGraph {
102    /// Pointer to the connected peer
103    channel: ChannelPtr,
104    /// Pointer to the Event Graph instance
105    event_graph: EventGraphPtr,
106    /// `MessageSubscriber` for `EventPut`
107    ev_put_sub: MessageSubscription<EventPut>,
108    /// `MessageSubscriber` for `EventReq`
109    ev_req_sub: MessageSubscription<EventReq>,
110    /// `MessageSubscriber` for `EventRep`
111    ev_rep_sub: MessageSubscription<EventRep>,
112    /// `MessageSubscriber` for `TipReq`
113    tip_req_sub: MessageSubscription<TipReq>,
114    /// `MessageSubscriber` for `TipRep`
115    _tip_rep_sub: MessageSubscription<TipRep>,
116    /// Peer malicious message count
117    malicious_count: AtomicUsize,
118    /// P2P jobs manager pointer
119    jobsman: ProtocolJobsManagerPtr,
120    /// To apply the rate-limit, we don't broadcast directly but instead send into the
121    /// sending queue.
122    broadcaster_push: smol::channel::Sender<EventPut>,
123    /// Receive send requests and rate-limit broadcasting them.
124    broadcaster_pull: smol::channel::Receiver<EventPut>,
125}
126
127/// A P2P message representing publishing an event on the network
128#[derive(Clone, SerialEncodable, SerialDecodable)]
129pub struct EventPut(pub Event);
130impl_p2p_message!(EventPut, "EventGraph::EventPut", 0, 0, DEFAULT_METERING_CONFIGURATION);
131
132/// A P2P message representing an event request
133#[derive(Clone, SerialEncodable, SerialDecodable)]
134pub struct EventReq(pub Vec<blake3::Hash>);
135impl_p2p_message!(EventReq, "EventGraph::EventReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
136
137/// A P2P message representing an event reply
138#[derive(Clone, SerialEncodable, SerialDecodable)]
139pub struct EventRep(pub Vec<Event>);
140impl_p2p_message!(EventRep, "EventGraph::EventRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
141
142/// A P2P message representing a request for a peer's DAG tips
143#[derive(Clone, SerialEncodable, SerialDecodable)]
144pub struct TipReq {}
145impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
146
147/// A P2P message representing a reply for the peer's DAG tips
148#[derive(Clone, SerialEncodable, SerialDecodable)]
149pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
150impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
151
152#[async_trait]
153impl ProtocolBase for ProtocolEventGraph {
154    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
155        self.jobsman.clone().start(ex.clone());
156        self.jobsman.clone().spawn(self.clone().handle_event_put(), ex.clone()).await;
157        self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
158        self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await;
159        self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await;
160        Ok(())
161    }
162
163    fn name(&self) -> &'static str {
164        "ProtocolEventGraph"
165    }
166}
167
168impl ProtocolEventGraph {
169    pub async fn init(event_graph: EventGraphPtr, channel: ChannelPtr) -> Result<ProtocolBasePtr> {
170        let msg_subsystem = channel.message_subsystem();
171        msg_subsystem.add_dispatch::<EventPut>().await;
172        msg_subsystem.add_dispatch::<EventReq>().await;
173        msg_subsystem.add_dispatch::<EventRep>().await;
174        msg_subsystem.add_dispatch::<TipReq>().await;
175        msg_subsystem.add_dispatch::<TipRep>().await;
176
177        let ev_put_sub = channel.subscribe_msg::<EventPut>().await?;
178        let ev_req_sub = channel.subscribe_msg::<EventReq>().await?;
179        let ev_rep_sub = channel.subscribe_msg::<EventRep>().await?;
180        let tip_req_sub = channel.subscribe_msg::<TipReq>().await?;
181        let _tip_rep_sub = channel.subscribe_msg::<TipRep>().await?;
182
183        let (broadcaster_push, broadcaster_pull) = smol::channel::unbounded();
184
185        Ok(Arc::new(Self {
186            channel: channel.clone(),
187            event_graph,
188            ev_put_sub,
189            ev_req_sub,
190            ev_rep_sub,
191            tip_req_sub,
192            _tip_rep_sub,
193            malicious_count: AtomicUsize::new(0),
194            jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
195            broadcaster_push,
196            broadcaster_pull,
197        }))
198    }
199
200    async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
201        let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
202        if malicious_count + 1 == MALICIOUS_THRESHOLD {
203            error!(
204                target: "event_graph::protocol::handle_event_put",
205                "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
206                self.channel.display_address(),
207            );
208            self.channel.stop().await;
209            return Err(Error::ChannelStopped)
210        }
211
212        warn!(
213            target: "event_graph::protocol::handle_event_put",
214            "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.display_address(),
215        );
216
217        Ok(())
218    }
219
220    /// Protocol function handling `EventPut`.
221    /// This is triggered whenever someone broadcasts (or relays) a new
222    /// event on the network.
223    async fn handle_event_put(self: Arc<Self>) -> Result<()> {
224        // Rolling window of event timestamps on this channel
225        let mut bantimes = MovingWindow::new(WINDOW_EXPIRY_TIME);
226
227        loop {
228            let event = match self.ev_put_sub.receive().await {
229                Ok(v) => v.0.clone(),
230                Err(_) => continue,
231            };
232            trace!(
233                 target: "event_graph::protocol::handle_event_put",
234                 "Got EventPut: {} [{}]", event.id(), self.channel.display_address(),
235            );
236
237            // Check if node has finished syncing its DAG
238            if !*self.event_graph.synced.read().await {
239                debug!(
240                    target: "event_graph::protocol::handle_event_put",
241                    "DAG is still syncing, skipping..."
242                );
243                continue
244            }
245
246            // If we have already seen the event, we'll stay quiet.
247            let event_id = event.id();
248            if self.event_graph.dag.contains_key(event_id.as_bytes()).unwrap() {
249                debug!(
250                    target: "event_graph::protocol::handle_event_put",
251                    "Event {event_id} is already known"
252                );
253                continue
254            }
255
256            // There's a new unique event.
257            // Apply ban logic to stop network floods.
258            bantimes.ticktock();
259            if bantimes.count() > WINDOW_MAXSIZE {
260                self.channel.ban().await;
261                // This error is actually unused. We could return Ok here too.
262                return Err(Error::MaliciousFlood)
263            }
264
265            // We received an event. Check if we already have it in our DAG.
266            // Check event is not older that current genesis event timestamp.
267            // Also check if we have the event's parents. In the case we do
268            // not have the parents, we'll request them from the peer that has
269            // sent this event to us. In case they do not reply in time, we drop
270            // the event.
271
272            // Check if the event is older than the genesis event. If so, we should
273            // not include it in our Dag.
274            // The genesis event marks the last time the Dag has been pruned of old
275            // events. The pruning interval is defined by the days_rotation field
276            // of [`EventGraph`].
277            let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
278            if event.timestamp < genesis_timestamp {
279                debug!(
280                    target: "event_graph::protocol::handle_event_put",
281                    "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
282                event.id(), event.timestamp
283                );
284            }
285
286            // Validate the new event first. If we do not consider it valid, we
287            // will just drop it and stay quiet. If the malicious threshold
288            // is reached, we will stop the connection.
289            if !event.validate_new() {
290                self.clone().increase_malicious_count().await?;
291                continue
292            }
293
294            // At this point, this is a new event to us. Let's see if we
295            // have all of its parents.
296            debug!(
297                target: "event_graph::protocol::handle_event_put",
298                "Event {event_id} is new"
299            );
300
301            let mut missing_parents = HashSet::new();
302            for parent_id in event.parents.iter() {
303                // `event.validate_new()` should have already made sure that
304                // not all parents are NULL, and that there are no duplicates.
305                if parent_id == &NULL_ID {
306                    continue
307                }
308
309                if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() {
310                    missing_parents.insert(*parent_id);
311                }
312            }
313
314            // If we have missing parents, then we have to attempt to
315            // fetch them from this peer. Do this recursively until we
316            // find all of them.
317            if !missing_parents.is_empty() {
318                // We track the received events mapped by their layer.
319                // If/when we get all of them, we need to insert them in order so
320                // the DAG state stays correct and unreferenced tips represent the
321                // actual thing they should. If we insert them out of order, then
322                // we might have wrong unreferenced tips.
323                let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
324                let mut received_events_hashes = HashSet::new();
325
326                debug!(
327                    target: "event_graph::protocol::handle_event_put",
328                    "Event has {} missing parents. Requesting...", missing_parents.len(),
329                );
330
331                while !missing_parents.is_empty() {
332                    // for parent_id in missing_parents.clone().iter() {
333                    debug!(
334                        target: "event_graph::protocol::handle_event_put",
335                        "Requesting {missing_parents:?}..."
336                    );
337
338                    self.channel
339                        .send(&EventReq(missing_parents.clone().into_iter().collect()))
340                        .await?;
341
342                    let outbound_connect_timeout = self
343                        .event_graph
344                        .p2p
345                        .settings()
346                        .read_arc()
347                        .await
348                        .outbound_connect_timeout(self.channel.address().scheme());
349                    // Node waits for response
350                    let Ok(parents) =
351                        self.ev_rep_sub.receive_with_timeout(outbound_connect_timeout).await
352                    else {
353                        error!(
354                            target: "event_graph::protocol::handle_event_put",
355                            "[EVENTGRAPH] Timeout while waiting for parents {missing_parents:?} from {}",
356                            self.channel.display_address(),
357                        );
358                        self.channel.stop().await;
359                        return Err(Error::ChannelStopped)
360                    };
361
362                    let parents = parents.0.clone();
363
364                    for parent in parents {
365                        let parent_id = parent.id();
366                        if !missing_parents.contains(&parent_id) {
367                            error!(
368                                target: "event_graph::protocol::handle_event_put",
369                                "[EVENTGRAPH] Peer {} replied with a wrong event: {}",
370                                self.channel.display_address(), parent.id(),
371                            );
372                            self.channel.stop().await;
373                            return Err(Error::ChannelStopped)
374                        }
375
376                        debug!(
377                            target: "event_graph::protocol::handle_event_put",
378                            "Got correct parent event {}", parent.id(),
379                        );
380
381                        if let Some(layer_events) = received_events.get_mut(&parent.layer) {
382                            layer_events.push(parent.clone());
383                        } else {
384                            let layer_events = vec![parent.clone()];
385                            received_events.insert(parent.layer, layer_events);
386                        }
387                        received_events_hashes.insert(parent_id);
388
389                        missing_parents.remove(&parent_id);
390
391                        // See if we have the upper parents
392                        for upper_parent in parent.parents.iter() {
393                            if upper_parent == &NULL_ID {
394                                continue
395                            }
396
397                            if !missing_parents.contains(upper_parent) &&
398                                !received_events_hashes.contains(upper_parent) &&
399                                !self
400                                    .event_graph
401                                    .dag
402                                    .contains_key(upper_parent.as_bytes())
403                                    .unwrap()
404                            {
405                                debug!(
406                                    target: "event_graph::protocol::handle_event_put",
407                                    "Found upper missing parent event {upper_parent}"
408                                );
409                                missing_parents.insert(*upper_parent);
410                            }
411                        }
412                    }
413                } // <-- while !missing_parents.is_empty()
414
415                // At this point we should've got all the events.
416                // We should add them to the DAG.
417                let mut events = vec![];
418                for (_, tips) in received_events {
419                    for tip in tips {
420                        events.push(tip);
421                    }
422                }
423                if self.event_graph.dag_insert(&events).await.is_err() {
424                    self.clone().increase_malicious_count().await?;
425                    continue
426                }
427            } // <-- !missing_parents.is_empty()
428
429            // If we're here, we have all the parents, and we can now
430            // perform a full validation and add the actual event to
431            // the DAG.
432            debug!(
433                target: "event_graph::protocol::handle_event_put",
434                "Got all parents necessary for insertion",
435            );
436            if self.event_graph.dag_insert(slice::from_ref(&event)).await.is_err() {
437                self.clone().increase_malicious_count().await?;
438                continue
439            }
440
441            self.broadcaster_push.send(EventPut(event)).await.expect("push broadcaster closed");
442        }
443    }
444
445    /// Protocol function handling `EventReq`.
446    /// This is triggered whenever someone requests an event from us.
447    async fn handle_event_req(self: Arc<Self>) -> Result<()> {
448        loop {
449            let event_ids = match self.ev_req_sub.receive().await {
450                Ok(v) => v.0.clone(),
451                Err(_) => continue,
452            };
453            trace!(
454                target: "event_graph::protocol::handle_event_req",
455                "Got EventReq: {event_ids:?} [{}]", self.channel.display_address(),
456            );
457
458            // Check if node has finished syncing its DAG
459            if !*self.event_graph.synced.read().await {
460                debug!(
461                    target: "event_graph::protocol::handle_event_req",
462                    "DAG is still syncing, skipping..."
463                );
464                continue
465            }
466
467            // We received an event request from somebody.
468            // If we do have it, we will send it back to them as `EventRep`.
469            // Otherwise, we'll stay quiet. An honest node should always have
470            // something to reply with provided that the request is legitimate,
471            // i.e. we've sent something to them and they did not have some of
472            // the parents.
473
474            // Check if we expected this request to come around.
475            // I dunno if this is a good idea, but it seems it will help
476            // against malicious event requests where they want us to keep
477            // reading our db and steal our bandwidth.
478            let mut events = vec![];
479            for event_id in event_ids.iter() {
480                if !self.event_graph.broadcasted_ids.read().await.contains(event_id) {
481                    let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
482                    if malicious_count + 1 == MALICIOUS_THRESHOLD {
483                        error!(
484                            target: "event_graph::protocol::handle_event_req",
485                            "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
486                            self.channel.display_address(),
487                        );
488                        self.channel.stop().await;
489                        return Err(Error::ChannelStopped)
490                    }
491
492                    warn!(
493                        target: "event_graph::protocol::handle_event_req",
494                        "[EVENTGRAPH] Peer {} requested an unexpected event {event_id:?}",
495                        self.channel.display_address()
496                    );
497                    continue
498                }
499
500                // At this point we should have it in our DAG.
501                // This code panics if this is not the case.
502                debug!(
503                    target: "event_graph::protocol::handle_event_req",
504                    "Fetching event {event_id:?} from DAG"
505                );
506                events.push(self.event_graph.dag_get(event_id).await.unwrap().unwrap());
507            }
508
509            // Check if the incoming event is older than the genesis event. If so, something
510            // has gone wrong. The event should have been pruned during the last
511            // rotation.
512            let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
513            let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
514
515            for event in events.iter() {
516                if event.timestamp < genesis_timestamp {
517                    error!(
518                        target: "event_graph::protocol::handle_event_req",
519                        "Requested event by peer {} is older than previous rotation period. It should have been pruned.
520                    Event timestamp: `{}`. Genesis timestamp: `{genesis_timestamp}`",
521                    event.id(), event.timestamp
522                    );
523                }
524
525                // Now let's get the upper level of event IDs. When we reply, we could
526                // get requests for those IDs as well.
527                for parent_id in event.parents.iter() {
528                    if parent_id != &NULL_ID {
529                        bcast_ids.insert(*parent_id);
530                    }
531                }
532            }
533            // TODO: We should remove the reply from the bcast IDs for this specific channel.
534            //       We can't remove them for everyone.
535            //bcast_ids.remove(&event_id);
536            drop(bcast_ids);
537
538            // Reply with the event
539            self.channel.send(&EventRep(events)).await?;
540        }
541    }
542
543    /// Protocol function handling `TipReq`.
544    /// This is triggered when someone requests the current unreferenced
545    /// tips of our DAG.
546    async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
547        loop {
548            self.tip_req_sub.receive().await?;
549            trace!(
550                target: "event_graph::protocol::handle_tip_req",
551                "Got TipReq [{}]", self.channel.display_address(),
552            );
553
554            // Check if node has finished syncing its DAG
555            if !*self.event_graph.synced.read().await {
556                debug!(
557                    target: "event_graph::protocol::handle_tip_req",
558                    "DAG is still syncing, skipping..."
559                );
560                continue
561            }
562
563            // TODO: Rate limit
564
565            // We received a tip request. Let's find them, add them to
566            // our bcast ids list, and reply with them.
567            let layers = self.event_graph.unreferenced_tips.read().await.clone();
568            let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
569            for (_, tips) in layers.iter() {
570                for tip in tips {
571                    bcast_ids.insert(*tip);
572                }
573            }
574            drop(bcast_ids);
575
576            self.channel.send(&TipRep(layers)).await?;
577        }
578    }
579
580    /// We need to rate limit message propagation so malicious nodes don't get us banned
581    /// for flooding. We do that by aggregating messages here into a queue then apply
582    /// rate limit logic before broadcasting.
583    ///
584    /// The rate limit logic is this:
585    ///
586    /// * If the count is less then RATELIMIT_MIN_COUNT then do nothing.
587    /// * Otherwise sleep for `sleep_time` ms.
588    ///
589    /// To calculate the sleep time, we use the RATELIMIT_SAMPLE_* values.
590    /// For example RATELIMIT_SAMPLE_IDX = 10, RATELIMIT_SAMPLE_SLEEP = 1000
591    /// means that when N = 10, then sleep for 1000 ms.
592    ///
593    /// Let RATELIMIT_MIN_COUNT = 6, then here's a table of sleep times:
594    ///
595    /// | Count | Sleep Time / ms |
596    /// |-------|-----------------|
597    /// | 0     | 0               |
598    /// | 4     | 0               |
599    /// | 6     | 0               |
600    /// | 10    | 1000            |
601    /// | 14    | 2000            |
602    /// | 18    | 3000            |
603    ///
604    /// So we use the sample to calculate a straight line from RATELIMIT_MIN_COUNT.
605    async fn broadcast_rate_limiter(self: Arc<Self>) -> Result<()> {
606        let mut ratelimit = MovingWindow::new(RATELIMIT_EXPIRY_TIME);
607
608        loop {
609            let event_put = self.broadcaster_pull.recv().await.expect("pull broadcaster closed");
610
611            ratelimit.ticktock();
612            if ratelimit.count() > RATELIMIT_MIN_COUNT {
613                let sleep_time =
614                    ((ratelimit.count() - RATELIMIT_MIN_COUNT) * RATELIMIT_SAMPLE_SLEEP /
615                        (RATELIMIT_SAMPLE_IDX - RATELIMIT_MIN_COUNT)) as u64;
616                debug!(
617                    target: "event_graph::protocol::broadcast_rate_limiter",
618                    "Activated rate limit: sleeping {sleep_time} ms [count={}]",
619                    ratelimit.count()
620                );
621                // Apply the ratelimit
622                msleep(sleep_time).await;
623            }
624
625            // Relay the event to other peers.
626            self.event_graph
627                .p2p
628                .broadcast_with_exclude(&event_put, &[self.channel.address().clone()])
629                .await;
630        }
631    }
632}
633
634#[cfg(test)]
635mod test {
636    use super::*;
637    use std::time::UNIX_EPOCH;
638
639    #[test]
640    fn test_eventgraph_moving_window_clean_future() {
641        let mut window = MovingWindow::new(NanoTimestamp::from_secs(60));
642        let future = UNIX_EPOCH.elapsed().unwrap().as_secs() + 100;
643        window.times.push_back(NanoTimestamp::from_secs(future.into()));
644        window.clean();
645        assert_eq!(window.count(), 0);
646    }
647}