source: subversion/applications/utils/mod_tile/renderd.py @ 29196

Last change on this file since 29196 was 15889, checked in by jonb, 10 years ago

Update renderd.py to obtain more config parameters from renderd.conf

  • Property svn:executable set to *
File size: 18.8 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5#
6# This is currently experimental and not intended as a replacement
7# of the C implementation, but works surpringly well. It should be
8# easier to produce custom variations of the rendering pipeline,
9# e.g. such as compositing tiles from multiple layers.
10#
11# It needs more work to make it more Pythonic, split it
12# into more appropriate classes, add documentation, fix bugs etc.
13#
14# I'm not yet convinced this is the best approach to integrating
15# Python with the render daemon. Two other options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37from StringIO import StringIO
38
39import cairo
40import cStringIO
41
42MAX_ZOOM = 18
43METATILE = 8
44META_MAGIC = "META"
45
46class protocol:
47    # ENUM values for commandStatus field in protocol packet
48    Ignore = 0
49    Render = 1
50    Dirty = 2
51    Done = 3
52    NotDone = 4
53
54class ProtocolPacket:
55    def __init__(self, version, fields = ""):
56        self.version = version
57        self.xmlname = ""
58        self.x = 0
59        self.y = 0
60        self.z = 0
61        self.mx = 0
62        self.my = 0
63        self.commandStatus = protocol.Ignore
64        self.fields = fields
65
66    def len(self):
67        return struct.calcsize(self.fields)
68
69    def bad_request(self):
70        # Check that the requested (x,y,z) is invalid
71        x = self.x
72        y = self.y
73        z = self.z
74
75        if (z < 0) or (z > MAX_ZOOM):
76            return True
77        limit = (1 << z) -1
78        if (x < 0) or (x > limit):
79            return True
80        if (y < 0) or (y > limit):
81            return True
82        return False
83
84    def meta_tuple(self):
85        # This metatile tuple is used to identify duplicate request in the rendering queue
86        return (self.xmlname, self.mx, self.my, self.z)
87
88class ProtocolPacketV1(ProtocolPacket):
89    def __init__(self):
90        ProtocolPacket(1)
91        self.fields = "5i"
92
93    def receive(self, data, dest):
94        version, request, x, y, z = struct.unpack(self.fields, data)
95
96        if version != 1:
97            print "Received V1 packet with incorect version %d" % version
98        else:
99            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
100            #    % (request, x, y, z)
101            self.commandStatus = request
102            self.x = x
103            self.y = y
104            self.z = z
105            self.xmlname = "default"
106            # Calculate Meta-tile value for this x/y
107            self.mx = x & ~(METATILE-1)
108            self.my = y & ~(METATILE-1)
109            self.dest = dest
110
111
112    def send(self, status):
113        x = self.x
114        y = self.y
115        z = self.z
116        data = struct.pack(self.fields, (1, status, x, y, z))
117        try: 
118            self.dest.send(data)
119        except socket.error, e:
120               if e[0] != errno.EBADF:
121                   raise
122
123
124class ProtocolPacketV2(ProtocolPacket):
125    def __init__(self):
126        ProtocolPacket(2)
127        self.fields = "5i41sxxx"
128
129    def receive(self, data, dest):
130        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
131
132        if version != 2:
133            print "Received V2 packet with incorect version %d" % version
134        else:
135            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
136            #    % (request, xmlname, x, y, z)
137            self.commandStatus = request
138            self.x = x
139            self.y = y
140            self.z = z
141            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
142            # Calculate Meta-tile value for this x/y
143            self.mx = x & ~(METATILE-1)
144            self.my = y & ~(METATILE-1)
145            self.dest = dest
146
147    def send(self, status):
148        x = self.x
149        y = self.y
150        z = self.z
151        xmlname = self.xmlname
152        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
153        try:
154            self.dest.send(data)
155        except socket.error, e:
156               if e[0] != errno.EBADF:
157                   raise
158
159DEG_TO_RAD = pi/180
160RAD_TO_DEG = 180/pi
161
162
163class SphericalProjection:
164    def __init__(self,levels=18):
165        self.Bc = []
166        self.Cc = []
167        self.zc = []
168        self.Ac = []
169        c = 256
170        for d in range(0,levels+1):
171            e = c/2;
172            self.Bc.append(c/360.0)
173            self.Cc.append(c/(2 * pi))
174            self.zc.append((e,e))
175            self.Ac.append(c)
176            c *= 2
177
178    def minmax(self, a,b,c):
179        a = max(a,b)
180        a = min(a,c)
181        return a
182
183    def fromLLtoPixel(self,ll,zoom):
184         d = self.zc[zoom]
185         e = round(d[0] + ll[0] * self.Bc[zoom])
186         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
187         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
188         return (e,g)
189
190    def fromPixelToLL(self,px,zoom):
191         e = self.zc[zoom]
192         f = (px[0] - e[0])/self.Bc[zoom]
193         g = (px[1] - e[1])/-self.Cc[zoom]
194         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
195         return (f,h)
196
197
198class RenderThread:
199    def __init__(self, tile_path, styles, queue_handler):
200        self.tile_path = tile_path
201        self.queue_handler = queue_handler
202        self.maps = {}
203        self.prj = {}
204        for xmlname in styles:
205            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
206            m = mapnik.Map(256, 256)
207            self.maps[xmlname] = m
208            # Load XML style
209            mapnik.load_map(m, styles[xmlname], True)
210            # Obtain <Map> projection
211            self.prj[xmlname] = mapnik.Projection(m.srs)
212
213        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
214        self.tileproj = SphericalProjection(MAX_ZOOM)
215
216    def render_with_agg(self, m, size):
217        # Render image with default Agg renderer
218        im = mapnik.Image(size, size)
219        mapnik.render(m, im)
220        return im
221
222    def render_with_cairo(self, m, size):
223        surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
224        mapnik.render(m, surface)
225        return mapnik.Image.from_cairo(surface)
226
227    def split_meta_image(self, im, sz, format = 'png256'):
228        # Split image up into NxN grid of tile images
229        tiles = {}
230        for yy in range(0,sz):
231            for xx in range(0,sz):
232                view = im.view(xx * 256 , yy * 256, 256, 256)
233                tile = view.tostring(format)
234                tiles[(xx, yy)] = tile
235
236        return tiles
237
238
239    def render_meta(self, m, style, x, y, z, sz):
240        # Calculate pixel positions of bottom-left & top-right
241        p0 = (x * 256, (y + sz) * 256)
242        p1 = ((x + sz) * 256, y * 256)
243
244        # Convert to LatLong (EPSG:4326)
245        l0 = self.tileproj.fromPixelToLL(p0, z);
246        l1 = self.tileproj.fromPixelToLL(p1, z);
247
248        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
249        c0 = self.prj[style].forward(mapnik.Coord(l0[0],l0[1]))
250        c1 = self.prj[style].forward(mapnik.Coord(l1[0],l1[1]))
251
252        # Bounding box for the meta-tile
253        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
254        render_size = 256 * sz
255        m.resize(render_size, render_size)
256        m.zoom_to_box(bbox)
257        m.buffer_size = 128
258
259        im = self.render_with_agg(m, render_size)
260        #im = self.render_with_cairo(m, render_size)
261        return self.split_meta_image(im, sz)
262
263    def render_request(self, t):
264        (xmlname, x, y, z) = t
265        # Calculate the meta tile size to use for this zoom level
266        size = min(METATILE, 1 << z)
267        try:
268            m = self.maps[xmlname]
269        except KeyError:
270            print "No map for: '%s'" % xmlname
271            return False
272        tiles = self.render_meta(m, xmlname, x, y, z, size)
273        self.meta_save(xmlname, x, y, z, size, tiles)
274
275        print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
276            (xmlname, z, x, x+size-1, y, y+size-1)
277
278        return True;
279
280    def xyz_to_meta(self, xmlname, x,y, z):
281        mask = METATILE -1
282        x &= ~mask
283        y &= ~mask
284        hashes = {}
285
286        for i in range(0,5):
287            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
288            x >>= 4
289            y >>= 4
290
291        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
292        return meta
293
294    def xyz_to_meta_offset(self, xmlname, x,y, z):
295        mask = METATILE -1
296        offset = (x & mask) * METATILE + (y & mask)
297        return offset
298
299
300    def meta_save(self, xmlname, x, y, z, size, tiles):
301        #print "Saving %d tiles" % (size * size)
302        meta_path = self.xyz_to_meta(xmlname, x, y, z)
303        d = os.path.dirname(meta_path)
304        if not os.path.exists(d):
305            try:
306                os.makedirs(d)
307            except OSError:
308                # Multiple threads can race when creating directories,
309                # ignore exception if the directory now exists
310                if not os.path.exists(d):
311                    raise
312
313        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
314        f = open(tmp, "w")
315
316        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
317        offset = len(META_MAGIC) + 4 * 4
318        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
319        offset += (2 * 4) * (METATILE * METATILE)
320        # Collect all the tile sizes
321        sizes = {}
322        offsets = {}
323        for xx in range(0, size):
324            for yy in range(0, size):
325                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
326                sizes[mt] = len(tiles[(xx, yy)])
327                offsets[mt] = offset
328                offset += sizes[mt]
329        # Write out the offset/size table
330        for mt in range(0, METATILE * METATILE):
331            if mt in sizes:
332                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
333            else:
334                f.write(struct.pack("2i", 0, 0))
335        # Write out the tiles
336        for xx in range(0, size):
337            for yy in range(0, size):
338                f.write(tiles[(xx, yy)])
339
340        f.close()
341        os.rename(tmp, meta_path)
342        #print "Wrote: %s" % meta_path
343
344    def loop(self):
345        while True:
346            #Fetch a meta-tile to render
347            r = self.queue_handler.fetch()
348            rendered = self.render_request(r)
349            # Retrieve all requests for this meta-tile
350            requests = self.queue_handler.pop_requests(r)
351            for request in requests:
352                if (request.commandStatus == protocol.Render):
353                    if rendered == True:
354                        request.send(protocol.Done)
355                    else:
356                        request.send(protocol.NotDone)
357
358
359def start_renderers(num_threads, tile_path, styles, queue_handler):
360    for i in range(num_threads):
361        renderer = RenderThread(tile_path, styles, queue_handler)
362        render_thread = threading.Thread(target=renderer.loop)
363        render_thread.setDaemon(True)
364        render_thread.start()
365        print "Started render thread %s" % render_thread.getName()
366
367class RequestQueues:
368    def __init__(self, request_limit = 32, dirty_limit = 1000):
369        # We store requests in several lists
370        # - Incoming render requests are initally put into the request queue
371        # If the request queue is full then the new request is demoted to the dirty queue
372        # - Incoming 'dirty' requests are put into the dirty queue, or dropped if this is full
373        # - The render queue holds the requests which are in progress by the render threads
374        self.requests = {}
375        self.dirties = {}
376        self.rendering = {}
377
378        self.request_limit = request_limit
379        self.dirty_limit = dirty_limit
380        self.not_empty = threading.Condition()
381
382
383    def add(self, request):
384        self.not_empty.acquire()
385        try:
386            # Before adding this new request we first look if this tile is already pending
387            # If so, the new request is tacked on to the existing one
388            # FIXME: Add short-circuit for overload condition?
389            t = request.meta_tuple()
390            if t in self.rendering:
391                self.rendering[t].append(request)
392                return "rendering"
393            if t in self.requests:
394                self.requests[t].append(request)
395                return "requested"
396            if t in self.dirties:
397                self.dirties[t].append(request)
398                return "dirty"
399            # If we've reached here then there are no existing requests for this tile
400            if (request.commandStatus == protocol.Render) and (len(self.requests) < self.request_limit):
401                self.requests[t] = [request]
402                self.not_empty.notify()
403                return "requested"
404            if len(self.dirties) < self.dirty_limit:
405                self.dirties[t] = [request]
406                self.not_empty.notify()
407                return "dirty"
408            return "dropped"
409        finally:
410            self.not_empty.release()
411
412
413    def fetch(self):
414        # Fetches a request tuple from the request or dirty queue
415        # The requests are moved to the rendering queue while they are being rendered
416        self.not_empty.acquire()
417        try:
418            while (len(self.requests) == 0) and (len(self.dirties) == 0):
419                self.not_empty.wait()
420            # Pull request from one of the incoming queues
421            try:
422                item = self.requests.popitem()
423            except KeyError:
424                try:
425                    item = self.dirties.popitem()
426                except KeyError:
427                    print "Odd, queues empty"
428                    return
429
430            t = item[0]
431            self.rendering[t] = item[1]
432            return t
433        finally:
434            self.not_empty.release()
435
436    def pop_requests(self, t):
437        # Removes this tuple from the rendering queue
438        # and returns the list of request for the tuple
439        self.not_empty.acquire()
440        try:
441            return self.rendering.pop(t)
442        except KeyError:
443            # Should never happen. It implies the requests queues are broken
444            print "WARNING: Failed to locate request in rendering list!"
445        finally:
446            self.not_empty.release()
447
448
449class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
450
451    def rx_request(self, request):
452        if (request.commandStatus != protocol.Render) \
453           and (request.commandStatus != protocol.Dirty):
454               return
455
456        if request.bad_request():
457            if (request.commandStatus == protocol.Render):
458                request.send(protocol.NotDone)
459            return
460
461        #cur_thread = threading.currentThread()
462        #print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
463        #    (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
464
465        status = self.server.queue_handler.add(request)
466        if status in ("rendering", "requested"):
467            # Request queued, response will be sent on completion
468            return
469
470        # The tile won't be rendered soon, tell the requestor straight away
471        if (request.commandStatus == protocol.Render):
472            request.send(protocol.NotDone)
473
474    def handle(self):
475        cur_thread = threading.currentThread()
476        #print "%s: New connection" % cur_thread.getName()
477        len_v1 = ProtocolPacketV1().len()
478        len_v2 = ProtocolPacketV2().len()
479        max_len = max(len_v1, len_v2)
480
481        while True:
482            try:
483                data = self.request.recv(max_len)
484            except socket.error, e:
485                if e[0] == errno.ECONNRESET:
486                    #print "Connection reset by peer"
487                    break
488                else:
489                    raise
490
491            if len(data) == len_v1:
492                req_v1 = ProtocolPacketV1()
493                req_v1.receive(data, self.request)
494                self.rx_request(req_v1)
495            if len(data) == len_v2:
496                req_v2 = ProtocolPacketV2()
497                req_v2.receive(data, self.request)
498                self.rx_request(req_v2)
499            elif len(data) == 0:
500                #print "%s: Connection closed" % cur_thread.getName()
501                break
502            else:
503                print "Invalid request length %d" % len(data)
504                break
505
506class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
507    def __init__(self, address, queue_handler, handler):
508        if(os.path.exists(address)):
509           os.unlink(address)
510        self.address = address
511        self.queue_handler = queue_handler
512        SocketServer.UnixStreamServer.__init__(self, address, handler)
513        self.daemon_threads = True
514
515def listener(address, queue_handler):
516    # Create the server
517    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
518    # The socket needs to be writeable by Apache
519    os.chmod(address, 0666)
520    # Loop forever servicing requests
521    server.serve_forever()
522
523def display_config(config):
524    for xmlname in config.sections():
525        if xmlname != "renderd" and xmlname != "mapnik":
526            print "Layer name: %s" % xmlname
527            uri = config.get(xmlname, "uri")
528            xml = config.get(xmlname, "xml")
529            print "    URI(%s) = XML(%s)" % (uri, xml)
530
531def read_styles(config):
532    styles = {}
533    for xmlname in config.sections():
534        if xmlname != "renderd" and xmlname != "mapnik":
535            styles[xmlname] = config.get(xmlname, "xml")
536    return styles
537
538if __name__ == "__main__":
539    try:
540        cfg_file = os.environ['RENDERD_CFG']
541    except KeyError:
542        cfg_file = "/etc/renderd.conf"
543
544    # Unifont has a better character coverage and is used as a fallback for DejaVu
545    # if you use a style based on the osm-template-fonset.xml
546    mapnik.FontEngine.instance().register_font("/home/jburgess/osm/fonts/unifont-5.1.20080706.ttf")
547
548    default_cfg = StringIO("""
549[renderd]
550socketname=/tmp/osm-renderd
551num_threads=4
552tile_dir=/var/lib/mod_tile
553""")
554
555    config = ConfigParser.ConfigParser()
556    config.readfp(default_cfg)
557    config.read(cfg_file)
558    display_config(config)
559    styles = read_styles(config)
560
561    num_threads    = config.getint("renderd", "num_threads")
562    renderd_socket = config.get("renderd", "socketname")
563    tile_dir       = config.get("renderd", "tile_dir")
564
565    queue_handler = RequestQueues()
566    start_renderers(num_threads, tile_dir, styles, queue_handler)
567    listener(renderd_socket, queue_handler)
Note: See TracBrowser for help on using the repository browser.