source: subversion/applications/utils/mod_tile/renderd.py @ 13125

Last change on this file since 13125 was 13125, checked in by jonb, 11 years ago

Remove some unused imports, move minmax inside the only class which uses it

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5# It needs more work to make it more Pythonic, split it
6# into more appropriate classes, add documentation, fix bugs etc.
7#
8# This is currently experimental and not intended as a replacement
9# of the C implementation! It should allow more people to produce
10# custom variations of the rendering pipeline, e.g. such as compositing
11# tiles from multiple layers.
12#
13# The code functions but I'm not yet convinced this is the correct
14# approach to integrating Python with the render daemon. Two other
15# options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37
38MAX_ZOOM = 18
39METATILE = 8
40META_MAGIC = "META"
41
42class protocol:
43    # ENUM values for commandStatus field in protocol packet
44    Ignore = 0
45    Render = 1
46    Dirty = 2
47    Done = 3
48    NotDone = 4
49
50class ProtocolPacket:
51    def __init__(self, version, fields = ""):
52        self.version = version
53        self.xmlname = ""
54        self.x = 0
55        self.y = 0
56        self.z = 0
57        self.mx = 0
58        self.my = 0
59        self.commandStatus = protocol.Ignore
60        self.fields = fields
61
62    def len(self):
63        return struct.calcsize(self.fields)
64
65    def bad_request(self):
66        x = self.x
67        y = self.y
68        z = self.z
69
70        if (z < 0) or (z > MAX_ZOOM):
71            return True
72        limit = (1 << z) -1
73        if (x < 0) or (x > limit):
74            return True
75        if (y < 0) or (y > limit):
76            return True
77        return False
78
79    def meta_tuple(self):
80        # This metatile tuple is used to identify duplicate request in the rendering queue
81        return (self.xmlname, self.mx, self.my, self.z)
82
83class ProtocolPacketV1(ProtocolPacket):
84    def __init__(self):
85        ProtocolPacket(1)
86        self.fields = "5i"
87
88    def receive(self, data, dest):
89        version, request, x, y, z = struct.unpack(self.fields, data)
90
91        if version != 1:
92            print "Received V1 packet with incorect version %d" % version
93        else:
94            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
95            #    % (request, x, y, z)
96            self.commandStatus = request
97            self.x = x
98            self.y = y
99            self.z = z
100            self.xmlname = "Default"
101            # Calculate Meta-tile value for this x/y
102            self.mx = x & (METATILE-1)
103            self.my = y & (METATILE-1)
104            self.dest = dest
105
106
107    def send(self, status):
108        x = self.x
109        y = self.y
110        z = self.z
111        data = struct.pack(self.fields, (1, status, x, y, z))
112        try: 
113            self.dest.send(data)
114        except socket.error, e:
115               if e[0] == errno.EBADF:
116                   print "Got EBADF in socket send"
117               else:
118                   raise
119
120
121class ProtocolPacketV2(ProtocolPacket):
122    def __init__(self):
123        ProtocolPacket(2)
124        self.fields = "5i41sxxx"
125
126    def receive(self, data, dest):
127        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
128
129        if version != 2:
130            print "Received V2 packet with incorect version %d" % version
131        else:
132            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
133            #    % (request, xmlname, x, y, z)
134            self.commandStatus = request
135            self.x = x
136            self.y = y
137            self.z = z
138            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
139            # Calculate Meta-tile value for this x/y
140            self.mx = x & ~(METATILE-1)
141            self.my = y & ~(METATILE-1)
142            self.dest = dest
143
144    def send(self, status):
145        x = self.x
146        y = self.y
147        z = self.z
148        xmlname = self.xmlname
149        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
150        try:
151            self.dest.send(data)
152        except socket.error, e:
153               if e[0] == errno.EBADF:
154                   print "Got EBADF in socket send"
155               else:
156                   raise
157
158DEG_TO_RAD = pi/180
159RAD_TO_DEG = 180/pi
160
161
162class SphericalProjection:
163    def __init__(self,levels=18):
164        self.Bc = []
165        self.Cc = []
166        self.zc = []
167        self.Ac = []
168        c = 256
169        for d in range(0,levels):
170            e = c/2;
171            self.Bc.append(c/360.0)
172            self.Cc.append(c/(2 * pi))
173            self.zc.append((e,e))
174            self.Ac.append(c)
175            c *= 2
176
177    def minmax(self, a,b,c):
178        a = max(a,b)
179        a = min(a,c)
180        return a
181
182    def fromLLtoPixel(self,ll,zoom):
183         d = self.zc[zoom]
184         e = round(d[0] + ll[0] * self.Bc[zoom])
185         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
186         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
187         return (e,g)
188
189    def fromPixelToLL(self,px,zoom):
190         e = self.zc[zoom]
191         f = (px[0] - e[0])/self.Bc[zoom]
192         g = (px[1] - e[1])/-self.Cc[zoom]
193         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
194         return (f,h)
195
196
197class RenderThread:
198    def __init__(self, tile_path, styles, queue_handler):
199        self.tile_path = tile_path
200        self.queue_handler = queue_handler
201        self.maps = {}
202        METATILE = 8
203        RENDER_SIZE = 256 * (METATILE + 1)
204        for xmlname in styles:
205            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
206            m = mapnik.Map(RENDER_SIZE, RENDER_SIZE)
207            self.maps[xmlname] = m
208            mapnik.load_map(m, styles[xmlname])
209
210        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
211        self.gprj = SphericalProjection(MAX_ZOOM)
212        # This is the Spherical mercator projection (EPSG:900913)
213        self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
214
215    def render_meta(self, m, style, x, y, z, sz):
216        # Calculate pixel positions of bottom-left & top-right
217        p0 = (x * 256, (y + sz) * 256)
218        p1 = ((x + sz) * 256, y * 256)
219
220        # Convert to LatLong (EPSG:4326)
221        l0 = self.gprj.fromPixelToLL(p0, z);
222        l1 = self.gprj.fromPixelToLL(p1, z);
223
224        # Convert to mercator co-ords (EPSG:900913)
225        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
226        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
227
228        # Bounding box for the meta-tile
229        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
230        # Expand tile to provide a gutter which avoids features getting lost at edge of metatile
231        scale = (sz+1.0)/sz
232        bbox.width(bbox.width() * scale)
233        bbox.height(bbox.height() * scale)
234        # Calculate meta tile size in pixels
235        render_size = 256 * (sz + 1)
236        #m.resize(render_size, render_size);
237        m.zoom_to_box(bbox)
238
239        # Render image
240        im = mapnik.Image(render_size, render_size)
241        mapnik.render(m, im)
242
243        tiles = {}
244
245        # Split image up into NxN grid of tile
246        for yy in range(0,sz):
247            for xx in range(0,sz):
248                # Position of tile, offset due to gutter
249                yoff = 128 + yy * 256
250                xoff = 128 + xx * 256
251                view = im.view(xoff, yoff, 256, 256)
252                tile = view.tostring('png256')
253                #print "Got view of z(%d) x(%d) y(%d), len(%d)" % (z, x+xx, y+yy, len(tile))
254                tiles[(xx, yy)] = tile
255
256        return tiles
257
258    def render_request(self, r):
259        # Calculate the meta tile size to use for this zoom level
260        size = min(METATILE, 1 << r.z)
261        xmlname = r.xmlname
262        x = r.mx
263        y = r.my
264        z = r.z
265        try:
266            m = self.maps[xmlname]
267        except KeyError:
268            print "No map for: '%s'" % xmlname
269            return False
270        tiles = self.render_meta(m, xmlname, x, y, z, size)
271        status = self.meta_save(xmlname, x, y, z, size, tiles)
272
273        if status == True:
274            print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
275            (xmlname, r.z, x, x+size-1, y, y+size-1)
276        else:
277            print "FAILED xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
278            (xmlname, z, x, x+size-1, y, y+size-1)
279
280        return status;
281
282    def xyz_to_meta(self, xmlname, x,y, z):
283        mask = METATILE -1
284        x &= ~mask
285        y &= ~mask
286        hashes = {}
287
288        for i in range(0,5):
289            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
290            x >>= 4
291            y >>= 4
292
293        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
294        return meta
295
296    def xyz_to_meta_offset(self, xmlname, x,y, z):
297        mask = METATILE -1
298        offset = (x & mask) * METATILE + (y & mask)
299        return offset
300
301
302    def meta_save(self, xmlname, x, y, z, size, tiles):
303        #print "Saving %d tiles" % (size * size)
304        meta_path = self.xyz_to_meta(xmlname, x, y, z)
305        d = os.path.dirname(meta_path)
306        if not os.path.exists(d):
307            try:
308                os.makedirs(d)
309            except OSError:
310                # Multiple threads can race when creating directories,
311                # ignore exception if the directory now exists
312                if not os.path.exists(d):
313                    raise
314
315        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
316        f = open(tmp, "w")
317
318        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
319        offset = len(META_MAGIC) + 4 * 4
320        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
321        offset += (2 * 4) * (METATILE * METATILE)
322        # Collect all the tile sizes
323        sizes = {}
324        offsets = {}
325        for xx in range(0, size):
326            for yy in range(0, size):
327                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
328                sizes[mt] = len(tiles[(xx, yy)])
329                offsets[mt] = offset
330                offset += sizes[mt]
331        # Write out the offset/size table
332        for mt in range(0, METATILE * METATILE):
333            if mt in sizes:
334                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
335            else:
336                f.write(struct.pack("2i", 0, 0))
337        # Write out the tiles
338        for xx in range(0, size):
339            for yy in range(0, size):
340                f.write(tiles[(xx, yy)])
341
342        f.close()
343        os.rename(tmp, meta_path)
344        print "Wrote: %s" % meta_path
345
346        return True
347
348    def loop(self):
349        while True:
350            #Fetch a meta-tile to render
351            r = self.queue_handler.fetch()
352            rendered = self.render_request(r)
353            # Retrieve all requests for this meta-tile
354            requests = self.queue_handler.pop_requests(r)
355            for request in requests:
356                if (request.commandStatus == protocol.Render):
357                    if rendered == True:
358                        request.send(protocol.Done)
359                    else:
360                        request.send(protocol.NotDone)
361            #time.sleep(1)
362        print "Dummy render thread, exiting. Path %s" % self.tile_path
363
364
365def start_renderers(num_threads, tile_path, styles, queue_handler):
366    for i in range(num_threads):
367        renderer = RenderThread(tile_path, styles, queue_handler)
368        render_thread = threading.Thread(target=renderer.loop)
369        render_thread.setDaemon(True)
370        render_thread.start()
371        print "Started render thread %s" % render_thread.getName()
372
373class RequestQueues:
374    def __init__(self, request_limit = 32, dirty_limit = 1000):
375        # Queues are used as follows:
376        # - Incoming render requests are initally put into the request queue
377        # If the request queue is full then the new request is demoted dirty queue
378        # - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
379        # - The render queue holds the requests which are in progress by the render threads
380        self.requests = {}
381        self.dirties = {}
382        self.rendering = {}
383
384        self.request_limit = request_limit
385        self.dirty_limit = dirty_limit
386        self.cond = threading.Condition()
387
388
389    def add(self, request):
390        self.cond.acquire()
391        try:
392            # FIXME: Add short-circuit for overload condition?
393            if request.meta_tuple() in self.rendering:
394                self.rendering[request.meta_tuple()].append(request)
395                return "rendering"
396            elif request.meta_tuple() in self.requests:
397                self.requests[request.meta_tuple()].append(request)
398                return "requested"
399            elif request.meta_tuple() in self.dirties:
400                self.dirties[request.meta_tuple()].append(request)
401                return "dirty"
402            elif request.commandStatus == protocol.Render and len(self.requests) < self.request_limit:
403                self.requests[request.meta_tuple()] = [request]
404                self.cond.notify()
405                return "requested"
406            elif len(self.dirties) < self.dirty_limit:
407                self.dirties[request.meta_tuple()] = [request]
408                self.cond.notify()
409                return "dirty"
410            else:
411                return "dropped"
412        finally:
413            self.cond.release()
414
415
416    def fetch(self):
417        self.cond.acquire()
418        try:
419            while len(self.requests) == 0 and len(self.dirties) == 0:
420                self.cond.wait()
421            # Pull request from one of the incoming queues
422            try:
423                item = self.requests.popitem()
424            except KeyError:
425                try:
426                    item = self.dirties.popitem()
427                except KeyError:
428                    print "Odd, queues empty"
429                    return
430
431            # Push request list on to the list of items being rendered
432            k = item[0]
433            v = item[1] # This is a list of all requests for this meta-tile
434            self.rendering[k] = v
435            # Return the first request from the list
436            return v[0]
437        finally:
438            self.cond.release()
439
440    def pop_requests(self, request):
441        self.cond.acquire()
442        try:
443            return self.rendering.pop(request.meta_tuple())
444        except KeyError:
445            # It is not yet clear why this happens, there should always be
446            # an entry in the rendering queue for each active meta -tile request
447            print "WARNING: Failed to locate request in rendering list!"
448            return (request,)
449        finally:
450            self.cond.release()
451
452
453class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
454
455    def rx_request(self, request):
456        if (request.commandStatus != protocol.Render) \
457           and (request.commandStatus != protocol.Dirty):
458               return
459
460        if request.bad_request():
461            if (request.commandStatus == protocol.Render):
462                request.send(protocol.NotDone)
463            return
464
465        cur_thread = threading.currentThread()
466        print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
467            (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
468
469        status = self.server.queue_handler.add(request)
470        if status in ("rendering", "requested"):
471            # Request queued, response will be sent on completion
472            return
473
474        # The tile won't be rendered soon
475        if (request.commandStatus == protocol.Render):
476            request.send(protocol.NotDone)
477
478    def handle(self):
479        cur_thread = threading.currentThread()
480        print "%s: New connection" % cur_thread.getName()
481        req_v1 = ProtocolPacketV1()
482        req_v2 = ProtocolPacketV2()
483        max_len = max(req_v1.len(), req_v2.len())
484
485        while True:
486            try:
487                data = self.request.recv(max_len)
488            except socket.error, e:
489                if e[0] == errno.ECONNRESET:
490                    print "Connection reset by peer"
491                    break
492                else:
493                    raise
494
495            if len(data) == req_v1.len():
496                req_v1.receive(data, self.request)
497                self.rx_request(req_v1)
498            if len(data) == req_v2.len():
499                req_v2.receive(data, self.request)
500                self.rx_request(req_v2)
501            elif len(data) == 0:
502                print "%s: Connection closed" % cur_thread.getName()
503                break
504            else:
505                print "Invalid request length %d" % len(data)
506                break
507
508class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
509    def __init__(self, address, queue_handler, handler):
510        if(os.path.exists(address)):
511           os.unlink(address)
512        self.address = address
513        self.queue_handler = queue_handler
514        SocketServer.UnixStreamServer.__init__(self, address, handler)
515        self.daemon_threads = True
516
517def listener(address, queue_handler):
518    # Create the server
519    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
520    # The socket needs to be writeable by Apache
521    os.chmod(address, 0666)
522    # Loop forever servicing requests
523    server.serve_forever()
524
525def display_config(config):
526    for xmlname in config.sections():
527        print "Layer name: %s" % xmlname
528        #for opt in config.options(xmlname):
529        #   print "%s = %s" % (opt, config.get(xmlname, opt))
530        uri = config.get(xmlname, "uri")
531        xml = config.get(xmlname, "xml")
532        print "    URI(%s) = XML(%s)" % (uri, xml)
533
534def read_styles(config):
535    styles = {}
536    for xmlname in config.sections():
537        styles[xmlname] = config.get(xmlname, "xml")
538    return styles
539
540if __name__ == "__main__":
541    try:
542        cfg_file = os.environ['RENDERD_CFG']
543    except KeyError:
544        cfg_file = "/etc/renderd.conf"
545
546    # FIXME: Move more of these to config file?
547    RENDER_SOCKET = "/tmp/osm-renderd"
548    HASH_PATH = "/var/lib/mod_tile"
549    NUM_THREADS = 4
550
551    config = ConfigParser.ConfigParser()
552    config.read(cfg_file)
553    display_config(config)
554    styles = read_styles(config)
555
556    queue_handler = RequestQueues()
557    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
558    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.