source: subversion/applications/utils/mod_tile/daemon.c @ 13125

Last change on this file since 13125 was 12950, checked in by twain, 11 years ago

mod_tile: New apache directives and options for multiple tile sets

File size: 13.5 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13
14#include "daemon.h"
15#include "gen_tile.h"
16#include "protocol.h"
17#include "render_config.h"
18#include "dir_utils.h"
19
20static pthread_t render_threads[NUM_THREADS];
21static struct sigaction sigPipeAction;
22
23static struct item reqHead, dirtyHead, renderHead;
24static int reqNum, dirtyNum;
25static pthread_mutex_t qLock;
26static pthread_cond_t qCond;
27
28struct item *fetch_request(void)
29{
30    struct item *item = NULL;
31
32    pthread_mutex_lock(&qLock);
33
34    while (reqNum == 0 && dirtyNum == 0) {
35        pthread_cond_wait(&qCond, &qLock);
36    }
37    if (reqNum) {
38        item = reqHead.next;
39        reqNum--;
40    } else if (dirtyNum) {
41        item = dirtyHead.next;
42        dirtyNum--;
43    }
44    if (item) {
45        item->next->prev = item->prev;
46        item->prev->next = item->next;
47
48        item->prev = &renderHead;
49        item->next = renderHead.next;
50        renderHead.next->prev = item;
51        renderHead.next = item;
52    }
53
54    pthread_mutex_unlock(&qLock);
55
56    return item;
57}
58
59void clear_requests(int fd)
60{
61    struct item *item, *dupes;
62
63    pthread_mutex_lock(&qLock);
64    item = reqHead.next;
65    while (item != &reqHead) {
66        if (item->fd == fd)
67            item->fd = FD_INVALID;
68
69        dupes = item->duplicates;
70        while (dupes) {
71            if (dupes->fd == fd)
72                dupes->fd = FD_INVALID;
73            dupes = dupes->duplicates;
74        }
75        item = item->next;
76    }
77
78    item = renderHead.next;
79    while (item != &renderHead) {
80        if (item->fd == fd)
81            item->fd = FD_INVALID;
82
83        dupes = item->duplicates;
84        while (dupes) {
85            if (dupes->fd == fd)
86                dupes->fd = FD_INVALID;
87            dupes = dupes->duplicates;
88        }
89        item = item->next;
90    }
91
92    pthread_mutex_unlock(&qLock);
93}
94
95static inline const char *cmdStr(enum protoCmd c)
96{
97    switch (c) {
98        case cmdIgnore:  return "Ignore";
99        case cmdRender:  return "Render";
100        case cmdDirty:   return "Dirty";
101        case cmdDone:    return "Done";
102        case cmdNotDone: return "NotDone";
103        default:         return "unknown";
104    }
105}
106
107void send_response(struct item *item, enum protoCmd rsp)
108{
109    struct protocol *req = &item->req;
110    int ret;
111
112    pthread_mutex_lock(&qLock);
113    item->next->prev = item->prev;
114    item->prev->next = item->next;
115    pthread_mutex_unlock(&qLock);
116
117    while (item) {
118        struct item *prev = item;
119        req = &item->req;
120        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
121            req->cmd = rsp;
122            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
123            ret = send(item->fd, req, sizeof(*req), 0);
124            if (ret != sizeof(*req))
125                perror("send error during send_done");
126        }
127        item = item->duplicates;
128        free(prev);
129    }
130}
131
132
133enum protoCmd pending(struct item *test)
134{
135    // check all queues and render list to see if this request already queued
136    // If so, add this new request as a duplicate
137    // call with qLock held
138    struct item *item;
139
140    item = renderHead.next;
141    while (item != &renderHead) {
142        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
143            // Add new test item in the list of duplicates
144            test->duplicates = item->duplicates;
145            item->duplicates = test;
146            return cmdIgnore;
147        }
148        item = item->next;
149    }
150
151    item = reqHead.next;
152    while (item != &reqHead) {
153        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname))) {
154            // Add new test item in the list of duplicates
155            test->duplicates = item->duplicates;
156            item->duplicates = test;
157            return cmdIgnore;
158        }
159        item = item->next;
160    }
161
162    item = dirtyHead.next;
163    while (item != &dirtyHead) {
164        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z) && (!strcmp(item->req.xmlname, test->req.xmlname)))
165            return cmdNotDone;
166        item = item->next;
167    }
168
169    return cmdRender;
170}
171
172
173enum protoCmd rx_request(const struct protocol *req, int fd)
174{
175    struct protocol *reqnew;
176    struct item *list = NULL, *item;
177    enum protoCmd pend;
178
179    // Upgrade version 1 to version 2
180    if (req->ver == 1) {
181        reqnew = (struct protocol *)malloc(sizeof(protocol));
182        memcpy(reqnew, req, sizeof(protocol_v1));
183        reqnew->xmlname[0] = 0;
184        req = reqnew;
185    }
186    else if (req->ver != 2) {
187        fprintf(stderr, "Bad protocol version %d\n", req->ver);
188        return cmdIgnore;
189    }
190
191    fprintf(stderr, "%s fd(%d) xml(%s), z(%d), x(%d), y(%d)\n",
192            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
193
194    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
195        return cmdIgnore;
196
197    if (check_xyz(req->x, req->y, req->z))
198        return cmdNotDone;
199
200    item = (struct item *)malloc(sizeof(*item));
201    if (!item) {
202            fprintf(stderr, "malloc failed\n");
203            return cmdNotDone;
204    }
205
206    item->req = *req;
207    item->duplicates = NULL;
208    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
209
210#ifdef METATILE
211    /* Round down request co-ordinates to the neareast N (should be a power of 2)
212     * Note: request path is no longer consistent but this will be recalculated
213     * when the metatile is being rendered.
214     */
215    item->mx = item->req.x & ~(METATILE-1);
216    item->my = item->req.y & ~(METATILE-1);
217#else
218    item->mx = item->req.x;
219    item->my = item->req.y;
220#endif
221
222    pthread_mutex_lock(&qLock);
223
224    if (dirtyNum == DIRTY_LIMIT) {
225        // The queue is severely backlogged. Drop request
226        pthread_mutex_unlock(&qLock);
227        free(item);
228        return cmdNotDone;
229    }
230
231    // Check for a matching request in the current rendering or dirty queues
232    pend = pending(item);
233    if (pend == cmdNotDone) {
234        // We found a match in the dirty queue, can not wait for it
235        pthread_mutex_unlock(&qLock);
236        free(item);
237        return cmdNotDone;
238    }
239    if (pend == cmdIgnore) {
240        // Found a match in render queue, item added as duplicate
241        pthread_mutex_unlock(&qLock);
242        return cmdIgnore;
243    }
244
245    // New request, add it to render or dirty queue
246    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
247        list = &reqHead;
248        reqNum++;
249    } else if (dirtyNum < DIRTY_LIMIT) {
250        list = &dirtyHead;
251        dirtyNum++;
252        item->fd = FD_INVALID; // No response after render
253    }
254
255    if (list) {
256        item->next = list;
257        item->prev = list->prev;
258        item->prev->next = item;
259        list->prev = item;
260
261        pthread_cond_signal(&qCond);
262    } else
263        free(item);
264
265    pthread_mutex_unlock(&qLock);
266
267    return (list == &reqHead)?cmdIgnore:cmdNotDone;
268}
269
270
271void process_loop(int listen_fd)
272{
273    int num_connections = 0;
274    int connections[MAX_CONNECTIONS];
275
276    bzero(connections, sizeof(connections));
277
278    while (1) {
279        struct sockaddr_un in_addr;
280        socklen_t in_addrlen = sizeof(in_addr);
281        fd_set rd;
282        int incoming, num, nfds, i;
283
284        FD_ZERO(&rd);
285        FD_SET(listen_fd, &rd);
286        nfds = listen_fd+1;
287
288        for (i=0; i<num_connections; i++) {
289            FD_SET(connections[i], &rd);
290            nfds = MAX(nfds, connections[i]+1);
291        }
292
293        num = select(nfds, &rd, NULL, NULL, NULL);
294        if (num == -1)
295            perror("select()");
296        else if (num) {
297            //printf("Data is available now on %d fds\n", num);
298            if (FD_ISSET(listen_fd, &rd)) {
299                num--;
300                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
301                if (incoming < 0) {
302                    perror("accept()");
303                } else {
304                    if (num_connections == MAX_CONNECTIONS) {
305                        fprintf(stderr, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
306                        close(incoming);
307                    } else {
308                        connections[num_connections++] = incoming;
309                        fprintf(stderr, "Got incoming connection, fd %d, number %d\n", incoming, num_connections);
310                    }
311                }
312            }
313            for (i=0; num && (i<num_connections); i++) {
314                int fd = connections[i];
315                if (FD_ISSET(fd, &rd)) {
316                    struct protocol cmd;
317                    int ret;
318
319                    //fprintf(stderr, "New command from fd %d, number %d, to go %d\n", fd, i, num);
320                    // TODO: to get highest performance we should loop here until we get EAGAIN
321                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
322                    if (ret == sizeof(cmd)) {
323                        enum protoCmd rsp = rx_request(&cmd, fd);
324
325                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
326                            cmd.cmd = rsp;
327                            fprintf(stderr, "Sending NotDone response(%d)\n", rsp);
328                            ret = send(fd, &cmd, sizeof(cmd), 0);
329                            if (ret != sizeof(cmd))
330                                perror("response send error");
331                        }
332                    } else if (!ret) {
333                        int j;
334
335                        num_connections--;
336                        fprintf(stderr, "Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
337                        for (j=i; j < num_connections; j++)
338                            connections[j] = connections[j+1];
339                        clear_requests(fd);
340                        close(fd);
341                    } else {
342                        fprintf(stderr, "Recv Error on fd %d\n", fd);
343                        break;
344                    }
345                }
346            }
347        } else
348            fprintf(stderr, "Select timeout\n");
349    }
350}
351
352
353int main(void)
354{
355    const char *spath = RENDER_SOCKET;
356    int fd, i;
357    struct sockaddr_un addr;
358    mode_t old;
359
360    fprintf(stderr, "Rendering daemon\n");
361
362    pthread_mutex_init(&qLock, NULL);
363    pthread_cond_init(&qCond, NULL);
364    reqHead.next = reqHead.prev = &reqHead;
365    dirtyHead.next = dirtyHead.prev = &dirtyHead;
366    renderHead.next = renderHead.prev = &renderHead;
367
368    fd = socket(PF_UNIX, SOCK_STREAM, 0);
369    if (fd < 0) {
370        fprintf(stderr, "failed to create unix socket\n");
371        exit(2);
372    }
373
374    bzero(&addr, sizeof(addr));
375    addr.sun_family = AF_UNIX;
376    strncpy(addr.sun_path, spath, sizeof(addr.sun_path));
377
378    unlink(addr.sun_path);
379
380    old = umask(0); // Need daemon socket to be writeable by apache
381    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
382        fprintf(stderr, "socket bind failed for: %s\n", spath);
383        close(fd);
384        exit(3);
385    }
386    umask(old);
387
388    if (listen(fd, QUEUE_MAX) < 0) {
389        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
390        close(fd);
391        exit(4);
392    }
393
394#if 0
395    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
396        fprintf(stderr, "setting socket non-block failed\n");
397        close(fd);
398        exit(5);
399    }
400#endif
401
402    //sigPipeAction.sa_handler = pipe_handler;
403    sigPipeAction.sa_handler = SIG_IGN;
404    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
405        fprintf(stderr, "failed to register signal handler\n");
406        close(fd);
407        exit(6);
408    }
409
410    xmlconfigitem maps[XMLCONFIGS_MAX];
411    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
412
413    FILE * hini ;
414    char line[INILINE_MAX];
415    char key[INILINE_MAX];
416    char value[INILINE_MAX];
417
418    // Load the config
419    if ((hini=fopen(RENDERD_CONFIG, "r"))==NULL) {
420        fprintf(stderr, "Config: cannot open %s\n", RENDERD_CONFIG);
421        exit(7);
422    }
423
424    int iconf = -1;
425    while (fgets(line, INILINE_MAX, hini)!=NULL) {
426        if (line[0] == '#') continue;
427        if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = 0;
428        if (line[0] == '[') {
429            iconf++;
430            if (iconf >= XMLCONFIGS_MAX) {
431                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
432                exit(7);
433            }
434            if (strlen(line) >= XMLCONFIG_MAX){
435                fprintf(stderr, "XML name too long: %s\n", line);
436                exit(7);
437            }
438            sscanf(line, "[%[^]]", maps[iconf].xmlname);
439        } else if (sscanf(line, "%[^=]=%[^;#]", key, value) == 2
440               ||  sscanf(line, "%[^=]=\"%[^\"]\"", key, value) == 2) {
441
442            if (!strcmp(key, "XML")){
443                if (strlen(value) >= PATH_MAX){
444                    fprintf(stderr, "XML path too long: %s\n", value);
445                    exit(7);
446                }
447                strcpy(maps[iconf].xmlfile, value);
448            }
449        }
450    }
451    fclose(hini);
452
453    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
454         printf("config %d: name(%s) file(%s)\n", iconf, maps[iconf].xmlname, maps[iconf].xmlfile);
455    }
456
457    render_init();
458
459    for(i=0; i<NUM_THREADS; i++) {
460        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
461            fprintf(stderr, "error spawning render thread\n");
462            close(fd);
463            exit(7);
464        }
465    }
466    process_loop(fd);
467
468    unlink(spath);
469    close(fd);
470    return 0;
471}
Note: See TracBrowser for help on using the repository browser.