Lines Matching +full:queue +full:- +full:group
2 * Copyright (c) 1998-2009, 2011, 2012, 2014 Proofpoint, Inc. and its suppliers.
4 * Copyright (c) 1983, 1995-1997 Eric P. Allman. All rights reserved.
17 SM_RCSID("@(#)$Id: queue.c,v 8.1000 2013-11-22 20:51:56 ca Exp $")
53 /* XREF: op.me: QUEUE FILE FORMAT: V */
54 #define QF_VERSION 8 /* version number of this queue format */
59 /* Naming convention: qgrp: index of queue group, qg: QUEUEGROUP */
62 ** Work queue.
74 int w_qgrp; /* queue group located in */
75 int w_qdir; /* queue directory located in */
76 struct work *w_next; /* next in queue */
81 static WORK *WorkQ; /* queue of things to be done */
92 if (Current_LA_time < now - GET_NEW_LA_TIME) \
100 ** DoQueueRun indicates that a queue run is needed.
104 static bool volatile DoQueueRun; /* non-interrupt time queue run needed */
107 ** Work group definition structure.
108 ** Each work group contains one or more queue groups. This is done
109 ** to manage the number of queue group runners active at the same time
111 ** The number of queue groups that can be run on the next work run
112 ** is kept track of. The queue groups are run in a round robin.
117 int wg_numqgrp; /* number of queue groups in work grp */
119 int wg_curqgrp; /* current queue group */
120 QUEUEGRP **wg_qgs; /* array of queue groups */
122 time_t wg_lowqintvl; /* lowest queue interval */
133 "@(#)$Debug: leak_q - trace memory leaks during queue processing $");
173 ** on a file system in which one or more queue directories reside.
186 static FILESYS FileSys[MAXFILESYS]; /* queue file systems */
196 ** size -- size of shared memory segment
197 ** pid -- pid of owner, should be a unique id to avoid misinterpretations
199 ** tag -- should be a unique id to avoid misinterpretations by others.
201 ** NumFileSys -- number of file systems.
202 ** FileSys -- (array of) structure for used file systems.
203 ** RSATmpCnt -- counter for number of uses of ephemeral RSA key.
204 ** [OCC -- ...]
205 ** QShm -- (array of) structure for information about queue directories.
210 ** Queue data in shared memory
222 static FILESYS *PtrFileSys; /* pointer to queue file system array */
224 static QUEUE_SHM_T *QShm; /* pointer to shared queue data */
263 ** HASH_Q -- simple hash function
266 ** p -- string to hash.
267 ** h -- hash start value (from previous run).
314 ** L Solaris Content-Length: header (obsolete)
320 ** r final recipient (Final-Recipient: DSN field)
324 ** V queue file version
333 ** QUEUEUP -- queue a message up for future transmission.
336 ** e -- the envelope to queue up.
337 ** flags -- QUP_FL_*:
338 ** QUP_FL_ANNOUNCE -- tell when queueing up.
339 ** QUP_FL_MSYNC -- fsync() if SuperSafe interactive mode.
340 ** QUP_FL_UNLOCK -- invoke unlockqueue().
347 ** The queue file is left locked.
358 int tfd = -1;
387 newid = (e->e_id == NULL) || !bitset(EF_INQUEUE, e->e_flags);
389 tfp = e->e_lockfp;
394 ** already exists and hence prevent problems if a queue-id
412 syserr("!queueup: cannot create queue file %s, euid=%ld, fd=%d, fp=%p",
416 e->e_lockfp = tfp;
420 /* if newid, write the queue file directly (instead of temp file) */
434 sm_syslog(LOG_ALERT, e->e_id,
455 sm_syslog(LOG_ALERT, e->e_id,
461 tfd = -1;
481 syserr("!queueup: cannot create queue temp file %s, uid=%ld",
488 qid_printqueue(e->e_qgrp, e->e_qdir),
499 printaddr(sm_debug_file(), e->e_sendqueue, true);
506 if (e->e_lockfp == NULL)
509 dumpfd(sm_io_getinfo(e->e_lockfp, SM_IO_WHAT_FD, NULL),
518 if (bitset(EF_HAS_DF, e->e_flags))
520 if (e->e_dfp != NULL &&
523 sm_io_setinfo(e->e_dfp, SM_BF_COMMIT, NULL) < 0 &&
529 if (e->e_dfp != NULL &&
533 sm_syslog(LOG_INFO, e->e_id,
534 "queueup: fsync(e->e_dfp)");
536 if (fsync(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD,
555 if (e->e_dfp != NULL &&
556 sm_io_getinfo(e->e_dfp, SM_IO_WHAT_ISTYPE, BF_FILE_TYPE))
571 e->e_dfino = -1;
574 e->e_dfdev = stbuf.st_dev;
575 e->e_dfino = ST_INODE(stbuf);
577 e->e_flags |= EF_HAS_DF;
581 (*e->e_putbody)(&mcibuf, e, NULL);
589 sm_syslog(LOG_INFO, e->e_id,
606 e->e_putbody = putbody;
615 /* output queue version number (must be first!) */
619 (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "T%ld\n", (long) e->e_ctime);
622 (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "K%ld\n", (long) e->e_dtime);
625 (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "N%d\n", e->e_ntries);
628 (void) sm_io_fprintf(tfp, SM_TIME_DEFAULT, "P%ld\n", e->e_msgpriority);
631 ** If data file is in a different directory than the queue file,
635 if (e->e_dfqgrp != e->e_qgrp)
638 Queue[e->e_dfqgrp]->qg_qpaths[e->e_dfqdir].qp_name);
642 if (e->e_dfino != -1)
645 (long) major(e->e_dfdev),
646 (long) minor(e->e_dfdev),
647 (ULONGLONG_T) e->e_dfino);
651 if (e->e_bodytype != NULL)
653 denlstring(e->e_bodytype, true, false));
656 if (e->e_quarmsg != NULL)
658 denlstring(e->e_quarmsg, true, false));
661 if (e->e_message != NULL)
663 denlstring(e->e_message, true, false));
667 if (bitset(EF_WARNING, e->e_flags))
669 if (bitset(EF_RESPONSE, e->e_flags))
671 if (bitset(EF_HAS8BIT, e->e_flags))
673 if (bitset(EF_DELETE_BCC, e->e_flags))
675 if (bitset(EF_RET_PARAM, e->e_flags))
677 if (bitset(EF_NO_BODY_RETN, e->e_flags))
679 if (bitset(EF_SPLIT, e->e_flags))
682 if (e->e_smtputf8)
693 if (bitnset(M_UDBENVELOPE, e->e_from.q_mailer->m_flags))
694 p = e->e_sender;
696 p = e->e_from.q_paddr;
700 /* output ESMTP-supplied "original" information */
701 if (e->e_envid != NULL)
703 denlstring(e->e_envid, true, false));
706 if (e->e_auth_param != NULL)
708 denlstring(e->e_auth_param, true, false));
709 if (e->e_dlvr_flag != 0)
711 (char) e->e_dlvr_flag, e->e_deliver_by);
715 for (q = e->e_sendqueue; q != NULL; q = q->q_next)
717 q->q_flags &= ~QQUEUED;
718 if (!QS_IS_UNDELIVERED(q->q_state))
722 if (q->q_message != NULL)
724 denlstring(q->q_message, true,
728 if (q->q_orcpt != NULL)
730 denlstring(q->q_orcpt, true,
732 if (q->q_finalrcpt != NULL)
734 denlstring(q->q_finalrcpt, true,
737 if (bitset(QPRIMARY, q->q_flags))
739 if (bitset(QHASNOTIFY, q->q_flags))
741 if (bitset(QPINGONSUCCESS, q->q_flags))
743 if (bitset(QPINGONFAILURE, q->q_flags))
745 if (bitset(QPINGONDELAY, q->q_flags))
747 if (bitset(QINTBCC, q->q_flags))
749 if (bitset(QMXSECURE, q->q_flags))
751 if (q->q_alias != NULL &&
752 bitset(QALIAS, q->q_alias->q_flags))
756 if (bitset(QDYNMAILER, q->q_flags))
760 denlstring(q->q_paddr, true, false));
765 if (e->e_quarmsg != NULL)
768 e->e_to = q->q_paddr;
771 logdelivery(q->q_mailer, NULL, q->q_status,
773 e->e_to = NULL;
777 ** This is only "valid" when the msg is safely in the queue,
781 q->q_flags |= QQUEUED;
792 ** Expand macros completely here. Queue run will deal with
796 ** We set up a "null mailer" -- i.e., a mailer that will have
802 nullmailer.m_se_rwset = nullmailer.m_sh_rwset = -1;
808 macdefine(&e->e_macro, A_PERM, 'g', "\201f");
809 for (h = e->e_header; h != NULL; h = h->h_link)
811 if (h->h_value == NULL)
814 /* don't output resent headers on non-resent messages */
815 if (bitset(H_RESENT, h->h_flags) &&
816 !bitset(EF_RESENT, e->e_flags))
820 if (bitset(H_DEFAULT, h->h_flags))
822 (void) expand(h->h_value, buf, sizeof(buf), e);
833 if (h->h_macro != '\0')
835 if (bitset(0200, h->h_macro))
838 macname(bitidx(h->h_macro)));
841 "$%c", h->h_macro);
843 else if (!bitzerop(h->h_mflags) &&
844 bitset(H_CHECK|H_ACHECK, h->h_flags))
850 if (bitnset(j, h->h_mflags))
857 if (bitset(H_DEFAULT, h->h_flags) &&
858 !bitset(H_BINDLATE, h->h_flags))
861 h->h_field,
864 else if (bitset(H_FROM|H_RCPT, h->h_flags) &&
865 !bitset(H_BINDLATE, h->h_flags))
867 bool oldstyle = bitset(EF_OLDSTYLE, e->e_flags);
872 if (bitset(H_FROM, h->h_flags))
874 commaize(h, h->h_value, oldstyle, &mcibuf, e,
882 h->h_field,
883 denlstring(h->h_value, false,
891 ** Write a terminator record -- this is to prevent
928 if (e->e_qfletter != '\0' &&
929 e->e_qfletter != new)
934 e->e_qfletter, new);
937 if (unlink(queuename(e, e->e_qfletter)) < 0)
941 sm_syslog(LOG_ERR, e->e_id,
943 queuename(e, e->e_qfletter),
948 e->e_qfletter = new;
961 syserr("!queueup: cannot fsync queue temp file %s",
967 /* close and unlock old (locked) queue file */
968 if (e->e_lockfp != NULL)
969 (void) sm_io_close(e->e_lockfp, SM_TIME_DEFAULT);
970 e->e_lockfp = tfp;
974 sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", qf);
980 sm_syslog(LOG_DEBUG, e->e_id, "queueup %s", tf);
982 e->e_qfletter = queue_letter(e, ANYQFL_LETTER);
986 e->e_flags |= EF_INQUEUE;
989 sm_dprintf("<<<<< done queueing %s <<<<<\n\n", e->e_id);
991 if (SM_TRIGGER == e->e_sendmode && !SM_IS_EMPTY(e->e_id))
998 e->e_qgrp, e->e_qdir, e->e_id);
1000 sm_syslog(LOG_DEBUG, e->e_id, "queueup: mode=%c, id=%s, unlock=%d, snd=%d",
1001 e->e_sendmode, e->e_id, QUP_UNLOCK, i);
1007 ** A queue runner will eventually pick it up.
1010 sm_syslog(LOG_ERR, e->e_id, "queueup: notify_snd=%d",
1022 ** PRINTCTLADDR -- print control address to file.
1025 ** a -- address.
1026 ** tfp -- file pointer.
1049 if (a == NULL || a->q_alias == NULL || tfp == NULL)
1068 user = q->q_ruser != NULL ? q->q_ruser : q->q_user;
1069 uid = q->q_uid;
1070 gid = q->q_gid;
1072 a = a->q_alias;
1076 strcmp(lastctladdr->q_paddr, a->q_paddr) == 0)
1088 denlstring(a->q_paddr, true, false));
1092 ** RUNNERS_SIGTERM -- propagate a SIGTERM to queue runner process
1094 ** This propagates the signal to the child processes that are queue
1095 ** runners. This is for a queue runner "cleanup". After all of the
1096 ** child queue runner processes are signaled (it should be SIGTERM
1105 ** sig -- the signal number being sent
1153 ** RUNNERS_SIGHUP -- propagate a SIGHUP to queue runner process
1155 ** This propagates the signal to the child processes that are queue
1156 ** runners. This is for a queue runner "cleanup". After all of the
1157 ** child queue runner processes are signaled (it should be SIGHUP
1166 ** sig -- the signal number being sent
1205 ** MARK_WORK_GROUP_RESTART -- mark a work group as needing a restart
1210 ** wgrp -- the work group id to restart.
1211 ** reason -- why (signal?), -1 to turn off restart
1237 ** RESTART_MARKED_WORK_GROUPS -- restart work groups marked as needing restart
1270 "restart queue runner=%d due to signal 0x%x", in restart_marked_work_groups()
1281 ** RESTART_WORK_GROUP -- restart a specific work group
1284 ** If the requested work group has been restarted too many times log
1288 ** wgrp -- the work group id to restart
1307 WorkGrp[wgrp].wg_restart = -1;
1317 "ERROR: persistent queue runner=%d restarted too many times, queue runner lost",
1322 ** SCHEDULE_QUEUE_RUNS -- schedule the next queue run for a work group.
1325 ** runall -- schedule even if individual bit is not set.
1326 ** wgrp -- the work group id to schedule.
1327 ** didit -- the queue run was performed for this work group.
1353 ** code that "walks" through a work queue group.
1367 qgrp = WorkGrp[wgrp].wg_qgs[cgrp]->qg_index;
1368 if (Queue[qgrp]->qg_queueintvl > 0)
1369 qintvl = Queue[qgrp]->qg_queueintvl;
1375 lastsched = Queue[qgrp]->qg_nextrun;
1377 if ((runall || Queue[qgrp]->qg_nextrun <= now) && qintvl > 0)
1386 ** Only set a new time if a queue run was performed
1387 ** for this queue group. If the queue was not run,
1393 Queue[qgrp]->qg_nextrun += qintvl;
1400 (long) Queue[qgrp]->qg_queueintvl,
1402 (long) Queue[qgrp]->qg_nextrun, sched);
1412 ** CHECKQUEUERUNNER -- check whether a queue group hasn't been run.
1414 ** Use this if events may get lost and hence queue runners may not
1415 ** be started and mail will pile up in a queue.
1421 ** true if a queue run is necessary.
1424 ** may schedule a queue run.
1435 for (qgrp = 0; qgrp < NumQueue && Queue[qgrp] != NULL; qgrp++) in checkqueuerunner()
1439 if (Queue[qgrp]->qg_queueintvl > 0) in checkqueuerunner()
1440 qintvl = Queue[qgrp]->qg_queueintvl; in checkqueuerunner()
1445 if (Queue[qgrp]->qg_nextrun <= now - qintvl) in checkqueuerunner()
1451 "checkqueuerunner: queue %d should have been run at %s, queue interval %ld", in checkqueuerunner()
1453 arpadate(ctime(&Queue[qgrp]->qg_nextrun)), in checkqueuerunner()
1467 ** RUNQUEUE -- run the jobs in the queue.
1469 ** Gets the stuff out of the queue in some presumably logical
1473 ** forkflag -- true if the queue scanning should be done in
1474 ** a child process. We double-fork so it is not our
1477 ** verbose -- if true, print out status information.
1478 ** persistent -- persistent queue runner?
1479 ** runall -- run all groups or only a subset (DoQueueRun)?
1482 ** true if the queue run successfully began.
1485 ** runs things in the mail queue using run_work_group().
1486 ** maybe schedules next queue run.
1489 static ENVELOPE QueueEnvelope; /* the queue run envelope */
1490 static time_t LastQueueTime = 0; /* last time a queue ID assigned */
1491 static pid_t LastQueuePid = -1; /* last PID which had a queue ID */
1517 sm_dprintf("runqueue() heap group #%d\n", sm_heap_group());
1521 /* queue run has been started, don't do any more this time */
1524 /* more than one queue or more than one directory per queue */
1526 (WorkGrp[0].wg_qgs[0]->qg_numqueues > 1 || NumWorkGroups > 1 ||
1531 ** For controlling queue runners via signals sent to this process.
1536 ** finished spinning off queue runners, may go back to doing something
1538 ** clean up the child queue runners. Only install 'runners_sig*' once
1556 ** of the next queue group's additional queue runners (maximum)
1560 ** may have fewer queue runners, this would be "unfair",
1561 ** i.e., this work group might "starve" then.
1582 ** This give a round-robin fairness to queue runs.
1586 ** decrements CurRunners if the queue runners terminate.
1588 ** (too few jobs in the queue) this value is larger than
1589 ** the actual number of queue runners. The discrepancy can
1590 ** increase if some queue runners "hang" for a long time.
1619 CurRunners -= WorkGrp[curnum].wg_maxact;
1632 /* schedule left over queue runs */
1653 ** SKIP_DOMAINS -- Skip 'skip' number of domains in the WorkQ.
1660 ** skip -- number of domains in WorkQ to skip.
1677 if (WorkQ->w_next != NULL)
1679 if (WorkQ->w_host != NULL &&
1680 WorkQ->w_next->w_host != NULL)
1682 if (!SM_STRCASEEQ(WorkQ->w_host,
1683 WorkQ->w_next->w_host))
1688 if ((WorkQ->w_host != NULL &&
1689 WorkQ->w_next->w_host == NULL) ||
1690 (WorkQ->w_host == NULL &&
1691 WorkQ->w_next->w_host != NULL))
1695 WorkQ = WorkQ->w_next;
1702 ** RUNNER_WORK -- have a queue runner do its work
1704 ** Have a queue runner do its work on a list of entries (WorkQ).
1712 ** e -- envelope.
1713 ** sequenceno -- 'th process to run WorkQ.
1714 ** didfork -- did the calling process fork()?
1715 ** skip -- process only each skip'th item.
1716 ** njobs -- number of jobs in WorkQ.
1722 ** runs things in the mail queue.
1759 sm_dprintf("runner_work(): heap group #%d\n",
1779 ** It is set 'skip' ahead (the number of parallel queue
1781 ** works on every 'skip'th (N-th) item.
1783 ** In the case of the BYHOST Queue Sort Order, the 'item'
1784 ** is a domain, so we work on every 'skip'th (N-th) domain.
1792 if (WorkQ->w_next != NULL)
1794 if (WorkQ->w_host != NULL &&
1795 WorkQ->w_next->w_host != NULL)
1797 if (!SM_STRCASEEQ(WorkQ->w_host,
1798 WorkQ->w_next->w_host))
1801 WorkQ = WorkQ->w_next;
1805 if ((WorkQ->w_host != NULL &&
1806 WorkQ->w_next->w_host == NULL) ||
1807 (WorkQ->w_host == NULL &&
1808 WorkQ->w_next->w_host != NULL))
1811 WorkQ = WorkQ->w_next;
1815 WorkQ = WorkQ->w_next;
1822 WorkQ = WorkQ->w_next;
1825 e->e_to = NULL;
1836 char *msg = "Aborting queue run: load average too high";
1844 if (shouldqueue(w->w_pri, w->w_ctime))
1851 message("Skipping %s/%s (sequence %d of %d) and flushing rest of queue",
1852 qid_printqueue(w->w_qgrp,
1853 w->w_qdir),
1854 w->w_name + 2, sequenceno,
1858 "runqueue: Flushing queue from %s/%s (pri %ld, LA %d, %d of %d)",
1859 qid_printqueue(w->w_qgrp,
1860 w->w_qdir),
1861 w->w_name + 2, w->w_pri,
1868 qid_printqueue(w->w_qgrp, w->w_qdir),
1869 w->w_name + 2, sequenceno, njobs);
1877 qid_printqueue(w->w_qgrp, w->w_qdir),
1878 w->w_name + 2, sequenceno, njobs);
1888 qid_printqueue(w->w_qgrp, w->w_qdir),
1889 w->w_name + 2, (int) CurrentPid);
1891 (void) dowork(w->w_qgrp, w->w_qdir, w->w_name + 2,
1895 sm_free(w->w_name); /* XXX */
1896 if (w->w_host != NULL)
1897 sm_free(w->w_host); /* XXX */
1909 sl = tTdlevel(76) - 100;
1930 ** RUN_WORK_GROUP -- run the jobs in a queue group from a work group.
1932 ** Gets the stuff out of the queue in some presumably logical
1936 ** wgrp -- work group to process.
1937 ** flags -- RWG_* flags
1940 ** true if the queue run successfully began.
1943 ** runs things in the mail queue.
1946 /* Minimum sleep time for persistent queue runners */
1969 ** the queue.
1977 char *msg = "Skipping queue run -- load average too high";
1995 char *msg = "Skipping queue run -- too many children";
2017 if (pid == -1)
2019 const char *msg = "Skipping queue run -- fork() failed";
2032 /* parent -- pick up intermediate zombie */
2035 /* wgrp only used when queue runners are persistent */
2036 proc_list_add(pid, "Queue runner", PROC_QUEUE,
2038 bitset(RWG_PERSISTENT, flags) ? wgrp : -1,
2045 /* child -- clean up signals */
2065 proc_list_add(CurrentPid, "Queue runner child process",
2066 PROC_QUEUE_CHILD, 0, -1, NULL);
2093 e->e_flags = BlankEnvelope.e_flags;
2094 e->e_parent = NULL;
2104 ** If we are running part of the queue, always ignore stored
2118 ** Here is where we choose the queue group from the work group.
2127 ** Run a queue group if:
2128 ** RWG_RUNALL bit is set or the bit for this group is set.
2135 ** Find the next queue group within the work group that
2139 qgrp = WorkGrp[wgrp].wg_qgs[WorkGrp[wgrp].wg_curqgrp]->qg_index;
2143 (Queue[qgrp]->qg_nextrun <= now &&
2144 Queue[qgrp]->qg_nextrun != (time_t) -1))
2148 e->e_id = NULL;
2155 qdir = Queue[qgrp]->qg_curnum; /* round-robin init of queue position */
2165 /* tweak niceness of queue runs */
2166 if (Queue[qgrp]->qg_nice > 0)
2167 (void) nice(Queue[qgrp]->qg_nice);
2170 /* XXX running queue group... */
2171 sm_setproctitle(true, CurEnv, "running queue: %s",
2181 ** Start making passes through the queue.
2182 ** First, read and sort the entire queue.
2186 for (i = 0; i < Queue[qgrp]->qg_numqueues; i++)
2191 QSHM_ENTRIES(Queue[qgrp]->qg_qpaths[qdir].qp_idx) = h;
2193 /* If there are no more items in this queue advance */
2196 /* A round-robin advance */
2198 qdir %= Queue[qgrp]->qg_numqueues;
2207 njobs = sortq(Queue[qgrp]->qg_maxlist);
2208 Queue[qgrp]->qg_curnum = qdir; /* update */
2210 if (!Verbose && bitnset(QD_FORK, Queue[qgrp]->qg_flags))
2218 ** will process every N-th item. The parent will wait for all
2220 ** queue group within the work group. This saves us forking
2221 ** a new runner-child for each work item.
2223 ** explicit "don't run this queue" setting.
2226 maxrunners = Queue[qgrp]->qg_maxqrun;
2229 ** If no runners are configured for this group but
2230 ** the queue is "forced" then lets use 1 runner.
2259 /* parent -- clean out connection cache */
2269 WorkQ = WorkQ->w_next;
2272 proc_list_add(pid, "Queue child runner process",
2273 PROC_QUEUE_CHILD, 0, -1, NULL);
2281 /* child -- Reset global flags */
2299 ** SMTP processes (whether -bd or -bs) set
2321 /* child -- error messages to the transcript */
2336 ** seeing if there is another queue group in the
2337 ** work group to process.
2353 else if (Queue[qgrp]->qg_maxqrun > 0 || bitset(RWG_FORCE, flags))
2369 /* Are there still more queues in the work group to process? */
2374 e->e_flags = BlankEnvelope.e_flags;
2378 /* No more queues in work group to process. Now check persistent. */
2382 sm_setproctitle(true, NULL, "running queue: %s",
2422 (void) sm_io_fprintf(out, SM_TIME_DEFAULT, "----------------------\n");
2424 sm_debug_level(&SmHeapCheck) - 1);
2440 ** In a persistent queue runner the code is repeated over
2443 ** Hence the queue runners would just idle around when once
2444 ** CurrentLA caused all entries in a queue to be ignored.
2451 e->e_flags = BlankEnvelope.e_flags;
2456 e->e_id = NULL;
2464 ** DOQUEUERUN -- do a queue run?
2474 ** RUNQUEUEEVENT -- Sets a flag to indicate that a queue run should be done.
2488 ** this function because of non-restartable/continuable system
2504 ** Set the general bit that we want a queue run,
2519 ** GATHERQ -- gather messages from the message queue(s) the work queue.
2522 ** qgrp -- the index of the queue group.
2523 ** qdir -- the index of the queue directory.
2524 ** doall -- if set, include everything in the queue (even
2528 ** full -- (optional) to be set 'true' if WorkList is full
2529 ** more -- (optional) to be set 'true' if there are still more
2530 ** messages in this queue not added to WorkList
2531 ** pnentries -- (optional) total number of entries in queue
2534 ** The number of request in the queue (not necessarily
2571 wn = WorkListCount - 1;
2578 Queue[qgrp]->qg_qpaths[qdir].qp_name,
2580 Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
2591 check->queue_negate ? "!" : "",
2592 check->queue_match);
2593 check = check->queue_next;
2600 check->queue_negate ? "!" : "",
2601 check->queue_match);
2602 check = check->queue_next;
2609 check->queue_negate ? "!" : "",
2610 check->queue_match);
2611 check = check->queue_next;
2620 check->queue_negate ? "!" : "",
2621 check->queue_match);
2622 check = check->queue_next;
2627 /* open the queue directory */
2652 sm_dprintf("gatherq: checking %s..", d->d_name);
2656 d->d_name[0] == NORMQF_LETTER) ||
2658 d->d_name[0] == QUARQF_LETTER) ||
2660 d->d_name[0] == LOSEQF_LETTER)) &&
2661 d->d_name[1] == 'f'))
2670 if (strlen(d->d_name) >= MAXQFNAME)
2675 d->d_name, MAXQFNAME);
2679 d->d_name, MAXQFNAME);
2687 if (strcontainedin(false, check->queue_match,
2688 d->d_name) != check->queue_negate)
2691 check = check->queue_next;
2717 (void) sm_strlcpyn(qf, sizeof(qf), 3, qd, "/", d->d_name);
2724 d->d_name);
2725 wn--;
2731 if (!((d->d_name[0] == DATAFL_LETTER ||
2732 d->d_name[0] == NORMQF_LETTER ||
2733 d->d_name[0] == QUARQF_LETTER ||
2734 d->d_name[0] == LOSEQF_LETTER ||
2735 d->d_name[0] == XSCRPT_LETTER) &&
2736 d->d_name[1] == 'f' && d->d_name[2] == '\0'))
2738 qid_printqueue(qgrp, qdir), d->d_name);
2739 wn--;
2752 w->w_qgrp = qgrp;
2753 w->w_qdir = qdir;
2754 w->w_name = newstr(d->d_name);
2755 w->w_host = NULL;
2756 w->w_lock = w->w_tooyoung = false;
2757 w->w_pri = 0;
2758 w->w_ctime = 0;
2759 w->w_mtime = sbuf.st_mtime;
2772 d->d_name, sm_errstring(errno));
2774 wn--;
2777 w->w_qgrp = qgrp;
2778 w->w_qdir = qdir;
2779 w->w_name = newstr(d->d_name);
2780 w->w_host = NULL;
2783 w->w_lock = !lockfile(sm_io_getinfo(cf, SM_IO_WHAT_FD,
2785 w->w_name, NULL,
2788 w->w_tooyoung = false;
2790 /* make sure jobs in creation don't clog queue */
2791 w->w_pri = 0x7fffffff;
2792 w->w_ctime = 0;
2793 w->w_mtime = sbuf.st_mtime;
2837 w->w_pri = atol(&lbuf[1]);
2842 w->w_ctime = atol(&lbuf[1]);
2852 w->w_name);
2867 check->queue_match,
2869 check->queue_negate)
2872 check = check->queue_next;
2880 if (w->w_host == NULL &&
2886 w->w_host = newstr(&p[1]);
2889 w->w_host = strrev(&p[1]);
2890 str = makelower_a(&w->w_host, NULL);
2891 ASSIGN_IFDIFF(w->w_host, str);
2913 check->queue_match,
2915 check->queue_negate)
2918 check = check->queue_next;
2929 check->queue_match,
2931 check->queue_negate)
2934 check = check->queue_next;
2946 delay = MIN(lasttry - w->w_ctime,
2948 age = curtime() - lasttry;
2950 w->w_tooyoung = true;
2954 age = curtime() - (time_t) atol(&lbuf[1]);
2957 w->w_tooyoung = true;
2962 w->w_tooyoung = false;
2969 if ((!doall && (shouldqueue(w->w_pri, w->w_ctime) ||
2970 w->w_tooyoung)) ||
2977 sm_dprintf("skipping %s (%x)\n", w->w_name, i);
2978 sm_free(w->w_name); /* XXX */
2979 if (w->w_host != NULL)
2980 sm_free(w->w_host); /* XXX */
2981 wn--;
2989 i = wn - WorkListCount;
3006 ** SORTQ -- sort the work list
3015 ** max -- maximum number of items to be placed in WorkQ
3040 nw = w->w_next;
3041 sm_free(w->w_name); /* XXX */
3042 if (w->w_host != NULL)
3043 sm_free(w->w_host); /* XXX */
3056 ** important items might get truncated off -- not what we want.
3085 w->w_host == NULL)
3088 w->w_host != NULL &&
3090 w->w_host))
3115 ** Sort based on queue filename.
3124 ** use a random index into the queue file name to start
3136 ** Simple sort based on modification time of queue file.
3156 ** Simple sort based on queue priority only.
3163 /* Check if the per queue group item limit will be exceeded */
3170 ** Only take the most important items up to the per queue group
3174 for (i = wc; --i >= 0; )
3177 w->w_qgrp = WorkList[i].w_qgrp;
3178 w->w_qdir = WorkList[i].w_qdir;
3179 w->w_name = WorkList[i].w_name;
3180 w->w_host = WorkList[i].w_host;
3181 w->w_lock = WorkList[i].w_lock;
3182 w->w_tooyoung = WorkList[i].w_tooyoung;
3183 w->w_pri = WorkList[i].w_pri;
3184 w->w_ctime = WorkList[i].w_ctime;
3185 w->w_mtime = WorkList[i].w_mtime;
3186 w->w_next = WorkQ;
3191 for (i = WorkListCount; --i >= wc; )
3206 for (w = WorkQ; w != NULL; w = w->w_next)
3208 if (w->w_host != NULL)
3210 w->w_name, w->w_pri, w->w_host);
3213 w->w_name, w->w_pri);
3220 ** GROW_WLIST -- make the work list larger
3223 ** qgrp -- the index for the queue group.
3224 ** qdir -- the index for the queue directory.
3277 ** WORKCMPF0 -- simple priority-only compare function.
3280 ** av -- the first argument.
3281 ** bv -- the second argument.
3284 ** -1 if av < bv
3294 long pa = ((WORK *)av)->w_pri;
3295 long pb = ((WORK *)bv)->w_pri;
3302 return -1;
3305 ** WORKCMPF1 -- first compare function for ordering work based on host name.
3310 ** av -- the first argument.
3311 ** bv -- the second argument.
3329 if (a->w_host != NULL && b->w_host == NULL)
3331 else if (a->w_host == NULL && b->w_host != NULL)
3332 return -1;
3333 if (a->w_host != NULL && b->w_host != NULL &&
3334 (i = sm_strcasecmp(a->w_host, b->w_host)) != 0)
3338 if (a->w_lock != b->w_lock)
3339 return b->w_lock - a->w_lock;
3345 ** WORKCMPF2 -- second compare function for ordering work based on host name.
3350 ** av -- the first argument.
3351 ** bv -- the second argument.
3369 if (a->w_lock != b->w_lock)
3370 return a->w_lock - b->w_lock;
3373 if (a->w_host != NULL && b->w_host == NULL)
3375 else if (a->w_host == NULL && b->w_host != NULL)
3376 return -1;
3377 if (a->w_host != NULL && b->w_host != NULL &&
3378 (i = sm_strcasecmp(a->w_host, b->w_host)) != 0)
3385 ** WORKCMPF3 -- simple submission-time-only compare function.
3388 ** av -- the first argument.
3389 ** bv -- the second argument.
3392 ** -1 if av < bv
3405 if (a->w_ctime > b->w_ctime)
3407 else if (a->w_ctime < b->w_ctime)
3408 return -1;
3413 ** WORKCMPF4 -- compare based on file name
3416 ** av -- the first argument.
3417 ** bv -- the second argument.
3420 ** -1 if av < bv
3433 return strcmp(a->w_name, b->w_name);
3436 ** WORKCMPF5 -- compare based on assigned random number
3439 ** av -- the first argument.
3440 ** bv -- the second argument.
3443 ** randomly 1/-1
3455 if (strlen(a->w_name) < randi || strlen(b->w_name) < randi)
3456 return -1;
3457 return a->w_name[randi] - b->w_name[randi];
3460 ** WORKCMPF6 -- simple modification-time-only compare function.
3463 ** av -- the first argument.
3464 ** bv -- the second argument.
3467 ** -1 if av < bv
3480 if (a->w_mtime > b->w_mtime)
3482 else if (a->w_mtime < b->w_mtime)
3483 return -1;
3489 ** WORKCMPF7 -- compare function for ordering work based on shuffled host name.
3494 ** av -- the first argument.
3495 ** bv -- the second argument.
3513 if (a->w_lock != b->w_lock)
3514 return a->w_lock - b->w_lock;
3517 if (a->w_host != NULL && b->w_host == NULL)
3519 else if (a->w_host == NULL && b->w_host != NULL)
3520 return -1;
3521 if (a->w_host != NULL && b->w_host != NULL &&
3522 (i = sm_strshufflecmp(a->w_host, b->w_host)) != 0)
3530 ** STRREV -- reverse string
3537 ** fwd -- the string to reverse.
3553 rev[cnt] = fwd[len - cnt - 1];
3591 ShuffledAlphabet[i] = ShuffledAlphabet[i + 'a' - 'A']; in init_shuffle_alphabet()
3612 return (ShuffledAlphabet[*us1] - ShuffledAlphabet[*--us2]);
3617 ** DOWORK -- do a work request.
3620 ** qgrp -- the index of the queue group for the job.
3621 ** qdir -- the index of the queue directory for the job.
3622 ** id -- the ID of the job to run.
3623 ** forkflag -- if set, run this in background.
3624 ** requeueflag -- if set, reinstantiate the queue quickly.
3625 ** This is used when expanding aliases in the queue.
3628 ** e - the envelope in which to run it.
3631 ** process id of process that is running the queue job.
3676 /* parent -- clean out connection cache */
3707 /* child -- error messages to the transcript */
3741 e->e_flags |= EF_QUEUERUN|EF_GLOBALERRS;
3743 e->e_errormode = EM_MAIL;
3744 e->e_id = id;
3745 e->e_qgrp = qgrp;
3746 e->e_qdir = qdir;
3754 sm_setproctitle(true, e, "%s from queue", qid_printname(e));
3756 sm_syslog(LOG_DEBUG, e->e_id, "dowork, pid=%d",
3760 e->e_header = NULL;
3762 /* read the queue control file -- return if locked */
3765 if (tTd(40, 4) && e->e_id != NULL)
3768 e->e_id = NULL;
3778 e->e_rpool = NULL;
3783 e->e_flags |= EF_INQUEUE;
3804 e->e_rpool = NULL;
3805 e->e_message = NULL;
3808 e->e_id = NULL;
3813 ** DOWORKLIST -- process a list of envelopes as work requests
3819 ** el -- envelope to be processed including its siblings.
3820 ** forkflag -- if set, run this in background.
3821 ** requeueflag -- if set, reinstantiate the queue quickly.
3822 ** This is used when expanding aliases in the queue.
3827 ** process id of process that is running the queue job.
3869 /* parent -- clean out connection cache */
3900 /* child -- error messages to the transcript */
3940 sm_syslog(LOG_DEBUG, el->e_id, "doworklist, pid=%d",
3943 for (ei = el; ei != NULL; ei = ei->e_sibling)
3948 if (WILL_BE_QUEUED(ei->e_sendmode))
3951 ei->e_quarmsg != NULL)
3960 e.e_id = ei->e_id;
3961 e.e_qgrp = ei->e_qgrp;
3962 e.e_qdir = ei->e_qdir;
3964 sm_setproctitle(true, &e, "%s from queue", qid_printname(&e));
3970 /* read the queue control file -- return if locked */
3990 ei->e_id = NULL;
4002 ** READQF -- read queue file and set up environment.
4005 ** e -- the envelope of the job to run.
4006 ** openonly -- only open the qf (returned as e_lockfp)
4009 ** true if it successfully read the queue file.
4013 ** The queue file is returned locked.
4067 "%s: locked\n", e->e_id);
4069 sm_dprintf("%s: locked\n", e->e_id);
4071 sm_syslog(LOG_DEBUG, e->e_id, "queueup: locked");
4121 "%s: changed\n", e->e_id);
4123 sm_dprintf("%s: changed\n", e->e_id);
4125 sm_syslog(LOG_DEBUG, e->e_id, "changed");
4131 ** Check the queue file for plausibility to avoid attacks.
4143 ** If this qf file results from a set-group-ID binary, then
4144 ** we check whether the directory is group-writable,
4145 ** the queue file mode contains the group-writable bit, and
4147 ** Notice: this requires that the set-group-ID binary is used to
4148 ** run the queue!
4183 sm_syslog(LOG_ALERT, e->e_id,
4184 "bogus queue file, uid=%ld, gid=%ld, mode=%o",
4190 e->e_flags |= EF_INQUEUE;
4199 /* must be a bogus file -- if also old, just remove it */
4212 ** Race condition -- we got a file just as it was being
4224 ** in case TrustedUser manipulates the queue.
4228 e->e_flags |= EF_UNSAFE;
4232 e->e_flags |= EF_UNSAFE;
4235 /* good file -- save this lock */
4236 e->e_lockfp = qfp;
4244 macdefine(&e->e_macro, A_PERM, 'i', e->e_id);
4247 e->e_flags |= EF_GLOBALERRS;
4250 e->e_qfletter = queue_letter(e, ANYQFL_LETTER);
4251 e->e_dfqgrp = e->e_qgrp;
4252 e->e_dfqdir = e->e_qdir;
4254 macdefine(&e->e_macro, A_TEMP, macid("{queue}"),
4255 qid_printqueue(e->e_qgrp, e->e_qdir));
4257 e->e_dfino = -1;
4258 e->e_msgsize = -1;
4274 syserr("SECURITY ALERT: extra or bogus data in queue file: %s",
4276 err = "bogus queue line";
4284 e->e_auth_param = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4291 e->e_bodytype = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4299 /* obsolete -- ignore */
4307 /* forbid queue groups in MSP? */
4312 qgrp < NumQueue && Queue[qgrp] != NULL;
4316 qdir < Queue[qgrp]->qg_numqueues;
4320 Queue[qgrp]->qg_qpaths[qdir].qp_name)
4323 e->e_dfqgrp = qgrp;
4324 e->e_dfqdir = qdir;
4329 err = "bogus queue file directory";
4344 err = "bogus queue line";
4352 e->e_flags |= EF_HAS8BIT;
4356 e->e_flags |= EF_DELETE_BCC;
4360 e->e_flags |= EF_RET_PARAM;
4364 e->e_flags |= EF_NO_BODY_RETN;
4368 e->e_flags |= EF_RESPONSE;
4372 e->e_flags |= EF_SPLIT;
4376 e->e_flags |= EF_WARNING;
4381 e->e_smtputf8 = true;
4389 e->e_quarmsg = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4390 macdefine(&e->e_macro, A_PERM,
4391 macid("{quarantine}"), e->e_quarmsg);
4399 ** better than before. "-3" to skip H?? at least.
4402 hdrsize += strlen(bp) - 3;
4411 e->e_dtime = atol(&buf[1]);
4414 case 'L': /* Solaris Content-Length: */
4420 e->e_ntries = atoi(&buf[1]);
4424 if (e->e_ntries > 0 && e->e_dtime <= now &&
4425 now < e->e_dtime + MinQueueAge)
4429 howlong = pintvl(now - e->e_dtime, true);
4434 e->e_id, howlong);
4437 e->e_id, howlong);
4439 sm_syslog(LOG_DEBUG, e->e_id,
4442 e->e_id = NULL;
4448 macdefine(&e->e_macro, A_TEMP,
4453 if (e->e_ntries == 0)
4467 e->e_msgpriority = atol(&bp[1]) + WkTimeFact;
4471 orcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4475 frcpt = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4510 ctladdr->q_flags |= QALIAS;
4532 macdefine(&e->e_macro, A_PERM, macid("{addr_type}"),
4543 if (ISVALIDQGRP(e->e_qgrp))
4544 q->q_qgrp = e->e_qgrp;
4545 q->q_alias = ctladdr;
4547 q->q_flags &= ~Q_PINGFLAGS;
4548 q->q_flags |= qflags;
4549 q->q_finalrcpt = frcpt;
4550 q->q_orcpt = orcpt;
4555 (void) recipient(q, &e->e_sendqueue, 0, e);
4559 macdefine(&e->e_macro, A_PERM, macid("{addr_type}"),
4564 setsender(sm_rpool_strdup_x(e->e_rpool, &bp[1]),
4569 e->e_ctime = atol(&bp[1]);
4572 case 'V': /* queue file version number */
4576 syserr("Version number in queue file (%d) greater than max (%d)",
4578 err = "unsupported queue file version";
4584 e->e_envid = sm_rpool_strdup_x(e->e_rpool, &bp[1]);
4585 macdefine(&e->e_macro, A_PERM,
4586 macid("{dsn_envid}"), e->e_envid);
4591 /* format: flag (1 char) space long-integer */
4592 e->e_dlvr_flag = buf[1];
4593 e->e_deliver_by = strtol(&buf[3], NULL, 10);
4600 macdefine(&e->e_macro, A_PERM, r,
4601 sm_rpool_strdup_x(e->e_rpool, ep));
4621 ** If we haven't read any lines, this queue file is empty.
4628 e->e_flags |= EF_CLRQUEUE|EF_FATALERRS|EF_RESPONSE;
4632 /* Check to make sure we have a complete queue file read */
4635 syserr("readqf: %s: incomplete queue file read", qf);
4642 if (e->e_from.q_mailer == NULL)
4644 syserr("readqf: %s: sender not specified in queue file", qf);
4652 if (bitset(EF_RET_PARAM, e->e_flags))
4654 if (bitset(EF_NO_BODY_RETN, e->e_flags))
4655 macdefine(&e->e_macro, A_PERM,
4658 macdefine(&e->e_macro, A_PERM,
4667 e->e_dfp = sm_io_open(SmFtStdio, SM_TIME_DEFAULT, p, SM_IO_RDONLY_B,
4669 if (e->e_dfp == NULL)
4675 e->e_flags |= EF_HAS_DF;
4676 if (fstat(sm_io_getinfo(e->e_dfp, SM_IO_WHAT_FD, NULL), &st)
4679 e->e_msgsize = st.st_size + hdrsize;
4680 e->e_dfdev = st.st_dev;
4681 e->e_dfino = ST_INODE(st);
4683 PRT_NONNEGL(e->e_msgsize));
4684 macdefine(&e->e_macro, A_TEMP, macid("{msg_size}"),
4705 e->e_lockfp = NULL;
4706 e->e_flags |= EF_INQUEUE;
4711 ** PRTSTR -- print a string, "unprintable" characters are shown as \oct
4714 ** s -- string to print
4715 ** ml -- maximum length of output
4745 while (ml-- > 0 && ((c = *s++) != '\0'))
4749 if (ml-- > 0)
4759 if ((ml -= 3) > 0)
4766 ** PRINTNQE -- print out number of entries in the mail queue
4769 ** out -- output file pointer.
4770 ** prefix -- string to output in front of each line.
4792 "%sNOTCONFIGURED:-1\r\n", prefix);
4795 for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
4801 for (j = 0; j < Queue[i]->qg_numqueues; j++)
4808 n = QSHM_ENTRIES(Queue[i]->qg_qpaths[j].qp_idx);
4846 "%sNOTAVAILABLE:-1\r\n", prefix);
4850 ** PRINTQUEUE -- print out a representation of the mail queue
4859 ** Prints a listing of the mail queue on the standard output.
4867 for (i = 0; i < NumQueue && Queue[i] != NULL; i++) in printqueue()
4872 for (j = 0; j < Queue[i]->qg_numqueues; j++) in printqueue()
4886 ** PRINT_SINGLE_QUEUE -- print out a representation of a single mail queue
4889 ** qgrp -- the index of the queue group.
4890 ** qdir -- the queue directory.
4893 ** number of requests in mail queue.
4896 ** Prints a listing of the mail queue on the standard output.
4941 Queue[qgrp]->qg_qpaths[qdir].qp_name,
4943 Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
4946 Queue[qgrp]->qg_qpaths[qdir].qp_name,
4948 Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
4953 ** Check for permission to print the queue
4972 while (--n >= 0)
4982 usrerr("510 You are not permitted to see the queue");
4989 ** Read and order the queue.
4993 (void) sortq(Queue[qgrp]->qg_maxlist);
5017 ")\n-----Q-ID----- --Size-- -Priority- ---Q-Time--- --------Sender/Recipient--------\n");
5020 ")\n-----Q-ID----- --Size-- -----Q-Time----- ------------Sender/Recipient-----------\n");
5021 for (w = WorkQ; w != NULL; w = w->w_next)
5040 w->w_name + 2);
5041 (void) sm_strlcpyn(qf, sizeof(qf), 3, qd, "/", w->w_name);
5059 w->w_name[0] = DATAFL_LETTER;
5060 (void) sm_strlcpyn(qf, sizeof(qf), 3, qddf, "/", w->w_name);
5074 e.e_id = w->w_name + 2;
5077 dfsize = -1;
5088 if (w->w_lock)
5092 else if (w->w_tooyoung)
5093 (void) sm_io_fprintf(smioout, SM_TIME_DEFAULT, "-");
5094 else if (shouldqueue(w->w_pri, w->w_ctime))
5115 case 'V': /* queue file version */
5121 i = sizeof(statmsg) - 1;
5128 i = sizeof(quarmsg) - 1;
5135 i = sizeof(bodytype) - 1;
5147 w->w_pri,
5193 "\n\t\t\t\t\t\t(---%.64s---)",
5261 ** QUEUE_LETTER -- get the proper queue letter for the current QueueMode.
5264 ** e -- envelope to build it in/from.
5265 ** type -- the file type, used as the first character
5280 if (e->e_quarmsg != NULL)
5309 ** QUEUENAME -- build a file name in the queue directory for this envelope.
5312 ** e -- envelope to build it in/from.
5313 ** type -- the file type, used as the first character
5317 ** a pointer to the queue name (in a static buffer).
5321 ** assign an id code with assign_queueid(). If no queue
5336 if (e->e_id == NULL)
5349 /* Assign a queue group/directory if needed */
5355 ** chooses a queue, and sometimes we need to write to the
5357 ** to choose a queue.
5360 if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR)
5362 if (e->e_qgrp != NOQGRP && e->e_qdir != NOQDIR)
5364 e->e_xfqgrp = e->e_qgrp;
5365 e->e_xfqdir = e->e_qdir;
5369 e->e_xfqgrp = 0;
5370 if (Queue[e->e_xfqgrp]->qg_numqueues <= 1)
5371 e->e_xfqdir = 0;
5374 e->e_xfqdir = get_rand_mod(
5375 Queue[e->e_xfqgrp]->qg_numqueues);
5379 qd = e->e_xfqdir;
5380 qg = e->e_xfqgrp;
5384 if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR)
5392 qd = e->e_dfqdir;
5393 qg = e->e_dfqgrp;
5397 qd = e->e_qdir;
5398 qg = e->e_qgrp;
5404 (void) sm_strlcpyn(buf, sizeof(buf), 2, pref, e->e_id);
5410 if (bitset(QP_SUBDF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
5419 if (bitset(QP_SUBQF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
5424 if (bitset(QP_SUBXF, Queue[qg]->qg_qpaths[qd].qp_subdirs))
5431 sm_abort("queuename: bad queue file type %d", type);
5435 Queue[qg]->qg_qpaths[qd].qp_name,
5436 sub, pref, e->e_id);
5445 ** INIT_QID_ALG -- Initialize the (static) parameters that are used to
5446 ** generate a queue ID.
5457 ** is NOT triggered which will cause the same queue id to be generated.
5470 LastQueuePid = -1; in init_qid_alg()
5474 ** ASSIGN_QUEUEID -- assign a queue ID for this envelope.
5477 ** This code assumes that nothing will remain in the queue for
5479 ** name do not already exist in the queue.
5483 ** e -- envelope to set it in.
5495 ** usually only 0-59. However (Linux):
5514 char idbuf[MAXQFNAME - 2];
5517 if (e->e_id != NULL)
5540 ** Generate a new sequence number between 0 and QIC_LEN_SQR-1.
5541 ** This lets us generate up to QIC_LEN_SQR unique queue ids
5543 ** a single message can consume many queue ids.
5553 idbuf[0] = QueueIdChars[tm->tm_year % QIC_LEN];
5554 idbuf[1] = QueueIdChars[tm->tm_mon];
5555 idbuf[2] = QueueIdChars[tm->tm_mday];
5556 idbuf[3] = QueueIdChars[tm->tm_hour];
5557 idbuf[4] = QueueIdChars[tm->tm_min % QIC_LEN_R];
5558 idbuf[5] = QueueIdChars[tm->tm_sec % QIC_LEN_R];
5562 (void) sm_snprintf(&idbuf[8], sizeof(idbuf) - 8, "%07d",
5565 (void) sm_snprintf(&idbuf[8], sizeof(idbuf) - 8, "%06d",
5567 e->e_id = sm_rpool_strdup_x(e->e_rpool, idbuf);
5568 macdefine(&e->e_macro, A_PERM, 'i', e->e_id);
5571 e->e_qgrp = NOQGRP; /* too early to do anything else */
5572 e->e_qdir = NOQDIR;
5573 e->e_xfqgrp = NOQGRP;
5577 e->e_qfletter = '\0';
5581 e->e_id, (void *)e);
5583 sm_syslog(LOG_DEBUG, e->e_id, "assigned id");
5586 ** SYNC_QUEUE_TIME -- Assure exclusive PID in any given second
5592 ** This will interfere with the queue file naming system.
5615 ** UNLOCKQUEUE -- unlock the queue entry for a specified envelope
5618 ** e -- the envelope to unlock.
5624 ** unlocks the queue for `e'.
5633 e->e_id == NULL ? "NOQUEUE" : e->e_id);
5636 SM_CLOSE_FP(e->e_lockfp);
5638 /* don't create a queue id if we don't already have one */
5639 if (e->e_id == NULL)
5644 sm_syslog(LOG_DEBUG, e->e_id, "unlock");
5649 ** SETCTLUSER -- create a controlling address
5655 ** user -- the user name of the controlling user.
5656 ** qfver -- the version stamp of this queue file.
5657 ** e -- envelope
5661 ** using storage allocated from e->e_rpool.
5686 a = (ADDRESS *) sm_rpool_malloc_x(e->e_rpool, sizeof(*a));
5692 a->q_user = sm_rpool_strdup_x(e->e_rpool, p);
5697 a->q_user = sm_rpool_strdup_x(e->e_rpool, user);
5701 a->q_uid = atoi(p);
5703 a->q_gid = atoi(p);
5708 a->q_flags |= QGOODUID;
5712 o[-1] = ':';
5717 if (*pw->pw_dir == '\0')
5718 a->q_home = NULL;
5719 else if (strcmp(pw->pw_dir, "/") == 0)
5720 a->q_home = "";
5722 a->q_home = sm_rpool_strdup_x(e->e_rpool, pw->pw_dir);
5723 a->q_uid = pw->pw_uid;
5724 a->q_gid = pw->pw_gid;
5725 a->q_flags |= QGOODUID;
5729 a->q_flags |= QPRIMARY; /* flag as a "ctladdr" */
5730 a->q_mailer = LocalMailer;
5732 a->q_paddr = sm_rpool_strdup_x(e->e_rpool, a->q_user);
5734 a->q_paddr = sm_rpool_strdup_x(e->e_rpool, p);
5738 ** LOSEQFILE -- rename queue file with LOSEQF_LETTER & try to let someone know
5741 ** e -- the envelope (e->e_id will be used).
5742 ** why -- reported to whomever can hear.
5757 if (e == NULL || e->e_id == NULL)
5762 if (!bitset(EF_INQUEUE, e->e_flags))
5767 /* if already lost, no need to re-lose */
5775 sm_syslog(LOG_ALERT, e->e_id,
5778 SM_CLOSE_FP(e->e_dfp);
5779 e->e_flags &= ~EF_HAS_DF;
5782 ** NAME2QID -- translate a queue group name to a queue group id
5785 ** queuename -- name of queue group.
5788 ** queue group id if found.
5801 return s->s_quegrp->qg_index;
5804 ** QID_PRINTNAME -- create externally printable version of queue id
5807 ** e -- the envelope.
5823 if (e->e_id == NULL)
5826 id = e->e_id;
5828 if (e->e_qdir == NOQDIR)
5832 Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_name,
5837 ** QID_PRINTQUEUE -- create full version of queue directory for data files
5840 ** qgrp -- index in queue group.
5841 ** qdir -- the short version of the queue directory
5844 ** the full pathname to the queue (might point to a static var)
5856 return Queue[qgrp]->qg_qdir;
5858 if (strcmp(Queue[qgrp]->qg_qpaths[qdir].qp_name, ".") == 0)
5861 subdir = Queue[qgrp]->qg_qpaths[qdir].qp_name;
5864 Queue[qgrp]->qg_qdir,
5868 Queue[qgrp]->qg_qpaths[qdir].qp_subdirs)
5874 ** PICKQDIR -- Pick a queue directory from a queue group
5877 ** qg -- queue group
5878 ** fsize -- file size in bytes
5879 ** e -- envelope, or NULL
5882 ** NOQDIR if no queue directory in qg has enough free space to
5884 ** a randomly selected queue directory which resides on a
5905 if (qg->qg_numqueues <= 1)
5908 qdir = get_rand_mod(qg->qg_numqueues);
5918 ** Now iterate over the queue directories,
5925 QPATHS *qp = &qg->qg_qpaths[i];
5930 needed += fsize / FILE_SYS_BLKSIZE(qp->qp_fsysidx)
5931 + ((fsize % FILE_SYS_BLKSIZE(qp->qp_fsysidx)
5935 fsavail = FILE_SYS_AVAIL(qp->qp_fsysidx);
5946 fsavail = freediskspace(FILE_SYS_NAME(qp->qp_fsysidx),
5957 if (qg->qg_numqueues > 0)
5958 i = (i + 1) % qg->qg_numqueues;
5962 sm_syslog(LOG_ALERT, e->e_id,
5964 CurHostName == NULL ? "SMTP-DAEMON" : CurHostName,
5966 qg->qg_qdir, avail);
5970 ** SETNEWQUEUE -- Sets a new queue group and directory
5972 ** Assign a queue group and directory to an envelope and store the
5973 ** directory in e->e_qdir.
5976 ** e -- envelope to assign a queue for.
5983 ** On success, e->e_qgrp and e->e_qdir are non-negative.
5985 ** e->qgrp = NOQGRP, e->e_qdir = NOQDIR
5997 if (e->e_qgrp == NOQGRP)
6002 ** Use the queue group of the "first" recipient, as set by
6004 ** use the queue group of the mailer of the first recipient.
6006 ** queue group.
6013 q = e->e_sendqueue;
6015 (QS_IS_BADADDR(q->q_state) || QS_IS_DEAD(q->q_state)))
6017 q = q->q_next;
6020 e->e_qgrp = 0;
6021 else if (q->q_qgrp >= 0)
6022 e->e_qgrp = q->q_qgrp;
6023 else if (q->q_mailer != NULL &&
6024 ISVALIDQGRP(q->q_mailer->m_qgrp))
6025 e->e_qgrp = q->q_mailer->m_qgrp;
6027 e->e_qgrp = 0;
6028 e->e_dfqgrp = e->e_qgrp;
6031 if (ISVALIDQDIR(e->e_qdir) && ISVALIDQDIR(e->e_dfqdir))
6035 qid_printqueue(e->e_qgrp, e->e_qdir));
6040 e->e_qdir = pickqdir(Queue[e->e_qgrp], e->e_msgsize, e);
6041 if (e->e_qdir == NOQDIR)
6043 e->e_qgrp = NOQGRP;
6044 if (!bitset(EF_FATALERRS, e->e_flags))
6046 e->e_flags |= EF_FATALERRS;
6051 sm_dprintf("setnewqueue: Assigned queue directory %s\n",
6052 qid_printqueue(e->e_qgrp, e->e_qdir));
6054 if (e->e_xfqgrp == NOQGRP || e->e_xfqdir == NOQDIR)
6056 e->e_xfqgrp = e->e_qgrp;
6057 e->e_xfqdir = e->e_qdir;
6059 e->e_dfqdir = e->e_qdir;
6063 ** CHKQDIR -- check a queue directory
6066 ** name -- name of queue directory
6067 ** sff -- flags for safefile()
6070 ** is it a queue directory?
6135 "queue directory \"%s\": Not safe: %s",
6142 ** MULTIQUEUE_CACHE -- cache a list of paths to queues.
6144 ** Each potential queue is checked as the cache is built.
6150 ** basedir -- base of all queue directories.
6151 ** blen -- strlen(basedir).
6152 ** qg -- queue group.
6153 ** qn -- number of queue directories already cached.
6154 ** phash -- pointer to hash value over queue dirs.
6160 ** new number of queue directories.
6188 if (qg->qg_numqueues != 0 && qg->qg_qpaths != NULL)
6190 for (i = 0; i < qg->qg_numqueues; i++)
6192 if (qg->qg_qpaths[i].qp_name != NULL)
6193 (void) sm_free(qg->qg_qpaths[i].qp_name); /* XXX */
6195 (void) sm_free((char *) qg->qg_qpaths); /* XXX */
6196 qg->qg_qpaths = NULL;
6197 qg->qg_numqueues = 0;
6209 if (!SM_IS_DIR_START(qg->qg_qdir))
6216 syserr("QueuePath %s not absolute", qg->qg_qdir);
6222 len = sm_strlcpy(qpath, qg->qg_qdir, sizeof(qpath));
6226 qg->qg_qdir, (int) sizeof(qpath));
6233 (strncmp(basedir, qpath, blen - 1) != 0 || len != blen - 1))
6242 if (blen < len && SM_FIRST_DIR_DELIM(qg->qg_qdir + blen) != NULL)
6246 if (sm_strlcpy(prefix, qg->qg_qdir + blen, sizeof(prefix)) >=
6250 qg->qg_qdir, (int) sizeof(qpath));
6260 SM_ASSERT(len >= blen - 1);
6261 cp = &qpath[len - 1];
6287 (void) sm_strlcpy(qpath + 1, qpath, sizeof(qpath) - 1);
6292 len = strlen(cp); /* Last component of queue directory */
6307 /* It is always basedir: we don't need to store it per group */
6308 /* XXX: optimize this! -> one more global? */
6309 qg->qg_qdir = newstr(basedir);
6310 qg->qg_qdir[blen - 1] = '\0'; /* cut off trailing / */
6330 syserr("can not opendir(%s/%s)", qg->qg_qdir, prefix);
6333 qg->qg_qdir, prefix,
6341 if (strcmp(d->d_name, ".") == 0 ||
6342 strcmp(d->d_name, "..") == 0)
6345 i = strlen(d->d_name);
6346 if (i < len || strncmp(d->d_name, cp, len) != 0)
6350 d->d_name);
6355 i = sizeof(relpath) - off;
6356 if (sm_strlcpy(relpath + off, d->d_name, i) >= i)
6362 if (qg->qg_qpaths == NULL)
6365 qg->qg_qpaths = (QPATHS *)xalloc((sizeof(*qg->qg_qpaths)) *
6367 qg->qg_numqueues = 0;
6371 qg->qg_qpaths = (QPATHS *)sm_realloc((char *)qg->qg_qpaths,
6372 (sizeof(*qg->qg_qpaths)) *
6373 (qg->qg_numqueues +
6375 if (qg->qg_qpaths == NULL)
6384 qg->qg_qpaths[qg->qg_numqueues].qp_subdirs = QP_NOSUB;
6389 qg->qg_qpaths[qg->qg_numqueues].qp_subdirs |= flag; \
6396 /* assert(strlen(d->d_name) < MAXPATHLEN - 14) */
6397 /* maybe even - 17 (subdirs) */
6400 qg->qg_qpaths[qg->qg_numqueues].qp_name =
6403 qg->qg_qpaths[qg->qg_numqueues].qp_name =
6404 newstr(d->d_name);
6408 qg->qg_numqueues, relpath,
6409 qg->qg_qpaths[qg->qg_numqueues].qp_subdirs);
6411 qg->qg_qpaths[qg->qg_numqueues].qp_idx = qn;
6414 qg->qg_numqueues++;
6416 slotsleft--;
6423 if (qg->qg_numqueues == 0)
6425 qg->qg_qpaths = (QPATHS *) xalloc(sizeof(*qg->qg_qpaths));
6439 qg->qg_qpaths[0].qp_subdirs = QP_NOSUB;
6440 qg->qg_numqueues = 1;
6444 (void) sm_strlcpyn(subdir, sizeof(subdir), 3, qg->qg_qdir, "/", name); \
6446 qg->qg_qpaths[0].qp_subdirs |= flag; \
6453 if (qg->qg_qdir[blen - 1] != '\0' &&
6454 qg->qg_qdir[blen] != '\0')
6461 qg->qg_qpaths[0].qp_name = newstr(qg->qg_qdir + blen);
6462 qg->qg_qdir[blen - 1] = '\0';
6465 qg->qg_qpaths[0].qp_name = newstr(".");
6468 qg->qg_qpaths[0].qp_idx = qn;
6469 *phash = hash_q(qg->qg_qpaths[0].qp_name, *phash);
6477 ** FILESYS_FIND -- find entry in FileSys table, or add new one
6483 ** If the directory does not exist, -1 is returned.
6486 ** name -- name of directory (must be persistent!)
6487 ** path -- pathname of directory (name plus maybe "/df")
6488 ** add -- add to structure if not found.
6493 ** FSF_TOO_MANY: too many filesystems (-> syserr())
6494 ** FSF_STAT_FAIL: can't stat() filesystem (-> syserr())
6500 #define FSF_NOT_FOUND (-1)
6501 #define FSF_STAT_FAIL (-2)
6502 #define FSF_TOO_MANY (-3)
6515 syserr("cannot stat queue directory %s", path);
6537 syserr("too many queue file systems (%d max)", MAXFILESYS);
6552 ** FILESYS_SETUP -- set up mapping from queue directories to file systems
6555 ** free space available in a set of queue directories.
6558 ** add -- initialize structure if necessary.
6564 ** FSF_STAT_FAIL: can't stat() filesystem (-> syserr())
6565 ** FSF_TOO_MANY: too many filesystems (-> syserr())
6579 for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
6581 for (j = 0; j < Queue[i]->qg_numqueues; ++j)
6583 QPATHS *qp = &Queue[i]->qg_qpaths[j];
6586 (void) sm_strlcpyn(qddf, sizeof(qddf), 2, qp->qp_name,
6587 (bitset(QP_SUBDF, qp->qp_subdirs)
6589 fs = filesys_find(qp->qp_name, qddf, add);
6591 qp->qp_fsysidx = fs;
6593 qp->qp_fsysidx = 0;
6602 ** FILESYS_UPDATE -- update amount of free space on all file systems
6605 ** available on all queue directory file systems.
6652 fs->fs_avail = 0; in filesys_update()
6653 fs->fs_blksize = 1024; /* avoid divide by zero */ in filesys_update()
6658 fs->fs_avail = avail; in filesys_update()
6659 fs->fs_blksize = blksize; in filesys_update()
6666 ** FILESYS_FREE -- check whether there is at least one fs with enough space.
6669 ** fsize -- file size in bytes
6701 ** DISK_STATUS -- show amount of free space in queue directories
6704 ** out -- output file pointer.
6705 ** prefix -- string to output in front of each line.
6729 free = -1;
6741 ** INIT_SEM -- initialize semaphore system
6744 ** owner -- is this the owner of semaphores?
6751 static int SemId = -1; /* Semaphore Id */
6783 (long) SemKey, SemId, sm_errstring(-SemId));
6802 ** STOP_SEM -- stop semaphore system
6805 ** owner -- is this the owner of semaphores?
6835 ** OCC_EXCEEDED -- is an outgoing connection limit exceeded?
6838 ** e -- envelope
6839 ** mci -- mail connection information
6840 ** host -- name of host
6841 ** addr -- address of host
6886 mci->mci_flags |= MCIF_OCC_INCR;
6891 ** OCC_CLOSE -- "close" an outgoing connection: update connection status
6894 ** e -- envelope
6895 ** mci -- mail connection information
6896 ** host -- name of host
6897 ** addr -- address of host
6917 if (mci == NULL || mci->mci_state == MCIS_CLOSED ||
6918 bitset(MCIF_CACHED, mci->mci_flags) ||
6919 !bitset(MCIF_OCC_INCR, mci->mci_flags))
6921 mci->mci_flags &= ~MCIF_OCC_INCR;
6927 (void) conn_limits(e, now, addr, SM_CLFL_EXC, occ, -1, -1);
6934 ** UPD_QS -- update information about queue when adding/deleting an entry
6937 ** e -- envelope.
6938 ** count -- add/remove entry (+1/0/-1: add/no change/remove)
6939 ** space -- update the space available as well.
6941 ** where -- caller (for logging)
6948 ** Changes number of entries in queue directory.
6967 if (e->e_qgrp == NOQGRP || e->e_qdir == NOQDIR)
6969 idx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_idx;
6982 fidx = Queue[e->e_qgrp]->qg_qpaths[e->e_qdir].qp_fsysidx;
6991 s = e->e_msgsize / FILE_SYS_BLKSIZE(fidx);
6999 FILE_SYS_AVAIL(fidx) -= s;
7007 ** WRITE_KEY_FILE -- record some key into a file.
7010 ** keypath -- file name.
7011 ** key -- key to write.
7048 fd = keyf->f_file;
7049 if (fd >= 0 && fchown(fd, RunAsUid, -1) < 0)
7067 ** READ_KEY_FILE -- read a key from a file.
7070 ** keypath -- file name.
7071 ** key -- default key.
7108 ** INIT_SHM -- initialize shared memory structure
7117 ** qn -- number of queue directories.
7118 ** owner -- owner of shared memory.
7119 ** hash -- identifies data that is stored in shared memory.
7141 #define SEL_SHM_KEY ((key_t) -1)
7169 /* allow read/write access for group? */
7234 ** Read it from the pid-file? That does
7254 QShm[i].qs_entries = -1;
7274 ** SETUP_QUEUES -- set up all queue groups
7277 ** owner -- owner of shared memory?
7299 ** Determine basedir for all queue directories.
7300 ** All queue directories must be (first level) subdirectories
7310 if (len >= sizeof(basedir) - 1)
7313 len, (int) sizeof(basedir) - 1);
7318 if (basedir[len - 1] == '*')
7336 len = cp - basedir;
7338 else if (!SM_IS_DIR_DELIM(basedir[len - 1]))
7346 SM_ASSERT(basedir[len - 1] == '/');
7366 /* initialize for queue runs */
7369 for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
7370 Queue[i]->qg_nextrun = now;
7389 ** Check queue directory permissions.
7390 ** Can we write to a group writable queue directory?
7398 syserr("can not write to queue directory %s (RunAsGid=%ld, required=%ld)",
7404 syserr("dangerous permissions=%o on queue directory %s",
7409 "dangerous permissions=%o on queue directory %s",
7419 /* initial number of queue directories */
7421 for (i = 0; i < NumQueue && Queue[i] != NULL; i++)
7422 qn = multiqueue_cache(basedir, len, Queue[i], qn, &hashval);
7453 ** CLEANUP_SHM -- do some cleanup work for shared memory etc
7456 ** owner -- owner of shared memory?
7482 ** CLEANUP_QUEUES -- do some cleanup work for queues
7498 ** SET_DEF_QUEUEVAL -- set default values for a queue group.
7501 ** qg -- queue group
7502 ** all -- set all values (true for default group)?
7508 ** sets default values for the queue group.
7516 if (bitnset(QD_DEFINED, qg->qg_flags))
7519 qg->qg_qdir = QueueDir;
7521 qg->qg_sortorder = QueueSortOrder;
7523 qg->qg_maxqrun = all ? MaxRunnersPerQueue : -1;
7524 qg->qg_nice = NiceQueueRun;
7527 ** MAKEQUEUE -- define a new queue.
7530 ** line -- description of queue. This is in labeled fields.
7532 ** F -- the flags associated with the queue
7533 ** I -- the interval between running the queue
7534 ** J -- the maximum # of jobs in work list
7535 ** [M -- the maximum # of jobs in a queue run]
7536 ** N -- the niceness at which to run
7537 ** P -- the path to the queue
7538 ** S -- the queue sorting order
7539 ** R -- number of parallel queue runners
7540 ** r -- max recipients per envelope
7541 ** The first word is the canonical name of the queue.
7542 ** qdef -- this is a 'Q' definition from .cf
7548 ** enters the queue into the queue table.
7562 /* allocate a queue and set up defaults */
7568 syserr("name required for queue");
7572 /* collect the queue name */
7579 qg->qg_name = newstr(line);
7598 syserr("queue %s: `=' expected", qg->qg_name);
7607 /* install the field into the queue struct */
7612 syserr("queue %s: empty path name",
7613 qg->qg_name);
7615 qg->qg_qdir = newstr(p);
7621 setbitn(*p, qg->qg_flags);
7626 ** One for persistent queue runners,
7627 ** one for "normal" queue runs?
7630 case 'I': /* interval between running the queue */
7631 qg->qg_queueintvl = convtime(p, 'm');
7635 qg->qg_nice = atoi(p);
7638 case 'R': /* maximum # of runners for the group */
7644 qg->qg_maxqrun = MaxQueueChildren;
7647 qg->qg_name, i,
7651 qg->qg_maxqrun = i;
7655 qg->qg_maxlist = atoi(p);
7659 qg->qg_maxrcpt = atoi(p);
7663 case 'S': /* queue sorting order */
7668 qg->qg_sortorder = QSO_BYHOST;
7673 qg->qg_sortorder = QSO_BYPRIORITY;
7678 qg->qg_sortorder = QSO_BYTIME;
7683 qg->qg_sortorder = QSO_BYFILENAME;
7688 qg->qg_sortorder = QSO_BYMODTIME;
7693 qg->qg_sortorder = QSO_RANDOM;
7699 qg->qg_sortorder = QSO_BYSHUFFLE;
7705 qg->qg_sortorder = QSO_NONE;
7709 syserr("Invalid queue sort order \"%s\"", p);
7715 syserr("Q%s: unknown queue equate %c=",
7716 qg->qg_name, fcode);
7724 if (qg->qg_nice != NiceQueueRun)
7728 qg->qg_name);
7735 syserr("too many queue groups defined (%d max)",
7740 if (qg->qg_qdir == NULL)
7744 syserr("QueueDir must be defined before queue groups");
7747 qg->qg_qdir = newstr(QueueDir);
7750 if (qg->qg_maxqrun > 1 && !bitnset(QD_FORK, qg->qg_flags))
7753 "Warning: Q=%s: R=%d: multiple queue runners specified\n\tbut flag '%c' is not set\n",
7754 qg->qg_name, qg->qg_maxqrun, QD_FORK);
7757 /* enter the queue into the symbol table */
7760 "Adding %s to stab, path: %s", qg->qg_name,
7761 qg->qg_qdir);
7762 s = stab(qg->qg_name, ST_QUEUE, ST_ENTER);
7763 if (s->s_quegrp != NULL)
7765 i = s->s_quegrp->qg_index;
7768 sm_free(s->s_quegrp); /* XXX */
7772 Queue[i] = s->s_quegrp = qg;
7773 qg->qg_index = i;
7775 /* set default value for max queue runners */
7776 if (qg->qg_maxqrun < 0)
7779 qg->qg_maxqrun = MaxRunnersPerQueue;
7781 qg->qg_maxqrun = 1;
7784 setbitn(QD_DEFINED, qg->qg_flags);
7788 ** HASHFQN -- calculate a hash value for a fully qualified host name
7791 ** fqn -- an all lower-case host.domain string
7792 ** buckets -- the number of buckets (queue directories)
7796 ** -1 on error
7810 return -1;
7814 ** This is the best as of Feb 19, 1996 --bcx
7834 ** A structure for sorting Queue according to maxqrun without
7835 ** screwing up Queue itself.
7841 int sg_maxqrun; /* max queue runners */
7852 if (((SORTQGRP_T *)a)->sg_maxqrun < ((SORTQGRP_T *)b)->sg_maxqrun)
7854 else if (((SORTQGRP_T *)a)->sg_maxqrun > ((SORTQGRP_T *)b)->sg_maxqrun)
7855 return -1;
7861 ** MAKEWORKGROUPS -- balance queue groups into work groups per MaxQueueChildren
7863 ** Take the now defined queue groups and assign them to work groups.
7865 ** queue runners such that MaxQueueChildren is not exceeded. This may
7866 ** result in more than one queue group per work group. In such a case
7867 ** the number of running queue groups in that work group will have no
7868 ** more than the work group maximum number of runners (a "fair" portion
7869 ** of MaxQueueRun). All queue groups within a work group will get a
7889 if (NumQueue == 1 && strcmp(Queue[0]->qg_name, "mqueue") == 0) in makeworkgroups()
7892 ** There is only the "mqueue" queue group (a default) in makeworkgroups()
7894 ** this queue group the maximum allowable queue runners. in makeworkgroups()
7896 ** 1 runner per queue capping it at MaxQueueChildren. in makeworkgroups()
7898 ** for the "mqueue" queue group (where N is kept less than in makeworkgroups()
7905 WorkGrp[0].wg_qgs[0] = Queue[0]; in makeworkgroups()
7907 Queue[0]->qg_numqueues > MaxQueueChildren) in makeworkgroups()
7910 WorkGrp[0].wg_runners = Queue[0]->qg_numqueues; in makeworkgroups()
7912 Queue[0]->qg_wgrp = 0; in makeworkgroups()
7916 Queue[0]->qg_maxqrun > MaxQueueChildren) in makeworkgroups()
7917 Queue[0]->qg_maxqrun = MaxQueueChildren; in makeworkgroups()
7918 WorkGrp[0].wg_maxact = Queue[0]->qg_maxqrun; in makeworkgroups()
7919 WorkGrp[0].wg_lowqintvl = Queue[0]->qg_queueintvl; in makeworkgroups()
7925 si[i].sg_maxqrun = Queue[i]->qg_maxqrun; in makeworkgroups()
7951 ** We now know the number of work groups to pack the queue groups in makeworkgroups()
7952 ** into. The queue groups in 'Queue' are sorted from highest in makeworkgroups()
7953 ** to lowest for the number of runners per queue group. in makeworkgroups()
7954 ** We put the queue groups with the largest number of runners in makeworkgroups()
7966 i, j, h, Queue[h]->qg_maxqrun, IS_BOUNCE_QUEUE(h)); in makeworkgroups()
7969 /* a to-and-fro packing scheme, continue from last position */ in makeworkgroups()
7972 dir = -1; in makeworkgroups()
7973 j = NumWorkGroups - 1; in makeworkgroups()
7995 WorkGrp[j].wg_qgs[WorkGrp[j].wg_numqgrp] = Queue[h]; in makeworkgroups()
7997 WorkGrp[j].wg_runners += Queue[h]->qg_maxqrun; in makeworkgroups()
7998 Queue[h]->qg_wgrp = j; in makeworkgroups()
8004 Queue[h]->qg_maxqrun > MaxQueueChildren) in makeworkgroups()
8005 Queue[h]->qg_maxqrun = MaxQueueChildren; in makeworkgroups()
8006 WorkGrp[j].wg_maxact = Queue[h]->qg_maxqrun; in makeworkgroups()
8011 ** qg1: 2m, qg2: 3m, minimum: 2m, when do queue runs for in makeworkgroups()
8016 if (Queue[h]->qg_queueintvl > 0 && in makeworkgroups()
8017 WorkGrp[j].wg_lowqintvl < Queue[h]->qg_queueintvl) in makeworkgroups()
8018 WorkGrp[j].wg_lowqintvl = Queue[h]->qg_queueintvl; in makeworkgroups()
8029 WorkGrp[i].wg_qgs[j]->qg_name); in makeworkgroups()
8040 ** DUP_DF -- duplicate envelope data file
8046 ** If the old and new queue directories are on different file systems,
8047 ** then the new data file link is created in the old queue directory,
8048 ** and the new queue file will contain a 'd' record pointing to the
8052 ** old -- old envelope.
8053 ** new -- new envelope.
8060 ** On fatal failure, EF_FATALERRS is set in old->e_flags.
8074 if (!bitset(EF_HAS_DF, old->e_flags))
8083 SM_REQUIRE(ISVALIDQGRP(old->e_qgrp) && ISVALIDQDIR(old->e_qdir));
8084 SM_REQUIRE(ISVALIDQGRP(new->e_qgrp) && ISVALIDQDIR(new->e_qdir));
8089 if (old->e_dfp != NULL)
8091 r = sm_io_setinfo(old->e_dfp, SM_BF_COMMIT, NULL);
8095 old->e_flags |= EF_FATALERRS;
8108 SM_REQUIRE(ISVALIDQGRP(old->e_dfqgrp) && ISVALIDQDIR(old->e_dfqdir));
8109 SM_REQUIRE(ISVALIDQGRP(new->e_dfqgrp) && ISVALIDQDIR(new->e_dfqdir));
8111 ofs = Queue[old->e_dfqgrp]->qg_qpaths[old->e_dfqdir].qp_fsysidx;
8112 nfs = Queue[new->e_dfqgrp]->qg_qpaths[new->e_dfqdir].qp_fsysidx;
8117 new->e_flags |= EF_HAS_DF;
8125 ** Can't link across queue directories, so try to create a hard
8126 ** link in the same queue directory as the old df file.
8130 new->e_dfqgrp = old->e_dfqgrp;
8131 new->e_dfqdir = old->e_dfqdir;
8135 new->e_flags |= EF_HAS_DF;
8142 sm_syslog(LOG_ERR, old->e_id,
8149 ** SPLIT_ENV -- Allocate a new envelope based on a given envelope.
8152 ** e -- envelope.
8153 ** sendqueue -- sendqueue for new envelope.
8154 ** qgrp -- index of queue group.
8155 ** qdir -- queue directory.
8173 ee = (ENVELOPE *) sm_rpool_malloc_x(e->e_rpool, sizeof(*ee));
8175 ee->e_message = NULL; /* XXX use original message? */
8176 ee->e_id = NULL;
8178 ee->e_sendqueue = sendqueue;
8179 ee->e_flags &= ~(EF_INQUEUE|EF_CLRQUEUE|EF_FATALERRS
8181 ee->e_flags |= EF_NORECEIPT; /* XXX really? */
8182 ee->e_from.q_state = QS_SENDER;
8183 ee->e_dfp = NULL;
8184 ee->e_lockfp = NULL;
8185 if (e->e_xfp != NULL)
8186 ee->e_xfp = sm_io_dup(e->e_xfp);
8188 /* failed to dup e->e_xfp, start a new transcript */
8189 if (ee->e_xfp == NULL)
8192 ee->e_qgrp = ee->e_dfqgrp = qgrp;
8193 ee->e_qdir = ee->e_dfqdir = qdir;
8194 ee->e_errormode = EM_MAIL;
8195 ee->e_statmsg = NULL;
8196 if (e->e_quarmsg != NULL)
8197 ee->e_quarmsg = sm_rpool_strdup_x(ee->e_rpool,
8198 e->e_quarmsg);
8208 ee->e_header = copyheader(e->e_header, ee->e_rpool);
8209 ee->e_errorqueue = copyqueue(e->e_errorqueue, ee->e_rpool);
8222 ** This function splits an envelope across multiple queue groups
8223 ** based on the queue group of each recipient.
8226 ** e -- envelope.
8234 ** On success, e->e_sibling points to a list of zero or more
8236 ** on disk. But the queue files are not created.
8238 ** On failure, e->e_sibling is not changed.
8239 ** The order of recipients in e->e_sendqueue is permuted.
8255 return (*pq1)->q_qgrp - (*pq2)->q_qgrp;
8267 fs1 = Queue[(*pe1)->e_qgrp]->qg_qpaths[(*pe1)->e_qdir].qp_fsysidx;
8268 fs2 = Queue[(*pe2)->e_qgrp]->qg_qpaths[(*pe2)->e_qdir].qp_fsysidx;
8270 return -1;
8289 SM_REQUIRE(ISVALIDQGRP(e->e_qgrp));
8291 /* Count addresses and assign queue groups. */
8294 for (q = e->e_sendqueue; q != NULL; q = q->q_next)
8296 if (QS_IS_DEAD(q->q_state))
8301 if (QS_IS_BADADDR(q->q_state) ||
8302 QS_IS_SENT(q->q_state))
8303 q->q_qgrp = e->e_qgrp;
8304 else if (!ISVALIDQGRP(q->q_qgrp))
8306 /* call ruleset which should return a queue group */
8307 i = rscap(RS_QUEUEGROUP, q->q_user, NULL, e, &pvp,
8317 q->q_qgrp = i;
8321 "queue group name %s -> %d",
8327 "can't find queue group name %s, selection ignored",
8330 if (q->q_mailer != NULL &&
8331 ISVALIDQGRP(q->q_mailer->m_qgrp))
8334 q->q_qgrp = q->q_mailer->m_qgrp;
8336 else if (ISVALIDQGRP(e->e_qgrp))
8337 q->q_qgrp = e->e_qgrp;
8339 q->q_qgrp = 0;
8347 /* sort the addresses by queue group */
8348 addrs = sm_rpool_malloc_x(e->e_rpool, naddrs * sizeof(ADDRESS *));
8349 for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next)
8351 if (QS_IS_DEAD(q->q_state))
8357 /* split into multiple envelopes, by queue group */
8360 e->e_sendqueue = NULL;
8363 if (i == naddrs - 1 || addrs[i]->q_qgrp != addrs[i + 1]->q_qgrp)
8364 addrs[i]->q_next = NULL;
8366 addrs[i]->q_next = addrs[i + 1];
8368 /* same queue group as original envelope? */
8369 if (addrs[i]->q_qgrp == e->e_qgrp)
8371 if (e->e_sendqueue == NULL)
8372 e->e_sendqueue = addrs[i];
8376 /* different queue group than original envelope */
8377 if (es == NULL || addrs[i]->q_qgrp != es->e_qgrp)
8379 ee = split_env(e, addrs[i], addrs[i]->q_qgrp, NOQDIR);
8389 /* assign a queue directory to each additional envelope */
8394 es->e_qdir = pickqdir(Queue[es->e_qgrp], es->e_msgsize, es);
8400 /* sort the additional envelopes by queue file system */
8412 if (!dup_df(splits[i - 1], splits[i]))
8416 /* success: prepend the new envelopes to the e->e_sibling list */
8420 es->e_sibling = e->e_sibling;
8421 e->e_sibling = es;
8434 e->e_sendqueue = addrs[0];
8435 for (i = 0; i < naddrs - 1; ++i)
8436 addrs[i]->q_next = addrs[i + 1];
8437 addrs[naddrs - 1]->q_next = NULL;
8445 ** envelopes within the same queue directory, if the number of
8446 ** recipients exceeds the limit for the queue group.
8449 ** e -- envelope.
8471 if (!ISVALIDQGRP(e->e_qgrp) || bitset(EF_SPLIT, e->e_flags))
8475 maxrcpt = Queue[e->e_qgrp]->qg_maxrcpt;
8481 for (q = e->e_sendqueue; q != NULL; q = q->q_next)
8483 if (QS_IS_DEAD(q->q_state))
8496 addrs = sm_rpool_malloc_x(e->e_rpool, nrcpt * sizeof(ADDRESS *));
8497 for (i = 0, q = e->e_sendqueue; q != NULL; q = q->q_next)
8499 if (QS_IS_DEAD(q->q_state))
8514 if (QS_IS_BADADDR(addrs[i]->q_state) ||
8515 QS_IS_SENT(addrs[i]->q_state) ||
8516 QS_IS_DEAD(addrs[i]->q_state)) /* for paranoia's sake */
8530 if (nrcpt - ndead <= maxrcpt)
8534 for (i = 0; i < nrcpt - 1; ++i)
8535 addrs[i]->q_next = addrs[i + 1];
8536 addrs[nrcpt - 1]->q_next = NULL;
8537 e->e_sendqueue = addrs[0];
8556 firstsibling = e->e_sibling;
8561 addrs[i - 1]->q_next = NULL;
8562 ee = split_env(e, addrs[i], e->e_qgrp, e->e_qdir);
8570 ee = ee->e_sibling;
8574 e->e_sibling = firstsibling;
8575 for (i = 0; i < nrcpt - 1; ++i)
8576 addrs[i]->q_next = addrs[i + 1];
8582 /* prepend the new envelope to e->e_sibling */
8583 ee->e_sibling = e->e_sibling;
8584 e->e_sibling = ee;
8588 if (j >= l - strlen(ee->e_id) - 3)
8607 ee->e_id,
8608 l - j);
8612 ee->e_id,
8613 l - j);
8617 if (nrcpt - i <= maxrcpt)
8625 sm_syslog(LOG_NOTICE, e->e_id,
8627 maxrcpt, nrcpt - ndead, nsplit,
8641 ** e -- envelope.
8658 if (OpMode == SM_VERIFY || !ISVALIDQGRP(e->e_qgrp) ||
8659 bitset(EF_SPLIT, e->e_flags))
8664 firstsibling = ee = e->e_sibling;
8681 next = ee->e_sibling;
8684 e->e_sibling = firstsibling;
8688 ee->e_flags |= EF_SPLIT;
8691 if (j >= l - strlen(ee->e_id) - 3)
8709 ee->e_id, l - j);
8712 ee->e_id, l - j);
8720 sm_syslog(LOG_NOTICE, e->e_id, "split: count=%d, id%s=%s",
8721 n - 1, n > 2 ? "s" : "", lsplits);
8726 e->e_flags |= EF_SPLIT;
8731 ** QUARANTINE_QUEUE_ITEM -- {un,}quarantine a single envelope
8736 ** qgrp -- queue group for the item
8737 ** qdir -- queue directory in the given queue group
8738 ** e -- envelope information for the item
8739 ** reason -- quarantine reason, NULL means unquarantine.
8745 ** Changes quarantine tag in queue file and renames it.
8792 oldqfp = e->e_lockfp;
8794 /* open the new queue file */
8852 e->e_id, &bp[1]);
8854 sm_syslog(LOG_INFO, e->e_id, "unquarantine");
8864 e->e_id, reason);
8876 e->e_id, &bp[1],
8881 sm_syslog(LOG_INFO, e->e_id, "quarantine=%s",
8901 e->e_id, reason);
8905 sm_syslog(LOG_INFO, e->e_id, "quarantine=%s",
8997 sm_syslog(LOG_DEBUG, e->e_id,
9063 /* for soft-updates */
9069 /* for soft-updates */
9090 ** QUARANTINE_QUEUE -- {un,}quarantine matching items in the queue
9092 ** Read all matching queue items, add/remove quarantine
9096 ** reason -- quarantine reason, "." means unquarantine.
9097 ** qgrplimit -- limit to single queue group unless NOQGRP
9103 ** Lots of changes to the queue.
9124 for (qgrp = 0; qgrp < NumQueue && Queue[qgrp] != NULL; qgrp++)
9131 for (qdir = 0; qdir < Queue[qgrp]->qg_numqueues; qdir++)