Commit 5040dda3 authored by AndreR's avatar AndreR Committed by TIGERs GitLab
Browse files

Resolve "Run multicast receiver for vision on individual thread to avoid skipped messages"

Closes #1626

See merge request main/Sumatra!1379

sumatra-commit: 41979847cff5f2362d2b48370b42a5fecfa1cc6d
parent a65ffff3
Pipeline #16672 passed with stage
in 4 minutes and 11 seconds
package edu.tigers.sumatra.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
/**
* This interface declares a object that is capable of receiving something via network.
* Together with its counterpart {@link ITransmitter} and their different implementations, it
* represents a small, passive, flexible network framework.
*
* @author Gero
*/
public interface IReceiver
{
/**
* Receives a {@link DatagramPacket} from the network
*
* @param store
* @return The given packet {@code store} with filled buffer (or <code>null</code> if the receiver
* <code>!isReady()</code>)
* @throws IOException
*/
DatagramPacket receive(DatagramPacket store) throws IOException;
/**
* May throw a {@link IOException}, as implementations often call {@link DatagramSocket#close()}
*
* @throws IOException
*/
void cleanup() throws IOException;
/**
* @return Whether the implementation is ready to receive content or not
*/
boolean isReady();
}
/*
* *********************************************************
* Copyright (c) 2009 - 2015, DHBW Mannheim - Tigers Mannheim
* Project: TIGERS - Sumatra
* Date: Oct 7, 2015
* Author(s): Nicolai Ommer <nicolai.ommer@gmail.com>
* *********************************************************
* Copyright (c) 2009 - 2021, DHBW Mannheim - TIGERs Mannheim
*/
package edu.tigers.sumatra.network;
/**
* @author Nicolai Ommer <nicolai.ommer@gmail.com>
* Observer for network receivers.
*/
public interface IReceiverObserver
{
/**
* @author Nicolai Ommer <nicolai.ommer@gmail.com>
* The interface timed out.
*/
void onInterfaceTimedOut();
void onSocketTimedOut();
}
package edu.tigers.sumatra.network;
import java.io.Closeable;
/**
* This interface declares objects that are capable of sending something via network.
* Together with its counterpart {@link IReceiver} and their different implementations, it
* represents a small, passive, flexible network framework.
*
* @author Gero
* @param <D> The data type this transmitter is able to process
*/
public interface ITransmitter<D> extends Closeable
{
/**
* Sends data of type {@code <D>} to the network
*
* @param data
* @return success?
*/
boolean send(D data);
}
/*
* Copyright (c) 2009 - 2019, DHBW Mannheim - TIGERs Mannheim
* Copyright (c) 2009 - 2021, DHBW Mannheim - TIGERs Mannheim
*/
package edu.tigers.sumatra.network;
import lombok.extern.log4j.Log4j2;
import java.io.EOFException;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* This class should be able to register to a multicast group, and receive {@link DatagramPacket}s
* on it
*
* @author Gero
* Connect to a multicast group on all reasonable network interfaces, and receive {@link DatagramPacket}s on it.
*/
public class MulticastUDPReceiver implements IReceiver
@Log4j2
public class MulticastUDPReceiver implements AutoCloseable
{
private static final Logger log = LogManager.getLogger(MulticastUDPReceiver.class.getName());
private static final int SO_TIMEOUT = 500;
private static final String[] USELESS_PREFIXES = { "tap", "tun", "ham", "WAN" };
private static final String[] PREFERRED_PREFIXES = { "eth", "enp" };
// Connection
private final List<MulticastSocket> sockets = new ArrayList<>();
private final Set<MulticastSocket> socketsTimedOut = new HashSet<>();
private final List<IReceiverObserver> observers = new CopyOnWriteArrayList<>();
private MulticastSocket currentSocket = null;
private InetAddress group = null;
/** The internal state-switch of this transmitter */
private boolean readyToReceive;
private MulticastSocket currentSocket;
/**
* @param port
* @param groupStr
*/
public MulticastUDPReceiver(final int port, final String groupStr)
public MulticastUDPReceiver(String host, int port)
{
List<NetworkInterface> ifaces = getNetworkInterfaces();
for (NetworkInterface iface : ifaces)
{
try
{
if (isUselessInterface(iface))
{
log.debug("Filtered network interface: " + iface.getDisplayName());
continue;
}
log.debug("Using network interface: " + iface.getDisplayName() + " with MTU " + iface.getMTU());
addSocket(port, groupStr, iface);
} catch (SocketException err)
{
log.error("Weird. Could not determine if network interface is p2p", err);
}
}
setSocketTimeouts();
readyToReceive = true;
currentSocket = connect(port);
addAllNetworkInterfaces(host, port);
}
/**
* Creates a MultiCastUDPReceiver with only the given nif
*
* @param host
* @param port
* @param groupStr
* @param iface
*/
public MulticastUDPReceiver(final int port, final String groupStr, final NetworkInterface iface)
public MulticastUDPReceiver(final String host, final int port, final NetworkInterface iface)
{
addSocket(port, groupStr, iface);
currentSocket = connect(port);
joinOnInterface(port, host, iface);
}
readyToReceive = true;
private void addAllNetworkInterfaces(String host, int port)
{
for (NetworkInterface iface : getNetworkInterfaces())
{
if (isUselessInterface(iface))
{
log.debug("Filtered network interface: " + iface.getDisplayName());
continue;
}
joinOnInterface(port, host, iface);
}
}
/**
* @param observer
*/
public void addObserver(final IReceiverObserver observer)
{
observers.add(observer);
}
/**
* @param observer
*/
public void removeObserver(final IReceiverObserver observer)
{
observers.remove(observer);
}
private List<NetworkInterface> getNetworkInterfaces()
private MulticastSocket connect(int port)
{
List<NetworkInterface> ifaces = new LinkedList<>();
try
{
ifaces = Collections.list(NetworkInterface.getNetworkInterfaces());
log.debug("Found " + ifaces.size() + " network interfaces");
} catch (SocketException err)
return new MulticastSocket(new InetSocketAddress(port));
} catch (IOException err)
{
log.error("Unable to get a list of network interfaces", err);
log.error("Could not create new multicast socket", err);
}
return ifaces;
return null;
}
private void setSocketTimeouts()
private List<NetworkInterface> getNetworkInterfaces()
{
if (sockets.size() > 1)
try
{
for (MulticastSocket socket : sockets)
{
try
{
socket.setSoTimeout(SO_TIMEOUT);
} catch (SocketException err)
{
log.error("Could not set SO_TIMEOUT", err);
}
}
var ifaces = Collections.list(NetworkInterface.getNetworkInterfaces());
log.debug("Found " + ifaces.size() + " network interfaces");
return ifaces;
} catch (SocketException err)
{
log.error("Unable to get a list of network interfaces", err);
}
return List.of();
}
......@@ -170,16 +131,13 @@ public class MulticastUDPReceiver implements IReceiver
}
@SuppressWarnings("squid:S2095") // we do not want to close the multicast socket here
private void addSocket(final int port, final String groupStr, final NetworkInterface iface)
private void joinOnInterface(final int port, final String groupStr, final NetworkInterface iface)
{
try
{
MulticastSocket socket = new MulticastSocket(new InetSocketAddress(port));
socket.setNetworkInterface(iface);
int i = findSocketIndex(socket);
sockets.add(i, socket);
joinNetworkGroup(socket, groupStr);
log.debug("Using network interface '{}' with MTU {}", iface.getDisplayName(), iface.getMTU());
currentSocket.joinGroup(new InetSocketAddress(groupStr, port), iface);
log.debug("Multicast group {}:{} joined on nif {}", groupStr, port, iface.getDisplayName());
} catch (IOException err)
{
log.info("Could not create multicast socket on iface " + iface.getDisplayName() + " and port " + port, err);
......@@ -187,160 +145,41 @@ public class MulticastUDPReceiver implements IReceiver
}
private int findSocketIndex(final MulticastSocket socket) throws SocketException
{
for (int i = 0; i < sockets.size(); i++)
{
for (String prefix : PREFERRED_PREFIXES)
{
if (socket.getNetworkInterface().getDisplayName().contains(prefix))
{
return i;
}
if (sockets.get(i).getNetworkInterface().getDisplayName().contains(prefix))
{
break;
}
}
}
return sockets.size();
}
private void joinNetworkGroup(final MulticastSocket socket, final String groupStr)
{
// Parse group
try
{
group = InetAddress.getByName(groupStr);
} catch (UnknownHostException err)
{
log.error("Unable to read multicast group address!", err);
}
// Join group
try
{
socket.setReuseAddress(true);
socket.joinGroup(group);
log.debug("Multicast group " + group + "joined");
} catch (IOException err)
{
log.error("Could not resolve address: " + group, err);
}
}
@SuppressWarnings("squid:S2583")
@Override
public DatagramPacket receive(final DatagramPacket store) throws IOException
public void receive(final DatagramPacket store) throws IOException
{
if (!isReady())
if (currentSocket == null)
{
log.error("Receiver is not ready to receive!");
return null;
throw new IOException("Connection is closed");
}
// first try current socket for performance reasons
if (currentSocket != null)
while (currentSocket != null)
{
try
{
currentSocket.receive(store);
return store;
currentSocket.setSoTimeout(SO_TIMEOUT);
return;
} catch (EOFException eof)
{
log.error("EOF error, buffer may be too small!", eof);
log.error("EOF error, buffer may be too small", eof);
} catch (SocketTimeoutException e)
{
log.debug("Timed out on primary socket.", e);
// go on below
for (IReceiverObserver obs : observers)
{
obs.onInterfaceTimedOut();
}
}
}
while (isReady())
{
for (MulticastSocket socket : sockets)
{
try
{
tryReceiving(store, socket);
return store;
} catch (SocketTimeoutException e)
{
if (!socketsTimedOut.contains(socket))
{
log.debug("Socket timed out on iface " + socket.getNetworkInterface().getDisplayName(), e);
socketsTimedOut.add(socket);
}
}
log.debug("No data received for {} ms", SO_TIMEOUT, e);
observers.forEach(IReceiverObserver::onSocketTimedOut);
currentSocket.setSoTimeout(0);
}
}
return store;
}
private void tryReceiving(final DatagramPacket store, final MulticastSocket socket) throws IOException
{
// blocking receive with a timeout of SO_TIMEOUT
socket.receive(store);
// we have received something
socketsTimedOut.remove(socket);
if (socket != currentSocket)
{
sockets.remove(socket);
sockets.add(0, socket);
currentSocket = socket;
log.info("MulticastSocket changed to " + socket.getNetworkInterface().getDisplayName() + " "
+ socket.getLocalPort());
}
throw new IOException("No data received");
}
@Override
public void cleanup()
public void close()
{
// No-working state after this line...
readyToReceive = false;
for (MulticastSocket socket : sockets)
if (currentSocket != null)
{
if (!socket.isClosed())
{
leaveMulticastGroup(socket);
socket.close();
}
currentSocket.close();
}
observers.clear();
}
private void leaveMulticastGroup(final MulticastSocket socket)
{
try
{
if (group != null)
{
socket.leaveGroup(group);
log.debug("Multicast group left");
}
} catch (IOException err)
{
log.error("Error while leaving multicast group '" + group + "'!", err);
}
}
@Override
public boolean isReady()
{
return readyToReceive;
}
}
......@@ -5,16 +5,16 @@
package edu.tigers.sumatra.network;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.NoRouteToHostException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
......@@ -22,17 +22,13 @@ import java.util.stream.Collectors;
/**
* This class is an {@link ITransmitter} implementation capable of sending some {@code byte[]}-data via UDP to a
* multicast-group.
* Transmit data to a multicast group on one or more network interfaces.
*/
@Log4j2
public class MulticastUDPTransmitter implements ITransmitter<byte[]>
public class MulticastUDPTransmitter implements AutoCloseable
{
private final int targetPort;
private final InetAddress targetAddr;
private final List<MulticastSocket> sockets = new ArrayList<>();
private boolean lastSendFailed = false;
private final List<TargetSocket> sockets = new ArrayList<>();
private final SocketAddress targetAddr;
/**
......@@ -41,8 +37,7 @@ public class MulticastUDPTransmitter implements ITransmitter<byte[]>
*/
public MulticastUDPTransmitter(final String targetAddr, final int targetPort)
{
this.targetPort = targetPort;
this.targetAddr = addressByName(targetAddr);
this.targetAddr = new InetSocketAddress(targetAddr, targetPort);
}
......@@ -61,11 +56,7 @@ public class MulticastUDPTransmitter implements ITransmitter<byte[]>
public void connectToAllInterfaces()
{
var networkInterfaces = getNetworkInterfaces();
for (var nif : networkInterfaces)
{
connectTo(nif);
}
getNetworkInterfaces().forEach(this::connectTo);
}
......@@ -97,7 +88,7 @@ public class MulticastUDPTransmitter implements ITransmitter<byte[]>
@SuppressWarnings("squid:S2095") // closing resources: can not close resource here
var socket = new MulticastSocket();
socket.setNetworkInterface(nif);
sockets.add(socket);
sockets.add(new TargetSocket(socket));
}
} catch (IOException e)
{
......@@ -106,65 +97,46 @@ public class MulticastUDPTransmitter implements ITransmitter<byte[]>
}
private InetAddress addressByName(final String targetAddr)
public synchronized void send(final byte[] data)
{
try
{
return InetAddress.getByName(targetAddr);
} catch (UnknownHostException err)
{
log.error("The Host could not be found!", err);
}
return null;
}
DatagramPacket tempPacket = new DatagramPacket(data, data.length, targetAddr);
@Override
public synchronized boolean send(final byte[] data)
{
if (sockets.isEmpty())
for (var targetSocket : sockets)
{
if (!lastSendFailed)
{
log.warn("Transmitter is not ready to send!");
lastSendFailed = true;
}
return false;
}
DatagramPacket tempPacket = new DatagramPacket(data, data.length, targetAddr, targetPort);
for (var socket : sockets)