source: subversion/applications/utils/mod_tile/daemon.c @ 6996

Last change on this file since 6996 was 6996, checked in by jonb, 12 years ago

mod_tile: Add shortcut to drop request if queue is full. Prevent sending responses to dirty requests since mod_tile is not expecting a response. If this is not done then the unix pipe will eventually fill up and block, causing a deadlock.

File size: 11.4 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13
14
15#include "gen_tile.h"
16#include "protocol.h"
17#include "render_config.h"
18#include "dir_utils.h"
19
20static pthread_t render_threads[NUM_THREADS];
21static struct sigaction sigPipeAction;
22
23static struct item reqHead, dirtyHead, renderHead;
24static int reqNum, dirtyNum;
25static pthread_mutex_t qLock;
26static pthread_cond_t qCond;
27
28struct item *fetch_request(void)
29{
30    struct item *item = NULL;
31
32    pthread_mutex_lock(&qLock);
33
34    while (reqNum == 0 && dirtyNum == 0) {
35        pthread_cond_wait(&qCond, &qLock);
36    }
37    if (reqNum) {
38        item = reqHead.next;
39        reqNum--;
40    } else if (dirtyNum) {
41        item = dirtyHead.next;
42        dirtyNum--;
43    }
44    if (item) {
45        item->next->prev = item->prev;
46        item->prev->next = item->next;
47
48        item->prev = &renderHead;
49        item->next = renderHead.next;
50        renderHead.next->prev = item;
51        renderHead.next = item;
52    }
53
54    pthread_mutex_unlock(&qLock);
55
56    return item;
57}
58
59void clear_requests(int fd)
60{
61    struct item *item, *dupes;
62
63    pthread_mutex_lock(&qLock);
64    item = reqHead.next;
65    while (item != &reqHead) {
66        if (item->fd == fd)
67            item->fd = FD_INVALID;
68
69        dupes = item->duplicates;
70        while (dupes) {
71            if (dupes->fd == fd)
72                dupes->fd = FD_INVALID;
73            dupes = dupes->duplicates;
74        }
75        item = item->next;
76    }
77
78    item = renderHead.next;
79    while (item != &renderHead) {
80        if (item->fd == fd)
81            item->fd = FD_INVALID;
82
83        dupes = item->duplicates;
84        while (dupes) {
85            if (dupes->fd == fd)
86                dupes->fd = FD_INVALID;
87            dupes = dupes->duplicates;
88        }
89        item = item->next;
90    }
91
92    pthread_mutex_unlock(&qLock);
93}
94
95static inline const char *cmdStr(enum protoCmd c)
96{
97    switch (c) {
98        case cmdIgnore:  return "Ignore";
99        case cmdRender:  return "Render";
100        case cmdDirty:   return "Dirty";
101        case cmdDone:    return "Done";
102        case cmdNotDone: return "NotDone";
103        default:         return "unknown";
104    }
105}
106
107void send_response(struct item *item, enum protoCmd rsp)
108{
109    struct protocol *req = &item->req;
110    int ret;
111
112    pthread_mutex_lock(&qLock);
113    item->next->prev = item->prev;
114    item->prev->next = item->next;
115    pthread_mutex_unlock(&qLock);
116
117    while (item) {
118        struct item *prev = item;
119        req = &item->req;
120        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
121            req->cmd = rsp;
122            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
123            ret = send(item->fd, req, sizeof(*req), 0);
124            if (ret != sizeof(*req))
125                perror("send error during send_done");
126        }
127        item = item->duplicates;
128        free(prev);
129    }
130}
131
132
133enum protoCmd pending(struct item *test)
134{
135    // check all queues and render list to see if this request already queued
136    // If so, add this new request as a duplicate
137    // call with qLock held
138    struct item *item;
139
140    item = renderHead.next;
141    while (item != &renderHead) {
142        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z)) {
143            // Add new test item in the list of duplicates
144            test->duplicates = item->duplicates;
145            item->duplicates = test;
146            return cmdIgnore;
147        }
148        item = item->next;
149    }
150
151    item = reqHead.next;
152    while (item != &reqHead) {
153        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z)) {
154            // Add new test item in the list of duplicates
155            test->duplicates = item->duplicates;
156            item->duplicates = test;
157            return cmdIgnore;
158        }
159        item = item->next;
160    }
161
162    item = dirtyHead.next;
163    while (item != &dirtyHead) {
164        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z))
165            return cmdNotDone;
166        item = item->next;
167    }
168
169    return cmdRender;
170}
171
172
173enum protoCmd rx_request(const struct protocol *req, int fd)
174{
175    struct item *list = NULL, *item;
176    enum protoCmd pend;
177
178    if (req->ver != 1) {
179        fprintf(stderr, "Bad protocol version %d\n", req->ver);
180        return cmdIgnore;
181    }
182
183    fprintf(stderr, "%s fd(%d) z(%d), x(%d), y(%d)\n",
184            cmdStr(req->cmd), fd, req->z, req->x, req->y);
185
186    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
187        return cmdIgnore;
188
189    if (check_xyz(req->x, req->y, req->z))
190        return cmdNotDone;
191
192    item = (struct item *)malloc(sizeof(*item));
193    if (!item) {
194            fprintf(stderr, "malloc failed\n");
195            return cmdNotDone;
196    }
197
198    item->req = *req;
199    item->duplicates = NULL;
200    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
201
202#ifdef METATILE
203    /* Round down request co-ordinates to the neareast N (should be a power of 2)
204     * Note: request path is no longer consistent but this will be recalculated
205     * when the metatile is being rendered.
206     */
207    item->mx = item->req.x & ~(METATILE-1);
208    item->my = item->req.y & ~(METATILE-1);
209#else
210    item->mx = item->req.x;
211    item->my = item->req.y;
212#endif
213
214    pthread_mutex_lock(&qLock);
215
216    if (dirtyNum == DIRTY_LIMIT) {
217        // The queue is severely backlogged. Drop request
218        pthread_mutex_unlock(&qLock);
219        free(item);
220        return cmdNotDone;
221    }
222
223    // Check for a matching request in the current rendering or dirty queues
224    pend = pending(item);
225    if (pend == cmdNotDone) {
226        // We found a match in the dirty queue, can not wait for it
227        pthread_mutex_unlock(&qLock);
228        free(item);
229        return cmdNotDone;
230    }
231    if (pend == cmdIgnore) {
232        // Found a match in render queue, item added as duplicate
233        pthread_mutex_unlock(&qLock);
234        return cmdIgnore;
235    }
236
237    // New request, add it to render or dirty queue
238    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
239        list = &reqHead;
240        reqNum++;
241    } else if (dirtyNum < DIRTY_LIMIT) {
242        list = &dirtyHead;
243        dirtyNum++;
244        item->fd = FD_INVALID; // No response after render
245    }
246
247    if (list) {
248        item->next = list;
249        item->prev = list->prev;
250        item->prev->next = item;
251        list->prev = item;
252
253        pthread_cond_signal(&qCond);
254    } else
255        free(item);
256
257    pthread_mutex_unlock(&qLock);
258
259    return (list == &reqHead)?cmdIgnore:cmdNotDone;
260}
261
262
263void process_loop(int listen_fd)
264{
265    int num_connections = 0;
266    int connections[MAX_CONNECTIONS];
267
268    bzero(connections, sizeof(connections));
269
270    while (1) {
271        struct sockaddr_un in_addr;
272        socklen_t in_addrlen = sizeof(in_addr);
273        fd_set rd;
274        int incoming, num, nfds, i;
275
276        FD_ZERO(&rd);
277        FD_SET(listen_fd, &rd);
278        nfds = listen_fd+1;
279
280        for (i=0; i<num_connections; i++) {
281            FD_SET(connections[i], &rd);
282            nfds = MAX(nfds, connections[i]+1);
283        }
284
285        num = select(nfds, &rd, NULL, NULL, NULL);
286        if (num == -1)
287            perror("select()");
288        else if (num) {
289            //printf("Data is available now on %d fds\n", num);
290            if (FD_ISSET(listen_fd, &rd)) {
291                num--;
292                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
293                if (incoming < 0) {
294                    perror("accept()");
295                } else {
296                    if (num_connections == MAX_CONNECTIONS) {
297                        fprintf(stderr, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
298                        close(incoming);
299                    } else {
300                        connections[num_connections++] = incoming;
301                        fprintf(stderr, "Got incoming connection, fd %d, number %d\n", incoming, num_connections);
302                    }
303                }
304            }
305            for (i=0; num && (i<num_connections); i++) {
306                int fd = connections[i];
307                if (FD_ISSET(fd, &rd)) {
308                    struct protocol cmd;
309                    int ret;
310
311                    //fprintf(stderr, "New command from fd %d, number %d, to go %d\n", fd, i, num);
312                    // TODO: to get highest performance we should loop here until we get EAGAIN
313                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
314                    if (ret == sizeof(cmd)) {
315                        enum protoCmd rsp = rx_request(&cmd, fd);
316
317                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
318                            cmd.cmd = rsp;
319                            fprintf(stderr, "Sending NotDone response(%d)\n", rsp);
320                            ret = send(fd, &cmd, sizeof(cmd), 0);
321                            if (ret != sizeof(cmd))
322                                perror("response send error");
323                        }
324                    } else if (!ret) {
325                        int j;
326
327                        num_connections--;
328                        fprintf(stderr, "Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
329                        for (j=i; j < num_connections; j++)
330                            connections[j] = connections[j+1];
331                        clear_requests(fd);
332                        close(fd);
333                    } else {
334                        fprintf(stderr, "Recv Error on fd %d\n", fd);
335                        break;
336                    }
337                }
338            }
339        } else
340            fprintf(stderr, "Select timeout\n");
341    }
342}
343
344
345int main(void)
346{
347    const char *spath = RENDER_SOCKET;
348    int fd, i;
349    struct sockaddr_un addr;
350    mode_t old;
351
352    fprintf(stderr, "Rendering daemon\n");
353
354    pthread_mutex_init(&qLock, NULL);
355    pthread_cond_init(&qCond, NULL);
356    reqHead.next = reqHead.prev = &reqHead;
357    dirtyHead.next = dirtyHead.prev = &dirtyHead;
358    renderHead.next = renderHead.prev = &renderHead;
359
360    fd = socket(PF_UNIX, SOCK_STREAM, 0);
361    if (fd < 0) {
362        fprintf(stderr, "failed to create unix socket\n");
363        exit(2);
364    }
365
366    bzero(&addr, sizeof(addr));
367    addr.sun_family = AF_UNIX;
368    strncpy(addr.sun_path, spath, sizeof(addr.sun_path));
369
370    unlink(addr.sun_path);
371
372    old = umask(0); // Need daemon socket to be writeable by apache
373    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
374        fprintf(stderr, "socket bind failed for: %s\n", spath);
375        close(fd);
376        exit(3);
377    }
378    umask(old);
379
380    if (listen(fd, QUEUE_MAX) < 0) {
381        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
382        close(fd);
383        exit(4);
384    }
385
386#if 0
387    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
388        fprintf(stderr, "setting socket non-block failed\n");
389        close(fd);
390        exit(5);
391    }
392#endif
393
394    //sigPipeAction.sa_handler = pipe_handler;
395    sigPipeAction.sa_handler = SIG_IGN;
396    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
397        fprintf(stderr, "failed to register signal handler\n");
398        close(fd);
399        exit(6);
400    }
401
402    render_init();
403
404    for(i=0; i<NUM_THREADS; i++) {
405        if (pthread_create(&render_threads[i], NULL, render_thread, NULL)) {
406            fprintf(stderr, "error spawning render thread\n");
407            close(fd);
408            exit(7);
409        }
410    }
411    process_loop(fd);
412
413    unlink(spath);
414    close(fd);
415    return 0;
416}
Note: See TracBrowser for help on using the repository browser.