8
Threads and channels
Threads
Inferno only supports threads, i.e., processes with a shared address space and shared resources. Using pctl()
a thread can
| FORKNS | NEWNS | split it's namespace from it's creator or start with an empty one. | |
| FORKFD | NEWFD | control it's file connections alone or start over only with those passed explicitly. | |
| NEWPGRP | start a new process group. |
Threads in the same process group are terminated together if killgrp is written into the ctl file
of one of the threads. sh
issues FORKNS upon startup and FORKFD when starting descendants.
The notions of process and thread are mixed up. Limbo only refers to threads that are created using spawn
and terminate themselves with exit
or return, or which are terminated by kill
, i.e., by input into the ctl file
. Inferno only refers to processes. I do not see a difference, however.
An address space consists of modules with reference semantics. They are loaded explicitly with load
and disappear once there are no more references. Because modules contain global variables, threads must coordinate their accesses, see Monitor
.
Channels
A channel
is a storage space for one value with a defined type that can, however, only be used for send and receive operations. Channels use reference semantics. There are the following operations:
The punchline is that sending and receiving must take place in different threads because both operations block until the transfer really takes place, i.e., until a sender and a receiver meet at the channel.
demo illustrates the various operations:
{08/demo.b}
implement Demo;
include "sys.m"; sys: Sys;
include "draw.m"; Context: import Draw;
Demo: module {
init: fn (ctxt: ref Context, argv: list of string);
};
typ: type (int, string);
init (nil: ref Context, nil: list of string) {
sys = load Sys Sys->PATH;
| a := array[3] of { * => chan of typ }; | ||
| spawn rcv(a); | ||
| a[2] <-= (1, "axel"); | ||
| a[1] <-= (2, "was"); | ||
| a[0] <-= (3, "here"); |
}
rcv (a: array of chan of typ) {
| (i, s) := <- a[2]; | ||
| sys->print("%d %s\n", i, s); | ||
| waiting: while () | ||
sys->print("%d: %d %s\n", n, i, s);
break waiting;
* =>
sys->print("%d: nope\n", n);
}
| (n, (i, s)) = <- a; | ||
| sys->print("%d: %d %s\n", n, i, s); |
}
{}
The syntax array[] of { ... }
creates values, i.e., here we get an array with 3 channels. The receiver then gets 3 messages by way of different channels.
perky$ demo
1 axel
0: nichts
1: 2 was
0: 3 here
alt { ... }
expects channels in it's tags, controls the first one in each tag and selects a block which has a non-blocking tag. If * is specified, alt does not block if all tags would block, but instead selects the block for *.
Channels can be used to synchronize threads which compete for global variables. The following idea
originated in the Limbo language definition:
{08/mon.m}
# mon := Monitor.new()
# mon.lock() # first call succeeds, subsequent block
# mon.test() # returns true if attempt to lock successful
# mon.unlock() # releases lock established by lock/test
# mon.destroy() # terminates partner thread
Mon: module {
PATH: con "mon.dis";
Monitor: adt {
ch: chan of int; # synchronizing channel
ctl: chan of int; # to abort monitor thread
fix: chan of int; ## repair a bug
new: fn (): ref Monitor;
lock: fn (m: self ref Monitor);
test: fn (m: self ref Monitor): int;
unlock: fn (m: self ref Monitor);
destroy: fn (m: self ref Monitor);
};
};
{}
For synchronization one creates an instance of Monitor:
{08/mon.b}
implement Mon;
include "mon.m";
Monitor.new (): ref Monitor {
| result := ref Monitor(chan of int, nil, nil);## | create instance | |
| spawn main(result); | create partner thread | |
| return result; |
}
{}
A thread is created which can take turns to act as a partner for lock(), test(), and unlock(), and which is terminated in destroy().
There is, however, an internal bug (##): If two alt statements attempt to send to the same channels and if one alt as in test() can avoid to send, the other alt subsequently no longer tries to send.
Monitor.unlock (m: self ref Monitor) {
<- m.ch; # blocks until <-= in lock
}
{}
test() works like lock()but uses an alt statement to avoid blocking:
{08/mon.b}
Monitor.test (m: self ref Monitor): int {
alt {
m.ch <-= 0 => return 1; # could lock
* => m.fix <-= 0; ## reset main, if necessary
return 0; # failed to lock
}
}
{}
main() alternately executes unlock() and lock() but additionally monitors ctl:
{08/mon.b}
main (m: ref Monitor) {
m.ctl = chan of int; m.fix = chan of int;
loop: while () {
alt {
<- m.ch => ; # unlock
<- m.ctl => break loop;
<- m.fix => continue loop; ## reset, if necessary
}
while () alt { ## loop for reset
m.ch <-= 0 => continue loop; # lock
<- m.ctl => break loop;
<- m.fix => ; ## reset, if necessary
}
}
m.ctl = nil;
}
{}
Thanks to unlock() in main() the first call to lock() in a client thread will be successful. Further calls to lock() block until unlock() in a client either meets lock() in main() or in another client. Basically it's two cogwheels slowly rolling along each other.
destroy() sends to ctl and thus terminates the other thread in which main() is executed:
{08/mon.b}
Monitor.destroy (m: self ref Monitor) {
if (m.ctl != nil)
m.ctl <-= 0;
}
| {} |
Monitor assumes that the participating processes cooperate. If a thread is aborted that successfully executed lock() the other threads waiting to lock() will block forever. If destroy() is not called, a thread remains behind -- as is the case with pipeline
problems in the shell. (Inferno 2.0 has a device for pipes.)
A classic (if somewhat unhygienic) synchronization problem concerns some philosophers who think and eat at a round table, where for eating each requires the two forks to the right and left of his place at the table:
![]()
One can employ a monitor as a butler who manages admission to the table but that does not lead to efficient spaghetti consumption.
If a monitor is used for each fork the system can deadlock: each philospher reaches for a fork and blocks reaching for the second one.
If a fork can be obtained conditionally, the following strategy has promise:
grab left fork
if right fork is available:
eat
release right fork
release left fork
Thus, the system won't deadlock, but a philosopher can still die of starvation (while heavily banging his left fork). If the thinkers occasionally confuse left and right, their risk diminishes...
Philosopher is an adt that alternates between thinking and enjoying a multi-course meal using two forks modeled as a Monitor each. If asked, he happily comments on his activities:
{08/phils.b adt}
Philosopher: adt {
me: int; # identity
fork: array of ref Monitor; # left and right fork
enjoy: fn (p: self ref Philosopher, ncourses, talk: int);
};
Philosopher.enjoy (p: self ref Philosopher, ncourses, talk: int) {
course: for (n := 1; n <= ncourses; ++ n) {
if (talk) sys->print("%d thinks\n", p.me);
sleep();
for ((l, r) := (p.me % 2, !(p.me % 2)); ; (l,r) = (r,l)) {
p.fork[l].lock();
if (p.fork[r].test()) {
if (talk) sys->print("%d eats\n", p.me);
sleep();
p.fork[r].unlock();
p.fork[l].unlock();
continue course;
} else if (talk) sys->print("%d missed\n", p.me);
p.fork[l].unlock();
}
}
if (talk) sys->print("%d leaves\n", p.me);
}
sleep() {
sys->sleep(random->randomint() & 1023);
}
{}
Whether a Philosopher assumes fork[0] or fork[1] to be his left fork, depends on his serial number me. He changes his mind whenever he is not successful.
sleep() demonstrates how the Random module
and lastly #c/random
can be used to delay for a randomly chosen number of milliseconds.
include "sys.m"; sys: Sys; stderr: ref Sys->FD;
include "draw.m"; Context: import Draw;
include "mon.m"; mon: Mon; Monitor: import mon;
include "keyring.m";
include "security.m"; random: Random;
Phils: module {
init: fn (ctxt: ref Context, argv: list of string);
};
usage () {
sys->fprint(stderr, "usage: phils [-tw] #phils #courses\n");
exit;
}
{08/phils.b init}
init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
stderr = sys->fildes(2);
mon = load Mon Mon->PATH;
if (mon == nil) {
sys->fprint(stderr, "phils: cannot load Mon: %r\n");
exit;
}
random = load Random Random->PATH;
if (random == nil) {
sys->fprint(stderr, "phils: cannot load Random: %r\n");
exit;
}
topt, wopt: int = 0;
while ((argv = tl argv) != nil) {
arg := hd argv;
if (arg[0] != '-') break;
for (i := 1; i < len arg; i ++)
case arg[i] {
't' => ++ topt; # -t: talk
'w' => ++ wopt; # -w: wait
* => usage();
}
}
if (argv == nil || len argv < 2) usage();
n := int hd argv; # number of philosophers
if (n <= 1) usage();
nc := int hd tl argv; # number of courses
if (nc < 1) usage();
wait: ref sys->FD;
if (wopt) {
pid := string sys->pctl(0, nil);
wait = sys->open("/prog/"+pid+"/wait", sys->OREAD);
if (wait == nil) {
sys->fprint(stderr, "phils: %s: cannot open: %r\n",
"/prog/"+pid+"/wait");
exit;
}
}
for (p = 0; p < n; ++ p)
spawn phil[p].enjoy(nc, topt);
if (wopt) {
buf := array[512] of byte;
for (p = 0; p < n; ++ p) {
l := sys->read(wait, buf, len buf);
sys->print("%s\n", string buf[0:l]);
}
for (f := 0; f < n; ++ f)
fork[f].destroy();
}
}
{}
First the necessary modules are loaded and the number of philosophers and courses are read from the command line. Then the forks are set up, passed to the philosopher and enjoy() is run in various threads.
If one wants to wait until threads are terminated, the creator first has to get read access to the wait file
. Every thread created afterwards upon exit leaves exactly one message in this file which can be retrieved with read().
In this case the forks' threads can be removed. As an alternative, all threads, philosophers and forks, could have been collected in a separate process group and eliminated altogether by writing killgrp into any of the ctl files.
file2chan("path", "file", flags) creates a file #sfile/file and then binds the directory #sfile according to the usual flags
at path into the namespace. As a result there is a new file path/file which apparently can only be eliminated using unmount(nil, "path")
. /chan for path and a number for file are typical values; this is used to make pipes
.
The punchline is the result of file2chan(): One gets two channels to manage read, write, and close operations for the file. Open operations cannot be recognized as such.
The following arrangement installs a file for an echo server, i.e., whatever is written into the file can be read back:
![]()
reader() and writer() are two threads that send data written to path/file over a channel data and make it available for reading from path/file.
server() is another thread that terminates it's process group if it receives something from the channel ctl. The threads executing reader(), writer(), and server() all belong to the same process group. If writer() notices that the last client has disconnected from path/file, it sends on ctl and thus terminates the threads.
The arrangement is useful enough to be packaged as a module:
{08/demon.m}
Demon: module {
PATH: con "demon.dis";
demon: fn (srv: string, mount: int, ctl: chan of int,
first, last: chan of array of byte): string;
dbg: int;
};
{}
srv and mount are the information for file2chan(). ctl may be nil, than server() does not exist and reader() and writer() are not terminated. first and last represent data but in such a fashion that more threads can be inserted inbetween.
echod shows how easily a service can be set up with demon():
{08/echod.b}
# provide echo service on a local path/file
# echod path/file
implement Echod;
include "sys.m"; sys: Sys;
include "draw.m"; Context: import Draw;
include "demon.m";
Echod: module {
init: fn (ctxt: ref Context, argv: list of string);
};
init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
demon := load Demon Demon->PATH;
if (demon == nil)
exits("cannot load Demon module: "+sys->sprint("%r"));
demon->dbg = len argv > 2; # extra argument for debugging
if (argv == nil || len argv < 2)
exits("usage: echod path/file [-d]");
sys->pctl(sys->NEWPGRP, nil); # new process group
data := chan of array of byte; # channel for echo service
s := demon->demon(hd tl argv, sys->MBEFORE, chan of int, data, data);
if (s != nil) exits(s);
} # server in background...
Demon ![]()
{08/demon.b}
implement Demon;
include "sys.m"; sys: Sys; FD, FileIO: import Sys; stderr: ref FD;
include "regex.m";
include "demon.m";
demon (srv: string, mount: int, ctl: chan of int,
first, last: chan of array of byte): string {
if (sys == nil) sys = load Sys Sys->PATH;
if (dbg) stderr = sys->fildes(2);
regex := load Regex Regex->PATH;
if (regex == nil)
return "cannot load Regex module: "+sys->sprint("%r");
# path/file
(beg, nil) := regex->execute(regex->compile("[^/]/[^/]+$"), srv);
if (beg < 0)
return "unsuitable path: "+srv;
io := sys->file2chan(srv[0:beg+1], srv[beg+2:], mount);
if (io == nil)
return srv+": cannot create: "+sys->sprint("%r");
if (ctl != nil)
spawn server(io, first, last, ctl);
else {
spawn reader(io, first);
spawn writer(last, io, nil);
}
return nil;
}
{}
A regular expression
is used to split srv into dirname and basename which have to be passed separately to file2chan(); in reality #sfilename/filename is bound to dirname.
spawn reader(io, first);
spawn writer(last, io, ctl);
if (dbg) sys->fprint(stderr, "demon: serving\n");
<- ctl;
fd := sys->open(pgrp, sys->OWRITE);
if (fd != nil)
sys->fprint(fd, "killgrp"); # terminate process group
if (dbg) sys->fprint(stderr, "demon: killed\n");
}
{}
reader() waits for input and sends it into the channel first. Once the last channel closes it's input connection nil must definitely be sent as a last message to first; otherwise the service might wait forever for first.
{08/demon.b}
reader (io: ref FileIO, first: chan of array of byte) {
loop: while () {
if (dbg) sys->fprint(stderr, "demon reader: recv\n");
(nil, data, nil, wc) := <- io.write;
if (wc == nil) break; # close on srv
retry: for (retry := 0;;) alt {
wc <-= (len data, nil) => # reply successful write...
if (dbg) sys->fprint(stderr,
"demon reader: %d\n", len data);
break retry;
* => # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon reader: %d: fail\n", len data);
break loop;
}
}
first <-= data; # ...and pass data on
}
if (dbg) sys->fprint(stderr, "demon reader: eof\n");
first <-= nil; # eof
if (dbg) sys->fprint(stderr, "demon reader: exit\n");
}
{}
alt is supposed to keep reader() from getting permanently blocked if the last client is terminated during a write operation.
setup() receives a FIFO list of queries from the client and potentially information about bytes that are already available and ensures that either bytes and at least one query is available or that it is clear that the last client has closed the connection:
{08/demon.b}
reply: type (array of byte, string); # rc <-=
tupel: type (chan of reply, int); # rc, count
transfer: type (list of tupel, int, array of byte); # queue, bytes, data
close: transfer = (nil, 0, nil); # preemptive
setup (queue: list of tupel, bytes: int, data: array of byte,
last: chan of array of byte, io: ref FileIO): transfer {
while (bytes < 0) { # need data and request
if (dbg) sys->fprint(stderr, "demon writer: wait\n");
alt {
(nil, count, nil, rc) := <- io.read => # got request
if (rc == nil) return close; # closed srv
if (dbg) sys->fprint(stderr,
"demon writer: req %x/%d\n", rc, count);
queue = rev((rc, count)::rev(queue));
data = <- last => # got data
if (data == nil) bytes = 0; # got eof
else bytes = len data; # got buffer
if (dbg) sys->fprint(stderr,
"demon writer: data %d\n", bytes);
}
}
if (len queue > 0) # can dequeue request
return (queue, bytes, data);
if (dbg) sys->fprint(stderr, "demon writer: recv\n");
(nil, count, nil, rc) := <- io.read; # block for request
if (rc == nil) return close; # closed srv
if (dbg) sys->fprint(stderr, "demon writer: req %x/%d\n", rc, count);
return ((rc, count)::nil, bytes, data);
}
{}
loop: while () {
(queue, bytes, data) = setup(queue, bytes, data, last, io);
if (queue == nil) break;
(rc, count) := hd queue; queue = tl queue;
if (dbg) sys->fprint(stderr,
"demon writer: reply %x/%d %d\n", rc, count, bytes);
if (bytes > count) # must save leftover bytes
for (retry := 0;;) alt {
rc <-= (data[0:count], nil) =>
data = data[count:]; bytes -= count;
continue loop;
* => # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon writer: failed\n");
continue loop;
}
}
else # can send all
for (retry = 0;;) alt {
rc <-= (data, nil) =>
if (bytes == 0)
break loop; # send eof just once
bytes = -1;
continue loop;
* => # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon writer: failed\n");
continue loop;
}
}
}
if (dbg) sys->fprint(stderr, "demon writer: exit\n");
if (ctl != nil) ctl <-= 0; # report demise
}
{}
echod trace
perky$ echod /tmp/x -d
demon: serving
demon writer: wait
demon reader: recv
perky$ cat /tmp/x & cat /tmp/x &
demon writer: req 812b86c/8192
demon writer: wait
demon writer: req 812b92c/8192
demon writer: wait
perky$ echo hello, world > /tmp/x
demon reader: 7
demon reader: recv
demon writer: data 7
demon writer: reply 812b86c/8192 7
demon writer: wait
hello,
demon writer: req 812b8ec/8192
demon writer: wait
demon reader: 6
demon reader: recv
demon writer: data 6
demon writer: reply 812b92c/8192 6
demon writer: wait
world
demon writer: req 812b86c/8192
demon writer: wait
One notices that reader() and writer() eventually find out that the last client closes the connection.
demon() manages a somewhat larger structure:
![]()
As a service the following threads can be inserted:
![]()
twriter() transports messages from the channel first to a network connection, treader() transports input from the network connection to the channel last. Thus, a network connection is available as a local file.
However, the network connection has to be constructed by reader(), i.e., during the first input to the local file, and a connection to the local file must continue to exist, otherwise writer() in demon() will terminate it all.
perky$ tcp
tcp: usage: tcp [-adrv] machine!port path/file
perky$ tcp -v penny!13 /tmp/daytime
tcp: serving /tmp/daytime
perky$ cat /tmp/daytime & ps
...
5 4 inferno recv 4K Demon
6 4 inferno recv 4K Tcp
7 4 inferno recv 4K Demon
8 4 inferno alt 4K Demon
11 9 inferno release 1K Cat[$Sys]
perky$ echo x >/tmp/daytime
cs: tcp!penny!13 -> /net/tcp/clone 131.173.250.21!13
tcp server: dialed net!penny!13
tcp server: running
tcp writer: 2
0000 780a |x. |
tcp writer: recv
tcp reader: 26
0000 54687520 4a756e20 31392030 303a3039 |Thu Jun 19 00:09|
0010 3a313620 31393937 0a0d |:16 1997.. |
tcp reader: send
tcp reader: 0
Thu Jun 19 00:09:16 1997
tcp reader: eof
perky$ ps
1 1 inferno release 3K Sh[$Sys]
2 1 root alt 6K Cs
15 1 inferno ready 2K Ps[$Sys]
One can see that twriter() and treader() are only created if there is an input and that cat correctly receives the end of file resulting from the implicit disconnect of the daytime connection.
For test purposes the bind flags can be specified explicitly. If necessary, a module Hd
is loaded that can create hexadecimal dumps.
{08/tcp.b}
# serve a network connection on a local path/file
# tcp [-adrv] machine!port path/file
implement Tcp;
include "sys.m"; sys: Sys; FD: import Sys; stderr: ref FD;
include "draw.m"; Context: import Draw;
include "demon.m";
include "hd.m"; dump: Hd;
Tcp: module {
init: fn (ctxt: ref Context, argv: list of string);
};
usage := "usage: tcp [-adrv] machine!port path/file";
vopt := 0;
init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
stderr = sys->fildes(2);
demon := load Demon Demon->PATH;
if (demon == nil)
exits(nil, "cannot load Demon module: "+sys->sprint("%r"));
mount := sys->MBEFORE;
while ((argv = tl argv) != nil) {
arg := hd argv;
if (arg[0] != '-' || arg == "-") break;
if (arg == "--") { argv = tl argv; break; }
for (i := 1; i < len arg; i ++)
case arg[i] {
'a' => mount = sys->MAFTER; # -a: mount after
'd' => ++demon->dbg; # -d: demon dbg
'r' => mount = sys->MREPL; # -r: mount replace
'v' => ++vopt; # -v: verbose
* => exits(nil, usage);
}
}
if (argv == nil || len argv < 2) exits(nil, usage);
if (vopt) {
dump = load Hd Hd->PATH;
if (dump == nil)
exits(nil,"cannot load Hd module: "+sys->sprint("%r"));
}
sys->pctl(sys->NEWPGRP, nil);
input:= chan of array of byte;
output:= chan of array of byte;
ctl := chan of int;
s := demon->demon(srv, mount, ctl, input, output);
if (s != nil) exits(nil, s);
spawn tserver(ctl, addr, input, output);
if (vopt) sys->fprint(stderr, "tcp: serving %s\n", srv);
}
exits (ctl: chan of int, s: string) {
sys->fprint(stderr, "tcp: %s\n", s);
if (ctl != nil) ctl <-= 0;
exit;
}
{}
So that threads in the service can be terminated with a message, exits() gets access to the ctl channel.
{08/tcp.b}
tserver (ctl: chan of int, addr: string, in, out: chan of array of byte) {
msg := <- in; # first message
(ok, c) := sys->dial(addr, nil);
if (ok < 0) exits(ctl, "cannot dial "+addr+": "+sys->sprint("%r"));
if (vopt) sys->fprint(stderr, "tcp server: dialed %s\n", addr);
spawn treader(c.dfd, out);
spawn twriter(msg, in, c.dfd);
if (vopt) sys->fprint(stderr, "tcp server: running\n");
}
{}
tserver() waits for the first write access(!) and builds the network connection using
dial(). From the resulting Connection only the bidirectional data file c.dfd is used.
tcpd works like tcp, but uses
announce() and listen(), to offer a service on a local TCP port:
| Inferno | Unix |
perky$ tcpd
tcpd: usage: tcpd [-adrv] path/file [machine!]port
perky$ tcpd -v /chan/e 12345
tcpd: serving /chan/e
cs: tcp!*!12345 -> /net/tcp/clone 12345
tcpd server: announced net!*!12345
perky$ cat /chan/e >/chan/e &
$ telnet perky 12345
Trying 131.173.161.211... Connected to perky.
Escape character is '^]'.
tcpd server: listened net!*!12345
tcpd server: peer 131.173.250.21!2771
tcpd server: running
tcpd writer: recv
hello, world
tcpd reader: 14
0000 68656c6c 6f2c2077 6f726c64 0d0a |hello, world.. |
tcpd reader: send
tcpd writer: 14
0000 68656c6c 6f2c2077 6f726c64 0d0a |hello, world.. |
hello, world
^]
telnet> q
Connection closed.
tcpd writer: recv
tcpd reader: 0
tcpd reader: send eof
tcpd reader: exit
tcpd writer: exit
perky$ ps
1 1 inferno release 3K Sh[$Sys]
2 1 root alt 6K Cs
48 1 inferno ready 1K Ps[$Sys]
There is a gotcha! in listen(): while a connection is constructed it is the caller's responsibility to create the necessary ref FD.