source: subversion/applications/utils/mod_tile/renderd.py @ 13273

Last change on this file since 13273 was 13273, checked in by jonb, 11 years ago

renderd.py: Obtain map projection string automatically from <Map>. Add some more comments. Minor code tidy up

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5#
6# This is currently experimental and not intended as a replacement
7# of the C implementation, but works surpringly well. It should be
8# easier to produce custom variations of the rendering pipeline,
9# e.g. such as compositing tiles from multiple layers.
10#
11# It needs more work to make it more Pythonic, split it
12# into more appropriate classes, add documentation, fix bugs etc.
13#
14# I'm not yet convinced this is the best approach to integrating
15# Python with the render daemon. Two other options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37
38import cairo
39import cStringIO
40
41MAX_ZOOM = 18
42METATILE = 8
43META_MAGIC = "META"
44
45class protocol:
46    # ENUM values for commandStatus field in protocol packet
47    Ignore = 0
48    Render = 1
49    Dirty = 2
50    Done = 3
51    NotDone = 4
52
53class ProtocolPacket:
54    def __init__(self, version, fields = ""):
55        self.version = version
56        self.xmlname = ""
57        self.x = 0
58        self.y = 0
59        self.z = 0
60        self.mx = 0
61        self.my = 0
62        self.commandStatus = protocol.Ignore
63        self.fields = fields
64
65    def len(self):
66        return struct.calcsize(self.fields)
67
68    def bad_request(self):
69        # Check that the requested (x,y,z) is invalid
70        x = self.x
71        y = self.y
72        z = self.z
73
74        if (z < 0) or (z > MAX_ZOOM):
75            return True
76        limit = (1 << z) -1
77        if (x < 0) or (x > limit):
78            return True
79        if (y < 0) or (y > limit):
80            return True
81        return False
82
83    def meta_tuple(self):
84        # This metatile tuple is used to identify duplicate request in the rendering queue
85        return (self.xmlname, self.mx, self.my, self.z)
86
87class ProtocolPacketV1(ProtocolPacket):
88    def __init__(self):
89        ProtocolPacket(1)
90        self.fields = "5i"
91
92    def receive(self, data, dest):
93        version, request, x, y, z = struct.unpack(self.fields, data)
94
95        if version != 1:
96            print "Received V1 packet with incorect version %d" % version
97        else:
98            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
99            #    % (request, x, y, z)
100            self.commandStatus = request
101            self.x = x
102            self.y = y
103            self.z = z
104            self.xmlname = "Default"
105            # Calculate Meta-tile value for this x/y
106            self.mx = x & ~(METATILE-1)
107            self.my = y & ~(METATILE-1)
108            self.dest = dest
109
110
111    def send(self, status):
112        x = self.x
113        y = self.y
114        z = self.z
115        data = struct.pack(self.fields, (1, status, x, y, z))
116        try: 
117            self.dest.send(data)
118        except socket.error, e:
119               if e[0] != errno.EBADF:
120                   raise
121
122
123class ProtocolPacketV2(ProtocolPacket):
124    def __init__(self):
125        ProtocolPacket(2)
126        self.fields = "5i41sxxx"
127
128    def receive(self, data, dest):
129        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
130
131        if version != 2:
132            print "Received V2 packet with incorect version %d" % version
133        else:
134            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
135            #    % (request, xmlname, x, y, z)
136            self.commandStatus = request
137            self.x = x
138            self.y = y
139            self.z = z
140            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
141            # Calculate Meta-tile value for this x/y
142            self.mx = x & ~(METATILE-1)
143            self.my = y & ~(METATILE-1)
144            self.dest = dest
145
146    def send(self, status):
147        x = self.x
148        y = self.y
149        z = self.z
150        xmlname = self.xmlname
151        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
152        try:
153            self.dest.send(data)
154        except socket.error, e:
155               if e[0] != errno.EBADF:
156                   raise
157
158DEG_TO_RAD = pi/180
159RAD_TO_DEG = 180/pi
160
161
162class SphericalProjection:
163    def __init__(self,levels=18):
164        self.Bc = []
165        self.Cc = []
166        self.zc = []
167        self.Ac = []
168        c = 256
169        for d in range(0,levels+1):
170            e = c/2;
171            self.Bc.append(c/360.0)
172            self.Cc.append(c/(2 * pi))
173            self.zc.append((e,e))
174            self.Ac.append(c)
175            c *= 2
176
177    def minmax(self, a,b,c):
178        a = max(a,b)
179        a = min(a,c)
180        return a
181
182    def fromLLtoPixel(self,ll,zoom):
183         d = self.zc[zoom]
184         e = round(d[0] + ll[0] * self.Bc[zoom])
185         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
186         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
187         return (e,g)
188
189    def fromPixelToLL(self,px,zoom):
190         e = self.zc[zoom]
191         f = (px[0] - e[0])/self.Bc[zoom]
192         g = (px[1] - e[1])/-self.Cc[zoom]
193         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
194         return (f,h)
195
196
197class RenderThread:
198    def __init__(self, tile_path, styles, queue_handler):
199        self.tile_path = tile_path
200        self.queue_handler = queue_handler
201        self.maps = {}
202        self.prj = {}
203        for xmlname in styles:
204            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
205            m = mapnik.Map(256, 256)
206            self.maps[xmlname] = m
207            # Load XML style
208            mapnik.load_map(m, styles[xmlname], True)
209            # Obtain <Map> projection
210            self.prj[xmlname] = mapnik.Projection(m.srs)
211
212        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
213        self.tileproj = SphericalProjection(MAX_ZOOM)
214
215    def render_with_agg(self, m, size):
216        # Render image with default Agg renderer
217        im = mapnik.Image(size, size)
218        mapnik.render(m, im)
219        return im
220
221    def render_with_cairo(self, m, size):
222        surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
223        mapnik.render(m, surface)
224        return mapnik.Image.from_cairo(surface)
225
226    def split_meta_image(self, im, sz, format = 'png256'):
227        # Split image up into NxN grid of tile images
228        tiles = {}
229        for yy in range(0,sz):
230            for xx in range(0,sz):
231                view = im.view(xx * 256 , yy * 256, 256, 256)
232                tile = view.tostring(format)
233                tiles[(xx, yy)] = tile
234
235        return tiles
236
237
238    def render_meta(self, m, style, x, y, z, sz):
239        # Calculate pixel positions of bottom-left & top-right
240        p0 = (x * 256, (y + sz) * 256)
241        p1 = ((x + sz) * 256, y * 256)
242
243        # Convert to LatLong (EPSG:4326)
244        l0 = self.tileproj.fromPixelToLL(p0, z);
245        l1 = self.tileproj.fromPixelToLL(p1, z);
246
247        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
248        c0 = self.prj[style].forward(mapnik.Coord(l0[0],l0[1]))
249        c1 = self.prj[style].forward(mapnik.Coord(l1[0],l1[1]))
250
251        # Bounding box for the meta-tile
252        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
253        render_size = 256 * sz
254        m.resize(render_size, render_size)
255        m.zoom_to_box(bbox)
256        m.buffer_size = 128
257
258        im = self.render_with_agg(m, render_size)
259        #im = self.render_with_cairo(m, render_size)
260        return self.split_meta_image(im, sz)
261
262    def render_request(self, t):
263        (xmlname, x, y, z) = t
264        # Calculate the meta tile size to use for this zoom level
265        size = min(METATILE, 1 << z)
266        try:
267            m = self.maps[xmlname]
268        except KeyError:
269            print "No map for: '%s'" % xmlname
270            return False
271        tiles = self.render_meta(m, xmlname, x, y, z, size)
272        self.meta_save(xmlname, x, y, z, size, tiles)
273
274        print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
275            (xmlname, z, x, x+size-1, y, y+size-1)
276
277        return True;
278
279    def xyz_to_meta(self, xmlname, x,y, z):
280        mask = METATILE -1
281        x &= ~mask
282        y &= ~mask
283        hashes = {}
284
285        for i in range(0,5):
286            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
287            x >>= 4
288            y >>= 4
289
290        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
291        return meta
292
293    def xyz_to_meta_offset(self, xmlname, x,y, z):
294        mask = METATILE -1
295        offset = (x & mask) * METATILE + (y & mask)
296        return offset
297
298
299    def meta_save(self, xmlname, x, y, z, size, tiles):
300        #print "Saving %d tiles" % (size * size)
301        meta_path = self.xyz_to_meta(xmlname, x, y, z)
302        d = os.path.dirname(meta_path)
303        if not os.path.exists(d):
304            try:
305                os.makedirs(d)
306            except OSError:
307                # Multiple threads can race when creating directories,
308                # ignore exception if the directory now exists
309                if not os.path.exists(d):
310                    raise
311
312        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
313        f = open(tmp, "w")
314
315        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
316        offset = len(META_MAGIC) + 4 * 4
317        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
318        offset += (2 * 4) * (METATILE * METATILE)
319        # Collect all the tile sizes
320        sizes = {}
321        offsets = {}
322        for xx in range(0, size):
323            for yy in range(0, size):
324                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
325                sizes[mt] = len(tiles[(xx, yy)])
326                offsets[mt] = offset
327                offset += sizes[mt]
328        # Write out the offset/size table
329        for mt in range(0, METATILE * METATILE):
330            if mt in sizes:
331                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
332            else:
333                f.write(struct.pack("2i", 0, 0))
334        # Write out the tiles
335        for xx in range(0, size):
336            for yy in range(0, size):
337                f.write(tiles[(xx, yy)])
338
339        f.close()
340        os.rename(tmp, meta_path)
341        #print "Wrote: %s" % meta_path
342
343    def loop(self):
344        while True:
345            #Fetch a meta-tile to render
346            r = self.queue_handler.fetch()
347            rendered = self.render_request(r)
348            # Retrieve all requests for this meta-tile
349            requests = self.queue_handler.pop_requests(r)
350            for request in requests:
351                if (request.commandStatus == protocol.Render):
352                    if rendered == True:
353                        request.send(protocol.Done)
354                    else:
355                        request.send(protocol.NotDone)
356
357
358def start_renderers(num_threads, tile_path, styles, queue_handler):
359    for i in range(num_threads):
360        renderer = RenderThread(tile_path, styles, queue_handler)
361        render_thread = threading.Thread(target=renderer.loop)
362        render_thread.setDaemon(True)
363        render_thread.start()
364        print "Started render thread %s" % render_thread.getName()
365
366class RequestQueues:
367    def __init__(self, request_limit = 32, dirty_limit = 1000):
368        # We store requests in several lists
369        # - Incoming render requests are initally put into the request queue
370        # If the request queue is full then the new request is demoted to the dirty queue
371        # - Incoming 'dirty' requests are put into the dirty queue, or dropped if this is full
372        # - The render queue holds the requests which are in progress by the render threads
373        self.requests = {}
374        self.dirties = {}
375        self.rendering = {}
376
377        self.request_limit = request_limit
378        self.dirty_limit = dirty_limit
379        self.not_empty = threading.Condition()
380
381
382    def add(self, request):
383        self.not_empty.acquire()
384        try:
385            # Before adding this new request we first look if this tile is already pending
386            # If so, the new request is tacked on to the existing one
387            # FIXME: Add short-circuit for overload condition?
388            t = request.meta_tuple()
389            if t in self.rendering:
390                self.rendering[t].append(request)
391                return "rendering"
392            if t in self.requests:
393                self.requests[t].append(request)
394                return "requested"
395            if t in self.dirties:
396                self.dirties[t].append(request)
397                return "dirty"
398            # If we've reached here then there are no existing requests for this tile
399            if (request.commandStatus == protocol.Render) and (len(self.requests) < self.request_limit):
400                self.requests[t] = [request]
401                self.not_empty.notify()
402                return "requested"
403            if len(self.dirties) < self.dirty_limit:
404                self.dirties[t] = [request]
405                self.not_empty.notify()
406                return "dirty"
407            return "dropped"
408        finally:
409            self.not_empty.release()
410
411
412    def fetch(self):
413        # Fetches a request tuple from the request or dirty queue
414        # The requests are moved to the rendering queue while they are being rendered
415        self.not_empty.acquire()
416        try:
417            while (len(self.requests) == 0) and (len(self.dirties) == 0):
418                self.not_empty.wait()
419            # Pull request from one of the incoming queues
420            try:
421                item = self.requests.popitem()
422            except KeyError:
423                try:
424                    item = self.dirties.popitem()
425                except KeyError:
426                    print "Odd, queues empty"
427                    return
428
429            t = item[0]
430            self.rendering[t] = item[1]
431            return t
432        finally:
433            self.not_empty.release()
434
435    def pop_requests(self, t):
436        # Removes this tuple from the rendering queue
437        # and returns the list of request for the tuple
438        self.not_empty.acquire()
439        try:
440            return self.rendering.pop(t)
441        except KeyError:
442            # Should never happen. It implies the requests queues are broken
443            print "WARNING: Failed to locate request in rendering list!"
444        finally:
445            self.not_empty.release()
446
447
448class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
449
450    def rx_request(self, request):
451        if (request.commandStatus != protocol.Render) \
452           and (request.commandStatus != protocol.Dirty):
453               return
454
455        if request.bad_request():
456            if (request.commandStatus == protocol.Render):
457                request.send(protocol.NotDone)
458            return
459
460        #cur_thread = threading.currentThread()
461        #print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
462        #    (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
463
464        status = self.server.queue_handler.add(request)
465        if status in ("rendering", "requested"):
466            # Request queued, response will be sent on completion
467            return
468
469        # The tile won't be rendered soon, tell the requestor straight away
470        if (request.commandStatus == protocol.Render):
471            request.send(protocol.NotDone)
472
473    def handle(self):
474        cur_thread = threading.currentThread()
475        #print "%s: New connection" % cur_thread.getName()
476        len_v1 = ProtocolPacketV1().len()
477        len_v2 = ProtocolPacketV2().len()
478        max_len = max(len_v1, len_v2)
479
480        while True:
481            try:
482                data = self.request.recv(max_len)
483            except socket.error, e:
484                if e[0] == errno.ECONNRESET:
485                    #print "Connection reset by peer"
486                    break
487                else:
488                    raise
489
490            if len(data) == len_v1:
491                req_v1 = ProtocolPacketV1()
492                req_v1.receive(data, self.request)
493                self.rx_request(req_v1)
494            if len(data) == len_v2:
495                req_v2 = ProtocolPacketV2()
496                req_v2.receive(data, self.request)
497                self.rx_request(req_v2)
498            elif len(data) == 0:
499                #print "%s: Connection closed" % cur_thread.getName()
500                break
501            else:
502                print "Invalid request length %d" % len(data)
503                break
504
505class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
506    def __init__(self, address, queue_handler, handler):
507        if(os.path.exists(address)):
508           os.unlink(address)
509        self.address = address
510        self.queue_handler = queue_handler
511        SocketServer.UnixStreamServer.__init__(self, address, handler)
512        self.daemon_threads = True
513
514def listener(address, queue_handler):
515    # Create the server
516    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
517    # The socket needs to be writeable by Apache
518    os.chmod(address, 0666)
519    # Loop forever servicing requests
520    server.serve_forever()
521
522def display_config(config):
523    for xmlname in config.sections():
524        print "Layer name: %s" % xmlname
525        #for opt in config.options(xmlname):
526        #   print "%s = %s" % (opt, config.get(xmlname, opt))
527        uri = config.get(xmlname, "uri")
528        xml = config.get(xmlname, "xml")
529        print "    URI(%s) = XML(%s)" % (uri, xml)
530
531def read_styles(config):
532    styles = {}
533    for xmlname in config.sections():
534        styles[xmlname] = config.get(xmlname, "xml")
535    return styles
536
537if __name__ == "__main__":
538    try:
539        cfg_file = os.environ['RENDERD_CFG']
540    except KeyError:
541        cfg_file = "/etc/renderd.conf"
542
543    # FIXME: Move more of these to config file?
544    RENDER_SOCKET = "/tmp/osm-renderd"
545    HASH_PATH = "/var/lib/mod_tile"
546    NUM_THREADS = 4
547
548    # Unifont has a better character coverage and is used as a fallback for DejaVu
549    # if you use a style based on the osm-template-fonset.xml
550    mapnik.FontEngine.instance().register_font("/home/jburgess/osm/fonts/unifont-5.1.20080706.ttf")
551
552    config = ConfigParser.ConfigParser()
553    config.read(cfg_file)
554    display_config(config)
555    styles = read_styles(config)
556
557    queue_handler = RequestQueues()
558    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
559    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.