source: subversion/applications/utils/mod_tile/daemon.c @ 16729

Last change on this file since 16729 was 16729, checked in by jonb, 10 years ago

Make mod_tile treat font_dir_recurse as a boolean, it will now accept 0/1/n/y/true/false etc

File size: 20.4 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13#include <string.h>
14#include <syslog.h>
15#include <getopt.h>
16
17#include "daemon.h"
18#include "gen_tile.h"
19#include "protocol.h"
20#include "render_config.h"
21#include "dir_utils.h"
22
23#define PIDFILE "/var/run/renderd/renderd.pid"
24
25extern "C" {
26#include "iniparser3.0b/src/iniparser.h"
27}
28
29static pthread_t *render_threads;
30static struct sigaction sigPipeAction;
31
32static struct item reqHead, dirtyHead, renderHead;
33static int reqNum, dirtyNum;
34static pthread_mutex_t qLock;
35static pthread_cond_t qCond;
36
37static stats_struct stats;
38static pthread_t stats_thread;
39
40static renderd_config config;
41
42struct item *fetch_request(void)
43{
44    struct item *item = NULL;
45
46    pthread_mutex_lock(&qLock);
47
48    while (reqNum == 0 && dirtyNum == 0) {
49        pthread_cond_wait(&qCond, &qLock);
50    }
51    if (reqNum) {
52        item = reqHead.next;
53        reqNum--;
54        stats.noReqRender++;
55    } else if (dirtyNum) {
56        item = dirtyHead.next;
57        dirtyNum--;
58        stats.noDirtyRender++;
59    }
60    if (item) {
61        item->next->prev = item->prev;
62        item->prev->next = item->next;
63
64        item->prev = &renderHead;
65        item->next = renderHead.next;
66        renderHead.next->prev = item;
67        renderHead.next = item;
68    }
69
70    pthread_mutex_unlock(&qLock);
71
72    return item;
73}
74
75void clear_requests(int fd)
76{
77    struct item *item, *dupes;
78
79    pthread_mutex_lock(&qLock);
80    item = reqHead.next;
81    while (item != &reqHead) {
82        if (item->fd == fd)
83            item->fd = FD_INVALID;
84
85        dupes = item->duplicates;
86        while (dupes) {
87            if (dupes->fd == fd)
88                dupes->fd = FD_INVALID;
89            dupes = dupes->duplicates;
90        }
91        item = item->next;
92    }
93
94    item = renderHead.next;
95    while (item != &renderHead) {
96        if (item->fd == fd)
97            item->fd = FD_INVALID;
98
99        dupes = item->duplicates;
100        while (dupes) {
101            if (dupes->fd == fd)
102                dupes->fd = FD_INVALID;
103            dupes = dupes->duplicates;
104        }
105        item = item->next;
106    }
107
108    pthread_mutex_unlock(&qLock);
109}
110
111static inline const char *cmdStr(enum protoCmd c)
112{
113    switch (c) {
114        case cmdIgnore:  return "Ignore";
115        case cmdRender:  return "Render";
116        case cmdDirty:   return "Dirty";
117        case cmdDone:    return "Done";
118        case cmdNotDone: return "NotDone";
119        default:         return "unknown";
120    }
121}
122
123void send_response(struct item *item, enum protoCmd rsp)
124{
125    struct protocol *req = &item->req;
126    int ret;
127
128    pthread_mutex_lock(&qLock);
129    item->next->prev = item->prev;
130    item->prev->next = item->next;
131    pthread_mutex_unlock(&qLock);
132
133    while (item) {
134        struct item *prev = item;
135        req = &item->req;
136        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
137            req->cmd = rsp;
138            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
139            ret = send(item->fd, req, sizeof(*req), 0);
140            if (ret != sizeof(*req))
141                perror("send error during send_done");
142        }
143        item = item->duplicates;
144        free(prev);
145    }
146}
147
148
149enum protoCmd pending(struct item *test)
150{
151    // check all queues and render list to see if this request already queued
152    // If so, add this new request as a duplicate
153    // call with qLock held
154    struct item *item;
155
156    item = renderHead.next;
157    while (item != &renderHead) {
158        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
159            // Add new test item in the list of duplicates
160            test->duplicates = item->duplicates;
161            item->duplicates = test;
162            return cmdIgnore;
163        }
164        item = item->next;
165    }
166
167    item = reqHead.next;
168    while (item != &reqHead) {
169        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
170            // Add new test item in the list of duplicates
171            test->duplicates = item->duplicates;
172            item->duplicates = test;
173            return cmdIgnore;
174        }
175        item = item->next;
176    }
177
178    item = dirtyHead.next;
179    while (item != &dirtyHead) {
180        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname)))
181            return cmdNotDone;
182        item = item->next;
183    }
184
185    return cmdRender;
186}
187
188
189enum protoCmd rx_request(const struct protocol *req, int fd)
190{
191    struct protocol *reqnew;
192    struct item *list = NULL, *item;
193    enum protoCmd pend;
194
195    // Upgrade version 1 to version 2
196    if (req->ver == 1) {
197        reqnew = (struct protocol *)malloc(sizeof(protocol));
198        memcpy(reqnew, req, sizeof(protocol_v1));
199        reqnew->xmlname[0] = 0;
200        req = reqnew;
201    }
202    else if (req->ver != 2) {
203        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
204        return cmdIgnore;
205    }
206
207    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
208            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
209
210    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
211        return cmdIgnore;
212
213    if (check_xyz(req->x, req->y, req->z))
214        return cmdNotDone;
215
216    item = (struct item *)malloc(sizeof(*item));
217    if (!item) {
218            syslog(LOG_ERR, "malloc failed");
219            return cmdNotDone;
220    }
221
222    item->req = *req;
223    item->duplicates = NULL;
224    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
225
226#ifdef METATILE
227    /* Round down request co-ordinates to the neareast N (should be a power of 2)
228     * Note: request path is no longer consistent but this will be recalculated
229     * when the metatile is being rendered.
230     */
231    item->mx = item->req.x & ~(METATILE-1);
232    item->my = item->req.y & ~(METATILE-1);
233#else
234    item->mx = item->req.x;
235    item->my = item->req.y;
236#endif
237
238    pthread_mutex_lock(&qLock);
239
240    if (dirtyNum == DIRTY_LIMIT) {
241        // The queue is severely backlogged. Drop request
242        stats.noReqDroped++;
243        pthread_mutex_unlock(&qLock);
244        free(item);
245        return cmdNotDone;
246    }
247
248    // Check for a matching request in the current rendering or dirty queues
249    pend = pending(item);
250    if (pend == cmdNotDone) {
251        // We found a match in the dirty queue, can not wait for it
252        pthread_mutex_unlock(&qLock);
253        free(item);
254        return cmdNotDone;
255    }
256    if (pend == cmdIgnore) {
257        // Found a match in render queue, item added as duplicate
258        pthread_mutex_unlock(&qLock);
259        return cmdIgnore;
260    }
261
262    // New request, add it to render or dirty queue
263    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
264        list = &reqHead;
265        reqNum++;
266    } else if (dirtyNum < DIRTY_LIMIT) {
267        list = &dirtyHead;
268        dirtyNum++;
269        item->fd = FD_INVALID; // No response after render
270    }
271
272    if (list) {
273        item->next = list;
274        item->prev = list->prev;
275        item->prev->next = item;
276        list->prev = item;
277
278        pthread_cond_signal(&qCond);
279    } else
280        free(item);
281
282    pthread_mutex_unlock(&qLock);
283
284    return (list == &reqHead)?cmdIgnore:cmdNotDone;
285}
286
287
288void process_loop(int listen_fd)
289{
290    int num_connections = 0;
291    int connections[MAX_CONNECTIONS];
292
293    bzero(connections, sizeof(connections));
294
295    while (1) {
296        struct sockaddr_un in_addr;
297        socklen_t in_addrlen = sizeof(in_addr);
298        fd_set rd;
299        int incoming, num, nfds, i;
300
301        FD_ZERO(&rd);
302        FD_SET(listen_fd, &rd);
303        nfds = listen_fd+1;
304
305        for (i=0; i<num_connections; i++) {
306            FD_SET(connections[i], &rd);
307            nfds = MAX(nfds, connections[i]+1);
308        }
309
310        num = select(nfds, &rd, NULL, NULL, NULL);
311        if (num == -1)
312            perror("select()");
313        else if (num) {
314            //printf("Data is available now on %d fds\n", num);
315            if (FD_ISSET(listen_fd, &rd)) {
316                num--;
317                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
318                if (incoming < 0) {
319                    perror("accept()");
320                } else {
321                    if (num_connections == MAX_CONNECTIONS) {
322                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
323                        close(incoming);
324                    } else {
325                        connections[num_connections++] = incoming;
326                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
327                    }
328                }
329            }
330            for (i=0; num && (i<num_connections); i++) {
331                int fd = connections[i];
332                if (FD_ISSET(fd, &rd)) {
333                    struct protocol cmd;
334                    int ret;
335
336                    // TODO: to get highest performance we should loop here until we get EAGAIN
337                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
338                    if (ret == sizeof(cmd)) {
339                        enum protoCmd rsp = rx_request(&cmd, fd);
340
341                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
342                            cmd.cmd = rsp;
343                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
344                            ret = send(fd, &cmd, sizeof(cmd), 0);
345                            if (ret != sizeof(cmd))
346                                perror("response send error");
347                        }
348                    } else if (!ret) {
349                        int j;
350
351                        num_connections--;
352                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
353                        for (j=i; j < num_connections; j++)
354                            connections[j] = connections[j+1];
355                        clear_requests(fd);
356                        close(fd);
357                    } else {
358                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
359                        break;
360                    }
361                }
362            }
363        } else {
364            syslog(LOG_ERR, "Select timeout");
365        }
366    }
367}
368
369/**
370 * Periodically write out current stats to a stats file. This information
371 * can then be used to monitor performance of renderd e.g. with a munin plugin
372 */
373void *stats_writeout_thread(void * arg) {
374    stats_struct lStats;
375    int dirtQueueLength;
376    int reqQueueLength;
377    int noFailedAttempts = 0;
378    char tmpName[PATH_MAX];
379
380    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
381
382    syslog(LOG_DEBUG, "Starting stats thread");
383    while (1) {
384        pthread_mutex_lock(&qLock);
385        memcpy(&lStats, &stats, sizeof(stats_struct));
386        dirtQueueLength = dirtyNum;
387        reqQueueLength = reqNum;
388        pthread_mutex_unlock(&qLock);
389
390        FILE * statfile = fopen(tmpName, "w");
391        if (statfile == NULL) {
392            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
393            noFailedAttempts++;
394            if (noFailedAttempts > 3) {
395                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
396                break;
397            }
398            continue;
399        } else {
400            noFailedAttempts = 0;
401            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
402            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
403            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
404            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
405            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
406            fclose(statfile);
407            if (rename(tmpName, config.stats_filename)) {
408                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
409                noFailedAttempts++;
410                if (noFailedAttempts > 3) {
411                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
412                    break;
413                }
414                continue;
415            }
416        }
417        sleep(10);
418    }
419    return NULL;
420}
421
422int main(int argc, char **argv)
423{
424    int fd, i;
425    struct sockaddr_un addr;
426    mode_t old;
427    int c;
428    int foreground=0;
429    int log_options;
430    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
431
432    while (1) {
433        int option_index = 0;
434        static struct option long_options[] = {
435            {"config", 1, 0, 'c'},
436            {"foreground", 1, 0, 'f'},
437            {"help", 0, 0, 'h'},
438            {0, 0, 0, 0}
439        };
440
441        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
442        if (c == -1)
443            break;
444
445        switch (c) {
446            case 'f':
447                foreground=1;
448                break;
449            case 'c':
450                strncpy(config_file_name, optarg, PATH_MAX-1);
451                config_file_name[PATH_MAX-1] = 0;
452                break;
453            case 'h':
454                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
455                fprintf(stderr, "Mapnik rendering daemon\n");
456                fprintf(stderr, "  -f, --foreground     run in foreground\n");
457                fprintf(stderr, "  -h, --help           display this help and exit\n");
458                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
459                exit(0);
460            default:
461                fprintf(stderr, "unknown config option '%c'\n", c);
462                exit(1);
463        }
464    }
465
466    log_options = LOG_PID;
467#ifdef LOG_PERROR
468    if (foreground)
469        log_options |= LOG_PERROR;
470#endif
471    openlog("renderd", log_options, LOG_DAEMON);
472
473    syslog(LOG_INFO, "Rendering daemon started");
474
475    pthread_mutex_init(&qLock, NULL);
476    pthread_cond_init(&qCond, NULL);
477    reqHead.next = reqHead.prev = &reqHead;
478    dirtyHead.next = dirtyHead.prev = &dirtyHead;
479    renderHead.next = renderHead.prev = &renderHead;
480
481    stats.noDirtyRender = 0;
482    stats.noReqDroped = 0;
483    stats.noReqRender = 0;
484
485    xmlconfigitem maps[XMLCONFIGS_MAX];
486    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
487
488    dictionary *ini = iniparser_load(config_file_name);
489    if (! ini) {
490        exit(1);
491    }
492
493    config.socketname = iniparser_getstring(ini, "renderd:socketname", (char *)RENDER_SOCKET);
494    config.num_threads = iniparser_getint(ini, "renderd:num_threads", NUM_THREADS);
495    config.tile_dir = iniparser_getstring(ini, "renderd:tile_dir", (char *)HASH_PATH);
496    config.stats_filename = iniparser_getstring(ini, "renderd:stats_file", NULL);
497    config.mapnik_plugins_dir = iniparser_getstring(ini, "mapnik:plugins_dir", (char *)MAPNIK_PLUGINS);
498    config.mapnik_font_dir = iniparser_getstring(ini, "mapnik:font_dir", (char *)FONT_DIR);
499    config.mapnik_font_dir_recurse = iniparser_getboolean(ini, "mapnik:font_dir_recurse", FONT_RECURSE);
500
501
502    int iconf = -1;
503    char buffer[PATH_MAX];
504    for (int section=0; section < iniparser_getnsec(ini); section++) {
505        char *name = iniparser_getsecname(ini, section);
506        if (strcmp(name, "renderd") && strcmp(name, "mapnik")) {
507            /* this is a map section */
508            iconf++;
509            if (strlen(name) >= XMLCONFIG_MAX) {
510                fprintf(stderr, "XML name too long: %s\n", name);
511                exit(7);
512            }
513
514            strcpy(maps[iconf].xmlname, name);
515            if (iconf >= XMLCONFIGS_MAX) {
516                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
517                exit(7);
518            }
519            sprintf(buffer, "%s:uri", name);
520            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
521            if (strlen(ini_uri) >= PATH_MAX) {
522                fprintf(stderr, "URI too long: %s\n", ini_uri);
523                exit(7);
524            }
525            strcpy(maps[iconf].xmluri, ini_uri);
526            sprintf(buffer, "%s:xml", name);
527            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
528            if (strlen(ini_xmlpath) >= PATH_MAX){
529                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
530                exit(7);
531            }
532            sprintf(buffer, "%s:host", name);
533            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
534            if (strlen(ini_hostname) >= PATH_MAX) {
535                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
536                exit(7);
537            }
538
539            sprintf(buffer, "%s:htcphost", name);
540            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
541            if (strlen(ini_htcpip) >= PATH_MAX) {
542                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
543                exit(7);
544            }
545            strcpy(maps[iconf].xmlfile, ini_xmlpath);
546            strcpy(maps[iconf].tile_dir, config.tile_dir);
547            strcpy(maps[iconf].host, ini_hostname);
548            strcpy(maps[iconf].htcpip, ini_htcpip);
549        }
550    }
551
552    syslog(LOG_INFO, "config renderd: socketname=%s\n", config.socketname);
553    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
554    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
555    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
556    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
557    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
558    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
559    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
560        if (maps[iconf].xmlname[0] != 0) {
561         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
562                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
563                 maps[iconf].htcpip, maps[iconf].host);
564        }
565    }
566
567    fd = socket(PF_UNIX, SOCK_STREAM, 0);
568    if (fd < 0) {
569        fprintf(stderr, "failed to create unix socket: %s\n", strerror(errno));
570        exit(2);
571    }
572
573    bzero(&addr, sizeof(addr));
574    addr.sun_family = AF_UNIX;
575    strncpy(addr.sun_path, config.socketname, sizeof(addr.sun_path));
576
577    unlink(addr.sun_path);
578
579    old = umask(0); // Need daemon socket to be writeable by apache
580    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
581        fprintf(stderr, "socket bind failed for `%s': %s\n", config.socketname, strerror(errno));
582        close(fd);
583        exit(3);
584    }
585    umask(old);
586
587    if (listen(fd, QUEUE_MAX) < 0) {
588        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
589        close(fd);
590        exit(4);
591    }
592
593#if 0
594    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
595        fprintf(stderr, "setting socket non-block failed\n");
596        close(fd);
597        exit(5);
598    }
599#endif
600
601    //sigPipeAction.sa_handler = pipe_handler;
602    sigPipeAction.sa_handler = SIG_IGN;
603    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
604        fprintf(stderr, "failed to register signal handler\n");
605        close(fd);
606        exit(6);
607    }
608
609    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
610
611    /* unless the command line said to run in foreground mode, fork and detach from terminal */
612    if (foreground) {
613        fprintf(stderr, "Running in foreground mode...\n");
614    } else {
615        if (daemon(0, 0) != 0) {
616            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
617        }
618        /* write pid file */
619        FILE *pidfile = fopen(PIDFILE, "w");
620        if (pidfile) {
621            (void) fprintf(pidfile, "%d\n", getpid());
622            (void) fclose(pidfile);
623        }
624    }
625
626    if (config.stats_filename != NULL) {
627        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
628            syslog(LOG_WARNING, "Could not create stats writeout thread");
629        }
630    } else {
631        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
632    }
633
634    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
635    for(i=0; i<config.num_threads; i++) {
636        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
637            fprintf(stderr, "error spawning render thread\n");
638            close(fd);
639            exit(7);
640        }
641    }
642
643    process_loop(fd);
644
645    unlink(config.socketname);
646    close(fd);
647    return 0;
648}
Note: See TracBrowser for help on using the repository browser.