OpenBSD-4.6/usr.sbin/smtpd/runner.c

Compare this file to the similar file:
Show the results in this format:

/*	$OpenBSD: runner.c,v 1.53 2009/06/29 10:11:07 martynas Exp $	*/

/*
 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <sys/types.h>
#include <sys/queue.h>
#include <sys/tree.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>

#include <netinet/in.h>
#include <arpa/inet.h>

#include <ctype.h>
#include <dirent.h>
#include <err.h>
#include <errno.h>
#include <event.h>
#include <fcntl.h>
#include <libgen.h>
#include <netdb.h>
#include <pwd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "smtpd.h"

__dead void	runner_shutdown(void);
void		runner_sig_handler(int, short, void *);
void		runner_dispatch_parent(int, short, void *);
void	        runner_dispatch_control(int, short, void *);
void	        runner_dispatch_queue(int, short, void *);
void	        runner_dispatch_mda(int, short, void *);
void		runner_dispatch_mta(int, short, void *);
void		runner_dispatch_lka(int, short, void *);
void		runner_setup_events(struct smtpd *);
void		runner_disable_events(struct smtpd *);

void		runner_reset_flags(void);
void		runner_process_offline(struct smtpd *);

void		runner_timeout(int, short, void *);

void		runner_process_queue(struct smtpd *);
void		runner_process_runqueue(struct smtpd *);
void		runner_process_batchqueue(struct smtpd *);

void		runner_batch_dispatch(struct smtpd *, struct batch *, time_t);

int		runner_message_schedule(struct message *, time_t);

void		runner_purge_run(void);
void		runner_purge_message(char *);

int		runner_check_loop(struct message *);

struct batch	*batch_record(struct smtpd *, struct message *);
struct batch	*batch_lookup(struct smtpd *, struct message *);

int		runner_force_envelope_schedule(char *);
int		runner_force_message_schedule(char *);

void
runner_sig_handler(int sig, short event, void *p)
{
	switch (sig) {
	case SIGINT:
	case SIGTERM:
		runner_shutdown();
		break;
	default:
		fatalx("runner_sig_handler: unexpected signal");
	}
}

void
runner_dispatch_parent(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_PARENT];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_parent: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {
		case IMSG_PARENT_ENQUEUE_OFFLINE:
			runner_process_offline(env);
			break;
		default:
			log_warnx("runner_dispatch_parent: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_parent: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_dispatch_control(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_CONTROL];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_control: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {
		case IMSG_MDA_PAUSE:
			env->sc_opts |= SMTPD_MDA_PAUSED;
			break;
		case IMSG_MTA_PAUSE:
			env->sc_opts |= SMTPD_MTA_PAUSED;
			break;
		case IMSG_MDA_RESUME:
			env->sc_opts &= ~SMTPD_MDA_PAUSED;
			break;
		case IMSG_MTA_RESUME:
			env->sc_opts &= ~SMTPD_MTA_PAUSED;
			break;
		case IMSG_RUNNER_SCHEDULE: {
			struct sched *s = imsg.data;

			IMSG_SIZE_CHECK(s);

			s->ret = 0;
			if (valid_message_uid(s->mid))
				s->ret = runner_force_envelope_schedule(s->mid);
			else if (valid_message_id(s->mid))
				s->ret = runner_force_message_schedule(s->mid);

			imsg_compose_event(iev, IMSG_RUNNER_SCHEDULE, 0, 0, -1, s, sizeof(*s));
			break;
		}
		default:
			log_warnx("runner_dispatch_control: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_control: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_dispatch_queue(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_QUEUE];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_queue: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {
		case IMSG_RUNNER_UPDATE_ENVELOPE: {
			struct message	*m = imsg.data;

			IMSG_SIZE_CHECK(m);

			env->stats->runner.active--;
			queue_message_update(m);
			break;
		}
		default:
			log_warnx("runner_dispatch_queue: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_queue: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_dispatch_mda(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_MDA];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_mda: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {
		default:
			log_warnx("runner_dispatch_mda: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_mda: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_dispatch_mta(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_MTA];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_mta: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {

		default:
			log_warnx("runner_dispatch_mta: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_mta: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_dispatch_lka(int sig, short event, void *p)
{
	struct smtpd		*env = p;
	struct imsgev		*iev;
	struct imsgbuf		*ibuf;
	struct imsg		 imsg;
	ssize_t			 n;

	iev = env->sc_ievs[PROC_LKA];
	ibuf = &iev->ibuf;

	if (event & EV_READ) {
		if ((n = imsg_read(ibuf)) == -1)
			fatal("imsg_read_error");
		if (n == 0) {
			/* this pipe is dead, so remove the event handler */
			event_del(&iev->ev);
			event_loopexit(NULL);
			return;
		}
	}

	if (event & EV_WRITE) {
		if (msgbuf_write(&ibuf->w) == -1)
			fatal("msgbuf_write");
	}

	for (;;) {
		if ((n = imsg_get(ibuf, &imsg)) == -1)
			fatalx("runner_dispatch_lka: imsg_get error");
		if (n == 0)
			break;

		switch (imsg.hdr.type) {
		default:
			log_warnx("runner_dispatch_lka: got imsg %d",
			    imsg.hdr.type);
			fatalx("runner_dispatch_lka: unexpected imsg");
		}
		imsg_free(&imsg);
	}
	imsg_event_add(iev);
}

void
runner_shutdown(void)
{
	log_info("runner handler");
	_exit(0);
}

void
runner_setup_events(struct smtpd *env)
{
	struct timeval	 tv;

	evtimer_set(&env->sc_ev, runner_timeout, env);
	tv.tv_sec = 0;
	tv.tv_usec = 10;
	evtimer_add(&env->sc_ev, &tv);
}

void
runner_disable_events(struct smtpd *env)
{
	evtimer_del(&env->sc_ev);
}

pid_t
runner(struct smtpd *env)
{
	pid_t		 pid;
	struct passwd	*pw;

	struct event	 ev_sigint;
	struct event	 ev_sigterm;

	struct peer peers[] = {
		{ PROC_PARENT,	runner_dispatch_parent },
		{ PROC_CONTROL,	runner_dispatch_control },
		{ PROC_MDA,	runner_dispatch_mda },
		{ PROC_MTA,	runner_dispatch_mta },
		{ PROC_QUEUE,	runner_dispatch_queue },
		{ PROC_LKA,	runner_dispatch_lka },
	};

	switch (pid = fork()) {
	case -1:
		fatal("runner: cannot fork");
	case 0:
		break;
	default:
		return (pid);
	}

	purge_config(env, PURGE_EVERYTHING);

	pw = env->sc_pw;

#ifndef DEBUG
	if (chroot(PATH_SPOOL) == -1)
		fatal("runner: chroot");
	if (chdir("/") == -1)
		fatal("runner: chdir(\"/\")");
#else
#warning disabling privilege revocation and chroot in DEBUG MODE
#endif

	smtpd_process = PROC_RUNNER;
	setproctitle("%s", env->sc_title[smtpd_process]);

#ifndef DEBUG
	if (setgroups(1, &pw->pw_gid) ||
	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
		fatal("runner: cannot drop privileges");
#endif

	SPLAY_INIT(&env->batch_queue);

	event_init();

	signal_set(&ev_sigint, SIGINT, runner_sig_handler, env);
	signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env);
	signal_add(&ev_sigint, NULL);
	signal_add(&ev_sigterm, NULL);
	signal(SIGPIPE, SIG_IGN);
	signal(SIGHUP, SIG_IGN);

	config_pipes(env, peers, nitems(peers));
	config_peers(env, peers, nitems(peers));

	unlink(PATH_QUEUE "/envelope.tmp");
	runner_reset_flags();
	runner_process_offline(env);

	runner_setup_events(env);
	event_dispatch();
	runner_shutdown();

	return (0);
}

void
runner_process_offline(struct smtpd *env)
{
	char		 path[MAXPATHLEN];
	struct qwalk	*q;

	q = qwalk_new(PATH_OFFLINE);

	if (qwalk(q, path))
		imsg_compose_event(env->sc_ievs[PROC_PARENT],
		    IMSG_PARENT_ENQUEUE_OFFLINE, 0, 0, -1, path,
		    strlen(path) + 1);

	qwalk_close(q);
}

void
runner_reset_flags(void)
{
	char		 path[MAXPATHLEN];
	struct message	 message;
	struct qwalk	*q;

	q = qwalk_new(PATH_QUEUE);

	while (qwalk(q, path)) {
		while (! queue_load_envelope(&message, basename(path)))
			sleep(1);

		message.flags &= ~F_MESSAGE_SCHEDULED;
		message.flags &= ~F_MESSAGE_PROCESSING;

		while (! queue_update_envelope(&message))
			sleep(1);
	}

	qwalk_close(q);
}

void
runner_timeout(int fd, short event, void *p)
{
	struct smtpd		*env = p;
	struct timeval		 tv;

	runner_purge_run();

	runner_process_queue(env);
	runner_process_runqueue(env);
	runner_process_batchqueue(env);

	tv.tv_sec = 1;
	tv.tv_usec = 0;
	evtimer_add(&env->sc_ev, &tv);
}

void
runner_process_queue(struct smtpd *env)
{
	char		 path[MAXPATHLEN];
	char		 rqpath[MAXPATHLEN];
	struct message	 message;
	time_t		 now;
	struct qwalk	*q;

	now = time(NULL);
	q = qwalk_new(PATH_QUEUE);

	while (qwalk(q, path)) {
		if (! queue_load_envelope(&message, basename(path)))
			continue;

		if (message.type & T_MDA_MESSAGE)
			if (env->sc_opts & SMTPD_MDA_PAUSED)
				continue;

		if (message.type & T_MTA_MESSAGE)
			if (env->sc_opts & SMTPD_MTA_PAUSED)
				continue;

		if (! runner_message_schedule(&message, now))
			continue;

		if (runner_check_loop(&message)) {
			log_debug("TODO: generate mailer daemon");
			queue_remove_envelope(&message);
			continue;
		}

		message.flags |= F_MESSAGE_SCHEDULED;
		message.flags &= ~F_MESSAGE_FORCESCHEDULE;
		queue_update_envelope(&message);

		if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE,
			basename(path)))
			fatalx("runner_process_queue: snprintf");

		if (symlink(path, rqpath) == -1) {
			if (errno == EEXIST)
				continue;
			if (errno == ENOSPC)
				break;
			fatal("runner_process_queue: symlink");
		}
	}
	
	qwalk_close(q);
}

void
runner_process_runqueue(struct smtpd *env)
{
	char		 path[MAXPATHLEN];
	struct message	 message;
	time_t		 tm;
	struct batch	*batchp;
	struct message	*messagep;
	struct qwalk	*q;

	tm = time(NULL);

	q = qwalk_new(PATH_RUNQUEUE);

	while (qwalk(q, path)) {
		unlink(path);

		if (! queue_load_envelope(&message, basename(path)))
			continue;

		if (message.flags & F_MESSAGE_PROCESSING)
			continue;

		message.lasttry = tm;
		message.flags &= ~F_MESSAGE_SCHEDULED;
		message.flags |= F_MESSAGE_PROCESSING;

		if (! queue_update_envelope(&message))
			continue;

		messagep = calloc(1, sizeof (struct message));
		if (messagep == NULL)
			fatal("runner_process_runqueue: calloc");
		*messagep = message;

		batchp = batch_lookup(env, messagep);
		if (batchp != NULL)
			messagep->batch_id = batchp->id;

		batchp = batch_record(env, messagep);
		if (messagep->batch_id == 0)
			messagep->batch_id = batchp->id;
	}

	qwalk_close(q);
}

void
runner_process_batchqueue(struct smtpd *env)
{
	time_t curtime;
	struct batch *batchp, *nxt;

	curtime = time(NULL);
	for (batchp = SPLAY_MIN(batchtree, &env->batch_queue);
	     batchp != NULL;
	     batchp = nxt) {
		nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp);

		runner_batch_dispatch(env, batchp, curtime);

		SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
		bzero(batchp, sizeof(struct batch));
		free(batchp);
	}
}

void
runner_batch_dispatch(struct smtpd *env, struct batch *batchp, time_t curtime)
{
	u_int8_t proctype;
	struct message *messagep;

	if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0)
		fatal("runner_batch_dispatch: unknown batch type");

	if (batchp->type & T_MDA_BATCH)
		proctype = PROC_MDA;
	else if (batchp->type & T_MTA_BATCH)
		proctype = PROC_MTA;

	imsg_compose_event(env->sc_ievs[proctype], IMSG_BATCH_CREATE, 0, 0, -1,
	    batchp, sizeof (struct batch));

	while ((messagep = TAILQ_FIRST(&batchp->messages))) {
		imsg_compose_event(env->sc_ievs[proctype], IMSG_BATCH_APPEND, 0, 0,
		    -1, messagep, sizeof (struct message));
		TAILQ_REMOVE(&batchp->messages, messagep, entry);
		bzero(messagep, sizeof(struct message));
		free(messagep);
	}

	imsg_compose_event(env->sc_ievs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1,
	    batchp, sizeof(struct batch));
}

int
runner_message_schedule(struct message *messagep, time_t tm)
{
	time_t delay;

	if (messagep->flags & (F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING))
		return 0;

	if (messagep->flags & F_MESSAGE_FORCESCHEDULE)
		return 1;

	/* Batch has been in the queue for too long and expired */
	if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) {
		queue_remove_envelope(messagep);
		return 0;
	}

	if (messagep->lasttry == 0)
		return 1;

	delay = SMTPD_QUEUE_MAXINTERVAL;

	if (messagep->type & T_MDA_MESSAGE) {
		if (messagep->status & S_MESSAGE_LOCKFAILURE) {
			if (messagep->retry < 128)
				return 1;
			delay = (messagep->retry * 60) + arc4random_uniform(60);
		}
		else {
			if (messagep->retry < 5)
				return 1;
			
			if (messagep->retry < 15)
				delay = (messagep->retry * 60) + arc4random_uniform(60);
		}
	}

	if (messagep->type & T_MTA_MESSAGE) {
		if (messagep->retry < 3)
			delay = SMTPD_QUEUE_INTERVAL;
		else if (messagep->retry <= 7) {
			delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3));
			if (delay > SMTPD_QUEUE_MAXINTERVAL)
				delay = SMTPD_QUEUE_MAXINTERVAL;
		}
	}

	if (tm >= messagep->lasttry + delay)
		return 1;

	return 0;
}

int
runner_force_envelope_schedule(char *mid)
{
	struct message message;

	if (! queue_load_envelope(&message, mid))
		return 0;

	if (! message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED))
		return 1;

	message.flags |= F_MESSAGE_FORCESCHEDULE;

	if (! queue_update_envelope(&message))
		return 0;

	return 1;
}

int
runner_force_message_schedule(char *mid)
{
	char path[MAXPATHLEN];
	DIR *dirp;
	struct dirent *dp;

	if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes",
		PATH_QUEUE, queue_hash(mid), mid))
		return 0;

	dirp = opendir(path);
	if (dirp == NULL)
		return 0;

	while ((dp = readdir(dirp)) != NULL) {
		if (valid_message_uid(dp->d_name))
			runner_force_envelope_schedule(dp->d_name);
	}
	closedir(dirp);

	return 1;
}

void
runner_purge_run(void)
{
	char		 path[MAXPATHLEN];
	struct qwalk	*q;

	q = qwalk_new(PATH_PURGE);

	while (qwalk(q, path))
		runner_purge_message(basename(path));

	qwalk_close(q);
}

void
runner_purge_message(char *msgid)
{
	char rootdir[MAXPATHLEN];
	char evpdir[MAXPATHLEN];
	char evppath[MAXPATHLEN];
	char msgpath[MAXPATHLEN];
	DIR *dirp;
	struct dirent *dp;
	
	if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", PATH_PURGE, msgid))
		fatal("runner_purge_message: snprintf");

	if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir,
		PATH_ENVELOPES))
		fatal("runner_purge_message: snprintf");
	
	if (! bsnprintf(msgpath, sizeof(msgpath), "%s/message", rootdir))
		fatal("runner_purge_message: snprintf");

	if (unlink(msgpath) == -1)
		if (errno != ENOENT)
			fatal("runner_purge_message: unlink");

	dirp = opendir(evpdir);
	if (dirp == NULL) {
		if (errno == ENOENT)
			goto delroot;
		fatal("runner_purge_message: opendir");
	}
	while ((dp = readdir(dirp)) != NULL) {
		if (strcmp(dp->d_name, ".") == 0 ||
		    strcmp(dp->d_name, "..") == 0)
			continue;
		if (! bsnprintf(evppath, sizeof(evppath), "%s/%s", evpdir,
			dp->d_name))
			fatal("runner_purge_message: snprintf");

		if (unlink(evppath) == -1)
			if (errno != ENOENT)
				fatal("runner_purge_message: unlink");
	}
	closedir(dirp);

	if (rmdir(evpdir) == -1)
		if (errno != ENOENT)
			fatal("runner_purge_message: rmdir");

delroot:
	if (rmdir(rootdir) == -1)
		if (errno != ENOENT)
			fatal("runner_purge_message: rmdir");
}

struct batch *
batch_record(struct smtpd *env, struct message *messagep)
{
	struct batch *batchp;
	struct path *path;

	batchp = NULL;
	if (messagep->batch_id != 0) {
		batchp = batch_by_id(env, messagep->batch_id);
		if (batchp == NULL)
			fatalx("batch_record: internal inconsistency.");
	}
	if (batchp == NULL) {
		batchp = calloc(1, sizeof(struct batch));
		if (batchp == NULL)
			fatal("batch_record: calloc");

		batchp->id = queue_generate_id();

		(void)strlcpy(batchp->message_id, messagep->message_id,
		    sizeof(batchp->message_id));
		TAILQ_INIT(&batchp->messages);
		SPLAY_INSERT(batchtree, &env->batch_queue, batchp);

		if (messagep->type & T_DAEMON_MESSAGE) {
			batchp->type = T_DAEMON_BATCH;
			path = &messagep->sender;
		}
		else {
			path = &messagep->recipient;
		}
		batchp->rule = path->rule;

		(void)strlcpy(batchp->hostname, path->domain,
		    sizeof(batchp->hostname));

		if (IS_MAILBOX(path->rule.r_action) ||
		    IS_EXT(path->rule.r_action)) {
			batchp->type |= T_MDA_BATCH;
		}
		else {
			batchp->type |= T_MTA_BATCH;
		}
	}

	TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry);
	env->stats->runner.active++;
	return batchp;
}

struct batch *
batch_lookup(struct smtpd *env, struct message *message)
{
	struct batch *batchp;
	struct batch lookup;

	/* We only support delivery of one message at a time, in MDA */
	if (message->type & T_MDA_MESSAGE)
		return NULL;

	/* If message->batch_id != 0, we can retrieve batch by id */
	if (message->batch_id != 0) {
		lookup.id = message->batch_id;
		return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
	}

	/* We do not know the batch_id yet, maybe it was created but we could not
	 * be notified, or it just does not exist. Let's scan to see if we can do
	 * a match based on our message_id and flags.
	 */
	SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {

		if (batchp->type != message->type)
			continue;

		if (strcasecmp(batchp->message_id, message->message_id) != 0)
			continue;

		if (batchp->type & T_MTA_BATCH)
			if (strcasecmp(batchp->hostname, message->recipient.domain) != 0)
				continue;

		break;
	}

	return batchp;
}

int
batch_cmp(struct batch *s1, struct batch *s2)
{
	/*
	 * do not return u_int64_t's
	 */
	if (s1->id < s2->id)
		return (-1);

	if (s1->id > s2->id)
		return (1);

	return (0);
}

int
runner_check_loop(struct message *messagep)
{
	int fd;
	FILE *fp;
	char *buf, *lbuf;
	size_t len;
	struct path chkpath;
	int ret = 0;
	int rcvcount = 0;

	fd = queue_open_message_file(messagep->message_id);
	if ((fp = fdopen(fd, "r")) == NULL)
		fatal("fdopen");

	lbuf = NULL;
	while ((buf = fgetln(fp, &len))) {
		if (buf[len - 1] == '\n')
			buf[len - 1] = '\0';
		else {
			/* EOF without EOL, copy and add the NUL */
			if ((lbuf = malloc(len + 1)) == NULL)
				err(1, NULL);
			memcpy(lbuf, buf, len);
			lbuf[len] = '\0';
			buf = lbuf;
		}

		if (strchr(buf, ':') == NULL && !isspace(*buf))
			break;

		if (strncasecmp("Received: ", buf, 10) == 0) {
			rcvcount++;
			if (rcvcount == MAX_HOPS_COUNT) {
				log_debug("LOOP DETECTED THROUGH RECEIVED LINES COUNT");
				ret = 1;
				break;
			}
		}

		else if (strncasecmp("X-OpenSMTPD-Loop: ", buf, 18) == 0) {
			bzero(&chkpath, sizeof (struct path));
			if (! recipient_to_path(&chkpath, buf + 18))
				continue;

			if (strcasecmp(chkpath.user, messagep->recipient.user) == 0 &&
			    strcasecmp(chkpath.domain, messagep->recipient.domain) == 0) {
				log_debug("LOOP DETECTED THROUGH X-OPENSMTPD-LOOP HEADER: %s@%s",
				    chkpath.user, chkpath.domain);
				ret = 1;
				break;
			}
		}
	}
	free(lbuf);

	fclose(fp);
	return ret;
}

SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);