Copyright ©1997-1998 by Axel T. Schreiner.  All Rights Reserved.



8
Threads and channels

Threads

Inferno only supports threads, i.e., processes with a shared address space and shared resources. Using pctl() a thread can

FORKNS NEWNS split it's namespace from it's creator or start with an empty one.
FORKFD NEWFD control it's file connections alone or start over only with those passed explicitly.
NEWPGRP start a new process group.

Threads in the same process group are terminated together if killgrp is written into the ctl file of one of the threads. sh issues FORKNS upon startup and FORKFD when starting descendants.

The notions of process and thread are mixed up. Limbo only refers to threads that are created using spawn and terminate themselves with exit or return, or which are terminated by kill , i.e., by input into the ctl file . Inferno only refers to processes. I do not see a difference, however.

An address space consists of modules with reference semantics. They are loaded explicitly with load and disappear once there are no more references. Because modules contain global variables, threads must coordinate their accesses, see Monitor .

Channels

A channel is a storage space for one value with a defined type that can, however, only be used for send and receive operations. Channels use reference semantics. There are the following operations:

typ: type (int, string); type declaration
x: chan of typ; channel declaration
x = chan of typ channel creation
x <-= (10, "hallo") send, space is now occupied
(i, s) := <- x receive, space is now available

The punchline is that sending and receiving must take place in different threads because both operations block until the transfer really takes place, i.e., until a sender and a receiver meet at the channel.



demo

demo illustrates the various operations:
{08/demo.b}
implement Demo;

include "sys.m"; sys: Sys;
include "draw.m"; Context: import Draw;

Demo: module {
init: fn (ctxt: ref Context, argv: list of string);
};

typ: type (int, string);

init (nil: ref Context, nil: list of string) {
sys = load Sys Sys->PATH;
a := array[3] of { * => chan of typ }; create 3 channels
spawn rcv(a); create receiver
a[2] <-= (1, "axel");
a[1] <-= (2, "was");
a[0] <-= (3, "here");

}
rcv (a: array of chan of typ) {
(i, s) := <- a[2]; explicit contact
sys->print("%d %s\n", i, s);
waiting: while ()
for (n := 0; n < len a; ++ n)
alt {
(i, s) = <- a[n] => contact if possible

sys->print("%d: %d %s\n", n, i, s);
break waiting;
* =>
sys->print("%d: nope\n", n);
}
(n, (i, s)) = <- a; contact with any element
sys->print("%d: %d %s\n", n, i, s);

}
{}
The syntax array[] of { ... } creates values, i.e., here we get an array with 3 channels. The receiver then gets 3 messages by way of different channels.

perky$ demo
1 axel
0: nichts
1: 2 was
0: 3 here

alt { ... } expects channels in it's tags, controls the first one in each tag and selects a block which has a non-blocking tag. If * is specified, alt does not block if all tags would block, but instead selects the block for *.



If a receive operation is applied to an array of channels, the result is atuple with the index of a non-blocking element and the value received on this channel.

Monitors

Channels can be used to synchronize threads which compete for global variables. The following idea originated in the Limbo language definition:
{08/mon.m}
# mon := Monitor.new()
# mon.lock()  # first call succeeds, subsequent block
# mon.test()  # returns true if attempt to lock successful
# mon.unlock()  # releases lock established by lock/test
# mon.destroy()  # terminates partner thread

Mon: module {
PATH: con "mon.dis";

Monitor: adt {
ch: chan of int; # synchronizing channel
ctl: chan of int; # to abort monitor thread
fix: chan of int; ## repair a bug

new: fn (): ref Monitor;
lock: fn (m: self ref Monitor);
test: fn (m: self ref Monitor): int;
unlock: fn (m: self ref Monitor);
destroy: fn (m: self ref Monitor);
};
};
{}
For synchronization one creates an instance of Monitor:
{08/mon.b}
implement Mon;

include "mon.m";

Monitor.new (): ref Monitor {
result := ref Monitor(chan of int, nil, nil);## create instance
spawn main(result); create partner thread
return result;

}
{}
A thread is created which can take turns to act as a partner for lock(), test(), and unlock(), and which is terminated in destroy().

There is, however, an internal bug (##): If two alt statements attempt to send to the same channels and if one alt as in test() can avoid to send, the other alt subsequently no longer tries to send.



lock() and unlock() are receive and send operations on the channel:
{08/mon.b}
Monitor.lock (m: self ref Monitor) {
m.ch <-= 0;   # blocks until <- in unlock
}

Monitor.unlock (m: self ref Monitor) {
<- m.ch;   # blocks until <-= in lock
}
{}
test() works like lock()but uses an alt statement to avoid blocking:
{08/mon.b}
Monitor.test (m: self ref Monitor): int {
alt {
m.ch <-= 0 => return 1; # could lock
* =>  m.fix <-= 0; ## reset main, if necessary
return 0; # failed to lock
}
}
{}
main() alternately executes unlock() and lock() but additionally monitors ctl:
{08/mon.b}
main (m: ref Monitor) {
m.ctl = chan of int; m.fix = chan of int;
loop: while () {
alt {
<- m.ch => ;  # unlock
<- m.ctl => break loop;
<- m.fix => continue loop; ## reset, if necessary
}
while () alt {   ## loop for reset
m.ch <-= 0 => continue loop; # lock
<- m.ctl => break loop;
<- m.fix => ;  ## reset, if necessary
}
}
m.ctl = nil;
}
{}
Thanks to unlock() in main() the first call to lock() in a client thread will be successful. Further calls to lock() block until unlock() in a client either meets lock() in main() or in another client. Basically it's two cogwheels slowly rolling along each other.

destroy() sends to ctl and thus terminates the other thread in which main() is executed:
{08/mon.b}
Monitor.destroy (m: self ref Monitor) {
if (m.ctl != nil)
m.ctl <-= 0;
}

{}

Monitor assumes that the participating processes cooperate. If a thread is aborted that successfully executed lock() the other threads waiting to lock() will block forever. If destroy() is not called, a thread remains behind -- as is the case with pipeline problems in the shell. (Inferno 2.0 has a device for pipes.)

The dining philosophers

A classic (if somewhat unhygienic) synchronization problem concerns some philosophers who think and eat at a round table, where for eating each requires the two forks to the right and left of his place at the table:

One can employ a monitor as a butler who manages admission to the table but that does not lead to efficient spaghetti consumption.

If a monitor is used for each fork the system can deadlock: each philospher reaches for a fork and blocks reaching for the second one.

If a fork can be obtained conditionally, the following strategy has promise:

grab left fork
if right fork is available:
eat
release right fork
release left fork

Thus, the system won't deadlock, but a philosopher can still die of starvation (while heavily banging his left fork). If the thinkers occasionally confuse left and right, their risk diminishes...



The philosopher

Philosopher is an adt that alternates between thinking and enjoying a multi-course meal using two forks modeled as a Monitor each. If asked, he happily comments on his activities:
{08/phils.b adt}
Philosopher: adt {
me: int;   # identity
fork: array of ref Monitor; # left and right fork

enjoy: fn (p: self ref Philosopher, ncourses, talk: int);
};

Philosopher.enjoy (p: self ref Philosopher, ncourses, talk: int) {
course: for (n := 1; n <= ncourses; ++ n) {
if (talk) sys->print("%d thinks\n", p.me);
sleep();
for ((l, r) := (p.me % 2, !(p.me % 2)); ; (l,r) = (r,l)) {
p.fork[l].lock();
if (p.fork[r].test()) {
if (talk) sys->print("%d eats\n", p.me);
sleep();
p.fork[r].unlock();
p.fork[l].unlock();
continue course;
} else if (talk) sys->print("%d missed\n", p.me);
p.fork[l].unlock();
}
}
if (talk) sys->print("%d leaves\n", p.me);
}

sleep() {
sys->sleep(random->randomint() & 1023);
}
{}
Whether a Philosopher assumes fork[0] or fork[1] to be his left fork, depends on his serial number me. He changes his mind whenever he is not successful.

sleep() demonstrates how the Random module and lastly #c/random can be used to delay for a randomly chosen number of milliseconds.



The meal
{08/phils.b}
implement Phils;

include "sys.m"; sys: Sys; stderr: ref Sys->FD;
include "draw.m"; Context: import Draw;
include "mon.m"; mon: Mon; Monitor: import mon;
include "keyring.m";
include "security.m"; random: Random;

Phils: module {
init: fn (ctxt: ref Context, argv: list of string);
};

usage () {
sys->fprint(stderr, "usage: phils [-tw] #phils #courses\n");
exit;
}
{08/phils.b init}
init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
stderr = sys->fildes(2);

mon = load Mon Mon->PATH;
if (mon == nil) {
sys->fprint(stderr, "phils: cannot load Mon: %r\n");
exit;
}
random = load Random Random->PATH;
if (random == nil) {
sys->fprint(stderr, "phils: cannot load Random: %r\n");
exit;
}

topt, wopt: int = 0;
while ((argv = tl argv) != nil) {
arg := hd argv;
if (arg[0] != '-') break;
for (i := 1; i < len arg; i ++)
case arg[i] {
't' => ++ topt; # -t: talk
'w' => ++ wopt; # -w: wait
* => usage();
}
}
if (argv == nil || len argv < 2) usage();
n := int hd argv;   # number of philosophers
if (n <= 1) usage();
nc := int hd tl argv;   # number of courses
if (nc < 1) usage();



fork := array[n] of { * => Monitor.new() };
phil := array[n] of
{ 0 => ref Philosopher(0, array[] of {fork[0], fork[n-1]}) };
for (p := 1; p < n; ++ p)
phil[p] = ref Philosopher(p, array[] of {fork[p-1], fork[p]});

wait: ref sys->FD;
if (wopt) {
pid := string sys->pctl(0, nil);
wait = sys->open("/prog/"+pid+"/wait", sys->OREAD);
if (wait == nil) {
sys->fprint(stderr, "phils: %s: cannot open: %r\n",
"/prog/"+pid+"/wait");
exit;
}
}

for (p = 0; p < n; ++ p)
spawn phil[p].enjoy(nc, topt);

if (wopt) {
buf := array[512] of byte;
for (p = 0; p < n; ++ p) {
l := sys->read(wait, buf, len buf);
sys->print("%s\n", string buf[0:l]);
}
for (f := 0; f < n; ++ f)
fork[f].destroy();
}
}
{}
First the necessary modules are loaded and the number of philosophers and courses are read from the command line. Then the forks are set up, passed to the philosopher and enjoy() is run in various threads.

If one wants to wait until threads are terminated, the creator first has to get read access to the wait file . Every thread created afterwards upon exit leaves exactly one message in this file which can be retrieved with read().

In this case the forks' threads can be removed. As an alternative, all threads, philosophers and forks, could have been collected in a separate process group and eliminated altogether by writing killgrp into any of the ctl files.



Channels and files

file2chan("path", "file", flags) creates a file #sfile/file and then binds the directory #sfile according to the usual flags at path into the namespace. As a result there is a new file path/file which apparently can only be eliminated using unmount(nil, "path") . /chan for path and a number for file are typical values; this is used to make pipes .

The punchline is the result of file2chan(): One gets two channels to manage read, write, and close operations for the file. Open operations cannot be recognized as such.

The following arrangement installs a file for an echo server, i.e., whatever is written into the file can be read back:

reader() and writer() are two threads that send data written to path/file over a channel data and make it available for reading from path/file.

server() is another thread that terminates it's process group if it receives something from the channel ctl. The threads executing reader(), writer(), and server() all belong to the same process group. If writer() notices that the last client has disconnected from path/file, it sends on ctl and thus terminates the threads.



demon

The arrangement is useful enough to be packaged as a module:
{08/demon.m}
Demon: module {
PATH: con "demon.dis";

demon: fn (srv: string, mount: int, ctl: chan of int,
first, last: chan of array of byte): string;
dbg: int;
};
{}
srv and mount are the information for file2chan(). ctl may be nil, than server() does not exist and reader() and writer() are not terminated. first and last represent data but in such a fashion that more threads can be inserted inbetween.

echod

echod shows how easily a service can be set up with demon():
{08/echod.b}
# provide echo service on a local path/file
# echod path/file

implement Echod;

include "sys.m"; sys: Sys;
include "draw.m"; Context: import Draw;
include "demon.m";

Echod: module {
init: fn (ctxt: ref Context, argv: list of string);
};
init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
demon := load Demon Demon->PATH;
if (demon == nil)
exits("cannot load Demon module: "+sys->sprint("%r"));
demon->dbg = len argv > 2; # extra argument for debugging

if (argv == nil || len argv < 2)
exits("usage: echod path/file [-d]");

sys->pctl(sys->NEWPGRP, nil); # new process group
data := chan of array of byte; # channel for echo service
s := demon->demon(hd tl argv, sys->MBEFORE, chan of int, data, data);
if (s != nil) exits(s);
}     # server in background...



exits (s: string) {
sys->fprint(sys->fildes(2), "echod: %s\n", s);
exit;
}
{}
Because ctl is not nil the service terminates once the last access to the file ends.

Demon
{08/demon.b}
implement Demon;

include "sys.m"; sys: Sys; FD, FileIO: import Sys; stderr: ref FD;
include "regex.m";
include "demon.m";

demon (srv: string, mount: int, ctl: chan of int,
first, last: chan of array of byte): string {
if (sys == nil) sys = load Sys Sys->PATH;
if (dbg) stderr = sys->fildes(2);

regex := load Regex Regex->PATH;
if (regex == nil)
return "cannot load Regex module: "+sys->sprint("%r");

# path/file
(beg, nil) := regex->execute(regex->compile("[^/]/[^/]+$"), srv);
if (beg < 0)
return "unsuitable path: "+srv;

io := sys->file2chan(srv[0:beg+1], srv[beg+2:], mount);
if (io == nil)
return srv+": cannot create: "+sys->sprint("%r");

if (ctl != nil)
spawn server(io, first, last, ctl);
else {
spawn reader(io, first);
spawn writer(last, io, nil);
}
return nil;
}
{}
A regular expression is used to split srv into dirname and basename which have to be passed separately to file2chan(); in reality #sfilename/filename is bound to dirname.



demon() or server() start reader() and writer() as threads. server() then waits for ctl and terminates the process group:
{08/demon.b}
server (io: ref FileIO, first, last: chan of array of byte, ctl: chan of int) {
pgrp := "#p/"+string sys->pctl(0, nil)+"/ctl";

spawn reader(io, first);
spawn writer(last, io, ctl);
if (dbg) sys->fprint(stderr, "demon: serving\n");

<- ctl;
fd := sys->open(pgrp, sys->OWRITE);
if (fd != nil)
sys->fprint(fd, "killgrp"); # terminate process group
if (dbg) sys->fprint(stderr, "demon: killed\n");
}
{}
reader() waits for input and sends it into the channel first. Once the last channel closes it's input connection nil must definitely be sent as a last message to first; otherwise the service might wait forever for first.
{08/demon.b}
reader (io: ref FileIO, first: chan of array of byte) {
loop: while () {
if (dbg) sys->fprint(stderr, "demon reader: recv\n");
(nil, data, nil, wc) := <- io.write;
if (wc == nil) break;  # close on srv
retry: for (retry := 0;;) alt {
wc <-= (len data, nil) => # reply successful write...
if (dbg) sys->fprint(stderr,
"demon reader: %d\n", len data);
break retry;
* =>    # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon reader: %d: fail\n", len data);
break loop;
}
}
first <-= data;   # ...and pass data on
}
if (dbg) sys->fprint(stderr, "demon reader: eof\n");
first <-= nil;    # eof
if (dbg) sys->fprint(stderr, "demon reader: exit\n");
}
{}
alt is supposed to keep reader() from getting permanently blocked if the last client is terminated during a write operation.



Unfortunately, writer() is very complicated. One has to simultaneously recognize that a client closes it's connection or that data is available on last -- this requires alt. There could be several requests from different clients which should be answered FIFO. If data has been taken from last or if bytes are leftover from before, last should no longer be queried in order to slow the service down.

setup() receives a FIFO list of queries from the client and potentially information about bytes that are already available and ensures that either bytes and at least one query is available or that it is clear that the last client has closed the connection:
{08/demon.b}
reply: type (array of byte, string);   # rc <-=
tupel: type (chan of reply, int);   # rc, count
transfer: type (list of tupel, int, array of byte); # queue, bytes, data
close: transfer = (nil, 0, nil);   # preemptive

setup (queue: list of tupel, bytes: int, data: array of byte,
last: chan of array of byte, io: ref FileIO): transfer {
while (bytes < 0) {    # need data and request
if (dbg) sys->fprint(stderr, "demon writer: wait\n");
alt {
(nil, count, nil, rc) := <- io.read => # got request
if (rc == nil) return close; # closed srv
if (dbg) sys->fprint(stderr,
"demon writer: req %x/%d\n", rc, count);
queue = rev((rc, count)::rev(queue));
data = <- last =>   # got data
if (data == nil) bytes = 0; # got eof
else bytes = len data;  # got buffer
if (dbg) sys->fprint(stderr,
"demon writer: data %d\n", bytes);
}
}
if (len queue > 0)    # can dequeue request
return (queue, bytes, data);
if (dbg) sys->fprint(stderr, "demon writer: recv\n");
(nil, count, nil, rc) := <- io.read;  # block for request
if (rc == nil) return close;   # closed srv
if (dbg) sys->fprint(stderr, "demon writer: req %x/%d\n", rc, count);
return ((rc, count)::nil, bytes, data);
}
{}



rev() reverses a tupel list:
{08/demon.b}
rev (q: list of tupel): list of tupel {
if (len q <= 1) return q;
for (p := hd q::nil; (q = tl q) != nil; p = hd q::p)
;
return p;
}
{}
Given setup(), the writer() is only concerned with transmitting:
{08/demon.b}
writer (last: chan of array of byte, io: ref FileIO, ctl: chan of int) {
queue: list of tupel;   # request fifo, hd is oldest
bytes := -1; data: array of byte; # available data

loop: while () {
(queue, bytes, data) = setup(queue, bytes, data, last, io);
if (queue == nil) break;
(rc, count) := hd queue; queue = tl queue;
if (dbg) sys->fprint(stderr,
"demon writer: reply %x/%d %d\n", rc, count, bytes);
if (bytes > count)  # must save leftover bytes
for (retry := 0;;) alt {
rc <-= (data[0:count], nil) =>
data = data[count:]; bytes -= count;
continue loop;
* =>   # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon writer: failed\n");
continue loop;
}
}
else    # can send all
for (retry = 0;;) alt {
rc <-= (data, nil) =>
if (bytes == 0)
break loop; # send eof just once
bytes = -1;
continue loop;
* =>   # reply could be invalid
if (retry++ < 1) sys->sleep(500);
else {
if (dbg) sys->fprint(stderr,
"demon writer: failed\n");
continue loop;
}
}
}
if (dbg) sys->fprint(stderr, "demon writer: exit\n");
if (ctl != nil) ctl <-= 0;  # report demise
}
{}



One definitely has to consider that a reply channel may have become invalid because the corresponding client was terminated; in this case the data is passed to the next query.

echod trace

perky$ echod /tmp/x -d
demon: serving
demon writer: wait
demon reader: recv
perky$ cat /tmp/x & cat /tmp/x &
demon writer: req 812b86c/8192
demon writer: wait
demon writer: req 812b92c/8192
demon writer: wait
perky$ echo hello, world > /tmp/x
demon reader: 7
demon reader: recv
demon writer: data 7
demon writer: reply 812b86c/8192 7
demon writer: wait
hello,
demon writer: req 812b8ec/8192
demon writer: wait
demon reader: 6
demon reader: recv
demon writer: data 6
demon writer: reply 812b92c/8192 6
demon writer: wait
world
demon writer: req 812b86c/8192
demon writer: wait



perky$ ps
...
78       76    inferno    release     1K Cat[$Sys]
79       77    inferno    release     1K Cat[$Sys]
perky$ kill 78; echo nice going > /tmp/x
demon reader: 5
demon reader: recv
demon writer: data 5
demon writer: reply 812b8ec/8192 5
nice
demon writer: wait
demon writer: req 812b92c/8192
demon writer: wait
demon reader: 6
demon writer: data 6
demon writer: reply 812b86c/8192 6
demon reader: recv
demon writer: failed
demon writer: reply 812b92c/8192 6
demon writer: wait
going
demon writer: req 81275cc/8192
demon writer: wait
perky$ kill 79
demon writer: exit
demon reader: eof
demon: killed

One notices that reader() and writer() eventually find out that the last client closes the connection.



tcp

demon() manages a somewhat larger structure:

As a service the following threads can be inserted:

twriter() transports messages from the channel first to a network connection, treader() transports input from the network connection to the channel last. Thus, a network connection is available as a local file.

However, the network connection has to be constructed by reader(), i.e., during the first input to the local file, and a connection to the local file must continue to exist, otherwise writer() in demon() will terminate it all.



Example: daytime service from port 13

perky$ tcp
tcp: usage: tcp [-adrv] machine!port path/file
perky$ tcp -v penny!13 /tmp/daytime
tcp: serving /tmp/daytime
perky$ cat /tmp/daytime & ps
...
5        4    inferno       recv     4K Demon
6        4    inferno       recv     4K Tcp
7        4    inferno       recv     4K Demon
8        4    inferno        alt     4K Demon
11        9    inferno    release     1K Cat[$Sys]
perky$ echo x >/tmp/daytime
cs: tcp!penny!13 -> /net/tcp/clone 131.173.250.21!13
tcp server: dialed net!penny!13
tcp server: running
tcp writer: 2
0000 780a                                |x.              |
tcp writer: recv
tcp reader: 26
0000 54687520 4a756e20 31392030 303a3039 |Thu Jun 19 00:09|
0010 3a313620 31393937 0a0d              |:16 1997..      |
tcp reader: send
tcp reader: 0
Thu Jun 19 00:09:16 1997
tcp reader: eof
perky$ ps
1        1    inferno    release     3K Sh[$Sys]
2        1       root        alt     6K Cs
15        1    inferno      ready     2K Ps[$Sys]

One can see that twriter() and treader() are only created if there is an input and that cat correctly receives the end of file resulting from the implicit disconnect of the  daytime connection.



Implementation

For test purposes the bind flags can be specified explicitly. If necessary, a module Hd is loaded that can create hexadecimal dumps.
{08/tcp.b}
# serve a network connection on a local path/file
# tcp [-adrv] machine!port path/file

implement Tcp;

include "sys.m"; sys: Sys; FD: import Sys; stderr: ref FD;
include "draw.m"; Context: import Draw;
include "demon.m";
include "hd.m";  dump: Hd;

Tcp: module {
init: fn (ctxt: ref Context, argv: list of string);
};

usage := "usage: tcp [-adrv] machine!port path/file";
vopt := 0;

init (nil: ref Context, argv: list of string) {
sys = load Sys Sys->PATH;
stderr = sys->fildes(2);

demon := load Demon Demon->PATH;
if (demon == nil)
exits(nil, "cannot load Demon module: "+sys->sprint("%r"));

mount := sys->MBEFORE;
while ((argv = tl argv) != nil) {
arg := hd argv;
if (arg[0] != '-' || arg == "-") break;
if (arg == "--") { argv = tl argv; break; }
for (i := 1; i < len arg; i ++)
case arg[i] {
'a' => mount = sys->MAFTER; # -a: mount after
'd' => ++demon->dbg;  # -d: demon dbg
'r' => mount = sys->MREPL; # -r: mount replace
'v' => ++vopt;   # -v: verbose
* => exits(nil, usage);
}
}
if (argv == nil || len argv < 2) exits(nil, usage);

if (vopt) {
dump = load Hd Hd->PATH;
if (dump == nil)
exits(nil,"cannot load Hd module: "+sys->sprint("%r"));
}



addr := "net!" + hd argv;  # net!machine!port
srv := hd tl argv;   # path/file

sys->pctl(sys->NEWPGRP, nil);
input:= chan of array of byte;
output:= chan of array of byte;
ctl := chan of int;

s := demon->demon(srv, mount, ctl, input, output);
if (s != nil) exits(nil, s);

spawn tserver(ctl, addr, input, output);
if (vopt) sys->fprint(stderr, "tcp: serving %s\n", srv);
}

exits (ctl: chan of int, s: string) {
sys->fprint(stderr, "tcp: %s\n", s);
if (ctl != nil) ctl <-= 0;
exit;
}
{}
So that threads in the service can be terminated with a message, exits() gets access to the ctl channel.
{08/tcp.b}
tserver (ctl: chan of int, addr: string, in, out: chan of array of byte) {
msg := <- in;    # first message
(ok, c) := sys->dial(addr, nil);
if (ok < 0) exits(ctl, "cannot dial "+addr+": "+sys->sprint("%r"));
if (vopt) sys->fprint(stderr, "tcp server: dialed %s\n", addr);

spawn treader(c.dfd, out);
spawn twriter(msg, in, c.dfd);
if (vopt) sys->fprint(stderr, "tcp server: running\n");
}
{}
tserver() waits for the first write access(!) and builds the network connection using dial(). From the resulting Connection only the bidirectional data file c.dfd is used.



{08/tcp.b}
treader (in: ref FD, out: chan of array of byte) {
msg := array[8192] of byte;
loop: while ()
case n := sys->read(in, msg, len msg) {
-1 => sys->fprint(stderr, "tcp: input error: %r\n");
break loop;
0 => if (vopt) sys->fprint(stderr, "tcp reader: 0\n");
break loop;
* => if (vopt) sys->fprint(stderr, "tcp reader: %d\n%s\n",
n, dump->dump(4, 16, 0, msg[0:n]));
if (vopt) sys->fprint(stderr, "tcp reader: send\n");
out <-= msg[0:n];
}
out <-= nil;   # send eof to client
if (vopt) sys->fprint(stderr, "tcp reader: eof\n");
}
{}
treader() reads from the network connection and writes the complete input to the channel. In case of error nil has to be written into the channel.
{08/tcp.b}
twriter (msg: array of byte, in: chan of array of byte, out: ref FD) {
while (msg != nil) {
if (vopt) sys->fprint(stderr, "tcp writer: %d\n%s\n",
len msg, dump->dump(4, 16, 0, msg));
if (sys->write(out, msg, len msg) != len msg)
exits(nil, "output error: "+sys->sprint("%r"));
if (vopt) sys->fprint(stderr, "tcp writer: recv\n");
msg = <- in;
}
if (vopt) sys->fprint(stderr, "tcp writer: exit\n");
}
{}
twriter() reads from the channel and writes to the network connection. Because tserver() is one channel input ahead, twriter() has to write first. No access to ctl is required because reader() has to have a chance to send an end of file to the client.


tcpd

tcpd works like tcp, but uses announce() and listen(), to offer a service on a local TCP port:

Inferno Unix

perky$ tcpd
tcpd: usage: tcpd [-adrv] path/file [machine!]port
perky$ tcpd -v /chan/e 12345
tcpd: serving /chan/e
cs: tcp!*!12345 -> /net/tcp/clone 12345
tcpd server: announced net!*!12345
perky$ cat /chan/e >/chan/e &
$ telnet perky 12345
Trying 131.173.161.211... Connected to perky.
Escape character is '^]'.
tcpd server: listened net!*!12345
tcpd server: peer 131.173.250.21!2771
tcpd server: running
tcpd writer: recv
hello, world
tcpd reader: 14
0000 68656c6c 6f2c2077 6f726c64 0d0a     |hello, world..  |
tcpd reader: send
tcpd writer: 14
0000 68656c6c 6f2c2077 6f726c64 0d0a     |hello, world..  |
hello, world
^]
telnet> q
Connection closed.
tcpd writer: recv
tcpd reader: 0
tcpd reader: send eof
tcpd reader: exit
tcpd writer: exit
perky$ ps
1        1    inferno    release     3K Sh[$Sys]
2        1       root        alt     6K Cs
48        1    inferno      ready     1K Ps[$Sys]

There is a gotcha! in listen(): while a connection is constructed it is the caller's responsibility to create the necessary ref FD.

14/Apr/1998