source: subversion/applications/utils/mod_tile/daemon.c @ 15864

Last change on this file since 15864 was 15864, checked in by jochen, 11 years ago

Fixes to debian scripts

renderd starts in background mode now by default (use -fforeground for old behaviour)
File size: 17.5 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13#include <string.h>
14#include <syslog.h>
15#include <getopt.h>
16
17#include "daemon.h"
18#include "gen_tile.h"
19#include "protocol.h"
20#include "render_config.h"
21#include "dir_utils.h"
22
23#define PIDFILE "/var/run/renderd/renderd.pid"
24
25extern "C" {
26#include "iniparser3.0b/src/iniparser.h"
27}
28
29static pthread_t *render_threads;
30static struct sigaction sigPipeAction;
31
32static struct item reqHead, dirtyHead, renderHead;
33static int reqNum, dirtyNum;
34static pthread_mutex_t qLock;
35static pthread_cond_t qCond;
36
37static renderd_config config;
38
39struct item *fetch_request(void)
40{
41    struct item *item = NULL;
42
43    pthread_mutex_lock(&qLock);
44
45    while (reqNum == 0 && dirtyNum == 0) {
46        pthread_cond_wait(&qCond, &qLock);
47    }
48    if (reqNum) {
49        item = reqHead.next;
50        reqNum--;
51    } else if (dirtyNum) {
52        item = dirtyHead.next;
53        dirtyNum--;
54    }
55    if (item) {
56        item->next->prev = item->prev;
57        item->prev->next = item->next;
58
59        item->prev = &renderHead;
60        item->next = renderHead.next;
61        renderHead.next->prev = item;
62        renderHead.next = item;
63    }
64
65    pthread_mutex_unlock(&qLock);
66
67    return item;
68}
69
70void clear_requests(int fd)
71{
72    struct item *item, *dupes;
73
74    pthread_mutex_lock(&qLock);
75    item = reqHead.next;
76    while (item != &reqHead) {
77        if (item->fd == fd)
78            item->fd = FD_INVALID;
79
80        dupes = item->duplicates;
81        while (dupes) {
82            if (dupes->fd == fd)
83                dupes->fd = FD_INVALID;
84            dupes = dupes->duplicates;
85        }
86        item = item->next;
87    }
88
89    item = renderHead.next;
90    while (item != &renderHead) {
91        if (item->fd == fd)
92            item->fd = FD_INVALID;
93
94        dupes = item->duplicates;
95        while (dupes) {
96            if (dupes->fd == fd)
97                dupes->fd = FD_INVALID;
98            dupes = dupes->duplicates;
99        }
100        item = item->next;
101    }
102
103    pthread_mutex_unlock(&qLock);
104}
105
106static inline const char *cmdStr(enum protoCmd c)
107{
108    switch (c) {
109        case cmdIgnore:  return "Ignore";
110        case cmdRender:  return "Render";
111        case cmdDirty:   return "Dirty";
112        case cmdDone:    return "Done";
113        case cmdNotDone: return "NotDone";
114        default:         return "unknown";
115    }
116}
117
118void send_response(struct item *item, enum protoCmd rsp)
119{
120    struct protocol *req = &item->req;
121    int ret;
122
123    pthread_mutex_lock(&qLock);
124    item->next->prev = item->prev;
125    item->prev->next = item->next;
126    pthread_mutex_unlock(&qLock);
127
128    while (item) {
129        struct item *prev = item;
130        req = &item->req;
131        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
132            req->cmd = rsp;
133            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
134            ret = send(item->fd, req, sizeof(*req), 0);
135            if (ret != sizeof(*req))
136                perror("send error during send_done");
137        }
138        item = item->duplicates;
139        free(prev);
140    }
141}
142
143
144enum protoCmd pending(struct item *test)
145{
146    // check all queues and render list to see if this request already queued
147    // If so, add this new request as a duplicate
148    // call with qLock held
149    struct item *item;
150
151    item = renderHead.next;
152    while (item != &renderHead) {
153        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
154            // Add new test item in the list of duplicates
155            test->duplicates = item->duplicates;
156            item->duplicates = test;
157            return cmdIgnore;
158        }
159        item = item->next;
160    }
161
162    item = reqHead.next;
163    while (item != &reqHead) {
164        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
165            // Add new test item in the list of duplicates
166            test->duplicates = item->duplicates;
167            item->duplicates = test;
168            return cmdIgnore;
169        }
170        item = item->next;
171    }
172
173    item = dirtyHead.next;
174    while (item != &dirtyHead) {
175        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname)))
176            return cmdNotDone;
177        item = item->next;
178    }
179
180    return cmdRender;
181}
182
183
184enum protoCmd rx_request(const struct protocol *req, int fd)
185{
186    struct protocol *reqnew;
187    struct item *list = NULL, *item;
188    enum protoCmd pend;
189
190    // Upgrade version 1 to version 2
191    if (req->ver == 1) {
192        reqnew = (struct protocol *)malloc(sizeof(protocol));
193        memcpy(reqnew, req, sizeof(protocol_v1));
194        reqnew->xmlname[0] = 0;
195        req = reqnew;
196    }
197    else if (req->ver != 2) {
198        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
199        return cmdIgnore;
200    }
201
202    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
203            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
204
205    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
206        return cmdIgnore;
207
208    if (check_xyz(req->x, req->y, req->z))
209        return cmdNotDone;
210
211    item = (struct item *)malloc(sizeof(*item));
212    if (!item) {
213            syslog(LOG_ERR, "malloc failed");
214            return cmdNotDone;
215    }
216
217    item->req = *req;
218    item->duplicates = NULL;
219    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
220
221#ifdef METATILE
222    /* Round down request co-ordinates to the neareast N (should be a power of 2)
223     * Note: request path is no longer consistent but this will be recalculated
224     * when the metatile is being rendered.
225     */
226    item->mx = item->req.x & ~(METATILE-1);
227    item->my = item->req.y & ~(METATILE-1);
228#else
229    item->mx = item->req.x;
230    item->my = item->req.y;
231#endif
232
233    pthread_mutex_lock(&qLock);
234
235    if (dirtyNum == DIRTY_LIMIT) {
236        // The queue is severely backlogged. Drop request
237        pthread_mutex_unlock(&qLock);
238        free(item);
239        return cmdNotDone;
240    }
241
242    // Check for a matching request in the current rendering or dirty queues
243    pend = pending(item);
244    if (pend == cmdNotDone) {
245        // We found a match in the dirty queue, can not wait for it
246        pthread_mutex_unlock(&qLock);
247        free(item);
248        return cmdNotDone;
249    }
250    if (pend == cmdIgnore) {
251        // Found a match in render queue, item added as duplicate
252        pthread_mutex_unlock(&qLock);
253        return cmdIgnore;
254    }
255
256    // New request, add it to render or dirty queue
257    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
258        list = &reqHead;
259        reqNum++;
260    } else if (dirtyNum < DIRTY_LIMIT) {
261        list = &dirtyHead;
262        dirtyNum++;
263        item->fd = FD_INVALID; // No response after render
264    }
265
266    if (list) {
267        item->next = list;
268        item->prev = list->prev;
269        item->prev->next = item;
270        list->prev = item;
271
272        pthread_cond_signal(&qCond);
273    } else
274        free(item);
275
276    pthread_mutex_unlock(&qLock);
277
278    return (list == &reqHead)?cmdIgnore:cmdNotDone;
279}
280
281
282void process_loop(int listen_fd)
283{
284    int num_connections = 0;
285    int connections[MAX_CONNECTIONS];
286
287    bzero(connections, sizeof(connections));
288
289    while (1) {
290        struct sockaddr_un in_addr;
291        socklen_t in_addrlen = sizeof(in_addr);
292        fd_set rd;
293        int incoming, num, nfds, i;
294
295        FD_ZERO(&rd);
296        FD_SET(listen_fd, &rd);
297        nfds = listen_fd+1;
298
299        for (i=0; i<num_connections; i++) {
300            FD_SET(connections[i], &rd);
301            nfds = MAX(nfds, connections[i]+1);
302        }
303
304        num = select(nfds, &rd, NULL, NULL, NULL);
305        if (num == -1)
306            perror("select()");
307        else if (num) {
308            //printf("Data is available now on %d fds\n", num);
309            if (FD_ISSET(listen_fd, &rd)) {
310                num--;
311                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
312                if (incoming < 0) {
313                    perror("accept()");
314                } else {
315                    if (num_connections == MAX_CONNECTIONS) {
316                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
317                        close(incoming);
318                    } else {
319                        connections[num_connections++] = incoming;
320                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
321                    }
322                }
323            }
324            for (i=0; num && (i<num_connections); i++) {
325                int fd = connections[i];
326                if (FD_ISSET(fd, &rd)) {
327                    struct protocol cmd;
328                    int ret;
329
330                    // TODO: to get highest performance we should loop here until we get EAGAIN
331                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
332                    if (ret == sizeof(cmd)) {
333                        enum protoCmd rsp = rx_request(&cmd, fd);
334
335                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
336                            cmd.cmd = rsp;
337                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
338                            ret = send(fd, &cmd, sizeof(cmd), 0);
339                            if (ret != sizeof(cmd))
340                                perror("response send error");
341                        }
342                    } else if (!ret) {
343                        int j;
344
345                        num_connections--;
346                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
347                        for (j=i; j < num_connections; j++)
348                            connections[j] = connections[j+1];
349                        clear_requests(fd);
350                        close(fd);
351                    } else {
352                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
353                        break;
354                    }
355                }
356            }
357        } else {
358            syslog(LOG_ERR, "Select timeout");
359        }
360    }
361}
362
363
364int main(int argc, char **argv)
365{
366    int fd, i;
367    struct sockaddr_un addr;
368    mode_t old;
369    int c;
370    int foreground=0;
371    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
372
373    while (1) {
374        int option_index = 0;
375        static struct option long_options[] = {
376            {"config", 1, 0, 'c'},
377            {"foreground", 1, 0, 'f'},
378            {"help", 0, 0, 'h'},
379            {0, 0, 0, 0}
380        };
381
382        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
383        if (c == -1)
384            break;
385
386        switch (c) {
387            case 'f':
388                foreground=1;
389                break;
390            case 'c':
391                strncpy(config_file_name, optarg, PATH_MAX-1);
392                config_file_name[PATH_MAX-1] = 0;
393                break;
394            case 'h':
395                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
396                fprintf(stderr, "Mapnik rendering daemon\n");
397                fprintf(stderr, "  -f, --foreground     run in foreground\n");
398                fprintf(stderr, "  -h, --help           display this help and exit\n");
399                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
400                exit(0);
401            default:
402                fprintf(stderr, "unknown config option '%c'\n", c);
403                exit(1);
404        }
405    }
406
407    openlog("renderd", LOG_PID, LOG_DAEMON);
408    syslog(LOG_INFO, "Rendering daemon started");
409
410    pthread_mutex_init(&qLock, NULL);
411    pthread_cond_init(&qCond, NULL);
412    reqHead.next = reqHead.prev = &reqHead;
413    dirtyHead.next = dirtyHead.prev = &dirtyHead;
414    renderHead.next = renderHead.prev = &renderHead;
415
416    xmlconfigitem maps[XMLCONFIGS_MAX];
417    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
418
419    dictionary *ini = iniparser_load(config_file_name);
420    if (! ini) {
421        exit(1);
422    }
423
424    config.socketname = iniparser_getstring(ini, "renderd:socketname", (char *)RENDER_SOCKET);
425    config.num_threads = iniparser_getint(ini, "renderd:num_threads", NUM_THREADS);
426    config.tile_dir = iniparser_getstring(ini, "renderd:tile_dir", (char *)HASH_PATH);
427    config.mapnik_plugins_dir = iniparser_getstring(ini, "mapnik:plugins_dir", (char *)MAPNIK_PLUGINS);
428    config.mapnik_font_dir = iniparser_getstring(ini, "mapnik:font_dir", (char *)FONT_DIR);
429
430    int iconf = -1;
431    char buffer[PATH_MAX];
432    for (int section=0; section < iniparser_getnsec(ini); section++) {
433        char *name = iniparser_getsecname(ini, section);
434        if (strcmp(name, "renderd") && strcmp(name, "mapnik")) {
435            /* this is a map section */
436            iconf++;
437            if (strlen(name) >= XMLCONFIG_MAX) {
438                fprintf(stderr, "XML name too long: %s\n", name);
439                exit(7);
440            }
441
442            strcpy(maps[iconf].xmlname, name);
443            if (iconf >= XMLCONFIGS_MAX) {
444                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
445                exit(7);
446            }
447            sprintf(buffer, "%s:uri", name);
448            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
449            if (strlen(ini_uri) >= PATH_MAX) {
450                fprintf(stderr, "URI too long: %s\n", ini_uri);
451                exit(7);
452            }
453            strcpy(maps[iconf].xmluri, ini_uri);
454            sprintf(buffer, "%s:xml", name);
455            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
456            if (strlen(ini_xmlpath) >= PATH_MAX){
457                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
458                exit(7);
459            }
460            sprintf(buffer, "%s:host", name);
461            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
462            if (strlen(ini_hostname) >= PATH_MAX) {
463                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
464                exit(7);
465            }
466
467            sprintf(buffer, "%s:htcphost", name);
468            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
469            if (strlen(ini_htcpip) >= PATH_MAX) {
470                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
471                exit(7);
472            }
473            strcpy(maps[iconf].xmlfile, ini_xmlpath);
474            strcpy(maps[iconf].tile_dir, config.tile_dir);
475            strcpy(maps[iconf].host, ini_hostname);
476            strcpy(maps[iconf].htcpip, ini_htcpip);
477        }
478    }
479
480    syslog(LOG_INFO, "config renderd: socketname=%s\n", config.socketname);
481    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
482    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
483    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
484    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
485    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
486    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
487         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
488                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
489                 maps[iconf].htcpip, maps[iconf].host);
490    }
491
492    fd = socket(PF_UNIX, SOCK_STREAM, 0);
493    if (fd < 0) {
494        fprintf(stderr, "failed to create unix socket\n");
495        exit(2);
496    }
497
498    bzero(&addr, sizeof(addr));
499    addr.sun_family = AF_UNIX;
500    strncpy(addr.sun_path, config.socketname, sizeof(addr.sun_path));
501
502    unlink(addr.sun_path);
503
504    old = umask(0); // Need daemon socket to be writeable by apache
505    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
506        fprintf(stderr, "socket bind failed for: %s\n", config.socketname);
507        close(fd);
508        exit(3);
509    }
510    umask(old);
511
512    if (listen(fd, QUEUE_MAX) < 0) {
513        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
514        close(fd);
515        exit(4);
516    }
517
518#if 0
519    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
520        fprintf(stderr, "setting socket non-block failed\n");
521        close(fd);
522        exit(5);
523    }
524#endif
525
526    //sigPipeAction.sa_handler = pipe_handler;
527    sigPipeAction.sa_handler = SIG_IGN;
528    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
529        fprintf(stderr, "failed to register signal handler\n");
530        close(fd);
531        exit(6);
532    }
533
534    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
535
536    /* unless the command line said to run in foreground mode, fork and detach from terminal */
537    if (foreground) {
538        fprintf(stderr, "Running in foreground mode...\n");
539    } else {
540        if (daemon(0, 0) != 0) {
541            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
542        }
543        /* write pid file */
544        FILE *pidfile = fopen(PIDFILE, "w");
545        if (pidfile) {
546            (void) fprintf(pidfile, "%d\n", getpid());
547            (void) fclose(pidfile);
548        }
549    }
550
551    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
552    for(i=0; i<config.num_threads; i++) {
553        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
554            fprintf(stderr, "error spawning render thread\n");
555            close(fd);
556            exit(7);
557        }
558    }
559
560    process_loop(fd);
561
562    unlink(config.socketname);
563    close(fd);
564    return 0;
565}
Note: See TracBrowser for help on using the repository browser.