Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
AAU NAV
development
camera_streaming
Commits
6dd217a8
Commit
6dd217a8
authored
Mar 01, 2020
by
Barciś, Agata
Browse files
Merge branch 'master' of gitlab.aau.at:aau-nav/development/camera_streaming
parents
d875af8b
30ce9ab6
Changes
4
Hide whitespace changes
Inline
Side-by-side
camera_streaming/camera_stream.py
View file @
6dd217a8
...
...
@@ -11,6 +11,7 @@ from std_msgs.msg import Header
from
sensor_msgs.msg
import
CompressedImage
import
cv2
import
numpy
as
np
from
builtin_interfaces.msg
import
Time
import
socket
...
...
@@ -29,7 +30,11 @@ from sniffing import sniffer
from
mission_manager.client
import
MissionExecutor
,
MissionClient
from
infdist.optimization.agent
import
EstimatingAgent
from
infdist.robot.ros2_network
import
ROS2Network
,
ROS2NativeMessage
from
infdist.optimization.simplesim
import
(
create_msgnum_constraint_violations
,
)
from
mission
import
generate_mission_context
from
detect
import
detect_red
CUSTOM_QOS
=
QoSProfile
(
history
=
QoSHistoryPolicy
.
RMW_QOS_POLICY_HISTORY_KEEP_LAST
,
...
...
@@ -45,14 +50,14 @@ class CameraStream(Node, MissionExecutor):
def
__init__
(
self
):
super
().
__init__
(
'camera_stream'
)
self
.
hosts
=
{
'fdrones_
t90
'
:
0
,
'fdrones_
t91
'
:
1
,
'fdrones_
t92
'
:
2
,
'fdrones_
t93
'
:
3
,
'fdrones_
t94
'
:
4
,
'fdrones_
t95
'
:
5
,
'fdrones_
t96
'
:
6
,
'fdrones_
t97
'
:
7
,
'fdrones_
b01
'
:
0
,
'fdrones_
b02
'
:
1
,
'fdrones_
b03
'
:
2
,
'fdrones_
b05
'
:
3
,
'fdrones_
b06
'
:
4
,
'fdrones_
b07
'
:
5
,
'fdrones_
b08
'
:
6
,
'fdrones_
b09
'
:
7
,
}
self
.
hostname
=
socket
.
gethostname
()
self
.
publisher
=
self
.
create_publisher
(
...
...
@@ -61,11 +66,13 @@ class CameraStream(Node, MissionExecutor):
qos_profile
=
CUSTOM_QOS
,
# qos_profile=QoSPresetProfiles.get_from_short_key('SYSTEM_DEFAULT')
)
self
.
timer_period
=
0.5
self
.
timer_period
=
0.
2
5
self
.
timer
=
None
self
.
cam_fps
=
6
0
self
.
cam_fps
=
9
0
self
.
cam
=
cv2
.
VideoCapture
(
0
)
self
.
cam
.
set
(
cv2
.
CAP_PROP_FPS
,
self
.
cam_fps
)
self
.
cam
.
set
(
cv2
.
CAP_PROP_FRAME_WIDTH
,
640
)
self
.
cam
.
set
(
cv2
.
CAP_PROP_FRAME_HEIGHT
,
480
)
self
.
cam
.
set
(
cv2
.
CAP_PROP_FOURCC
,
cv2
.
VideoWriter_fourcc
(
*
'MJPG'
))
self
.
cam
.
read
()
self
.
offset
=
self
.
timer_period
/
8
*
int
(
self
.
hosts
[
self
.
hostname
])
...
...
@@ -85,30 +92,42 @@ class CameraStream(Node, MissionExecutor):
super
().
end_mission_at_timestamp
(
timestamp
)
self
.
t_end
=
timestamp
.
timestamp
()
-
self
.
t_start
self
.
agent
=
EstimatingAgent
(
ident
=
self
.
hostname
,
ident
=
self
.
hostname
[
-
1
]
,
net
=
ROS2Network
(),
messages_context
=
generate_mission_context
(
agents
=
self
.
hosts
.
keys
(),
data_types
=
[
'
camera
'
],
agents
=
[
k
[
-
1
]
for
k
in
self
.
hosts
.
keys
()
]
,
data_types
=
[
'
v
'
],
t_end
=
self
.
t_end
,
T
=
self
.
timer_period
,
),
now_func
=
lambda
:
time
.
time
()
-
self
.
t_start
,
agents
=
{
'base_station'
:
lambda
t
:
set
()},
constraints
=
{},
window_size
=
0.1
,
constraints
=
{
# create_throughput_constraint_violations(
# throughput,
# timeslot,
# message_size
# )
'msg_num'
:
create_msgnum_constraint_violations
(
msgnum
=
3
,
timeslot
=
0.5
,
)
},
limit_history
=
15
,
)
def
camera_received_callback
(
self
,
msg_metadata
):
if
msg_metadata
[
'sender'
]
==
self
.
hostname
:
if
msg_metadata
[
'sender'
]
==
self
.
hostname
[
-
1
]
:
return
native_message
=
ROS2NativeMessage
(
msg
=
None
,
sender
=
msg_metadata
[
'sender'
],
sender
=
msg_metadata
[
'sender'
]
[
-
1
]
,
receivers
=
[
'base_station'
],
data_type_name
=
f
'
camera_
{
msg_metadata
[
"sender"
]
}
'
,
data_type_name
=
f
'
v
{
msg_metadata
[
"sender"
]
[
-
1
]
}
'
,
data
=
{
'battery_level'
:
1
,
'max_depl_rate'
:
0.1
,
'red'
:
msg_metadata
[
'red'
],
},
stamp
=
msg_metadata
[
'stamp'
]
)
...
...
@@ -129,6 +148,9 @@ class CameraStream(Node, MissionExecutor):
self
.
timer_period
=
1
/
float
(
params
.
get
(
'fps'
,
2
))
def
timer_callback
(
self
):
if
abs
((
datetime
.
now
().
timestamp
()
-
self
.
t_start
)
%
self
.
timer_period
-
self
.
offset
)
>
0.02
:
print
(
"skipping"
)
return
for
i
in
range
(
16
):
start
=
datetime
.
now
()
self
.
cam
.
grab
()
...
...
@@ -138,6 +160,8 @@ class CameraStream(Node, MissionExecutor):
frame
=
self
.
cam
.
retrieve
()[
1
]
now
=
self
.
get_clock
().
now
()
frame
=
cv2
.
resize
(
frame
,
(
320
,
240
))
mask
=
detect_red
(
frame
)
red
=
np
.
round
(
np
.
sum
(
mask
)
/
mask
.
size
/
255
,
5
)
jpg
=
bytearray
(
cv2
.
imencode
(
'.jpg'
,
frame
)[
1
])
msg
=
CompressedImage
()
nanosec
,
sec
=
map
(
...
...
@@ -146,7 +170,7 @@ class CameraStream(Node, MissionExecutor):
)
metadata
=
{
'stamp'
:
(
sec
+
nanosec
*
1e-9
)
-
self
.
t_start
,
'red'
:
0
,
'red'
:
red
,
'sender'
:
self
.
hostname
}
metadata_str
=
json
.
dumps
(
metadata
)
...
...
@@ -160,12 +184,11 @@ class CameraStream(Node, MissionExecutor):
msg
.
data
=
jpg
native_message
=
ROS2NativeMessage
(
msg
=
msg
,
sender
=
self
.
hostname
,
sender
=
self
.
hostname
[
-
1
]
,
receivers
=
self
.
hosts
,
data_type_name
=
f
'
camera_
{
self
.
hostname
}
'
,
data_type_name
=
f
'
v
{
self
.
hostname
[
-
1
]
}
'
,
data
=
{
'battery_level'
:
1
,
'max_depl_rate'
:
0.1
,
'red'
:
red
,
},
publisher
=
self
.
publisher
,
stamp
=
(
...
...
camera_streaming/mission.py
View file @
6dd217a8
...
...
@@ -2,30 +2,31 @@
from
infdist.optimization.models
import
MissionContext
,
InformationType
from
infdist.optimization.aggregations
import
AggregationMostRecent
from
infdist.optimization.utilities
import
UtilityBattery
from
infdist.optimization.message_forecast
import
PeriodicTypeForecast
from
utility
import
PeriodicTypeForecast
,
UtilityVideo
# from infdist.optimization.message_forecast import PeriodicTypeForecast
def
generate_mission_context
(
agents
,
data_types
,
t_end
,
T
):
return
MissionContext
(
set
([
InformationType
(
f
'
{
data_type
}
_
{
agent
}
'
,
utility_cls
=
Utility
Battery
,
f
'
{
data_type
}{
agent
}
'
,
utility_cls
=
Utility
Video
,
aggregation_cls
=
AggregationMostRecent
,
message_forecast_cls
=
PeriodicTypeForecast
,
message_forecast_kwargs
=
{
't_end'
:
t_end
,
'data_type_name'
:
f
'
{
data_type
}
_
{
agent
}
'
,
'max_depl_rate'
:
0.1
,
'battery_level'
:
1
,
'data_type_name'
:
f
'
{
data_type
}{
agent
}
'
,
'red'
:
0
,
'receivers'
:
[
'base_station'
],
'T'
:
T
,
'sender'
:
f
'
{
agent
}
'
,
'initial_t_start'
:
T
/
8
*
i
,
},
weight
=
1
)
for
data_type
in
data_types
for
agent
in
agents
for
i
,
agent
in
enumerate
(
agents
)
])
)
camera_streaming/sniffing/sniffer.py
View file @
6dd217a8
...
...
@@ -13,7 +13,7 @@ CMD = (
def
sniff
(
callback
):
p
=
sub
.
Popen
(
CMD
,
stdout
=
sub
.
PIPE
)
p
=
sub
.
Popen
(
CMD
,
stdout
=
sub
.
PIPE
,
stderr
=
sub
.
PIPE
)
for
row
in
iter
(
p
.
stdout
.
readline
,
b
''
):
packet
=
binascii
.
unhexlify
(
row
.
rstrip
())
...
...
camera_streaming/utility.py
0 → 100644
View file @
6dd217a8
import
math
from
datetime
import
timedelta
import
numpy
as
np
from
infdist.optimization.utilities
import
Utility
from
infdist.optimization.message_forecast
import
BaseTypeForecast
,
Samples
from
infdist.optimization.models
import
Message
class
PeriodicTypeForecast
(
BaseTypeForecast
):
def
__init__
(
self
,
data_type_name
,
t_end
,
red
,
receivers
,
T
,
sender
,
initial_t_start
=
0
,
):
self
.
data_type_name
=
data_type_name
self
.
t_end
=
t_end
self
.
red
=
red
self
.
receivers
=
receivers
self
.
T
=
T
self
.
t_start
=
initial_t_start
self
.
t_start_samples
=
Samples
()
self
.
previous_message
=
None
self
.
sender
=
sender
def
register_message
(
self
,
message
):
assert
self
.
sender
==
message
.
sender
if
self
.
previous_message
is
not
None
:
t_start
=
message
.
t_gen
%
self
.
T
self
.
t_start_samples
.
register
(
t_start
)
self
.
t_start
=
self
.
t_start_samples
.
average
()
self
.
previous_message
=
message
def
estimate_t_end
(
self
):
return
self
.
t_end
def
create_message
(
self
,
t_gen
):
return
Message
(
self
.
sender
,
set
(
self
.
receivers
)
-
set
([
self
.
sender
]),
t_gen
,
self
.
data_type_name
,
{
'red'
:
self
.
previous_message
.
data
.
red
if
self
.
previous_message
else
0
,
}
)
def
message_generator
(
self
,
t_start
,
given_messages
):
start
=
t_start
+
self
.
T
-
(
t_start
-
self
.
t_start
)
%
self
.
T
for
t
in
np
.
arange
(
start
,
self
.
t_end
,
self
.
T
):
message
=
self
.
create_message
(
t
)
while
(
given_messages
and
(
message
.
t_gen
>=
given_messages
[
0
].
t_gen
or
message
==
given_messages
[
0
])
):
m
=
given_messages
.
pop
(
0
)
if
m
==
message
:
break
yield
m
yield
message
for
m
in
given_messages
:
yield
m
class
UtilityVideo
(
Utility
):
def
func
(
self
,
m
,
t
,
s
):
length
=
(
t
-
m
.
t_rcv
)
if
m
.
t_rcv
else
(
t
-
m
.
t_gen
)
# if isinstance(length, timedelta):
# length = length.total_seconds()
fmax
=
12
f1
=
6
S
=
2
k
=
S
/
(
1
-
f1
/
(
2
*
fmax
*
math
.
log
(
2
)))
m
=
1
+
math
.
log
(
1
/
fmax
,
2
)
*
k
return
0
if
t
<
m
.
t_rcv
else
max
(
0
,
min
(
1
,
-
math
.
log
(
length
,
2
)
*
k
+
m
)
)
def
integrate
(
self
,
m
,
t_start
,
t_end
):
t_rcv
=
m
.
t_rcv
if
m
.
t_rcv
else
m
.
t_gen
t_s
=
max
(
t_rcv
,
t_start
)
t_l
=
t_rcv
+
0.25
red
=
m
.
data
.
red
log_part
=
0
if
t_end
<
t_s
or
t_start
>
t_l
:
return
0
if
t_end
>
t_l
:
# logarithmic part
t
=
t_l
t0
=
t_l
-
t_rcv
t1
=
t_end
-
t_rcv
fmax
=
4
f1
=
2
S
=
2
base
=
2
ln2
=
math
.
log
(
base
)
k
=
S
/
(
1
-
f1
/
(
2
*
fmax
*
ln2
))
m
=
1
+
math
.
log
(
1
/
fmax
,
base
)
*
k
log_part
=
max
(
0
,
(
(
t1
*
(
1
-
math
.
log
(
t1
))
-
t0
*
(
1
-
math
.
log
(
t0
)))
/
ln2
*
k
+
(
t1
-
t0
)
*
m
))
else
:
t
=
t_end
result
=
(
t
-
t_s
+
log_part
)
*
(
1
+
1
*
red
)
# print('!!!!', t_start, t_end, t_s, t_l, result)
return
result
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment