
/*
 *  CONTROL.C
 *
 *	DNET (c)Copyright 1988, Matthew Dillon, All Rights Reserved
 *
 *  -handle various actions:
 *	RTO	- read timeout
 *	WTO	- write timeout (a packet not acked)
 *	RNET	- state machine for raw receive packet
 *	WNET	- starts timeout sequence if Write occured
 *	WUPDATE - update write windows, send packets
 *	RUPDATE - execute received packets in correct sequence
 *	RECCMD	- execute decoded SCMD commands obtained from received
 *			packets.
 */

#include "dnet.h"
#include <stdio.h>

extern void do_cmd();

/*
 *  RTO:    read timeout.   Timeout occured while waiting for the rest of
 *	    a packet.  Reset state and restart read.
 */

do_rto()
{
    RState = 0;
    RcvData = 0;
    if (DDebug)
	fprintf(stderr, "RTO\n");
}

/*
 *  WTO:    Write timeout (unresolved output windows exist).  Resend a CHECK
 *	    command for all unresolved (READY) output windows
 */

void
do_wto()
{
    register short i;
    register PKT *pkt;
    register uword len = 0;

    if (DDebug)
	fprintf(stderr, "WTO\n");
    if (Restart) {
	WCBuf[0] = SYNC;
	WCBuf[1] = PKCMD_RESTART;
	WCBuf[2] = (WCBuf[0]<<1)^WCBuf[1];
	NetWrite(WCBuf, 3, 1);
	WTimeout(WTIME);
	return;
    }
    for (i = 0; i < WPUsed; ++i) {
	pkt = WPak[i];
	if (pkt->state == READY) {  /*  send check  */
	    WCBuf[len+0] = SYNC;
	    WCBuf[len+1] = PKCMD_CHECK | ((WPStart + i) << 5);
	    WCBuf[len+2] = (WCBuf[len+0] << 1) ^ WCBuf[len+1];
	    len += 3;
	}
    }
    if (len) {
	NetWrite(WCBuf, len, 1);
	WTimeout(WTIME);
    }
}

/*
 *  RNET:   Receved data ready from network.  The RNet request will
 *	    automagically be reissued on return.
 */

do_rnet()
{
    register ubyte *ptr = RcvBuf;
    register long len  = RcvData;
    long lrd = len;
    long mark;
    static uword dlen;
    static uword dblen;
    static ubyte dctl;

    if (DDebug)
	fprintf(stderr, "NETREAD %08lx %ld\n", RcvBuf, RcvData);
    while (len) {
	switch(RState) {
	case RS_SYNC:
	    --len;
	    if (*ptr++ == SYNC)
		RState = RS_CTL;
	    break;
	case RS_CTL:
	    --len;
	    dctl = *ptr++;
	    RState = RS_CCHK;
	    break;
	case RS_CCHK:
	    /* warning: (ubyte) cast not implemented properly for some
	     * compilers
	     */
	    if ((((SYNC<<1)^dctl) & 0xFF) == *ptr) {
		++ptr;
		--len;
		if (dctl & PKF_DATA) {
		    RState = RS_LEN1;
		    break;
		}
		RState = RS_SYNC;
		if (DDebug)
		    fprintf(stderr, "DO COMMAND (0DATA) 0x%02lx\n", dctl);
		do_cmd(dctl, NULL, 0);
	    } else {
		if (DDebug)
		    fprintf(stderr, "RS_CCHK FAILED\n");
		if (len) {      /*  Resync at earliest point    */
		    ++len;
		    --ptr;
		}
	    }
	    RState = RS_SYNC;
	    break;
	case RS_LEN1:
	    dlen = *ptr;
	    ++ptr;
	    --len;
	    RState = RS_LEN2;
	    break;
	case RS_LEN2:
	    dlen = (dlen << 8) + *ptr + 2;
	    ++ptr;
	    --len;
	    if (dlen > MAXPKT+2) {
		if (DDebug)
		    fprintf(stderr,"RAW DATA: LENGTH FAILURE (cmd %02x) %ld\n",
			dctl, dlen
		    );
		RState = RS_SYNC;
		break;
	    }
	    RState = RS_DATA;
	    break;
	case RS_DATA:
	    if (DDebug)
		fprintf(stderr, "RS_DATA->");
	    if (dlen <= len) {
		register uword chk = chkbuf(ptr, dlen - 2);
		if ((chk >> 8) == ptr[dlen-2] && (chk & 0xFF) == ptr[dlen-1]) {
		    if (DDebug)
			fprintf(stderr, "DO MPX (%ld DATA) 0x%02lx\n",
			    dlen-2, dctl
			);
		    do_cmd(dctl, ptr, dlen-2);
		    len -= dlen;
		    ptr += dlen;
		} else {
		    if (DDebug)
			fprintf(stderr, "RS_DATA: FAILED\n");
		}
		RState = RS_SYNC;
	    } else {
		if (DDebug)
		    fprintf(stderr, "RS_DATA: INCOMPLETE READ\n");
		goto break2;	/*  incomplete read  */
	    }
	    break;
	}
    }
break2:
    switch(RState) {
    case RS_DATA:
	RExpect = dlen - len;
	if (RExpect < 0) {
	    fprintf(stderr, "panic, RExpect = %ld\n", RExpect);
	    exit(1);
	}
	break;
    default:
	RExpect = 0;
	break;
    }
    do_cmd(-1, NULL, 0);
    mark = len;
    {
	if (len && ptr != RcvBuf)
	    bcopy(ptr, RcvBuf, len);
	RcvData = len;
    }
    if (RState == RS_DATA) {
	;
	/*
	 *RDisable();
	 *RTimeout(RTIME/1000);
	 */
    }
    /*	REMOVED 21 Sept 1988
     * do_wupdate();
     */
    return(mark);
}

/*
 *  WNET:   The last data packet has been sent to the network...  Start a
 *	    timeout sequence (1 second).  If this times out we will send
 *	    a CHECK packet for all unresolved transmission windows.
 */

do_wnet()
{
    if (DidWrite) {
	DidWrite = 0;
	if (DDebug)
	    fprintf(stderr, "WNET-STARTTO\n");
	WTimeout(WTIME);
    } else {
	if (DDebug)
	    fprintf(stderr, "WNET-NOP\n");
    }
}

/*
 *  DO_WUPDATE()
 *
 *  (1) Remove EMPTY windows at head of transmit queue.
 *  (2) Fill up transmit queue with pending requests, if any.
 *
 *  First two bits determine CMD as follows:
 *	    0bbbbbbb		DATA	    0-127 bytes of DATA
 *	    10bbbccc		DATA	    0-7 bytes of DATA, ccc=channel
 *								  command.
 *	    11bbbbbb bbbbbbbb	DATA	    0-1023 bytes of DATA
 */

do_wupdate()
{
    static short XPri;
    int maxpktsize;
    register XIOR *ior;
    register PKT *pkt;
    register long len;

    while (WPUsed && WPak[0]->state == EMPTY) {
	pkt = WPak[0];
	WPak[0] = WPak[1];
	WPak[1] = WPak[2];
	WPak[2] = WPak[3];
	WPak[3] = pkt;
	--WPUsed;
	++WPStart;
    }
    if (Restart)
	return(0);

    while (WPUsed != 4 && (ior = (XIOR *)RemHead(&TxList))) {
	register long offset = 0;

	{
	    short npri;

	    if (ior->io_Command == SCMD_DATA)
		npri = ior->io_Pri << 2;
	    else
		npri = XPri;
	    if (npri >= XPri)  {
		XPri = npri;
	    } else {
		if (XPri - npri > 100)
		    XPri -= 10;
		else if (XPri - npri > 50)
		    XPri -= 5;
		else
		    --XPri;
	    }
	    maxpktsize = MAXPKT - (XPri - npri);
	    if (DDebug)
		fprintf(stderr, "**MAXPKTSIZE = %ld  XPri %ld npri %ld\n",
		    maxpktsize, XPri, npri
		);
	    if (maxpktsize < MINPKT)
		maxpktsize = MINPKT;
	}

	pkt = WPak[WPUsed];
	pkt->state = READY;
	pkt->sync  = SYNC;
	pkt->ctl   = PKCMD_WRITE | PKF_DATA | ((WPStart + WPUsed)<<5);
	pkt->cchk = (pkt->sync << 1) ^ pkt->ctl;
	for (;;) {
	    if (offset > (maxpktsize-8))                /*  not enough room */
		break;
	    if (ior->io_Command == SCMD_DATA && ior->io_Channel != WChan) {
		/*  CSWITCH */
		WChan = ior->io_Channel;
		pkt->data[offset+0] = 0x80|SCMD_SWITCH|(2<<3);
		pkt->data[offset+1] = WChan >> 8;
		pkt->data[offset+2] = WChan;
		offset += 3;
		if (DDebug)
		    fprintf(stderr, "SEND SWITCH %ld\n", WChan);
	    }
	    len = ior->io_Length - ior->io_Actual;
	    if (ior->io_Command == SCMD_DATA) {     /*  DATA    */
		if (DDebug)
		    fprintf(stderr, "SEND DATA %ld bytes\n", len);
		if (offset + len > (maxpktsize-4))
		    len = (maxpktsize-4) - offset;
		if (len < 128) {
		    pkt->data[offset] = len;
		    ++offset;
		} else {
		    pkt->data[offset+0] = (len>>8)|0xC0;
		    pkt->data[offset+1] = len;
		    offset += 2;
		}
	    } else {				    /*	COMMAND */
		pkt->data[offset] = 0x80|ior->io_Command|(len<<3);
		++offset;
	    }
	    bcopy((char *)ior->io_Data+ior->io_Actual,pkt->data+offset,len);
	    offset += len;
	    ior->io_Actual += len;
	    if (ior->io_Actual == ior->io_Length) {
		free(ior->io_Data);
		free(ior);
		ior = (XIOR *)RemHead(&TxList);          /* Next packet      */
		if (ior == NULL)
		    break;
	    }
	}
	pkt->iolength = offset + OVERHEAD;
	pkt->lenh = offset >> 8;
	pkt->lenl = offset;
	{
	    register uword chksum = chkbuf(pkt->data, offset);
	    pkt->data[offset+0] = chksum >> 8;
	    pkt->data[offset+1] = chksum;
	}
	NetWrite(&pkt->sync, pkt->iolength, 1);
	if (ior) {
	    ++ior->io_Pri;
	    Enqueue(&TxList, ior);
	    --ior->io_Pri;
	}
	++WPUsed;
    }
}

void
dumpcheck(ptr)
register ubyte *ptr;
{
    register short i;
    for (i = 0;i < 8; ++i) {
	if (ptr[i])
	    replywindow(i);
	ptr[i] = 0;
    }
}

void
do_cmd(ctl, buf, bytes)
short ctl;	/* actually a ubyte */
ubyte *buf;
{
    ubyte window = ctl >> 5;
    ubyte rwindow;
    static ubyte Chk, Chkwin[8];	/* remember window checks */

    if (ctl == -1)  {                           /* end of sequence */
	dumpcheck(Chkwin);
	Chk = 0;
	return;
    }
    if ((ctl & PKF_MASK) == PKCMD_CHECK) {      /* CHECK packet    */
	Chkwin[window] = 1;
	Chk = 1;
	return;
    }
    if (Chk) {                                  /* NON-CHECK packet*/
	dumpcheck(Chkwin);
	Chk = 0;
    }
    switch(ctl & PKF_MASK) {
    case PKCMD_WRITE:
	rwindow = (window - RPStart) & 7;
	if (rwindow < 4) {
	    bcopy(buf, RPak[rwindow]->data, bytes);
	    RPak[rwindow]->iolength = bytes;
	    RPak[rwindow]->state = READY;
	    if (rwindow == 0)
		do_rupdate();
	}
	replywindow(window);
	break;
    case PKCMD_ACK:
	rwindow = (window - WPStart) & 7;
	if (rwindow < WPUsed)       /*  mark as sent    */
	    WPak[rwindow]->state = EMPTY;
	break;
    case PKCMD_NAK:		    /*	resend		*/
	rwindow = (window - WPStart) & 7;
	if (rwindow < WPUsed) {     /*  resend          */
	    NetWrite(&WPak[rwindow]->sync, WPak[rwindow]->iolength, 0);
	} else {
	    fprintf(stderr, "Soft Error: Illegal NAK\n");
	}
	break;
    case PKCMD_ACKRSTART:
    case PKCMD_RESTART:
	{
	    uword chan;
	    uword chksum;
	    int len;
	    int fd;

	    if ((ctl & PKF_MASK) == PKCMD_ACKRSTART)
		Restart = 0;
	    do_netreset();
	    if ((ctl & PKF_MASK) == PKCMD_RESTART) {
		gethostname(WCBuf+5, sizeof(WCBuf)-5-3);
		len = strlen(WCBuf+5)+1;
		WCBuf[0] = SYNC;
		WCBuf[1] = PKCMD_ACKRSTART | PKF_DATA;
		WCBuf[2] = (SYNC << 1) ^ WCBuf[1];
		WCBuf[3] = 0;
		WCBuf[4] = len;
		chksum = chkbuf(WCBuf+5, len);
		WCBuf[5+len] = chksum >> 8;
		WCBuf[6+len] = chksum;
		NetWrite(WCBuf, 7+len, 1);
	    }
	    if (bytes)
		setlistenport(buf);
	    else
		setlistenport("");
	    do_wupdate();
	    WTimeout(WTIME);
	}
	break;
    }
    do_rupdate();
}

do_rupdate()
{
    while (RPak[0]->state == READY) {
	register PKT *pkt = RPak[0];
	register ubyte *ptr = pkt->data;
	register uword len;
	uword iolen = pkt->iolength;
	ubyte cmd;

	if (DDebug)
	    fprintf(stderr, "Interpret Received Commands\n");
	while (iolen) {
	    cmd = SCMD_DATA;
	    len = ptr[0];
	    ++ptr;
	    --iolen;
	    if (len >= 128) {
		if (len < 0xC0) {
		    cmd = len & 7;
		    len = (len >> 3) & 7;
		} else {
		    len = ((len << 8) | *ptr) & 0x3FFF;
		    ++ptr;
		    --iolen;
		}
	    }
	    iolen -= len;
	    if (DDebug)
		fprintf(stderr, " MPXCMD %ld (%ld bytes)\n", cmd, len);
	    do_reccmd(cmd, ptr, len);
	    ptr += len;
	}
	RPak[0] = RPak[1];
	RPak[1] = RPak[2];
	RPak[2] = RPak[3];
	RPak[3] = pkt;
	pkt->state = EMPTY;
	++RPStart;
    }
}

do_reccmd(cmd, ptr, len)
ubyte *ptr;
{
    switch(cmd) {
    case SCMD_DATA:
	if (RChan < MAXCHAN && (Chan[RChan].flags & CHANF_ROK))
	    gwrite(Chan[RChan].fd, ptr, len);
	break;
    case SCMD_SWITCH:
	RChan = (ptr[0]<<8)|ptr[1];
	break;
    case SCMD_OPEN:
	{
	    register COPEN *cop = (COPEN *)ptr;
	    CACKCMD ack;
	    uword chan = (cop->chanh << 8) | cop->chanl;

	    ack.chanh = cop->chanh;
	    ack.chanl = cop->chanl;
	    ack.error = 0;

	    if (chan >= MAXCHAN || Chan[chan].state) {
		ack.error = 33;     /* magic */
		WriteStream(SCMD_ACKCMD, &ack, sizeof(CACKCMD), chan);
		break;
	    }
	    {
		int error;
		int s;
		uword port = (cop->porth<<8)|cop->portl;

		if (isinternalport(port)) {
		    error = iconnect(&s, port);
		} else {
		    struct sockaddr sa;
		    s = socket(PF_UNIX, SOCK_STREAM, 0);
		    if (DDebug)
			fprintf(stderr, " REC OPEN, CONNECTING ch%d po%d\n",
			    chan, port
			);
		    sa.sa_family = AF_INET;
		    sprintf(sa.sa_data,".PORT.%ld", port);
		    error = connect(s, &sa, sizeof(sa));
		    if (error < 0) {
			startserver(port);
			error = connect(s, &sa, sizeof(sa));
		    }
		    if (DDebug)
			fprintf(stderr, " CONNECTED err=%ld\n", error);
		}
		if (error < 0) {
		    ack.error = 2;
		} else {
		    extern void do_open();
		    fcntl(s, F_SETFL, FNDELAY);
		    Chan[chan].state = CHAN_OPEN;
		    Chan[chan].flags = CHANF_ROK|CHANF_WOK;
		    Chan[chan].fd = s;
		    Chan[chan].pri= cop->pri;
		    FD_SET(s, &Fdread);
		    FD_SET(s, &Fdexcept);
		    FdChan[s] = chan;
		    Fdstate[s] = do_open;
		}
		WriteStream(SCMD_ACKCMD, &ack, sizeof(CACKCMD), -1);
	    }
	}
	break;
    case SCMD_CLOSE:	/*  receive close   */
	{
	    extern void nop();
	    register CCLOSE *clo = (CCLOSE *)ptr;
	    uword chan = (clo->chanh<<8)|clo->chanl;
	    int fd = Chan[chan].fd;

	    if (DDebug)
		fprintf(stderr, " SCMD_CLOSE\n");
	    if (chan >= MAXCHAN || Chan[chan].state == CHAN_FREE)
		break;
	    /*
	    Chan[chan].state = CHAN_CLOSE;
	    Chan[chan].flags |= CHANF_RCLOSE;
	    */
	    Chan[chan].flags &= ~(CHANF_ROK|CHANF_WOK);
	    FD_CLR(fd, &Fdread);
	    FD_CLR(fd, &Fdexcept);
	    Chan[chan].state = CHAN_FREE;
	    Chan[chan].fd = -1;
	    Fdstate[fd] = nop;
	    close(fd);
	    ClearChan(&TxList, chan, 0);

	    if (Chan[chan].flags & CHANF_LCLOSE) {
		if (DDebug)
		    fprintf(stderr," REMOTE CLOSE %ld, LOCAL ALREADY CLOSED\n",
			fd
		    );
	    } else {
		CCLOSE cc;
		char dummy;
		cc.chanh = chan >> 8;
		cc.chanl = chan;
		WriteStream(SCMD_CLOSE, &cc, sizeof(CCLOSE), chan);
		/*
		shutdown(Chan[chan].fd, 2);
		write(Chan[chan].fd, &dummy, 0);
		*/
		if (DDebug)
		    fprintf(stderr," REMOTE CLOSE %ld, LOCAL NOT YET CLOSED\n",
			fd
		    );
	    }
	}
	break;
    case SCMD_ACKCMD:	/*  acknowledge of my open	*/
	{
	    register CACKCMD *cack = (CACKCMD *)ptr;
	    uword chan = (cack->chanh<<8)|cack->chanl;
	    if (chan >= MAXCHAN || Chan[chan].state != CHAN_LOPEN)
		break;
	    if (DDebug)
		fprintf(stderr, "ackerr = %ld\n", cack->error);
	    if (cack->error == 33) {
		uword newchan = alloc_channel();
		COPEN co;
		if (newchan < MAXCHAN) {
		    Chan[newchan] = Chan[chan];
		    Chan[chan].state = CHAN_FREE;
		    Chan[chan].fd = -1;
		    co.chanh = newchan >> 8;
		    co.chanl = newchan;
		    co.porth = Chan[chan].port >> 8;
		    co.portl = Chan[chan].port;
		    co.error = 0;
		    co.pri   = Chan[chan].pri;
		    WriteStream(SCMD_OPEN, &co, sizeof(COPEN), newchan);
		    break;
		}
	    }
	    if (cack->error) {
		extern void nop();
		ubyte error = cack->error;
		int fd = Chan[chan].fd;

		gwrite(fd, &error, 1);
		Fdstate[fd] = nop;
		Chan[chan].fd = -1;
		Chan[chan].state = CHAN_FREE;
		FD_CLR(fd, &Fdread);
		FD_CLR(fd, &Fdexcept);
		close(fd);
	    } else {
		ubyte error = 0;
		extern void do_open();
		gwrite(Chan[chan].fd, &error, 1);
		Chan[chan].state = CHAN_OPEN;
		Chan[chan].flags = CHANF_ROK|CHANF_WOK;
		Fdstate[Chan[chan].fd] = do_open;
	    }
	}
	break;
    case SCMD_EOFCMD:	/*  EOF on channel		*/
	{
	    register CEOFCMD *eof = (CEOFCMD *)ptr;
	    uword chan = (eof->chanh<<8)|eof->chanl;

	    if (chan < MAXCHAN && Chan[chan].state == CHAN_OPEN) {
		Chan[chan].flags &= ~eof->flags;
		if (eof->flags & CHANF_ROK) {
		    char dummy;
		    shutdown(Chan[chan].fd, 1);
		    write(Chan[chan].fd, &dummy, 0);
		}
	    }
	}
	break;
    case SCMD_QUIT:
	dneterror("QUIT");
	break;
    case SCMD_IOCTL:
	{
	    register CIOCTL *cio = (CIOCTL *)ptr;
	    uword chan = (cio->chanh<<8)|cio->chanl;

	    if (chan < MAXCHAN && Chan[chan].state == CHAN_OPEN) {
		switch(cio->cmd) {
		case CIO_SETROWS:
		    isetrows(Chan[chan].fd, (cio->valh<<8)|cio->vall);
		    break;
		case CIO_SETCOLS:
		    isetcols(Chan[chan].fd, (cio->valh<<8)|cio->vall);
		    break;
		case CIO_STOP:
		    break;
		case CIO_START:
		    break;
		case CIO_FLUSH:
		    ClearChan(&TxList, chan, 0);
		    break;
		}
	    }
	}
	break;
    default:
	break;
    }
}

replywindow(window)
{
    ubyte rwindow = (window - RPStart) & 7;
    WCBuf[0] = SYNC;

    if (DDebug) {
	if (rwindow >= 4 || RPak[rwindow]->state == READY)
	    fprintf(stderr, " ACK WINDOW %ld\n", window);
	else
	    fprintf(stderr, " NAK WINDOW %ld\n", window);
    }
    if (rwindow >= 4 || RPak[rwindow]->state == READY)  /* data ready */
	WCBuf[1] = PKCMD_ACK | (window << 5);           /* ack it     */
    else
	WCBuf[1] = PKCMD_NAK | (window << 5);           /* nack it    */
    WCBuf[2] = (SYNC << 1) ^ WCBuf[1];
    NetWrite(WCBuf, 3, 0);
}

