Commit a3dd5707 authored by Barciś, Agata's avatar Barciś, Agata
Browse files

sniffing and periodic forcaster

parent f638d57c
#! /usr/bin/env python #! /usr/bin/env python
import time import time
import json
import threading
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
...@@ -20,6 +22,9 @@ from rclpy.qos import ( ...@@ -20,6 +22,9 @@ from rclpy.qos import (
QoSHistoryPolicy, QoSHistoryPolicy,
QoSReliabilityPolicy, QoSReliabilityPolicy,
) )
from rclpy.qos import QoSPresetProfiles
from sniffing import sniffer
from mission_manager.client import MissionExecutor, MissionClient from mission_manager.client import MissionExecutor, MissionClient
from infdist.optimization.agent import EstimatingAgent from infdist.optimization.agent import EstimatingAgent
...@@ -54,6 +59,7 @@ class CameraStream(Node, MissionExecutor): ...@@ -54,6 +59,7 @@ class CameraStream(Node, MissionExecutor):
CompressedImage, CompressedImage,
'/{}/camera'.format(self.hostname), '/{}/camera'.format(self.hostname),
qos_profile=CUSTOM_QOS, qos_profile=CUSTOM_QOS,
# qos_profile=QoSPresetProfiles.get_from_short_key('SYSTEM_DEFAULT')
) )
self.timer_period = 0.5 self.timer_period = 0.5
self.timer = None self.timer = None
...@@ -63,7 +69,21 @@ class CameraStream(Node, MissionExecutor): ...@@ -63,7 +69,21 @@ class CameraStream(Node, MissionExecutor):
self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
self.cam.read() self.cam.read()
self.offset = self.timer_period/8 * int(self.hosts[self.hostname]) self.offset = self.timer_period/8 * int(self.hosts[self.hostname])
self.t_start = datetime.now().timestamp()
self.t_end = 30 self.t_end = 30
self.sniffer = threading.Thread(
target=sniffer.sniff,
args=(self.camera_received_callback,)
)
self.sniffer.start()
def start_mission_at_timestamp(self, timestamp):
super().start_mission_at_timestamp(timestamp)
self.t_start = timestamp.timestamp()
def end_mission_at_timestamp(self, timestamp):
super().end_mission_at_timestamp(timestamp)
self.t_end = timestamp.timestamp() - self.t_start
self.agent = EstimatingAgent( self.agent = EstimatingAgent(
ident=self.hostname, ident=self.hostname,
net=ROS2Network(), net=ROS2Network(),
...@@ -71,12 +91,29 @@ class CameraStream(Node, MissionExecutor): ...@@ -71,12 +91,29 @@ class CameraStream(Node, MissionExecutor):
agents=self.hosts.keys(), agents=self.hosts.keys(),
data_types=['camera'], data_types=['camera'],
t_end=self.t_end, t_end=self.t_end,
T=self.timer_period,
), ),
now_func=datetime.now, now_func=lambda: time.time() - self.t_start,
agents=['base_station'], agents={'base_station': lambda t: set()},
constraints={}, constraints={},
) )
def camera_received_callback(self, msg_metadata):
if msg_metadata['sender'] == self.hostname:
return
native_message = ROS2NativeMessage(
msg=None,
sender=msg_metadata['sender'],
receivers=['base_station'],
data_type_name=f'camera_{msg_metadata["sender"]}',
data={
'battery_level': 1,
'max_depl_rate': 0.1,
},
stamp=msg_metadata['stamp']
)
self.agent.received(native_message)
def start_mission(self, timestamp): def start_mission(self, timestamp):
self.get_logger().info('Mission started') self.get_logger().info('Mission started')
time.sleep(self.offset) time.sleep(self.offset)
...@@ -107,8 +144,15 @@ class CameraStream(Node, MissionExecutor): ...@@ -107,8 +144,15 @@ class CameraStream(Node, MissionExecutor):
int, int,
(now.nanoseconds % 10**9, now.nanoseconds/10**9) (now.nanoseconds % 10**9, now.nanoseconds/10**9)
) )
metadata = {
'stamp': (sec + nanosec * 1e-9) - self.t_start,
'red': 0,
'sender': self.hostname
}
metadata_str = json.dumps(metadata)
frame_id = "MEEETADATA" + chr(len(metadata_str)) + metadata_str
header = Header( header = Header(
frame_id="", frame_id=frame_id,
stamp=Time(sec=sec, nanosec=nanosec) stamp=Time(sec=sec, nanosec=nanosec)
) )
msg.header = header msg.header = header
...@@ -123,10 +167,15 @@ class CameraStream(Node, MissionExecutor): ...@@ -123,10 +167,15 @@ class CameraStream(Node, MissionExecutor):
'battery_level': 1, 'battery_level': 1,
'max_depl_rate': 0.1, 'max_depl_rate': 0.1,
}, },
publisher=self.publisher publisher=self.publisher,
stamp=(
msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
) - self.t_start
) )
self.agent.generated(native_message) try:
print("image sent") self.agent.generated(native_message)
except Exception as e:
print(e)
def main(): def main():
......
...@@ -3,19 +3,25 @@ ...@@ -3,19 +3,25 @@
from infdist.optimization.models import MissionContext, InformationType from infdist.optimization.models import MissionContext, InformationType
from infdist.optimization.aggregations import AggregationMostRecent from infdist.optimization.aggregations import AggregationMostRecent
from infdist.optimization.utilities import UtilityBattery from infdist.optimization.utilities import UtilityBattery
from infdist.optimization.message_forecast import EmptyTypeForecast from infdist.optimization.message_forecast import PeriodicTypeForecast
def generate_mission_context(agents, data_types, t_end): def generate_mission_context(agents, data_types, t_end, T):
return MissionContext( return MissionContext(
set([ set([
InformationType( InformationType(
f'{data_type}_{agent}', f'{data_type}_{agent}',
utility_cls=UtilityBattery, utility_cls=UtilityBattery,
aggregation_cls=AggregationMostRecent, aggregation_cls=AggregationMostRecent,
message_forecast_cls=EmptyTypeForecast, message_forecast_cls=PeriodicTypeForecast,
message_forecast_kwargs={ message_forecast_kwargs={
't_end': t_end, 't_end': t_end,
'data_type_name': f'{data_type}_{agent}',
'max_depl_rate': 0.1,
'battery_level': 1,
'receivers': ['base_station'],
'T': T,
'sender': f'{agent}',
}, },
weight=1 weight=1
) )
......
...@@ -11,19 +11,21 @@ CMD = ( ...@@ -11,19 +11,21 @@ CMD = (
'-T fields -e data'.split() '-T fields -e data'.split()
) )
p = sub.Popen(CMD, stdout=sub.PIPE)
for row in iter(p.stdout.readline, b''):
packet = binascii.unhexlify(row.rstrip())
FRAME_IDENT = b"MEEETADATA" def sniff(callback):
p = sub.Popen(CMD, stdout=sub.PIPE)
for row in iter(p.stdout.readline, b''):
packet = binascii.unhexlify(row.rstrip())
s = packet.find(FRAME_IDENT) FRAME_IDENT = b"MEEETADATA"
if s == -1:
continue
data_start = s+len(FRAME_IDENT)+1 s = packet.find(FRAME_IDENT)
data_len = packet[data_start-1] if s == -1:
data_str = packet[data_start:data_start+data_len] continue
data = json.loads(data_str) data_start = s+len(FRAME_IDENT)+1
print("!", data) data_len = packet[data_start-1]
data_str = packet[data_start:data_start+data_len]
data = json.loads(data_str)
callback(data)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment