diff --git a/camera_streaming/camera_stream.py b/camera_streaming/camera_stream.py index 9af84ab119563e6d06be1f55f28811ddf6d7d658..85d915d497a18c10a41b2bda44b912da7df48398 100644 --- a/camera_streaming/camera_stream.py +++ b/camera_streaming/camera_stream.py @@ -1,6 +1,8 @@ #! /usr/bin/env python import time +import json +import threading import rclpy from rclpy.node import Node @@ -20,6 +22,9 @@ from rclpy.qos import ( QoSHistoryPolicy, QoSReliabilityPolicy, ) +from rclpy.qos import QoSPresetProfiles + +from sniffing import sniffer from mission_manager.client import MissionExecutor, MissionClient from infdist.optimization.agent import EstimatingAgent @@ -54,6 +59,7 @@ class CameraStream(Node, MissionExecutor): CompressedImage, '/{}/camera'.format(self.hostname), qos_profile=CUSTOM_QOS, + # qos_profile=QoSPresetProfiles.get_from_short_key('SYSTEM_DEFAULT') ) self.timer_period = 0.5 self.timer = None @@ -63,7 +69,21 @@ class CameraStream(Node, MissionExecutor): self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) self.cam.read() self.offset = self.timer_period/8 * int(self.hosts[self.hostname]) + self.t_start = datetime.now().timestamp() self.t_end = 30 + self.sniffer = threading.Thread( + target=sniffer.sniff, + args=(self.camera_received_callback,) + ) + self.sniffer.start() + + def start_mission_at_timestamp(self, timestamp): + super().start_mission_at_timestamp(timestamp) + self.t_start = timestamp.timestamp() + + def end_mission_at_timestamp(self, timestamp): + super().end_mission_at_timestamp(timestamp) + self.t_end = timestamp.timestamp() - self.t_start self.agent = EstimatingAgent( ident=self.hostname, net=ROS2Network(), @@ -71,12 +91,29 @@ class CameraStream(Node, MissionExecutor): agents=self.hosts.keys(), data_types=['camera'], t_end=self.t_end, + T=self.timer_period, ), - now_func=datetime.now, - agents=['base_station'], + now_func=lambda: time.time() - self.t_start, + agents={'base_station': lambda t: set()}, constraints={}, ) + def camera_received_callback(self, msg_metadata): + if msg_metadata['sender'] == self.hostname: + return + native_message = ROS2NativeMessage( + msg=None, + sender=msg_metadata['sender'], + receivers=['base_station'], + data_type_name=f'camera_{msg_metadata["sender"]}', + data={ + 'battery_level': 1, + 'max_depl_rate': 0.1, + }, + stamp=msg_metadata['stamp'] + ) + self.agent.received(native_message) + def start_mission(self, timestamp): self.get_logger().info('Mission started') time.sleep(self.offset) @@ -107,8 +144,15 @@ class CameraStream(Node, MissionExecutor): int, (now.nanoseconds % 10**9, now.nanoseconds/10**9) ) + metadata = { + 'stamp': (sec + nanosec * 1e-9) - self.t_start, + 'red': 0, + 'sender': self.hostname + } + metadata_str = json.dumps(metadata) + frame_id = "MEEETADATA" + chr(len(metadata_str)) + metadata_str header = Header( - frame_id="", + frame_id=frame_id, stamp=Time(sec=sec, nanosec=nanosec) ) msg.header = header @@ -123,10 +167,15 @@ class CameraStream(Node, MissionExecutor): 'battery_level': 1, 'max_depl_rate': 0.1, }, - publisher=self.publisher + publisher=self.publisher, + stamp=( + msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9 + ) - self.t_start ) - self.agent.generated(native_message) - print("image sent") + try: + self.agent.generated(native_message) + except Exception as e: + print(e) def main(): diff --git a/camera_streaming/mission.py b/camera_streaming/mission.py index 2f0263a7a5d844112350babb9878aa2a482241b5..655764458f315f43237a7b8b618060dae086c20a 100644 --- a/camera_streaming/mission.py +++ b/camera_streaming/mission.py @@ -3,19 +3,25 @@ from infdist.optimization.models import MissionContext, InformationType from infdist.optimization.aggregations import AggregationMostRecent from infdist.optimization.utilities import UtilityBattery -from infdist.optimization.message_forecast import EmptyTypeForecast +from infdist.optimization.message_forecast import PeriodicTypeForecast -def generate_mission_context(agents, data_types, t_end): +def generate_mission_context(agents, data_types, t_end, T): return MissionContext( set([ InformationType( f'{data_type}_{agent}', utility_cls=UtilityBattery, aggregation_cls=AggregationMostRecent, - message_forecast_cls=EmptyTypeForecast, + message_forecast_cls=PeriodicTypeForecast, message_forecast_kwargs={ 't_end': t_end, + 'data_type_name': f'{data_type}_{agent}', + 'max_depl_rate': 0.1, + 'battery_level': 1, + 'receivers': ['base_station'], + 'T': T, + 'sender': f'{agent}', }, weight=1 ) diff --git a/camera_streaming/sniffing/__init__.py b/camera_streaming/sniffing/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/camera_streaming/sniffing/sniffer.py b/camera_streaming/sniffing/sniffer.py new file mode 100644 index 0000000000000000000000000000000000000000..068edc09e48c4672fc8c6ebb76a8f8ca67f258ea --- /dev/null +++ b/camera_streaming/sniffing/sniffer.py @@ -0,0 +1,31 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +import binascii +import json +import subprocess as sub + +CMD = ( + 'sudo tshark -i wlan0 -Y'.split() + + ["frame contains \"MEEETADATA\""] + + '-T fields -e data'.split() +) + + +def sniff(callback): + p = sub.Popen(CMD, stdout=sub.PIPE) + for row in iter(p.stdout.readline, b''): + packet = binascii.unhexlify(row.rstrip()) + + FRAME_IDENT = b"MEEETADATA" + + s = packet.find(FRAME_IDENT) + if s == -1: + continue + + data_start = s+len(FRAME_IDENT)+1 + data_len = packet[data_start-1] + data_str = packet[data_start:data_start+data_len] + + data = json.loads(data_str) + callback(data)