source: subversion/applications/utils/mod_tile/renderd.py @ 13240

Last change on this file since 13240 was 13240, checked in by jonb, 11 years ago

renderd.py: Remove more unnecessary code

  • Property svn:executable set to *
File size: 18.2 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5# It needs more work to make it more Pythonic, split it
6# into more appropriate classes, add documentation, fix bugs etc.
7#
8# This is currently experimental and not intended as a replacement
9# of the C implementation! It should allow more people to produce
10# custom variations of the rendering pipeline, e.g. such as compositing
11# tiles from multiple layers.
12#
13# The code functions but I'm not yet convinced this is the correct
14# approach to integrating Python with the render daemon. Two other
15# options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37
38import cairo
39import cStringIO
40
41MAX_ZOOM = 18
42METATILE = 8
43META_MAGIC = "META"
44
45class protocol:
46    # ENUM values for commandStatus field in protocol packet
47    Ignore = 0
48    Render = 1
49    Dirty = 2
50    Done = 3
51    NotDone = 4
52
53class ProtocolPacket:
54    def __init__(self, version, fields = ""):
55        self.version = version
56        self.xmlname = ""
57        self.x = 0
58        self.y = 0
59        self.z = 0
60        self.mx = 0
61        self.my = 0
62        self.commandStatus = protocol.Ignore
63        self.fields = fields
64
65    def len(self):
66        return struct.calcsize(self.fields)
67
68    def bad_request(self):
69        x = self.x
70        y = self.y
71        z = self.z
72
73        if (z < 0) or (z > MAX_ZOOM):
74            return True
75        limit = (1 << z) -1
76        if (x < 0) or (x > limit):
77            return True
78        if (y < 0) or (y > limit):
79            return True
80        return False
81
82    def meta_tuple(self):
83        # This metatile tuple is used to identify duplicate request in the rendering queue
84        return (self.xmlname, self.mx, self.my, self.z)
85
86class ProtocolPacketV1(ProtocolPacket):
87    def __init__(self):
88        ProtocolPacket(1)
89        self.fields = "5i"
90
91    def receive(self, data, dest):
92        version, request, x, y, z = struct.unpack(self.fields, data)
93
94        if version != 1:
95            print "Received V1 packet with incorect version %d" % version
96        else:
97            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
98            #    % (request, x, y, z)
99            self.commandStatus = request
100            self.x = x
101            self.y = y
102            self.z = z
103            self.xmlname = "Default"
104            # Calculate Meta-tile value for this x/y
105            self.mx = x & ~(METATILE-1)
106            self.my = y & ~(METATILE-1)
107            self.dest = dest
108
109
110    def send(self, status):
111        x = self.x
112        y = self.y
113        z = self.z
114        data = struct.pack(self.fields, (1, status, x, y, z))
115        try: 
116            self.dest.send(data)
117        except socket.error, e:
118               if e[0] != errno.EBADF:
119                   raise
120
121
122class ProtocolPacketV2(ProtocolPacket):
123    def __init__(self):
124        ProtocolPacket(2)
125        self.fields = "5i41sxxx"
126
127    def receive(self, data, dest):
128        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
129
130        if version != 2:
131            print "Received V2 packet with incorect version %d" % version
132        else:
133            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
134            #    % (request, xmlname, x, y, z)
135            self.commandStatus = request
136            self.x = x
137            self.y = y
138            self.z = z
139            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
140            # Calculate Meta-tile value for this x/y
141            self.mx = x & ~(METATILE-1)
142            self.my = y & ~(METATILE-1)
143            self.dest = dest
144
145    def send(self, status):
146        x = self.x
147        y = self.y
148        z = self.z
149        xmlname = self.xmlname
150        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
151        try:
152            self.dest.send(data)
153        except socket.error, e:
154               if e[0] != errno.EBADF:
155                   raise
156
157DEG_TO_RAD = pi/180
158RAD_TO_DEG = 180/pi
159
160
161class SphericalProjection:
162    def __init__(self,levels=18):
163        self.Bc = []
164        self.Cc = []
165        self.zc = []
166        self.Ac = []
167        c = 256
168        for d in range(0,levels+1):
169            e = c/2;
170            self.Bc.append(c/360.0)
171            self.Cc.append(c/(2 * pi))
172            self.zc.append((e,e))
173            self.Ac.append(c)
174            c *= 2
175
176    def minmax(self, a,b,c):
177        a = max(a,b)
178        a = min(a,c)
179        return a
180
181    def fromLLtoPixel(self,ll,zoom):
182         d = self.zc[zoom]
183         e = round(d[0] + ll[0] * self.Bc[zoom])
184         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
185         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
186         return (e,g)
187
188    def fromPixelToLL(self,px,zoom):
189         e = self.zc[zoom]
190         f = (px[0] - e[0])/self.Bc[zoom]
191         g = (px[1] - e[1])/-self.Cc[zoom]
192         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
193         return (f,h)
194
195
196class RenderThread:
197    def __init__(self, tile_path, styles, queue_handler):
198        self.tile_path = tile_path
199        self.queue_handler = queue_handler
200        self.maps = {}
201        for xmlname in styles:
202            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
203            m = mapnik.Map(256, 256)
204            self.maps[xmlname] = m
205            mapnik.load_map(m, styles[xmlname], True)
206
207        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
208        self.gprj = SphericalProjection(MAX_ZOOM)
209        # This is the Spherical mercator projection (EPSG:900913)
210        self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
211
212    def render_with_agg(self, m, size):
213        # Render image with default Agg renderer
214        im = mapnik.Image(size, size)
215        mapnik.render(m, im)
216        return im
217
218    def render_with_cairo(self, m, size):
219        surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
220        mapnik.render(m, surface)
221        return mapnik.Image.from_cairo(surface)
222
223    def split_meta_image(self, im, sz, format = 'png256'):
224        # Split image up into NxN grid of tile images
225        tiles = {}
226        for yy in range(0,sz):
227            for xx in range(0,sz):
228                view = im.view(xx * 256 , yy * 256, 256, 256)
229                tile = view.tostring(format)
230                tiles[(xx, yy)] = tile
231
232        return tiles
233
234
235    def render_meta(self, m, style, x, y, z, sz):
236        # Calculate pixel positions of bottom-left & top-right
237        p0 = (x * 256, (y + sz) * 256)
238        p1 = ((x + sz) * 256, y * 256)
239
240        # Convert to LatLong (EPSG:4326)
241        l0 = self.gprj.fromPixelToLL(p0, z);
242        l1 = self.gprj.fromPixelToLL(p1, z);
243
244        # Convert to mercator co-ords (EPSG:900913)
245        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
246        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
247
248        # Bounding box for the meta-tile
249        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
250        render_size = 256 * sz
251        m.resize(render_size, render_size)
252        m.zoom_to_box(bbox)
253        m.buffer_size = 128
254
255        im = self.render_with_agg(m, render_size)
256        #im = self.render_with_cairo(m, render_size)
257        return self.split_meta_image(im, sz)
258
259    def render_request(self, r):
260        # Calculate the meta tile size to use for this zoom level
261        size = min(METATILE, 1 << r.z)
262        xmlname = r.xmlname
263        x = r.mx
264        y = r.my
265        z = r.z
266        try:
267            m = self.maps[xmlname]
268        except KeyError:
269            print "No map for: '%s'" % xmlname
270            return False
271        tiles = self.render_meta(m, xmlname, x, y, z, size)
272        self.meta_save(xmlname, x, y, z, size, tiles)
273
274        print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
275        (xmlname, r.z, x, x+size-1, y, y+size-1)
276
277        return True;
278
279    def xyz_to_meta(self, xmlname, x,y, z):
280        mask = METATILE -1
281        x &= ~mask
282        y &= ~mask
283        hashes = {}
284
285        for i in range(0,5):
286            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
287            x >>= 4
288            y >>= 4
289
290        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
291        return meta
292
293    def xyz_to_meta_offset(self, xmlname, x,y, z):
294        mask = METATILE -1
295        offset = (x & mask) * METATILE + (y & mask)
296        return offset
297
298
299    def meta_save(self, xmlname, x, y, z, size, tiles):
300        #print "Saving %d tiles" % (size * size)
301        meta_path = self.xyz_to_meta(xmlname, x, y, z)
302        d = os.path.dirname(meta_path)
303        if not os.path.exists(d):
304            try:
305                os.makedirs(d)
306            except OSError:
307                # Multiple threads can race when creating directories,
308                # ignore exception if the directory now exists
309                if not os.path.exists(d):
310                    raise
311
312        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
313        f = open(tmp, "w")
314
315        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
316        offset = len(META_MAGIC) + 4 * 4
317        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
318        offset += (2 * 4) * (METATILE * METATILE)
319        # Collect all the tile sizes
320        sizes = {}
321        offsets = {}
322        for xx in range(0, size):
323            for yy in range(0, size):
324                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
325                sizes[mt] = len(tiles[(xx, yy)])
326                offsets[mt] = offset
327                offset += sizes[mt]
328        # Write out the offset/size table
329        for mt in range(0, METATILE * METATILE):
330            if mt in sizes:
331                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
332            else:
333                f.write(struct.pack("2i", 0, 0))
334        # Write out the tiles
335        for xx in range(0, size):
336            for yy in range(0, size):
337                f.write(tiles[(xx, yy)])
338
339        f.close()
340        os.rename(tmp, meta_path)
341        #print "Wrote: %s" % meta_path
342
343    def loop(self):
344        while True:
345            #Fetch a meta-tile to render
346            r = self.queue_handler.fetch()
347            rendered = self.render_request(r)
348            # Retrieve all requests for this meta-tile
349            requests = self.queue_handler.pop_requests(r)
350            for request in requests:
351                if (request.commandStatus == protocol.Render):
352                    if rendered == True:
353                        request.send(protocol.Done)
354                    else:
355                        request.send(protocol.NotDone)
356
357
358def start_renderers(num_threads, tile_path, styles, queue_handler):
359    for i in range(num_threads):
360        renderer = RenderThread(tile_path, styles, queue_handler)
361        render_thread = threading.Thread(target=renderer.loop)
362        render_thread.setDaemon(True)
363        render_thread.start()
364        print "Started render thread %s" % render_thread.getName()
365
366class RequestQueues:
367    def __init__(self, request_limit = 32, dirty_limit = 1000):
368        # Queues are used as follows:
369        # - Incoming render requests are initally put into the request queue
370        # If the request queue is full then the new request is demoted dirty queue
371        # - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
372        # - The render queue holds the requests which are in progress by the render threads
373        self.requests = {}
374        self.dirties = {}
375        self.rendering = {}
376
377        self.request_limit = request_limit
378        self.dirty_limit = dirty_limit
379        self.mutex = threading.Lock()
380        self.not_empty = threading.Condition(self.mutex)
381
382
383    def add(self, request):
384        self.mutex.acquire()
385        try:
386            # FIXME: Add short-circuit for overload condition?
387            t = request.meta_tuple()
388            if t in self.rendering:
389                self.rendering[t].append(request)
390                return "rendering"
391            elif t in self.requests:
392                self.requests[t].append(request)
393                return "requested"
394            elif t in self.dirties:
395                self.dirties[t].append(request)
396                return "dirty"
397            elif (request.commandStatus == protocol.Render) and (len(self.requests) < self.request_limit):
398                self.requests[t] = [request]
399                self.not_empty.notify()
400                return "requested"
401            elif len(self.dirties) < self.dirty_limit:
402                self.dirties[t] = [request]
403                self.not_empty.notify()
404                return "dirty"
405            else:
406                return "dropped"
407        finally:
408            self.mutex.release()
409
410
411    def fetch(self):
412        self.not_empty.acquire()
413        try:
414            while (len(self.requests) == 0) and (len(self.dirties) == 0):
415                self.not_empty.wait()
416            # Pull request from one of the incoming queues
417            try:
418                item = self.requests.popitem()
419            except KeyError:
420                try:
421                    item = self.dirties.popitem()
422                except KeyError:
423                    print "Odd, queues empty"
424                    return
425
426            # Push request list on to the list of items being rendered
427            k = item[0]
428            v = item[1] # This is a list of all requests for this meta-tile
429            self.rendering[k] = v
430            # Return the first request from the list
431            return v[0]
432        finally:
433            self.not_empty.release()
434
435    def pop_requests(self, request):
436        self.mutex.acquire()
437        try:
438            return self.rendering.pop(request.meta_tuple())
439        except KeyError:
440            # It is not yet clear why this happens, there should always be
441            # an entry in the rendering queue for each active meta -tile request
442            print "WARNING: Failed to locate request in rendering list!"
443            return (request,)
444        finally:
445            self.mutex.release()
446
447
448class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
449
450    def rx_request(self, request):
451        if (request.commandStatus != protocol.Render) \
452           and (request.commandStatus != protocol.Dirty):
453               return
454
455        if request.bad_request():
456            if (request.commandStatus == protocol.Render):
457                request.send(protocol.NotDone)
458            return
459
460        cur_thread = threading.currentThread()
461        #print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
462        #    (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
463
464        status = self.server.queue_handler.add(request)
465        if status in ("rendering", "requested"):
466            # Request queued, response will be sent on completion
467            return
468
469        # The tile won't be rendered soon
470        if (request.commandStatus == protocol.Render):
471            request.send(protocol.NotDone)
472
473    def handle(self):
474        cur_thread = threading.currentThread()
475        #print "%s: New connection" % cur_thread.getName()
476        req_v1 = ProtocolPacketV1()
477        req_v2 = ProtocolPacketV2()
478        max_len = max(req_v1.len(), req_v2.len())
479
480        while True:
481            try:
482                data = self.request.recv(max_len)
483            except socket.error, e:
484                if e[0] == errno.ECONNRESET:
485                    #print "Connection reset by peer"
486                    break
487                else:
488                    raise
489
490            if len(data) == req_v1.len():
491                req_v1.receive(data, self.request)
492                self.rx_request(req_v1)
493            if len(data) == req_v2.len():
494                req_v2.receive(data, self.request)
495                self.rx_request(req_v2)
496            elif len(data) == 0:
497                #print "%s: Connection closed" % cur_thread.getName()
498                break
499            else:
500                print "Invalid request length %d" % len(data)
501                break
502
503class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
504    def __init__(self, address, queue_handler, handler):
505        if(os.path.exists(address)):
506           os.unlink(address)
507        self.address = address
508        self.queue_handler = queue_handler
509        SocketServer.UnixStreamServer.__init__(self, address, handler)
510        self.daemon_threads = True
511
512def listener(address, queue_handler):
513    # Create the server
514    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
515    # The socket needs to be writeable by Apache
516    os.chmod(address, 0666)
517    # Loop forever servicing requests
518    server.serve_forever()
519
520def display_config(config):
521    for xmlname in config.sections():
522        print "Layer name: %s" % xmlname
523        #for opt in config.options(xmlname):
524        #   print "%s = %s" % (opt, config.get(xmlname, opt))
525        uri = config.get(xmlname, "uri")
526        xml = config.get(xmlname, "xml")
527        print "    URI(%s) = XML(%s)" % (uri, xml)
528
529def read_styles(config):
530    styles = {}
531    for xmlname in config.sections():
532        styles[xmlname] = config.get(xmlname, "xml")
533    return styles
534
535if __name__ == "__main__":
536    try:
537        cfg_file = os.environ['RENDERD_CFG']
538    except KeyError:
539        cfg_file = "/etc/renderd.conf"
540
541    # FIXME: Move more of these to config file?
542    RENDER_SOCKET = "/tmp/osm-renderd"
543    HASH_PATH = "/var/lib/mod_tile"
544    NUM_THREADS = 4
545
546    mapnik.FontEngine.instance().register_font("/home/jburgess/osm/fonts/unifont-5.1.20080706.ttf")
547
548    config = ConfigParser.ConfigParser()
549    config.read(cfg_file)
550    display_config(config)
551    styles = read_styles(config)
552
553
554    queue_handler = RequestQueues()
555    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
556    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.