summaryrefslogtreecommitdiff
path: root/tools/hi-q.c
diff options
context:
space:
mode:
authorJohn Denker <jsd@av8n.com>2012-07-23 19:42:23 (GMT)
committerJohn Denker <jsd@av8n.com>2012-07-23 19:42:23 (GMT)
commitfdaeef07dffa8894672da5e51f63d467d452c7c9 (patch)
treef0f2e77d7ff0520dc9d34ee0fecb67ea2c311bc7 /tools/hi-q.c
parent8eeff9d54790fdc8b3d9e069d191487417102476 (diff)
much more logical about keeping track of pipes and how they are used
Diffstat (limited to 'tools/hi-q.c')
-rw-r--r--tools/hi-q.c195
1 files changed, 122 insertions, 73 deletions
diff --git a/tools/hi-q.c b/tools/hi-q.c
index 114570f..5ee7688 100644
--- a/tools/hi-q.c
+++ b/tools/hi-q.c
@@ -64,6 +64,7 @@ foo_sa(CONFIG, 78, "configuration error") ;\
foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)"
+typedef enum {MSG, ENV} channeler;
#define bufsize 16384
@@ -198,6 +199,7 @@ void slurp(const int inch, const int ouch){
ssize_t todo;
for (;;) {
ssize_t got = read(inch, buf, bufsize);
+ //xx cerr << "slurp: read returns " << got << endl;
if (got == 0) { // EoF
break;
}
@@ -210,6 +212,7 @@ void slurp(const int inch, const int ouch){
todo = got;
while (todo) {
ssize_t sent = write(ouch, buf, todo);
+ //xx cerr << "slurp: write returns " << sent << endl;
if (sent < 0 && errno != EINTR) {
fprintf(stderr, "hi-q: output error on fd%d : ", ouch);
perror(0);
@@ -282,6 +285,22 @@ string basename(const string path){
return path;
}
+void attach(const int pipe_end, const int fd, const int kidno){
+ cerr << "attaching current pipe_end " << pipe_end
+ << " to " << fd
+ << " for " << kidno << endl;
+ if (pipe_end != fd) {
+ int rslt = dup2(pipe_end, fd);
+ if (rslt < 0) {
+ fprintf(stderr, "hi-q: dup2(%d,%d) failed for kid %d : ", pipe_end, fd, kidno);
+ perror(0);
+ exit(ex_syserr);
+ }
+ close(pipe_end);
+ }
+
+}
+
int main(int argc, char** argv) {
{
progname = *argv;
@@ -299,7 +318,6 @@ bar
int kidstatus;
int rslt;
- int loose_end = 0; // our original stdin
typedef vector<string> VS;
vector<jobber> filter;
@@ -395,11 +413,28 @@ bar
// to close it and dup() something useful onto it.
map<int,int> iiofpid;
-
+ map<channeler,int> next_read;
+ next_read[MSG] = 0; // our original stdin
+ next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info
+ int slurp_read(1); // our original non-standard input
+ int slurp_write = -1; // effectively next_write[ENV];
+ map<channeler,int> current_read;
+ map<channeler,int> cur_write; // current kid writes here
+ cur_write[MSG] = -1;
+ cur_write[ENV] = -1;
+
+// important loop to start all kids
for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */
- //xx cerr << "top of loop ... loose end " << loose_end << " for " << ii << endl;
- if (loose_end > 20) exit(99);
- int kid_end;
+ current_read = next_read;
+
+ cerr << "top of loop: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << cur_write[MSG]
+ << " w.ENV: " << cur_write[ENV]
+ << " for " << ii << endl;
+ if (current_read[MSG] > 20) exit(99);
+ if (current_read[ENV] > 20) exit(99);
int datapipe[2];
@@ -407,17 +442,15 @@ bar
case series:
case qq:
case sa:
-// connect *old* loose end to this kid's stdin
- //xx cerr << "moving old loose end " << loose_end << " to 0 for " << ii << endl;
- if (loose_end) {
- close(0);
- dup2(loose_end, 0);
- close(loose_end);
- }
-// Create a pipe, which will be used to connect
-// this child's fd1 to the next child's fd0 ...
-// except for the last kid, which reads both fd0 and fd1,
+// Create a new pipe.
+// Pipe must be created here (in the parent).
+// The intended bindings must be figured out shortly below.
+// Some of the bindings must be hooked up later (in the child),
+// while others are used by the parent (e.g. envelope slurp).
+// This pipe will be used (by the children) to connect
+// this child's output to the next child's input ...
+// except for the special kid, which reads both fd0 and fd1,
// while writing nothing.
rslt = pipe(datapipe);
if (rslt < 0) {
@@ -425,6 +458,10 @@ bar
perror(0);
exeunt(ex_syserr);
}
+ if (1) cerr << "new pipe"
+ << " reading: " << datapipe[rEnd]
+ << " writing: " << datapipe[wEnd]
+ << endl;
break;
case postspam:
case stub:
@@ -435,20 +472,23 @@ bar
exeunt(ex_syserr);
}
-// For N-1 kids, the loose end feeds forward.
-// It will be written by this kid and read by the next kid.
-// For the special kid, the loose end will be its nonstandard input.
-// It will be written by us (hi-q) and read by the last kid.
-
+// figure out the intended bindings:
switch (filter[ii].mode) {
- case series:
case sa:
- loose_end = datapipe[rEnd];
- kid_end = datapipe[wEnd];
+ case series:
+ cur_write[MSG] = datapipe[wEnd];
+ next_read[MSG] = datapipe[rEnd];
break;
case qq:
- loose_end = datapipe[wEnd]; // reverse of normal "series" case
- kid_end = datapipe[rEnd]; // reverse of normal "series" case
+ if (slurp_write >= 0){
+ cerr << "???? multiple qq jobs?" << endl;
+ }
+ slurp_write= datapipe[wEnd];
+ current_read[ENV] = datapipe[rEnd];
+ next_read[ENV] = -1;
+ next_read[MSG] = -1;
+ cur_write[ENV] = -1;
+ cur_write[MSG] = -1;
break;
case postspam:
case stub:
@@ -467,7 +507,7 @@ bar
}
iiofpid[kidpid[ii]] = ii;
if (!kidpid[ii]) { /*** child code ***/
- if (verbose) cerr << "top of kid ... loose end " << loose_end << " for " << ii << endl;
+ if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl;
pid_t kidgroup(0); // process group for all kids is
// equal to pid of kid#0
@@ -513,31 +553,19 @@ bar
}
}
- if (0) cerr << "before closing loose end " << loose_end
- << " and kid end " << kid_end
- << " for " << ii << endl;
switch (filter[ii].mode){
- case sa:
case qq:
+ attach(current_read[MSG], 0, ii);
+ attach(current_read[ENV], 1, ii);
+ break;
+ case sa:
case series:
- close(loose_end); // the reading end is none of this kid's business
- // except last kid: writing end
-
- // Note this does an implicit close on the previously-open fd1:
- rslt = dup2(kid_end, 1); // the writing end is stdout for this kid
- // except last kid: nonstandard input
- if (rslt < 0) {
- fprintf(stderr, "hi-q: kid %d: dup2(%d,1) failed: ", ii, kid_end);
- perror(0);
- exit(ex_syserr);
- }
- close(kid_end); // use fd1 instead now
- // OK, at this point this kid is set up to read fd0 and write fd1
- // (except last kid reads fd1 as well as fd0).
+ attach(current_read[MSG], 0, ii);
+ attach(cur_write[MSG], 1, ii);
break;
case stub:
case postspam:
- // nothing to do
+ // nothing to hook up; no pipe was even created.
break;
case fail:
cerr << "should never happen: invalid filter" << endl;
@@ -545,6 +573,12 @@ bar
break;
}
+// in all modes:
+// close envelope channel in kid space
+// (leaving it open in parent space)
+ close(current_read[ENV]);
+ close(slurp_write);
+
//// probe_fd();
int ntok = filter[ii].cmd.size();
@@ -576,7 +610,10 @@ bar
perror(0);
exeunt(ex_syserr);
}
- close(kid_end);
+
+// these tricks are for kid:
+ close(cur_write[MSG]);
+ close(cur_write[ENV]);
// Let kid #0 run a little ways:
if (ii==0) {
@@ -598,6 +635,7 @@ bar
} /* end loop starting all kids */
// here with the whole pipeline of kids launched
+// parent program continues
close(resync[wEnd]); // important, so that block gets released
close(resync[rEnd]); // less important, just housecleaning
@@ -730,35 +768,46 @@ bar
// Here if all filters agree this is not spam.
// Now it is safe to transfer the envelope information:
- slurp(1, loose_end);
- close(1);
- close(loose_end);
+
+ if (0) cerr << "about to slurp: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << cur_write[MSG]
+ << " w.ENV: " << cur_write[ENV]
+ << " slurp_read: " << slurp_read
+ << " slurp_write: " << slurp_write
+ << endl;
+
+ slurp(slurp_read, slurp_write);
+ close(slurp_write);
+ close(slurp_read);
// now that the envelope information has been transfered,
// wait for the last kid in the usual way
- {
- for(;;) {
- waitpid(special_pid, &kidstatus, WUNTRACED);
- if (WIFEXITED(kidstatus)) {
- int sts = WEXITSTATUS(kidstatus);
- cerr << progid
- << " says: qq program"
- << " i.e. " << basename(filter[nkids-1].cmd[0])
- << "[" << kidpid[nkids-1] << "]"
- << " returned status " << sts
- << endl;
- return sts;
- } else if (WIFSIGNALED(kidstatus)) {
- cerr << progid
- << " says: qq program"
- << " i.e. " << basename(filter[nkids-1].cmd[0])
- << "[" << kidpid[nkids-1] << "]"
- << " was killed by signal " << WTERMSIG(kidstatus)
- << endl;
- return ex_syserr;
- } else {
- /* paused, not dead */
- }
+
+ for(;;) {
+ waitpid(special_pid, &kidstatus, WUNTRACED);
+ if (WIFEXITED(kidstatus)) {
+ int sts = WEXITSTATUS(kidstatus);
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " returned status " << sts
+ << endl;
+ return sts;
+ } else if (WIFSIGNALED(kidstatus)) {
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " was killed by signal " << WTERMSIG(kidstatus)
+ << endl;
+ return ex_syserr;
+ } else {
+ /* paused, not dead */
}
- }
+ } /* loop until all kids accounted for */
+ // should never get here;
+ // exit from within loop is the only way out
}