Commit 30ce9ab6 authored by Barciś, Agata's avatar Barciś, Agata
Browse files

WIP

parent 129b9fe6
...@@ -11,6 +11,7 @@ from std_msgs.msg import Header ...@@ -11,6 +11,7 @@ from std_msgs.msg import Header
from sensor_msgs.msg import CompressedImage from sensor_msgs.msg import CompressedImage
import cv2 import cv2
import numpy as np
from builtin_interfaces.msg import Time from builtin_interfaces.msg import Time
import socket import socket
...@@ -29,7 +30,11 @@ from sniffing import sniffer ...@@ -29,7 +30,11 @@ from sniffing import sniffer
from mission_manager.client import MissionExecutor, MissionClient from mission_manager.client import MissionExecutor, MissionClient
from infdist.optimization.agent import EstimatingAgent from infdist.optimization.agent import EstimatingAgent
from infdist.robot.ros2_network import ROS2Network, ROS2NativeMessage from infdist.robot.ros2_network import ROS2Network, ROS2NativeMessage
from infdist.optimization.simplesim import (
create_msgnum_constraint_violations,
)
from mission import generate_mission_context from mission import generate_mission_context
from detect import detect_red
CUSTOM_QOS = QoSProfile( CUSTOM_QOS = QoSProfile(
history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST, history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
...@@ -45,14 +50,14 @@ class CameraStream(Node, MissionExecutor): ...@@ -45,14 +50,14 @@ class CameraStream(Node, MissionExecutor):
def __init__(self): def __init__(self):
super().__init__('camera_stream') super().__init__('camera_stream')
self.hosts = { self.hosts = {
'fdrones_t90': 0, 'fdrones_b01': 0,
'fdrones_t91': 1, 'fdrones_b02': 1,
'fdrones_t92': 2, 'fdrones_b03': 2,
'fdrones_t93': 3, 'fdrones_b05': 3,
'fdrones_t94': 4, 'fdrones_b06': 4,
'fdrones_t95': 5, 'fdrones_b07': 5,
'fdrones_t96': 6, 'fdrones_b08': 6,
'fdrones_t97': 7, 'fdrones_b09': 7,
} }
self.hostname = socket.gethostname() self.hostname = socket.gethostname()
self.publisher = self.create_publisher( self.publisher = self.create_publisher(
...@@ -61,11 +66,13 @@ class CameraStream(Node, MissionExecutor): ...@@ -61,11 +66,13 @@ class CameraStream(Node, MissionExecutor):
qos_profile=CUSTOM_QOS, qos_profile=CUSTOM_QOS,
# qos_profile=QoSPresetProfiles.get_from_short_key('SYSTEM_DEFAULT') # qos_profile=QoSPresetProfiles.get_from_short_key('SYSTEM_DEFAULT')
) )
self.timer_period = 0.5 self.timer_period = 0.25
self.timer = None self.timer = None
self.cam_fps = 60 self.cam_fps = 90
self.cam = cv2.VideoCapture(0) self.cam = cv2.VideoCapture(0)
self.cam.set(cv2.CAP_PROP_FPS, self.cam_fps) self.cam.set(cv2.CAP_PROP_FPS, self.cam_fps)
self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
self.cam.read() self.cam.read()
self.offset = self.timer_period/8 * int(self.hosts[self.hostname]) self.offset = self.timer_period/8 * int(self.hosts[self.hostname])
...@@ -85,30 +92,42 @@ class CameraStream(Node, MissionExecutor): ...@@ -85,30 +92,42 @@ class CameraStream(Node, MissionExecutor):
super().end_mission_at_timestamp(timestamp) super().end_mission_at_timestamp(timestamp)
self.t_end = timestamp.timestamp() - self.t_start self.t_end = timestamp.timestamp() - self.t_start
self.agent = EstimatingAgent( self.agent = EstimatingAgent(
ident=self.hostname, ident=self.hostname[-1],
net=ROS2Network(), net=ROS2Network(),
messages_context=generate_mission_context( messages_context=generate_mission_context(
agents=self.hosts.keys(), agents=[k[-1] for k in self.hosts.keys()],
data_types=['camera'], data_types=['v'],
t_end=self.t_end, t_end=self.t_end,
T=self.timer_period, T=self.timer_period,
), ),
now_func=lambda: time.time() - self.t_start, now_func=lambda: time.time() - self.t_start,
agents={'base_station': lambda t: set()}, agents={'base_station': lambda t: set()},
constraints={}, window_size=0.1,
constraints={
# create_throughput_constraint_violations(
# throughput,
# timeslot,
# message_size
# )
'msg_num': create_msgnum_constraint_violations(
msgnum=3,
timeslot=0.5,
)
},
limit_history=15,
) )
def camera_received_callback(self, msg_metadata): def camera_received_callback(self, msg_metadata):
if msg_metadata['sender'] == self.hostname: if msg_metadata['sender'] == self.hostname[-1]:
return return
native_message = ROS2NativeMessage( native_message = ROS2NativeMessage(
msg=None, msg=None,
sender=msg_metadata['sender'], sender=msg_metadata['sender'][-1],
receivers=['base_station'], receivers=['base_station'],
data_type_name=f'camera_{msg_metadata["sender"]}', data_type_name=f'v{msg_metadata["sender"][-1]}',
data={ data={
'battery_level': 1, 'red': msg_metadata['red'],
'max_depl_rate': 0.1,
}, },
stamp=msg_metadata['stamp'] stamp=msg_metadata['stamp']
) )
...@@ -129,6 +148,9 @@ class CameraStream(Node, MissionExecutor): ...@@ -129,6 +148,9 @@ class CameraStream(Node, MissionExecutor):
self.timer_period = 1 / float(params.get('fps', 2)) self.timer_period = 1 / float(params.get('fps', 2))
def timer_callback(self): def timer_callback(self):
if abs((datetime.now().timestamp() - self.t_start)%self.timer_period - self.offset) > 0.02:
print("skipping")
return
for i in range(16): for i in range(16):
start = datetime.now() start = datetime.now()
self.cam.grab() self.cam.grab()
...@@ -138,6 +160,8 @@ class CameraStream(Node, MissionExecutor): ...@@ -138,6 +160,8 @@ class CameraStream(Node, MissionExecutor):
frame = self.cam.retrieve()[1] frame = self.cam.retrieve()[1]
now = self.get_clock().now() now = self.get_clock().now()
frame = cv2.resize(frame, (320, 240)) frame = cv2.resize(frame, (320, 240))
mask = detect_red(frame)
red = np.round(np.sum(mask) / mask.size / 255, 5)
jpg = bytearray(cv2.imencode('.jpg', frame)[1]) jpg = bytearray(cv2.imencode('.jpg', frame)[1])
msg = CompressedImage() msg = CompressedImage()
nanosec, sec = map( nanosec, sec = map(
...@@ -146,7 +170,7 @@ class CameraStream(Node, MissionExecutor): ...@@ -146,7 +170,7 @@ class CameraStream(Node, MissionExecutor):
) )
metadata = { metadata = {
'stamp': (sec + nanosec * 1e-9) - self.t_start, 'stamp': (sec + nanosec * 1e-9) - self.t_start,
'red': 0, 'red': red,
'sender': self.hostname 'sender': self.hostname
} }
metadata_str = json.dumps(metadata) metadata_str = json.dumps(metadata)
...@@ -160,12 +184,11 @@ class CameraStream(Node, MissionExecutor): ...@@ -160,12 +184,11 @@ class CameraStream(Node, MissionExecutor):
msg.data = jpg msg.data = jpg
native_message = ROS2NativeMessage( native_message = ROS2NativeMessage(
msg=msg, msg=msg,
sender=self.hostname, sender=self.hostname[-1],
receivers=self.hosts, receivers=self.hosts,
data_type_name=f'camera_{self.hostname}', data_type_name=f'v{self.hostname[-1]}',
data={ data={
'battery_level': 1, 'red': red,
'max_depl_rate': 0.1,
}, },
publisher=self.publisher, publisher=self.publisher,
stamp=( stamp=(
......
...@@ -2,30 +2,31 @@ ...@@ -2,30 +2,31 @@
from infdist.optimization.models import MissionContext, InformationType from infdist.optimization.models import MissionContext, InformationType
from infdist.optimization.aggregations import AggregationMostRecent from infdist.optimization.aggregations import AggregationMostRecent
from infdist.optimization.utilities import UtilityBattery
from infdist.optimization.message_forecast import PeriodicTypeForecast from utility import PeriodicTypeForecast, UtilityVideo
# from infdist.optimization.message_forecast import PeriodicTypeForecast
def generate_mission_context(agents, data_types, t_end, T): def generate_mission_context(agents, data_types, t_end, T):
return MissionContext( return MissionContext(
set([ set([
InformationType( InformationType(
f'{data_type}_{agent}', f'{data_type}{agent}',
utility_cls=UtilityBattery, utility_cls=UtilityVideo,
aggregation_cls=AggregationMostRecent, aggregation_cls=AggregationMostRecent,
message_forecast_cls=PeriodicTypeForecast, message_forecast_cls=PeriodicTypeForecast,
message_forecast_kwargs={ message_forecast_kwargs={
't_end': t_end, 't_end': t_end,
'data_type_name': f'{data_type}_{agent}', 'data_type_name': f'{data_type}{agent}',
'max_depl_rate': 0.1, 'red': 0,
'battery_level': 1,
'receivers': ['base_station'], 'receivers': ['base_station'],
'T': T, 'T': T,
'sender': f'{agent}', 'sender': f'{agent}',
'initial_t_start': T/8 * i,
}, },
weight=1 weight=1
) )
for data_type in data_types for data_type in data_types
for agent in agents for i, agent in enumerate(agents)
]) ])
) )
...@@ -13,7 +13,7 @@ CMD = ( ...@@ -13,7 +13,7 @@ CMD = (
def sniff(callback): def sniff(callback):
p = sub.Popen(CMD, stdout=sub.PIPE) p = sub.Popen(CMD, stdout=sub.PIPE, stderr=sub.PIPE)
for row in iter(p.stdout.readline, b''): for row in iter(p.stdout.readline, b''):
packet = binascii.unhexlify(row.rstrip()) packet = binascii.unhexlify(row.rstrip())
......
import math
from datetime import timedelta
import numpy as np
from infdist.optimization.utilities import Utility
from infdist.optimization.message_forecast import BaseTypeForecast, Samples
from infdist.optimization.models import Message
class PeriodicTypeForecast(BaseTypeForecast):
def __init__(
self,
data_type_name,
t_end,
red,
receivers,
T,
sender,
initial_t_start=0,
):
self.data_type_name = data_type_name
self.t_end = t_end
self.red = red
self.receivers = receivers
self.T = T
self.t_start = initial_t_start
self.t_start_samples = Samples()
self.previous_message = None
self.sender = sender
def register_message(self, message):
assert self.sender == message.sender
if self.previous_message is not None:
t_start = message.t_gen % self.T
self.t_start_samples.register(t_start)
self.t_start = self.t_start_samples.average()
self.previous_message = message
def estimate_t_end(self):
return self.t_end
def create_message(self, t_gen):
return Message(
self.sender,
set(self.receivers) - set([self.sender]),
t_gen,
self.data_type_name,
{
'red': self.previous_message.data.red if self.previous_message else 0,
}
)
def message_generator(self, t_start, given_messages):
start = t_start + self.T - (t_start - self.t_start) % self.T
for t in np.arange(start, self.t_end, self.T):
message = self.create_message(t)
while (
given_messages and
(message.t_gen >= given_messages[0].t_gen or
message == given_messages[0])
):
m = given_messages.pop(0)
if m == message:
break
yield m
yield message
for m in given_messages:
yield m
class UtilityVideo(Utility):
def func(self, m, t, s):
length = (t-m.t_rcv) if m.t_rcv else (t-m.t_gen)
# if isinstance(length, timedelta):
# length = length.total_seconds()
fmax = 12
f1 = 6
S = 2
k = S / (1 - f1 / (2 * fmax * math.log(2)))
m = 1 + math.log(1 / fmax, 2) * k
return 0 if t < m.t_rcv else max(
0, min(
1, -math.log(length, 2) * k + m
)
)
def integrate(self, m, t_start, t_end):
t_rcv = m.t_rcv if m.t_rcv else m.t_gen
t_s = max(t_rcv, t_start)
t_l = t_rcv + 0.25
red = m.data.red
log_part = 0
if t_end < t_s or t_start > t_l:
return 0
if t_end > t_l:
# logarithmic part
t = t_l
t0 = t_l - t_rcv
t1 = t_end - t_rcv
fmax = 4
f1 = 2
S = 2
base = 2
ln2 = math.log(base)
k = S / (1 - f1 / (2 * fmax * ln2))
m = 1 + math.log(1 / fmax, base) * k
log_part = max(0, (
(t1 * (1 - math.log(t1)) - t0 * (1 - math.log(t0))) / ln2 * k +
(t1 - t0) * m
))
else:
t = t_end
result = (t - t_s + log_part) * (1 + 1 * red)
# print('!!!!', t_start, t_end, t_s, t_l, result)
return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment