1
0
mirror of https://github.com/livingcomputermuseum/IFS.git synced 2026-01-13 15:27:25 +00:00

357 lines
11 KiB
C#

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IFS
{
public enum BSPState
{
Unconnected,
Connected
}
public class BSPChannel
{
public BSPChannel(PUP rfcPup, UInt32 socketID)
{
_inputLock = new ReaderWriterLockSlim();
_outputLock = new ReaderWriterLockSlim();
_inputWriteEvent = new AutoResetEvent(false);
_inputQueue = new Queue<byte>(65536);
// TODO: init IDs, etc. based on RFC PUP
_start_pos = _recv_pos = _send_pos = rfcPup.ID;
// Set up socket addresses.
// The client sends the connection port it prefers to use
// in the RFC pup.
_clientConnectionPort = new PUPPort(rfcPup.Contents, 0);
// We create our connection port using a unique socket address.
_serverConnectionPort = new PUPPort(DirectoryServices.Instance.LocalHostAddress, socketID);
}
public PUPPort ClientPort
{
get { return _clientConnectionPort; }
}
public PUPPort ServerPort
{
get { return _serverConnectionPort; }
}
/// <summary>
/// Reads data from the channel (i.e. from the client). Will block if not all the requested data is available.
/// </summary>
/// <returns></returns>
public int Read(ref byte[] data, int count)
{
// sanity check
if (count > data.Length)
{
throw new InvalidOperationException("count must be less than or equal to the length of the buffer being read into.");
}
int read = 0;
// Loop until the data we asked for arrives or until we time out waiting.
// TODO: handle partial transfers due to aborted BSPs.
while (true)
{
_inputLock.EnterUpgradeableReadLock();
if (_inputQueue.Count >= count)
{
_inputLock.EnterWriteLock();
// We have the data right now, read it and return.
// TODO: this is a really inefficient thing.
for (int i = 0; i < count; i++)
{
data[i] = _inputQueue.Dequeue();
}
_inputLock.ExitWriteLock();
_inputLock.ExitUpgradeableReadLock();
break;
}
else
{
_inputLock.ExitUpgradeableReadLock();
// Not enough data in the queue.
// Wait until we have received more data, then try again.
_inputWriteEvent.WaitOne(); // TODO: timeout and fail
}
}
return read;
}
/// <summary>
/// Appends data into the input queue (called from BSPManager to place new PUP data into the BSP stream)
/// </summary>
public void WriteQueue(PUP dataPUP)
{
// If we are over our high watermark, we will drop the data (and not send an ACK even if requested).
// Clients should be honoring the limits we set in the RFC packets.
_inputLock.EnterUpgradeableReadLock();
if (_inputQueue.Count + dataPUP.Contents.Length > MaxBytes)
{
_inputLock.ExitUpgradeableReadLock();
return;
}
// Sanity check on expected position from sender vs. received data on our end.
// If they don't match then we've lost a packet somewhere.
if (dataPUP.ID != _recv_pos)
{
// Current behavior is to simply drop all incoming PUPs (and not ACK them) until they are re-sent to us
// (in which case the above sanity check will pass). According to spec, AData requests that are not ACKed
// must eventually be resent. This is far simpler than accepting out-of-order data and keeping track
// of where it goes in the queue.
return;
}
// Prepare to add data to the queue
// Again, this is really inefficient
_inputLock.EnterWriteLock();
for (int i = 0; i < dataPUP.Contents.Length; i++)
{
_inputQueue.Enqueue(dataPUP.Contents[i]);
}
_recv_pos += (UInt32)dataPUP.Contents.Length;
_inputLock.ExitWriteLock();
_inputLock.ExitUpgradeableReadLock();
_inputWriteEvent.Set();
if ((PupType)dataPUP.Type == PupType.AData)
{
SendAck();
}
}
/// <summary>
/// Sends data to the channel (i.e. to the client). Will block if an ACK is requested.
/// </summary>
/// <param name="data">The data to be sent</param>
public void Send(byte[] data)
{
// Write data to the output stream
// Await an ack for the PUP we just sent
_outputAckEvent.WaitOne(); // TODO: timeout and fail
}
/// <summary>
/// Invoked when the client sends an ACK
/// </summary>
/// <param name="ackPUP"></param>
public void Ack(PUP ackPUP)
{
// Update receiving end stats (max PUPs, etc.)
// Ensure client's position matches ours
// Let any waiting threads continue
_outputAckEvent.Set();
}
/*
public void Mark(byte type);
public void Interrupt();
public void Abort(int code, string message);
public void Error(int code, string message);
public void End();
*/
// TODO:
// Events for:
// Abort, End, Mark, Interrupt (from client)
// Repositioning (due to lost packets) (perhaps not necessary)
//
private void SendAck()
{
}
private BSPState _state;
private UInt32 _recv_pos;
private UInt32 _send_pos;
private UInt32 _start_pos;
private PUPPort _clientConnectionPort; // the client port
private PUPPort _serverConnectionPort; // the server port we (the server) have established for communication
private ReaderWriterLockSlim _inputLock;
private System.Threading.AutoResetEvent _inputWriteEvent;
private ReaderWriterLockSlim _outputLock;
private System.Threading.AutoResetEvent _outputAckEvent;
// TODO: replace this with a more efficient structure for buffering data
private Queue<byte> _inputQueue;
// Constants
// For now, we work on one PUP at a time.
private const int MaxPups = 1;
private const int MaxPupSize = 532;
private const int MaxBytes = 1 * 532;
}
/// <summary>
///
/// </summary>
public static class BSPManager
{
static BSPManager()
{
//
// Initialize the socket ID counter; we start with a
// number beyond the range of well-defined sockets.
// For each new BSP channel that gets opened, we will
// increment this counter to ensure that each channel gets
// a unique ID. (Well, until we wrap around...)
//
_nextSocketID = _startingSocketID;
}
/// <summary>
/// Called when BSP-based protocols receive data.
/// </summary>
/// <returns>
/// null if no new channel is created due to the sent PUP (not an RFC)
/// a new BSPChannel if one has been created based on the PUP (new RFC)
/// </returns>
/// <param name="p"></param>
public static BSPChannel RecvData(PUP p)
{
PupType type = (PupType)p.Type;
switch (type)
{
case PupType.RFC:
{
BSPChannel newChannel = new BSPChannel(p, GetNextSocketID());
_activeChannels.Add(newChannel.ServerPort.Socket, newChannel);
// Send RFC response to complete the rendezvous.
PUP rfcResponse = new PUP(PupType.RFC, p.ID, newChannel.ClientPort, newChannel.ServerPort, newChannel.ServerPort.ToArray());
Dispatcher.Instance.SendPup(rfcResponse);
return newChannel;
}
break;
case PupType.Data:
case PupType.AData:
{
BSPChannel channel = FindChannelForPup(p);
if (channel != null)
{
channel.WriteQueue(p);
}
}
break;
case PupType.Ack:
{
BSPChannel channel = FindChannelForPup(p);
if (channel != null)
{
channel.Ack(p);
}
}
break;
case PupType.End:
{
BSPChannel channel = FindChannelForPup(p);
if (channel != null)
{
//channel.EndReply();
}
}
break;
case PupType.Abort:
{
// TODO: tear down the channel
}
break;
default:
throw new NotImplementedException(String.Format("Unhandled BSP PUP type {0}.", type));
}
return null;
}
public static void DestroyChannel(BSPChannel channel)
{
}
private static BSPChannel FindChannelForPup(PUP p)
{
return null;
}
private static UInt32 GetNextSocketID()
{
UInt32 next = _nextSocketID;
_nextSocketID++;
//
// Handle the wrap around case (which we're very unlikely to
// ever hit, but why not do the right thing).
// Start over at the initial ID. This is very unlikely to
// collide with any pending channels.
//
if(_nextSocketID < _startingSocketID)
{
_nextSocketID = _startingSocketID;
}
return next;
}
/// <summary>
/// Map from socket address to BSP channel
/// </summary>
private static Dictionary<UInt32, BSPChannel> _activeChannels;
private static UInt32 _nextSocketID;
private static readonly UInt32 _startingSocketID = 0x1000;
}
}