source: trunk/factory/src/factory.py @ 370

Last change on this file since 370 was 370, checked in by sander, 10 years ago

Do not DDoS the server on a continuing but recoverable error

  • Property svn:executable set to *
File size: 7.1 KB
Line 
1#!/usr/bin/env python
2# Officeshots.org - Test your office documents in different applications
3# Copyright (C) 2009 Stichting Lone Wolves
4# Written by Sander Marechal <s.marechal@jejik.com>
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19"""
20This is the core Factory class and main method of the Officeshots
21factory. Execute it with -h or --help to see the available options
22"""
23
24import os
25import sys
26import time
27import socket
28import logging
29import ConfigParser
30
31from optparse import OptionParser
32from backends import BackendException
33from xmlrpclib import ServerProxy, Error, Fault
34from xml.parsers.expat import ExpatError
35
36LOGLEVELS = {'debug': logging.DEBUG,
37             'info': logging.INFO,
38             'warning': logging.WARNING,
39             'error': logging.ERROR,
40             'critical': logging.CRITICAL}
41
42class Factory:
43        """
44        The core factory class communicates with the Officeshots server and passes
45        requests on to any of the available workers
46        """
47
48        def configure(self, options):
49                self.options = options
50                self.config = ConfigParser.RawConfigParser()
51                self.config.read(os.path.abspath(self.options.config_file))
52
53                # Configure logging
54                if self.options.debug:
55                        logging.basicConfig(format = self.config.get('global', 'log_format'), level = logging.DEBUG)
56                else:
57                        level = LOGLEVELS.get(self.config.get('global', 'log_level'), logging.NOTSET)
58                        try:
59                                logging.basicConfig(
60                                                format = self.config.get('global', 'log_format'),
61                                                filename = self.config.get('global', 'log_file'),
62                                                filemode = 'a',
63                                                level = level
64                                )
65                        except IOError, e:
66                                print "Logfile IO error. Please make sure that the log_file setting is correct in config.ini"
67                                sys.exit(1)
68
69                # Load factory name
70                self.name = self.config.get('global', 'factory_name')
71
72                # Configure the XMLRPC proxy
73                transport_name = self.config.get('global', 'transport')
74                transport = self.load('transports.' + transport_name, 'SSLTransport')
75                if transport is None:
76                        print "Transport %s could not be loaded" % transport_name
77                        sys.exit(1)
78
79                transport = transport(
80                        self.config.get('global', 'tls_key_file'),
81                        self.config.get('global', 'tls_certificate_file')
82                )
83                self.proxy = ServerProxy(self.config.get('global', 'xmlrpc_endpoint'), transport=transport, verbose=self.options.debug)
84
85                # Load all the backends
86                self.backends = []
87                sections = [s.strip() for s in self.config.get('global', 'backends').split(',')]
88
89                for section in sections:
90                        if len(section) == 0:
91                                continue
92
93                        backend_name = self.config.get(section, 'backend')
94                        if backend_name is None:
95                                continue
96
97                        backend = self.load('backends.' + backend_name.lower(), backend_name)
98                        if backend is None:
99                                continue
100
101                        backend = backend(self.options, self.config, section)
102                        try:
103                                backend.initialize()
104                        except BackendException, e:
105                                logging.warning('Error initializing backend %s for %s: ' + str(e), backend_name, section)
106                                continue
107
108                        self.backends.append(backend)
109
110                if len(self.backends) == 0:
111                        logging.critical('No backends could be loaded')
112                        return False
113
114                # Configuration succeeded
115                return True
116
117        def load(self, package, class_name):
118                """
119                A convenience function to import class_name from package
120                """
121                try:
122                        module = __import__(package, globals(), locals(), class_name)
123                except ImportError, e:
124                        logging.warning('Error importing %s from %s. ' + str(e), class_name, package)
125                        return None
126               
127                return getattr(module, class_name)
128
129
130
131        def systemload(self):
132                """
133                Return the average system load
134                """
135                try:
136                        return max(os.getloadavg())
137                except (AttributeError, OSError):
138                        return None
139
140        def loop(self):
141                """
142                A single iteration of the main loop.
143                Return False to terminate the application
144                """
145                # Keep an eye on system load
146                load = self.systemload()
147                maxload = self.config.getfloat('global', 'load_max')
148                if load > maxload:
149                        logging.debug("Systemload %.2f exceeds limit %.2f. Sleeping." % (load, maxload))
150                        time.sleep(60)
151                        return True
152
153                # Poll for a job. Sleep for a minute if there's no work
154                try:
155                        job = self.proxy.jobs.poll(self.name)
156                except socket.error, ex:
157                        logging.warning(ex)
158                        logging.warning("Cannot connect to server. Sleeping.")
159                        time.sleep(60)
160                        return True
161                except Fault, ex:
162                        logging.error("XML-RPC fault. Poll failed. Sleeping.")
163                        time.sleep(60)
164                        return True
165                except ExpatError, ex:
166                        logging.error("XML parsing fault. Server probably sent an error in HTML. Poll failed. Sleeping.")
167                        time.sleep(60)
168                        return True
169
170                if len(job) == 0:
171                        logging.debug('No jobs found. Sleeping.')
172                        time.sleep(60)
173                        return True
174
175                # We have work. Find a backend to pass it off to
176                for backend in self.backends:
177                        if backend.can_process(job):
178                                try:
179                                        (format, document) = backend.process(job)
180                                except BackendException, ex:
181                                        logging.warning(ex)
182                                        if not ex.recoverable:
183                                                logging.warning('Removing backend')
184                                                self.backends.remove(backend)
185                                                if len(self.backends) == 0:
186                                                        logging.critical('No more active backends.')
187                                                        return False
188                                        time.sleep(60)
189                                        return True
190
191                                try:
192                                        self.proxy.jobs.finish(self.name, job['job'], format, document)
193                                except socket.error, ex:
194                                        logging.warning("Cannot connect to server. Job cannot be finished. Sleeping.")
195                                        time.sleep(60)
196                                        return True
197                                except Fault, ex:
198                                        logging.error("XML-RPC fault. Finishing failed. Sleeping.")
199                                        time.sleep(60)
200                                        return True
201                                except ExpatError, ex:
202                                        logging.error("XML parsing fault. Server probably sent an error in HTML. Poll failed. Sleeping.")
203                                        time.sleep(60)
204                                        return True
205
206                                logging.info('Processed job %s', job['job'])
207                                return True
208
209                logging.warning('No suitable backend found for job')
210                logging.debug('Application: %s, version: %s, doctype: %s, format: %s' % (job['application'], job['version'], job['doctype'], job['format']))
211
212                # TODO: Do something smart about that, like deactivating the related worker on the server
213                return True
214
215
216        def run(self):
217                """
218                This is the main execution loop
219                """
220                logging.info('Starting factory server.')
221                while self.loop():
222                        pass
223
224
225
226if __name__ == "__main__":
227        parser = OptionParser(usage='Usage: %prog [options]')
228        parser.add_option('-c', '--config-file', action='store', type='string', dest='config_file',
229                        default='../conf/config.ini', help='Full path to the configuration file to read.')
230        parser.add_option('-d', '--debug', action='store_true', dest='debug',
231                        help='When in debug mode all errors will be written to the console and logging will be set to debug.')
232        (options, args) = parser.parse_args()
233
234        factory = Factory()
235        if factory.configure(options):
236                factory.run()
Note: See TracBrowser for help on using the repository browser.