diff --git a/emu/Makefile b/emu/Makefile index 1f5a7b0..13c0c11 100644 --- a/emu/Makefile +++ b/emu/Makefile @@ -1,5 +1,5 @@ -SRC=emu.c util.c cmd.c apr.c mem.c tty.c pt.c dc.c dt.c netmem.c ../tools/pdp6common.c -H=pdp6.h ../tools/pdp6common.h +SRC=emu.c util.c threading.c cmd.c apr.c mem.c tty.c pt.c dc.c dt.c netmem.c ../tools/pdp6common.c +H=pdp6.h ../tools/pdp6common.h threading.h # clang #CFLAGS= -Wno-shift-op-parentheses -Wno-logical-op-parentheses \ # -Wno-bitwise-op-parentheses diff --git a/emu/apr.c b/emu/apr.c index 9328def..7d1a6c1 100644 --- a/emu/apr.c +++ b/emu/apr.c @@ -224,7 +224,7 @@ Device* makeapr(int argc, char *argv[]) { Apr *apr; - Thread th; + Task t; apr = malloc(sizeof(Apr)); memset(apr, 0, sizeof(Apr)); @@ -237,8 +237,8 @@ makeapr(int argc, char *argv[]) apr->iobus.dev[CPA] = (Busdev){ apr, wake_cpa, 0 }; apr->iobus.dev[PI] = (Busdev){ apr, wake_pi, 0 }; - th = (Thread){ nil, aprcycle, apr, 1, 0 }; - addthread(th); + t = (Task){ nil, aprcycle, apr, 1, 0 }; + addtask(t); return &apr->dev; } diff --git a/emu/args.h b/emu/args.h index a73082f..830e3a1 100644 --- a/emu/args.h +++ b/emu/args.h @@ -1,4 +1,6 @@ +#ifndef USED #define USED(x) ((void)x) +#endif #define SET(x) ((x)=0) extern char *argv0; diff --git a/emu/cmd.c b/emu/cmd.c index dcd9205..d04f131 100644 --- a/emu/cmd.c +++ b/emu/cmd.c @@ -726,8 +726,11 @@ commandline(char *line) } } +/* If there is no channel, execute immediately. + * Otherwise send line through channel so it can + * be executed synchronously in the simulation thread. */ void -cli(FILE *in) +cli(FILE *in, Channel **c) { size_t len; char *line; @@ -740,15 +743,20 @@ cli(FILE *in) printf("> "); if(getline(&line, &len, in) < 0) return; - commandline(line); - free(line); + if(c){ + chansend(c[0], &line); + chanrecv(c[1], &line); // wait for completion + }else{ + commandline(line); + free(line); + } } } void* cmdthread(void *p) { - cli(stdin); + cli(stdin, p); quit(0); return nil; } diff --git a/emu/dt.c b/emu/dt.c index 04eedfc..4560d22 100644 --- a/emu/dt.c +++ b/emu/dt.c @@ -776,7 +776,7 @@ Device* makedt(int argc, char *argv[]) { Dt551 *dt; - Thread th; + Task t; dt = malloc(sizeof(Dt551)); memset(dt, 0, sizeof(Dt551)); @@ -789,8 +789,8 @@ makedt(int argc, char *argv[]) // should have 30000 cycles per second, so one every 33μs // APR at 1 has an approximate cycle time of 200-300ns - th = (Thread){ nil, dtcycle, dt, 150, 0 }; - addthread(th); + t = (Task){ nil, dtcycle, dt, 150, 0 }; + addtask(t); return &dt->dev; } diff --git a/emu/emu.c b/emu/emu.c index fc591be..9d55f79 100644 --- a/emu/emu.c +++ b/emu/emu.c @@ -39,30 +39,30 @@ err(char *fmt, ...) } -Thread *threads; +Task *tasks; void -addthread(Thread th) +addtask(Task t) { - Thread *p; - p = malloc(sizeof(Thread)); - *p = th; - p->next = threads; - threads = p; + Task *p; + p = malloc(sizeof(Task)); + *p = t; + p->next = tasks; + tasks = p; } void* simthread(void *p) { - Thread *th; + Task *t; printf("[simthread] start\n"); for(;;) - for(th = threads; th; th = th->next){ - th->cnt++; - if(th->cnt == th->freq){ - th->cnt = 0; - th->f(th->arg); + for(t = tasks; t; t = t->next){ + t->cnt++; + if(t->cnt == t->freq){ + t->cnt = 0; + t->f(t->arg); } } err("can't happen"); @@ -104,10 +104,26 @@ dofile(const char *path) printf("Couldn't open file %s\n", path); return; } - cli(f); + cli(f, nil); fclose(f); } +/* Task for simulation. + * Execute commands synchronously. */ +void +readcmdchan(void *p) +{ + Channel **c; + char *line; + + c = p; + if(channbrecv(c[0], &line) == 1){ + commandline(line); + free(line); + chansend(c[1], &line); + } +} + void quit(int code) { diff --git a/emu/main_panel.c b/emu/main_panel.c index 167a4c8..0407e8f 100644 --- a/emu/main_panel.c +++ b/emu/main_panel.c @@ -2,7 +2,6 @@ #include #include #include -#include #include "args.h" typedef struct Point Point; @@ -516,7 +515,7 @@ usage(void) } int -main(int argc, char *argv[]) +threadmain(int argc, char *argv[]) { SDL_Surface *screen; SDL_Event ev; @@ -525,8 +524,9 @@ main(int argc, char *argv[]) Element *e; int i; int w, h; - pthread_t cmd_thread, sim_thread; const char *outfile; + Channel *cmdchans[2]; + Task t; Apr *apr; Ptr *ptr; @@ -644,8 +644,12 @@ main(int argc, char *argv[]) ptr = (Ptr*)getdevice("ptr"); ptp = (Ptp*)getdevice("ptp"); - pthread_create(&sim_thread, nil, simthread, apr); - pthread_create(&cmd_thread, nil, cmdthread, nil); + cmdchans[0] = chancreate(sizeof(char*), 1); + cmdchans[1] = chancreate(sizeof(void*), 1); + t = (Task){ nil, readcmdchan, cmdchans, 10, 0 }; + addtask(t); + threadcreate(simthread, nil); + threadcreate(cmdthread, cmdchans); for(;;){ while(SDL_PollEvent(&ev)) diff --git a/emu/main_serial.c b/emu/main_serial.c index 99dc302..f63262a 100644 --- a/emu/main_serial.c +++ b/emu/main_serial.c @@ -1,6 +1,5 @@ #include "pdp6.h" #include -#include #include "args.h" #include @@ -154,12 +153,13 @@ usage(void) } int -main(int argc, char *argv[]) +threadmain(int argc, char *argv[]) { - pthread_t cmd_thread, sim_thread; const char *outfile; const char *panelfile; int fd; + Channel *cmdchans[2]; + Task t; Apr *apr; Ptr *ptr; @@ -189,8 +189,12 @@ main(int argc, char *argv[]) apr = (Apr*)getdevice("apr"); ptr = (Ptr*)getdevice("ptr"); - pthread_create(&sim_thread, nil, simthread, apr); - pthread_create(&cmd_thread, nil, cmdthread, nil); + cmdchans[0] = chancreate(sizeof(char*), 1); + cmdchans[1] = chancreate(sizeof(void*), 1); + t = (Task){ nil, readcmdchan, cmdchans, 10, 0 }; + addtask(t); + threadcreate(simthread, nil); + threadcreate(cmdthread, cmdchans); if(panelfile){ fd = open(panelfile, O_RDWR); diff --git a/emu/mem_0 b/emu/mem_0 index cce9b0b..a5ed455 100644 --- a/emu/mem_0 +++ b/emu/mem_0 @@ -2157,7 +2157,7 @@ 721200000000 321600037451 542200037447 -322600037066 +322600037402 254000037402 563000033777 254000034000 @@ -2212,7 +2212,7 @@ 253400037524 321400037541 323500037470 -201400000055 +201400037536 332010037177 254000037470 336000000012 diff --git a/emu/netmem.c b/emu/netmem.c index 9e8f2c9..a2e7d42 100644 --- a/emu/netmem.c +++ b/emu/netmem.c @@ -91,7 +91,7 @@ makenetmem(int argc, char *argv[]) int port; Netmem *nm; Device *apr; - Thread th; + Task t; nm = malloc(sizeof(Netmem)); memset(nm, 0, sizeof(Netmem)); @@ -120,8 +120,8 @@ makenetmem(int argc, char *argv[]) printf("couldn't connect\n"); printf("netmem fd: %d\n", nm->fd); - th = (Thread){ nil, netmemcycle, nm, 50, 0 }; - addthread(th); + t = (Task){ nil, netmemcycle, nm, 50, 0 }; + addtask(t); return &nm->dev; } diff --git a/emu/pdp6.h b/emu/pdp6.h index 4ed3a65..fc23476 100644 --- a/emu/pdp6.h +++ b/emu/pdp6.h @@ -6,6 +6,7 @@ #include #include #include "../tools/pdp6common.h" +#include "threading.h" #define nelem(a) (sizeof(a)/sizeof(a[0])) #define nil NULL @@ -33,7 +34,9 @@ int readn(int fd, void *data, int n); int dial(const char *host, int port); void quit(int code); -void cli(FILE *f); +void commandline(char *line); +void cli(FILE *f, Channel **c); +void readcmdchan(void *p); void *cmdthread(void *); void *simthread(void *p); void dofile(const char *path); @@ -84,18 +87,18 @@ typedef struct IOBus IOBus; typedef struct Apr Apr; -typedef struct Thread Thread; -struct Thread +typedef struct Task Task; +struct Task { - Thread *next; /* link to next thread */ + Task *next; /* link to next task */ void (*f)(void*); void *arg; - int freq; /* how often the thread is serviced */ + int freq; /* how often the task is serviced */ int cnt; }; -extern Thread *threads; -void addthread(Thread th); +extern Task *tasks; +void addtask(Task t); typedef struct Device Device; struct Device diff --git a/emu/pt.c b/emu/pt.c index 025461a..2bcd015 100644 --- a/emu/pt.c +++ b/emu/pt.c @@ -240,7 +240,7 @@ Device* makeptp(int argc, char *argv[]) { Ptp *ptp; - Thread th; + Task t; ptp = malloc(sizeof(Ptp)); memset(ptp, 0, sizeof(Ptp)); @@ -251,8 +251,8 @@ makeptp(int argc, char *argv[]) ptp->dev.ioconnect = ptpioconnect; ptp->fd = -1; - th = (Thread){ nil, ptpcycle, ptp, 1000, 0 }; - addthread(th); + t = (Task){ nil, ptpcycle, ptp, 1000, 0 }; + addtask(t); return &ptp->dev; } @@ -260,7 +260,7 @@ Device* makeptr(int argc, char *argv[]) { Ptr *ptr; - Thread th; + Task t; ptr = malloc(sizeof(Ptr)); memset(ptr, 0, sizeof(Ptr)); @@ -271,7 +271,7 @@ makeptr(int argc, char *argv[]) ptr->dev.ioconnect = ptrioconnect; ptr->fd = -1; - th = (Thread){ nil, ptrcycle, ptr, 1000, 0 }; - addthread(th); + t = (Task){ nil, ptrcycle, ptr, 1000, 0 }; + addtask(t); return &ptr->dev; } diff --git a/emu/test_dt.c b/emu/test_dt.c index 90c399d..d0a698b 100644 --- a/emu/test_dt.c +++ b/emu/test_dt.c @@ -6,7 +6,7 @@ Dx555 *dx; IOBus iobus; IOBus *bus = &iobus; int pireq; -Thread dtthread; +Task dttask; void setreq(IOBus *bus, int dev, u8 pia) @@ -16,9 +16,9 @@ setreq(IOBus *bus, int dev, u8 pia) } void -addthread(Thread th) +addtask(Task t) { - dtthread = th; + dttask = t; } @@ -203,7 +203,7 @@ fwdtest_w(void) cono(bus, DC, DARQ|DBRQ|OUT|CHMOD_6|DEV1|7); cono(bus, UTC, SEL|GO|FWD|DLY3|FN_WBN|DEV1|0); while(dt->ut_go){ - dtthread.f(dt); + dttask.f(dt); if(pireq == 7){ if(bufsz > 0){ w = bufp[--bufsz]; @@ -228,7 +228,7 @@ revtest_w(void) cono(bus, DC, DARQ|DBRQ|OUT|CHMOD_6|DEV1|7); cono(bus, UTC, SEL|GO|REV|DLY3|FN_WD|DEV1|0); while(dt->ut_go){ - dtthread.f(dt); + dttask.f(dt); if(pireq == 7){ if(bufsz > 0){ w = bufp[--bufsz]; @@ -251,7 +251,7 @@ fwdtest_r(void) cono(bus, DC, DBDAMOVE|IN|CHMOD_6|DEV1|7); cono(bus, UTC, SEL|GO|FWD|DLY3|FN_RD|DEV1|0); while(dt->ut_go){ - dtthread.f(dt); + dttask.f(dt); if(pireq == 7){ w = datai(bus, DC); printf("DC got: %012lo\n", w); @@ -271,7 +271,7 @@ revtest_r(void) cono(bus, DC, DBDAMOVE|IN|CHMOD_6|DEV1|7); cono(bus, UTC, SEL|GO|REV|DLY3|FN_RD|DEV1|0); while(dt->ut_go){ - dtthread.f(dt); + dttask.f(dt); if(pireq == 7){ w = datai(bus, DC); printf("DC got: %012lo\n", w); @@ -293,7 +293,7 @@ fmttest(void) cono(bus, DC, DARQ|DBRQ|OUT|CHMOD_6|DEV1|7); cono(bus, UTC, SEL|GO|FWD|DLY3|FN_WTM|DEV1|0); while(dt->ut_go){ - dtthread.f(dt); + dttask.f(dt); if(pireq == 7){ if(bufsz > 0){ --bufsz; diff --git a/emu/threading.c b/emu/threading.c new file mode 100644 index 0000000..3cb9f0f --- /dev/null +++ b/emu/threading.c @@ -0,0 +1,357 @@ +#include "threading.h" +#include + +#define USED(x) (void)(x) + +/* + * Locks + */ + +static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER; + +static void +lockinit(Lock *lk) +{ + pthread_mutexattr_t attr; + + pthread_mutex_lock(&initmutex); + if(lk->init == 0){ + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); + pthread_mutex_init(&lk->mutex, &attr); + pthread_mutexattr_destroy(&attr); + lk->init = 1; + } + pthread_mutex_unlock(&initmutex); +} + +void +lock(Lock *lk) +{ + if(!lk->init) + lockinit(lk); + if(pthread_mutex_lock(&lk->mutex) != 0) + abort(); +} + +int +canlock(Lock *lk) +{ + int r; + + if(!lk->init) + lockinit(lk); + r = pthread_mutex_trylock(&lk->mutex); + if(r == 0) + return 1; + if(r == EBUSY) + return 0; + abort(); +} + +void +unlock(Lock *lk) +{ + if(pthread_mutex_unlock(&lk->mutex) != 0) + abort(); +} + + + +static void +rendinit(Rendez *r) +{ + pthread_condattr_t attr; + + pthread_mutex_lock(&initmutex); + if(r->init == 0){ + pthread_condattr_init(&attr); + pthread_cond_init(&r->cond, &attr); + pthread_condattr_destroy(&attr); + r->init = 1; + } + pthread_mutex_unlock(&initmutex); +} + +void +rsleep(Rendez *r) +{ + if(!r->init) + rendinit(r); + if(pthread_cond_wait(&r->cond, &r->l->mutex) != 0) + abort(); +} + +void +rwakeup(Rendez *r) +{ + if(pthread_cond_signal(&r->cond) != 0) + abort(); +} + +void +rwakeupall(Rendez *r) +{ + if(pthread_cond_broadcast(&r->cond) != 0) + abort(); +} + + +/* + * Threads + */ + + +static Lock idlock; +static pthread_t *threads; +static int numthreads; +static int maxthreads; + +static __thread int myid; +static __thread void *mypointer; + +static int +addthreadid(void) +{ + int id; + + if(numthreads >= maxthreads){ + maxthreads += 64; + threads = realloc(threads, maxthreads*sizeof(*threads)); + } + id = numthreads++; + threads[id] = 0; + return id; +} + +static pthread_t +gethandle(int id) +{ + pthread_t th; + lock(&idlock); + th = threads[id]; + unlock(&idlock); + return th; +} + +struct ThreadArg +{ + int id; + void *(*f)(void*); + void *arg; +}; + +static void* +trampoline(void *p) +{ + struct ThreadArg *args; + void *(*f)(void*); + void *arg; + + args = p; + myid = args->id; + f = args->f; + arg = args->arg; + free(args); + return f(arg); +} + +int +threadcreate(void *(*f)(void*), void *arg) +{ + struct ThreadArg *args; + int id; + + lock(&idlock); + id = addthreadid(); + args = malloc(sizeof(*args)); + args->id = id; + args->f = f; + args->arg = arg; + pthread_create(&threads[id], nil, trampoline, args); + unlock(&idlock); + return id; +} + +void +threadexits(void *ret) +{ + pthread_exit(ret); +} + +int +threadid(void) +{ + return myid; +} + +void** +threaddata(void) +{ + return &mypointer; +} + +void +threadkill(int id) +{ + assert(id >= 0 && id < numthreads); + pthread_cancel(gethandle(id)); +} + +void +threadwait(int id) +{ + pthread_join(gethandle(id), nil); +} + +int +main(int argc, char *argv[]) +{ + int id; + + id = addthreadid(); + threads[id] = pthread_self(); + return threadmain(argc, argv); +} + + +/* + * Channels + */ + + +Channel* +chancreate(int elemsize, int bufsize) +{ + Channel *c; + + c = malloc(sizeof(Channel) + bufsize*elemsize); + if(c == nil) + return nil; + memset(c, 0, sizeof(Channel)); + c->elemsize = elemsize; + c->bufsize = bufsize; + c->nbuf = 0; + c->buf = (uchar*)(c+1); + c->full.l = &c->lock; + c->empty.l = &c->lock; + return c; +} + +void +chanclose(Channel *c) +{ + lock(&c->lock); + c->closed = 1; + rwakeupall(&c->full); + rwakeupall(&c->empty); + unlock(&c->lock); +} + +void +chanfree(Channel *c) +{ + // TODO: don't free chans still in use + free(c); +} + +static int cansend(Channel *c) { return c->nbuf < c->bufsize; } +static int canrecv(Channel *c) { return c->nbuf > 0; } + +static void +chansend_(Channel *c, void *p) +{ + uchar *pp; + + assert(cansend(c)); + pp = c->buf + (c->off+c->nbuf)%c->bufsize * c->elemsize; + memmove(pp, p, c->elemsize); + c->nbuf++; +} + +static void +chanrecv_(Channel *c, void *p) +{ + uchar *pp; + + assert(canrecv(c)); + pp = c->buf + c->off*c->elemsize; + memmove(p, pp, c->elemsize); + c->nbuf--; + if(++c->off == c->bufsize) + c->off = 0; +} + +int +chansend(Channel *c, void *p) +{ + lock(&c->lock); + while(!(c->closed || cansend(c))) + rsleep(&c->full); + /* closed or can send */ + if(c->closed){ + /* can never send to closed chan */ + unlock(&c->lock); + return -1; + } + chansend_(c, p); + rwakeup(&c->empty); + unlock(&c->lock); + return 1; +} + +int +chanrecv(Channel *c, void *p) +{ + lock(&c->lock); + while(!(c->closed || canrecv(c))) + rsleep(&c->empty); + /* closed or can receive */ + if(canrecv(c)){ + /* can still receive from closed chan */ + chanrecv_(c, p); + rwakeup(&c->full); + unlock(&c->lock); + return 1; + } + unlock(&c->lock); + return -1; +} + +int +channbsend(Channel *c, void *p) +{ + lock(&c->lock); + if(c->closed){ + /* can never send to closed chan */ + unlock(&c->lock); + return -1; + } + if(cansend(c)){ + chansend_(c, p); + rwakeup(&c->empty); + unlock(&c->lock); + return 1; + } + unlock(&c->lock); + return 0; +} + +int +channbrecv(Channel *c, void *p) +{ + lock(&c->lock); + if(canrecv(c)){ + /* can still receive from closed chan */ + chanrecv_(c, p); + rwakeup(&c->full); + unlock(&c->lock); + return 1; + } + if(c->closed){ + unlock(&c->lock); + return -1; + } + unlock(&c->lock); + return 0; +} diff --git a/emu/threading.h b/emu/threading.h new file mode 100644 index 0000000..b753eec --- /dev/null +++ b/emu/threading.h @@ -0,0 +1,60 @@ +#include +#include +#include +#include + +#define nil NULL +typedef unsigned char uchar; +typedef unsigned int uint; + +typedef struct Lock Lock; +struct Lock +{ + int init; + pthread_mutex_t mutex; +}; +void lock(Lock *lk); +int canlock(Lock *lk); +void unlock(Lock *lk); + + +typedef struct Rendez Rendez; +struct Rendez +{ + int init; + Lock *l; + pthread_cond_t cond; +}; +void rsleep(Rendez *r); +void rwakeup(Rendez *r); +void rwakeupall(Rendez *r); + + +typedef struct Channel Channel; +struct Channel +{ + int bufsize; + int elemsize; + uchar *buf; + int nbuf; + int off; + + int closed; + Lock lock; + Rendez full, empty; +}; +Channel *chancreate(int elemsize, int bufsize); +void chanclose(Channel *chan); +void chanfree(Channel *c); +int chansend(Channel *c, void *p); +int chanrecv(Channel *c, void *p); +int channbsend(Channel *c, void *p); +int channbrecv(Channel *c, void *p); + +int threadmain(int argc, char *argv[]); +int threadcreate(void *(*f)(void*), void *arg); +void threadexits(void *ret); +int threadid(void); +void **threaddata(void); +void threadkill(int id); +void threadwait(int id); diff --git a/emu/tty.c b/emu/tty.c index 6f92773..c58cc79 100644 --- a/emu/tty.c +++ b/emu/tty.c @@ -136,7 +136,7 @@ Device* maketty(int argc, char *argv[]) { Tty *tty; - Thread th; + Task t; tty = malloc(sizeof(Tty)); memset(tty, 0, sizeof(Tty)); @@ -148,8 +148,8 @@ maketty(int argc, char *argv[]) tty->fd = -1; - th = (Thread){ nil, ttycycle, tty, 1, 0 }; - addthread(th); + t = (Task){ nil, ttycycle, tty, 1, 0 }; + addtask(t); return &tty->dev; }