Commit 1395fbb6 authored by Barciś, Agata's avatar Barciś, Agata
Browse files

infdist agent added to streaming node

parent a7178889
#! /usr/bin/env python
import time
import rclpy
from rclpy.node import Node
from rclpy.executors import SingleThreadedExecutor
from std_msgs.msg import Header
from sensor_msgs.msg import CompressedImage
......@@ -9,7 +12,7 @@ import cv2
from builtin_interfaces.msg import Time
import socket
# from datetime import datetime
from datetime import datetime
from rclpy.qos import (
QoSProfile,
......@@ -18,6 +21,11 @@ from rclpy.qos import (
QoSReliabilityPolicy,
)
from mission_manager.client import MissionExecutor, MissionClient
from infdist.optimization.agent import EstimatingAgent
from infdist.robot.ros2_network import ROS2Network, ROS2NativeMessage
from mission import generate_mission_context
CUSTOM_QOS = QoSProfile(
history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
reliability=QoSReliabilityPolicy.
......@@ -28,29 +36,70 @@ CUSTOM_QOS = QoSProfile(
)
class CameraStream():
class CameraStream(Node, MissionExecutor):
def __init__(self):
self.node = Node('camera_stream')
hostname = socket.gethostname()
self.publisher = self.node.create_publisher(
super().__init__('camera_stream')
self.hosts = {
'fdrones_t90': 0,
'fdrones_t91': 1,
'fdrones_t92': 2,
'fdrones_t93': 3,
'fdrones_t94': 4,
'fdrones_t95': 5,
'fdrones_t96': 6,
'fdrones_t97': 7,
}
self.hostname = socket.gethostname()
self.publisher = self.create_publisher(
CompressedImage,
'/{}/camera'.format(hostname),
'/{}/camera'.format(self.hostname),
qos_profile=CUSTOM_QOS,
)
timer_period = 0.5
self.timer = self.node.create_timer(timer_period, self.timer_callback)
self.timer_period = 0.5
self.timer = None
self.cam_fps = 60
self.cam = cv2.VideoCapture(0)
self.cam.set(cv2.CAP_PROP_FPS, 60)
print(cv2.CAP_PROP_BUFFERSIZE)
self.cam.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cam.set(cv2.CAP_PROP_FPS, self.cam_fps)
self.cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
self.cam.read()
self.offset = self.timer_period/8 * int(self.hosts[self.hostname])
self.t_end = 30
self.agent = EstimatingAgent(
ident=self.hostname,
net=ROS2Network(),
messages_context=generate_mission_context(
agents=self.hosts.keys(),
data_types=['camera'],
t_end=self.t_end,
),
now_func=datetime.now,
agents=['base_station'],
constraints={},
)
def start_mission(self, timestamp):
self.get_logger().info('Mission started')
time.sleep(self.offset)
self.timer = self.create_timer(self.timer_period, self.timer_callback)
def end_mission(self, timestamp):
self.get_logger().info('Mission ended')
if self.timer:
self.timer.cancel()
def change_params(self, params, timestamp):
self.get_logger().info('Params change received')
self.timer_period = 1 / float(params.get('fps', 2))
def timer_callback(self):
for i in range(16):
start = datetime.now()
self.cam.grab()
end = datetime.now()
if (end - start).total_seconds() >= 1 / self.cam_fps: # buffer is empty
break
frame = self.cam.retrieve()[1]
now = self.node.get_clock().now()
now = self.get_clock().now()
frame = cv2.resize(frame, (320, 240))
jpg = bytearray(cv2.imencode('.jpg', frame)[1])
msg = CompressedImage()
......@@ -65,38 +114,34 @@ class CameraStream():
msg.header = header
msg.format = 'jpeg'
msg.data = jpg
self.publisher.publish(msg)
native_message = ROS2NativeMessage(
msg=msg,
sender=self.hostname,
receivers=self.hosts,
data_type_name=f'camera_{self.hostname}',
data={
'battery_level': 1,
'max_depl_rate': 0.1,
},
publisher=self.publisher
)
self.agent.generated(native_message)
print("image sent")
# reading = 0
# resizing = 0
# encoding = 0
#
# frame = cam.read()[1]
# for i in range(100):
# start_time = datetime.now()
# frame = cam.read()[1]
# read_time = datetime.now()
# frame = resize(frame, (320, 240))
# resize_time = datetime.now()
# jpg = imencode('.jpg', frame)[1].tostring()
# end_time = datetime.now()
# reading += (read_time - start_time).total_seconds()
# resizing += (resize_time - read_time).total_seconds()
# encoding += (end_time - resize_time).total_seconds()
#
# print('reading', reading/100)
# print('resizing', resizing/100)
# print('encoding', encoding/100)
# print('total', reading + resizing + encoding)
def main():
rclpy.init()
camera_stream = CameraStream()
rclpy.spin(camera_stream.node)
mission_client = MissionClient()
mission_client.add_mission_executor(camera_stream)
executor = SingleThreadedExecutor()
executor.add_node(camera_stream)
executor.add_node(mission_client)
mission_client.get_logger().info(
'Node initialized, waiting for events.'
)
executor.spin()
camera_stream.node.destroy_node()
rclpy.shutdown()
......
#! /usr/bin/env python
from infdist.optimization.models import MissionContext, InformationType
from infdist.optimization.aggregations import AggregationMostRecent
from infdist.optimization.utilities import UtilityBattery
from infdist.optimization.message_forecast import EmptyTypeForecast
def generate_mission_context(agents, data_types, t_end):
return MissionContext(
set([
InformationType(
f'{data_type}_{agent}',
utility_cls=UtilityBattery,
aggregation_cls=AggregationMostRecent,
message_forecast_cls=EmptyTypeForecast,
message_forecast_kwargs={
't_end': t_end,
},
weight=1
)
for data_type in data_types
for agent in agents
])
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment