network: Add NetDeviceQueue methods to implement flow control

This commit is contained in:
Stefano Avallone
2017-03-08 18:01:43 +01:00
parent 6fef34be47
commit 40abc6507a
5 changed files with 237 additions and 113 deletions

View File

@@ -72,30 +72,25 @@ class Channel;
* API has been optimized to make it easy to add new MAC protocols,
* not to add new layer 3 protocols.
*
* Devices aiming to be Traffic Control aware must implement a NotifyNewAggregate
* method to perform the following operations:
* - cache the pointer to the netdevice queue interface aggregated to the device
* - set the number of device transmission queues through the netdevice queue
* interface, if the device is multi-queue
* - set the select queue callback through the netdevice queue interface, if
* the device is multi-queue
* In order to support flow control, a Traffic Control aware device must:
* - stop a device queue when there is no room for another packet. This check
* is typically performed after successfully enqueuing a packet in the device
* queue. Failing to enqueue a packet because there is no room for the packet
* in the queue should be avoided. Should such a situation occur, the device
* queue should be immediately stopped
* - wake up the queue disc when the device queue is empty. This check is
* typically performed after a dequeue operation fails because the device
* queue is empty.
* - start a device queue when the queue is stopped and there is room for
* another packet. This check is typically performed after successfully
* dequeuing a packet from the device queue
* In order to support BQL, a Traffic Control aware device must:
* - call NotifyQueuedBytes after successfully enqueuing a packet in the
* device queue
* - call NotifyTransmittedBytes after successfully dequeuing a packet from
* the device queue
* Devices aiming to support flow control and dynamic queue limits must perform
* the following operations:
* - in the NotifyNewAggregate method
* + cache the pointer to the netdevice queue interface aggregated to the
* device
* + set the select queue callback through the netdevice queue interface,
* if the device is multi-queue
* - anytime before initialization
* + set the number of device transmission queues (and optionally create them)
* through the netdevice queue interface, if the device is multi-queue
* - when the device queues have been created, invoke
* NetDeviceQueueInterface::ConnectQueueTraces, which
* + connects the Enqueue traced callback of the device queues to the
* PacketEnqueued static method of the NetDeviceQueue class
* + connects the Dequeue and DropAfterDequeue traced callback of the device
* queues to the PacketDequeued static method of the NetDeviceQueue
* class
* + connects the DropBeforeEnqueue traced callback of the device queues to
* the PacketDiscarded static method of the NetDeviceQueue class
*/
class NetDevice : public Object
{

View File

@@ -145,6 +145,12 @@ NetDeviceQueue::GetQueueLimits ()
return m_queueLimits;
}
void
NetDeviceQueue::DoNsLog (const enum LogLevel level, std::string str)
{
NS_LOG (level, str);
}
NS_OBJECT_ENSURE_REGISTERED (NetDeviceQueueInterface);

View File

@@ -25,10 +25,20 @@
#include "ns3/object.h"
#include "ns3/ptr.h"
#include "ns3/queue-item.h"
#include "ns3/queue.h"
#include "ns3/net-device.h"
namespace ns3 {
class QueueLimits;
class NetDeviceQueueInterface;
// This header file is included by all the queue discs and all the netdevices
// using a Queue object. The following explicit template instantiation
// declarations enables them to suppress implicit template instantiations
extern template class Queue<Packet>;
extern template class Queue<QueueDiscItem>;
/**
* \ingroup network
@@ -125,7 +135,71 @@ public:
*/
Ptr<QueueLimits> GetQueueLimits ();
/**
* \brief Perform the actions required by flow control and dynamic queue
* limits when a packet is enqueued in the queue of a netdevice
*
* \param queue the device queue
* \param ndqi the NetDeviceQueueInterface object aggregated to the device
* \param txq the index of the transmission queue associated with the device queue
* \param item the enqueued packet
*
* This method must be connected to the "Enqueue" traced callback of a Queue
* object (through a bound callback) in order for a netdevice to support
* flow control and dynamic queue limits.
*/
template <typename Item>
static void PacketEnqueued (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item);
/**
* \brief Perform the actions required by flow control and dynamic queue
* limits when a packet is dequeued (or dropped after dequeue) from
* the queue of a netdevice
*
* \param queue the device queue
* \param ndqi the NetDeviceQueueInterface object aggregated to the device
* \param txq the index of the transmission queue associated with the device queue
* \param item the dequeued (or dropped after dequeue) packet
*
* This method must be connected to the "Dequeue" and "DropAfterDequeue"
* traced callbacks of a Queue object (through a bound callback) in order for
* a netdevice to support flow control and dynamic queue limits.
*/
template <typename Item>
static void PacketDequeued (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item);
/**
* \brief Perform the actions required by flow control and dynamic queue
* limits when a packet is dropped before being enqueued in the queue
* of a netdevice (which likely indicates that the queue is full)
*
* \param queue the device queue
* \param ndqi the NetDeviceQueueInterface object aggregated to the device
* \param txq the index of the transmission queue associated with the device queue
* \param item the dropped packet
*
* This method must be connected to the "DropBeforeEnqueue" traced callback
* of a Queue object (through a bound callback) in order for a netdevice to
* support flow control and dynamic queue limits.
*/
template <typename Item>
static void PacketDiscarded (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item);
private:
/**
* \brief Pass messages to the ns-3 logging system
*
* \param level the log level
* \param str the message to log
*/
static void DoNsLog (const enum LogLevel level, std::string str);
bool m_stoppedByDevice; //!< True if the queue has been stopped by the device
bool m_stoppedByQueueLimits; //!< True if the queue has been stopped by a queue limits object
Ptr<QueueLimits> m_queueLimits; //!< Queue limits object
@@ -239,6 +313,15 @@ public:
*/
SelectQueueCallback GetSelectQueueCallback (void) const;
/**
* \brief Connect the traced callbacks of a queue to the static methods of the
* NetDeviceQueue class to support flow control and dynamic queue limits
* \param queue the queue
* \param txq the index of the tx queue
*/
template <typename Item>
void ConnectQueueTraces (Ptr<Queue<Item> > queue, uint8_t txq);
protected:
/**
* \brief Dispose of the object
@@ -252,6 +335,115 @@ private:
bool m_lateTxQueuesCreation; //!< True if a device wants to create the TX queues by itself
};
#define NDQI_LOG(level,params) \
{ \
std::stringstream ss; \
ss << params; \
DoNsLog (level, ss.str ()); \
}
/**
* Implementation of the templates declared above.
*/
template <typename Item>
void
NetDeviceQueueInterface::ConnectQueueTraces (Ptr<Queue<Item> > queue, uint8_t txq)
{
NS_ASSERT (queue != 0);
NS_ASSERT (txq < GetNTxQueues ());
queue->TraceConnectWithoutContext ("Enqueue",
MakeBoundCallback (&NetDeviceQueue::PacketEnqueued<Item>,
queue, this, txq));
queue->TraceConnectWithoutContext ("Dequeue",
MakeBoundCallback (&NetDeviceQueue::PacketDequeued<Item>,
queue, this, txq));
queue->TraceConnectWithoutContext ("DropAfterDequeue",
MakeBoundCallback (&NetDeviceQueue::PacketDequeued<Item>,
queue, this, txq));
queue->TraceConnectWithoutContext ("DropBeforeEnqueue",
MakeBoundCallback (&NetDeviceQueue::PacketDiscarded<Item>,
queue, this, txq));
}
template <typename Item>
void
NetDeviceQueue::PacketEnqueued (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item)
{
NDQI_LOG (LOG_LOGIC, "NetDeviceQueue:PacketEnqueued(" << queue << ", " << ndqi
<< ", " << txq << ", " << item << ")");
// Inform BQL
ndqi->GetTxQueue (txq)->NotifyQueuedBytes (item->GetSize ());
uint16_t mtu = ndqi->GetObject<NetDevice> ()->GetMtu ();
// After enqueuing a packet, we need to check whether the queue is able to
// store another packet. If not, we stop the queue
if ((queue->GetMode () == QueueBase::QUEUE_MODE_PACKETS &&
queue->GetNPackets () >= queue->GetMaxPackets ()) ||
(queue->GetMode () == QueueBase::QUEUE_MODE_BYTES &&
queue->GetNBytes () + mtu > queue->GetMaxBytes ()))
{
NDQI_LOG (LOG_DEBUG, "The device queue is being stopped (" << queue->GetNPackets ()
<< " packets and " << queue->GetNBytes () << " bytes inside)");
ndqi->GetTxQueue (txq)->Stop ();
}
}
template <typename Item>
void
NetDeviceQueue::PacketDequeued (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item)
{
NDQI_LOG (LOG_LOGIC, "NetDeviceQueue:PacketDequeued(" << queue << ", " << ndqi
<< ", " << txq << ", " << item << ")");
// Inform BQL
ndqi->GetTxQueue (txq)->NotifyTransmittedBytes (item->GetSize ());
uint16_t mtu = ndqi->GetObject<NetDevice> ()->GetMtu ();
// After dequeuing a packet, if there is room for another packet we
// call Wake () that ensures that the queue is not stopped and restarts
// the queue disc if the queue was stopped
if ((queue->GetMode () == QueueBase::QUEUE_MODE_PACKETS &&
queue->GetNPackets () < queue->GetMaxPackets ()) ||
(queue->GetMode () == QueueBase::QUEUE_MODE_BYTES &&
queue->GetNBytes () + mtu <= queue->GetMaxBytes ()))
{
ndqi->GetTxQueue (txq)->Wake ();
}
}
template <typename Item>
void
NetDeviceQueue::PacketDiscarded (Ptr<Queue<Item> > queue,
Ptr<NetDeviceQueueInterface> ndqi,
uint8_t txq, Ptr<const Item> item)
{
NDQI_LOG (LOG_LOGIC, "NetDeviceQueue:PacketDiscarded(" << queue << ", " << ndqi
<< ", " << txq << ", " << item << ")");
// This method is called when a packet is discarded before being enqueued in the
// device queue, likely because the queue is full. This should not happen if the
// device correctly stops the queue. Anyway, stop the tx queue, so that the upper
// layers do not send packets until there is room in the queue again.
NDQI_LOG (LOG_ERROR, "BUG! No room in the device queue for the received packet! ("
<< queue->GetNPackets () << " packets and " << queue->GetNBytes () << " bytes inside)");
ndqi->GetTxQueue (txq)->Stop ();
}
} // namespace ns3
#endif /* NET_DEVICE_QUEUE_INTERFACE_H */

View File

@@ -207,6 +207,23 @@ PointToPointNetDevice::ProcessHeader (Ptr<Packet> p, uint16_t& param)
return true;
}
void
PointToPointNetDevice::DoInitialize (void)
{
if (m_queueInterface)
{
NS_ASSERT_MSG (m_queue != 0, "A Queue object has not been attached to the device");
// connect the traced callbacks of m_queue to the static methods provided by
// the NetDeviceQueue class to support flow control and dynamic queue limits.
// This could not be done in NotifyNewAggregate because at that time we are
// not guaranteed that a queue has been attached to the netdevice
m_queueInterface->ConnectQueueTraces (m_queue, 0);
}
NetDevice::DoInitialize ();
}
void
PointToPointNetDevice::NotifyNewAggregate (void)
{
@@ -300,52 +317,19 @@ PointToPointNetDevice::TransmitComplete (void)
m_phyTxEndTrace (m_currentPkt);
m_currentPkt = 0;
Ptr<NetDeviceQueue> txq;
if (m_queueInterface)
{
txq = m_queueInterface->GetTxQueue (0);
}
Ptr<Packet> p = m_queue->Dequeue ();
if (p == 0)
{
NS_LOG_LOGIC ("No pending packets in device queue after tx complete");
if (txq)
{
NS_LOG_DEBUG ("The device queue is being woken up (" << m_queue->GetNPackets () <<
" packets and " << m_queue->GetNBytes () << " bytes inside)");
txq->Wake ();
}
return;
}
//
// Got another packet off of the queue, so start the transmit process again.
// If the queue was stopped, start it again if there is room for another packet.
// Note that we cannot wake the upper layers because otherwise a packet is sent
// to the device while the machine state is busy, thus causing the assert in
// TransmitStart to fail.
//
if (txq && txq->IsStopped ())
{
if ((m_queue->GetMode () == QueueBase::QUEUE_MODE_PACKETS &&
m_queue->GetNPackets () < m_queue->GetMaxPackets ()) ||
(m_queue->GetMode () == QueueBase::QUEUE_MODE_BYTES &&
m_queue->GetNBytes () + m_mtu <= m_queue->GetMaxBytes ()))
{
NS_LOG_DEBUG ("The device queue is being started (" << m_queue->GetNPackets () <<
" packets and " << m_queue->GetNBytes () << " bytes inside)");
txq->Start ();
}
}
m_snifferTrace (p);
m_promiscSnifferTrace (p);
TransmitStart (p);
if (txq)
{
// Inform BQL
txq->NotifyTransmittedBytes (m_currentPkt->GetSize ());
}
}
bool
@@ -561,14 +545,6 @@ PointToPointNetDevice::Send (
const Address &dest,
uint16_t protocolNumber)
{
Ptr<NetDeviceQueue> txq;
if (m_queueInterface)
{
txq = m_queueInterface->GetTxQueue (0);
}
NS_ASSERT_MSG (!txq || !txq->IsStopped (), "Send should not be called when the device is stopped");
NS_LOG_FUNCTION (this << packet << dest << protocolNumber);
NS_LOG_LOGIC ("p=" << packet << ", dest=" << &dest);
NS_LOG_LOGIC ("UID is " << packet->GetUid ());
@@ -596,69 +572,23 @@ PointToPointNetDevice::Send (
//
if (m_queue->Enqueue (packet))
{
// Inform BQL
if (txq)
{
txq->NotifyQueuedBytes (packet->GetSize ());
}
//
// If the channel is ready for transition we send the packet right now
//
if (m_txMachineState == READY)
{
packet = m_queue->Dequeue ();
// We have enqueued a packet and dequeued a (possibly different) packet. We
// need to check if there is still room for another packet only if the queue
// is in byte mode (the enqueued packet might be larger than the dequeued
// packet, thus leaving no room for another packet)
if (txq)
{
if (m_queue->GetMode () == QueueBase::QUEUE_MODE_BYTES &&
m_queue->GetNBytes () + m_mtu > m_queue->GetMaxBytes ())
{
NS_LOG_DEBUG ("The device queue is being stopped (" << m_queue->GetNPackets () <<
" packets and " << m_queue->GetNBytes () << " bytes inside)");
txq->Stop ();
}
}
m_snifferTrace (packet);
m_promiscSnifferTrace (packet);
bool ret = TransmitStart (packet);
if (txq)
{
// Inform BQL
txq->NotifyTransmittedBytes (m_currentPkt->GetSize ());
}
return ret;
}
// We have enqueued a packet but we have not dequeued any packet. Thus, we
// need to check whether the queue is able to store another packet. If not,
// we stop the queue
if (txq)
{
if ((m_queue->GetMode () == QueueBase::QUEUE_MODE_PACKETS &&
m_queue->GetNPackets () >= m_queue->GetMaxPackets ()) ||
(m_queue->GetMode () == QueueBase::QUEUE_MODE_BYTES &&
m_queue->GetNBytes () + m_mtu > m_queue->GetMaxBytes ()))
{
NS_LOG_DEBUG ("The device queue is being stopped (" << m_queue->GetNPackets () <<
" packets and " << m_queue->GetNBytes () << " bytes inside)");
txq->Stop ();
}
}
return true;
}
// Enqueue may fail (overflow). This should not happen if the traffic control
// module has been installed. Anyway, stop the tx queue, so that the upper layers
// do not send packets until there is room in the queue again.
// Enqueue may fail (overflow)
m_macTxDropTrace (packet);
if (txq)
{
NS_LOG_ERROR ("BUG! Device queue full when the queue is not stopped! (" << m_queue->GetNPackets () <<
" packets and " << m_queue->GetNBytes () << " bytes inside)");
txq->Stop ();
}
return false;
}

View File

@@ -199,6 +199,7 @@ protected:
*/
void DoMpiReceive (Ptr<Packet> p);
virtual void DoInitialize (void);
virtual void NotifyNewAggregate (void);
private: