
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <dos/dos.h>
#include <clib/dos_protos.h>
#include <clib/exec_protos.h>

#include <clib/AMarquee_protos.h>

#include <pragmas/AMarquee_pragmas.h>

#define UNLESS(x) if (!(x))

struct Library * AMarqueeBase = NULL;
struct QSession * session     = NULL;

void CleanExit(void)
{
  if (session)      QFreeSession(session);       /* This MUST be done before we close the library! */
  if (AMarqueeBase) CloseLibrary(AMarqueeBase);
  printf("All done.\n");
}

/* Main program--simply increments our counter using a stream. */
int main(int argc, char ** argv)
{
  char * connectTo, * progName;
  int port = 2957;
  int count = 0;
  
  atexit(CleanExit);
    
  printf("StreamCheck: This program is designed to be used with StreamGen.\n");
  printf("             It will watch StreamGen's output and note any streaming\n");
  printf("             errors (skipped or duplicated numbers).\n");
  printf("             Note that only one StreamGen should be running!\n");
  
  if (argc < 2) 
  {
    printf("Usage:  StreamCheck hostname [myname]\n");
    exit(RETURN_WARN);
  }
  connectTo = argv[1] ? argv[1] : "localhost";
  progName  = argv[2] ? argv[2] : "streamcheck";

  if ((AMarqueeBase = OpenLibrary("amarquee.library",37L)) == NULL)
  {
    printf("Couldn't open amarquee.library v37!\n");
    exit(RETURN_ERROR);
  }
  printf("Connecting to %s:%i...\n",connectTo, port);
  if ((session = QNewSession(connectTo, port, progName)) == NULL)
  {
    printf("Couldn't connect to server %s:%i\n",connectTo, port);
    CloseLibrary(AMarqueeBase);
    exit(RETURN_WARN);
  }
  printf("StreamCheck connected to server %s:%i\n",connectTo, port);

  UNLESS((QSubscribeOp(session, "/#?/#?/streamcount", sizeof(count))) &&
         (QGo(session,0L)))
  {
    printf("Setup error.\n");
    exit(RETURN_ERROR);
  }
  
  /* Setup */
  while(1)
  {
    ULONG signals = Wait(SIGBREAKF_CTRL_C | (1L<<session->qMsgPort->mp_SigBit));
    
    if (signals & SIGBREAKF_CTRL_C)
    {
      printf("Aborting...\n");
      break;
    }
    if (signals & (1L<<session->qMsgPort->mp_SigBit))
    {
      struct QMessage * qmsg;
      BOOL BDie = FALSE;
      
      while((BDie == FALSE)&&(qmsg = GetMsg(session->qMsgPort)))
      {
        if (qmsg->qm_Status != QERROR_NO_ERROR)
        {
          printf("Received an error, aborting!\n");
          BDie = TRUE;
        }
        else
        {
          if (qmsg->qm_Data) 
          {
            if (count != 0)
            {
              int newVal = *((int *)qmsg->qm_Data);
              
              if (newVal != count+1) printf("Stream Error:  count was %i, newVal was %i.\n",count, newVal);
              count = newVal;
              printf("Message %p: count = %i\r", qmsg, count); fflush(stdout);
            }
            else count = *((int *)qmsg->qm_Data);
          }
          else
          {
            printf("StreamGenerator terminated, exiting!\n");
            BDie = TRUE;
          }
        }
        FreeQMessage(session, qmsg);
      }
      
      if (BDie) break;
    }
  }
  /* CleanExit called here via the atexit() feature */
}
