source: subversion/applications/utils/mod_tile/daemon.c @ 15800

Last change on this file since 15800 was 15800, checked in by jochen, 10 years ago

renderd uses syslog now instead of logging messages to stderr
renderd has -c, --config option to set config file at runtime

File size: 15.9 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13#include <string.h>
14#include <syslog.h>
15#include <getopt.h>
16
17#include "daemon.h"
18#include "gen_tile.h"
19#include "protocol.h"
20#include "render_config.h"
21#include "dir_utils.h"
22
23extern "C" {
24#include "iniparser3.0b/src/iniparser.h"
25}
26
27static pthread_t *render_threads;
28static struct sigaction sigPipeAction;
29
30static struct item reqHead, dirtyHead, renderHead;
31static int reqNum, dirtyNum;
32static pthread_mutex_t qLock;
33static pthread_cond_t qCond;
34
35static renderd_config config;
36
37struct item *fetch_request(void)
38{
39    struct item *item = NULL;
40
41    pthread_mutex_lock(&qLock);
42
43    while (reqNum == 0 && dirtyNum == 0) {
44        pthread_cond_wait(&qCond, &qLock);
45    }
46    if (reqNum) {
47        item = reqHead.next;
48        reqNum--;
49    } else if (dirtyNum) {
50        item = dirtyHead.next;
51        dirtyNum--;
52    }
53    if (item) {
54        item->next->prev = item->prev;
55        item->prev->next = item->next;
56
57        item->prev = &renderHead;
58        item->next = renderHead.next;
59        renderHead.next->prev = item;
60        renderHead.next = item;
61    }
62
63    pthread_mutex_unlock(&qLock);
64
65    return item;
66}
67
68void clear_requests(int fd)
69{
70    struct item *item, *dupes;
71
72    pthread_mutex_lock(&qLock);
73    item = reqHead.next;
74    while (item != &reqHead) {
75        if (item->fd == fd)
76            item->fd = FD_INVALID;
77
78        dupes = item->duplicates;
79        while (dupes) {
80            if (dupes->fd == fd)
81                dupes->fd = FD_INVALID;
82            dupes = dupes->duplicates;
83        }
84        item = item->next;
85    }
86
87    item = renderHead.next;
88    while (item != &renderHead) {
89        if (item->fd == fd)
90            item->fd = FD_INVALID;
91
92        dupes = item->duplicates;
93        while (dupes) {
94            if (dupes->fd == fd)
95                dupes->fd = FD_INVALID;
96            dupes = dupes->duplicates;
97        }
98        item = item->next;
99    }
100
101    pthread_mutex_unlock(&qLock);
102}
103
104static inline const char *cmdStr(enum protoCmd c)
105{
106    switch (c) {
107        case cmdIgnore:  return "Ignore";
108        case cmdRender:  return "Render";
109        case cmdDirty:   return "Dirty";
110        case cmdDone:    return "Done";
111        case cmdNotDone: return "NotDone";
112        default:         return "unknown";
113    }
114}
115
116void send_response(struct item *item, enum protoCmd rsp)
117{
118    struct protocol *req = &item->req;
119    int ret;
120
121    pthread_mutex_lock(&qLock);
122    item->next->prev = item->prev;
123    item->prev->next = item->next;
124    pthread_mutex_unlock(&qLock);
125
126    while (item) {
127        struct item *prev = item;
128        req = &item->req;
129        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
130            req->cmd = rsp;
131            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
132            ret = send(item->fd, req, sizeof(*req), 0);
133            if (ret != sizeof(*req))
134                perror("send error during send_done");
135        }
136        item = item->duplicates;
137        free(prev);
138    }
139}
140
141
142enum protoCmd pending(struct item *test)
143{
144    // check all queues and render list to see if this request already queued
145    // If so, add this new request as a duplicate
146    // call with qLock held
147    struct item *item;
148
149    item = renderHead.next;
150    while (item != &renderHead) {
151        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
152            // Add new test item in the list of duplicates
153            test->duplicates = item->duplicates;
154            item->duplicates = test;
155            return cmdIgnore;
156        }
157        item = item->next;
158    }
159
160    item = reqHead.next;
161    while (item != &reqHead) {
162        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
163            // Add new test item in the list of duplicates
164            test->duplicates = item->duplicates;
165            item->duplicates = test;
166            return cmdIgnore;
167        }
168        item = item->next;
169    }
170
171    item = dirtyHead.next;
172    while (item != &dirtyHead) {
173        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname)))
174            return cmdNotDone;
175        item = item->next;
176    }
177
178    return cmdRender;
179}
180
181
182enum protoCmd rx_request(const struct protocol *req, int fd)
183{
184    struct protocol *reqnew;
185    struct item *list = NULL, *item;
186    enum protoCmd pend;
187
188    // Upgrade version 1 to version 2
189    if (req->ver == 1) {
190        reqnew = (struct protocol *)malloc(sizeof(protocol));
191        memcpy(reqnew, req, sizeof(protocol_v1));
192        reqnew->xmlname[0] = 0;
193        req = reqnew;
194    }
195    else if (req->ver != 2) {
196        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
197        return cmdIgnore;
198    }
199
200    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
201            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
202
203    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
204        return cmdIgnore;
205
206    if (check_xyz(req->x, req->y, req->z))
207        return cmdNotDone;
208
209    item = (struct item *)malloc(sizeof(*item));
210    if (!item) {
211            syslog(LOG_ERR, "malloc failed");
212            return cmdNotDone;
213    }
214
215    item->req = *req;
216    item->duplicates = NULL;
217    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
218
219#ifdef METATILE
220    /* Round down request co-ordinates to the neareast N (should be a power of 2)
221     * Note: request path is no longer consistent but this will be recalculated
222     * when the metatile is being rendered.
223     */
224    item->mx = item->req.x & ~(METATILE-1);
225    item->my = item->req.y & ~(METATILE-1);
226#else
227    item->mx = item->req.x;
228    item->my = item->req.y;
229#endif
230
231    pthread_mutex_lock(&qLock);
232
233    if (dirtyNum == DIRTY_LIMIT) {
234        // The queue is severely backlogged. Drop request
235        pthread_mutex_unlock(&qLock);
236        free(item);
237        return cmdNotDone;
238    }
239
240    // Check for a matching request in the current rendering or dirty queues
241    pend = pending(item);
242    if (pend == cmdNotDone) {
243        // We found a match in the dirty queue, can not wait for it
244        pthread_mutex_unlock(&qLock);
245        free(item);
246        return cmdNotDone;
247    }
248    if (pend == cmdIgnore) {
249        // Found a match in render queue, item added as duplicate
250        pthread_mutex_unlock(&qLock);
251        return cmdIgnore;
252    }
253
254    // New request, add it to render or dirty queue
255    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
256        list = &reqHead;
257        reqNum++;
258    } else if (dirtyNum < DIRTY_LIMIT) {
259        list = &dirtyHead;
260        dirtyNum++;
261        item->fd = FD_INVALID; // No response after render
262    }
263
264    if (list) {
265        item->next = list;
266        item->prev = list->prev;
267        item->prev->next = item;
268        list->prev = item;
269
270        pthread_cond_signal(&qCond);
271    } else
272        free(item);
273
274    pthread_mutex_unlock(&qLock);
275
276    return (list == &reqHead)?cmdIgnore:cmdNotDone;
277}
278
279
280void process_loop(int listen_fd)
281{
282    int num_connections = 0;
283    int connections[MAX_CONNECTIONS];
284
285    bzero(connections, sizeof(connections));
286
287    while (1) {
288        struct sockaddr_un in_addr;
289        socklen_t in_addrlen = sizeof(in_addr);
290        fd_set rd;
291        int incoming, num, nfds, i;
292
293        FD_ZERO(&rd);
294        FD_SET(listen_fd, &rd);
295        nfds = listen_fd+1;
296
297        for (i=0; i<num_connections; i++) {
298            FD_SET(connections[i], &rd);
299            nfds = MAX(nfds, connections[i]+1);
300        }
301
302        num = select(nfds, &rd, NULL, NULL, NULL);
303        if (num == -1)
304            perror("select()");
305        else if (num) {
306            //printf("Data is available now on %d fds\n", num);
307            if (FD_ISSET(listen_fd, &rd)) {
308                num--;
309                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
310                if (incoming < 0) {
311                    perror("accept()");
312                } else {
313                    if (num_connections == MAX_CONNECTIONS) {
314                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
315                        close(incoming);
316                    } else {
317                        connections[num_connections++] = incoming;
318                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
319                    }
320                }
321            }
322            for (i=0; num && (i<num_connections); i++) {
323                int fd = connections[i];
324                if (FD_ISSET(fd, &rd)) {
325                    struct protocol cmd;
326                    int ret;
327
328                    // TODO: to get highest performance we should loop here until we get EAGAIN
329                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
330                    if (ret == sizeof(cmd)) {
331                        enum protoCmd rsp = rx_request(&cmd, fd);
332
333                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
334                            cmd.cmd = rsp;
335                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
336                            ret = send(fd, &cmd, sizeof(cmd), 0);
337                            if (ret != sizeof(cmd))
338                                perror("response send error");
339                        }
340                    } else if (!ret) {
341                        int j;
342
343                        num_connections--;
344                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
345                        for (j=i; j < num_connections; j++)
346                            connections[j] = connections[j+1];
347                        clear_requests(fd);
348                        close(fd);
349                    } else {
350                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
351                        break;
352                    }
353                }
354            }
355        } else {
356            syslog(LOG_ERR, "Select timeout");
357        }
358    }
359}
360
361
362int main(int argc, char **argv)
363{
364    int fd, i;
365    struct sockaddr_un addr;
366    mode_t old;
367    int c;
368    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
369
370    while (1) {
371        int option_index = 0;
372        static struct option long_options[] = {
373            {"config", 1, 0, 'c'},
374            {"help", 0, 0, 'h'},
375            {0, 0, 0, 0}
376        };
377
378        c = getopt_long(argc, argv, "hc:", long_options, &option_index);
379        if (c == -1)
380            break;
381
382        switch (c) {
383            case 'c':
384                strncpy(config_file_name, optarg, PATH_MAX-1);
385                config_file_name[PATH_MAX-1] = 0;
386                break;
387            case 'h':
388                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
389                fprintf(stderr, "Mapnik rendering daemon\n");
390                fprintf(stderr, "  -h, --help           display this help and exit\n");
391                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
392                exit(0);
393            default:
394                fprintf(stderr, "unknown config option '%c'\n", c);
395                exit(1);
396        }
397    }
398
399    openlog("renderd", LOG_PID, LOG_DAEMON);
400    syslog(LOG_INFO, "Rendering daemon started");
401
402    pthread_mutex_init(&qLock, NULL);
403    pthread_cond_init(&qCond, NULL);
404    reqHead.next = reqHead.prev = &reqHead;
405    dirtyHead.next = dirtyHead.prev = &dirtyHead;
406    renderHead.next = renderHead.prev = &renderHead;
407
408    xmlconfigitem maps[XMLCONFIGS_MAX];
409    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
410
411    dictionary *ini = iniparser_load(config_file_name);
412    if (! ini) {
413        exit(1);
414    }
415
416    config.socketname = iniparser_getstring(ini, "renderd:socketname", (char *)RENDER_SOCKET);
417    config.num_threads = iniparser_getint(ini, "renderd:num_threads", NUM_THREADS);
418    config.tile_dir = iniparser_getstring(ini, "renderd:tile_dir", (char *)HASH_PATH);
419    config.mapnik_plugins_dir = iniparser_getstring(ini, "mapnik:plugins_dir", (char *)MAPNIK_PLUGINS);
420    config.mapnik_font_dir = iniparser_getstring(ini, "mapnik:font_dir", (char *)FONT_DIR);
421
422    int iconf = -1;
423    char buffer[PATH_MAX];
424    for (int section=0; section < iniparser_getnsec(ini); section++) {
425        char *name = iniparser_getsecname(ini, section);
426        if (strcmp(name, "renderd") && strcmp(name, "mapnik")) {
427            /* this is a map section */
428            iconf++;
429            if (strlen(name) >= XMLCONFIG_MAX) {
430                fprintf(stderr, "XML name too long: %s\n", name);
431                exit(7);
432            }
433            strcpy(maps[iconf].xmlname, name);
434            if (iconf >= XMLCONFIGS_MAX) {
435                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
436                exit(7);
437            }
438            sprintf(buffer, "%s:uri", name);
439            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
440            if (strlen(ini_uri) >= PATH_MAX) {
441                fprintf(stderr, "URI too long: %s\n", ini_uri);
442                exit(7);
443            }
444            strcpy(maps[iconf].xmluri, ini_uri);
445            sprintf(buffer, "%s:xml", name);
446            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
447            if (strlen(ini_xmlpath) >= PATH_MAX){
448                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
449                exit(7);
450            }
451            strcpy(maps[iconf].xmlfile, ini_xmlpath);
452        }
453    }
454   
455    syslog(LOG_INFO, "config renderd: socketname=%s\n", config.socketname);
456    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
457    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
458    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
459    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
460    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
461    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
462         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s)\n", iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri);
463    }
464
465    fd = socket(PF_UNIX, SOCK_STREAM, 0);
466    if (fd < 0) {
467        fprintf(stderr, "failed to create unix socket\n");
468        exit(2);
469    }
470
471    bzero(&addr, sizeof(addr));
472    addr.sun_family = AF_UNIX;
473    strncpy(addr.sun_path, config.socketname, sizeof(addr.sun_path));
474
475    unlink(addr.sun_path);
476
477    old = umask(0); // Need daemon socket to be writeable by apache
478    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
479        fprintf(stderr, "socket bind failed for: %s\n", config.socketname);
480        close(fd);
481        exit(3);
482    }
483    umask(old);
484
485    if (listen(fd, QUEUE_MAX) < 0) {
486        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
487        close(fd);
488        exit(4);
489    }
490
491#if 0
492    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
493        fprintf(stderr, "setting socket non-block failed\n");
494        close(fd);
495        exit(5);
496    }
497#endif
498
499    //sigPipeAction.sa_handler = pipe_handler;
500    sigPipeAction.sa_handler = SIG_IGN;
501    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
502        fprintf(stderr, "failed to register signal handler\n");
503        close(fd);
504        exit(6);
505    }
506
507    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
508
509    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
510    for(i=0; i<config.num_threads; i++) {
511        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
512            fprintf(stderr, "error spawning render thread\n");
513            close(fd);
514            exit(7);
515        }
516    }
517    process_loop(fd);
518
519    unlink(config.socketname);
520    close(fd);
521    return 0;
522}
Note: See TracBrowser for help on using the repository browser.