| 1 | # -*- python -*- |
|---|
| 2 | |
|---|
| 3 | |
|---|
| 4 | """ |
|---|
| 5 | Run this tool with twistd in its own directory, with a file named 'urls.txt' |
|---|
| 6 | describing which nodes to query. Make sure to copy diskwatcher.py into the |
|---|
| 7 | same directory. It will request disk-usage numbers from the nodes once per |
|---|
| 8 | hour (or slower), and store them in a local database. It will compute |
|---|
| 9 | usage-per-unit time values over several time ranges and make them available |
|---|
| 10 | through an HTTP query (using ./webport). It will also provide an estimate of |
|---|
| 11 | how much time is left before the grid's storage is exhausted. |
|---|
| 12 | |
|---|
| 13 | There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph |
|---|
| 14 | the values this tool computes. |
|---|
| 15 | |
|---|
| 16 | Each line of urls.txt points to a single node. Each node should have its own |
|---|
| 17 | dedicated disk: if multiple nodes share a disk, only list one of them in |
|---|
| 18 | urls.txt (otherwise that space will be double-counted, confusing the |
|---|
| 19 | results). Each line should be in the form: |
|---|
| 20 | |
|---|
| 21 | http://host:webport/statistics?t=json |
|---|
| 22 | |
|---|
| 23 | """ |
|---|
| 24 | |
|---|
| 25 | # TODO: |
|---|
| 26 | # built-in graphs on web interface |
|---|
| 27 | |
|---|
| 28 | |
|---|
| 29 | import os.path, urllib, time |
|---|
| 30 | from datetime import timedelta |
|---|
| 31 | from twisted.application import internet, service, strports |
|---|
| 32 | from twisted.web import server, resource, http, client |
|---|
| 33 | from twisted.internet import defer |
|---|
| 34 | from twisted.python import log |
|---|
| 35 | import json |
|---|
| 36 | from axiom.attributes import AND |
|---|
| 37 | from axiom.store import Store |
|---|
| 38 | from epsilon import extime |
|---|
| 39 | from diskwatcher import Sample |
|---|
| 40 | |
|---|
| 41 | #from axiom.item import Item |
|---|
| 42 | #from axiom.attributes import text, integer, timestamp |
|---|
| 43 | |
|---|
| 44 | #class Sample(Item): |
|---|
| 45 | # url = text() |
|---|
| 46 | # when = timestamp() |
|---|
| 47 | # used = integer() |
|---|
| 48 | # avail = integer() |
|---|
| 49 | |
|---|
| 50 | #s = Store("history.axiom") |
|---|
| 51 | #ns = Store("new-history.axiom") |
|---|
| 52 | #for sa in s.query(Sample): |
|---|
| 53 | # diskwatcher.Sample(store=ns, |
|---|
| 54 | # url=sa.url, when=sa.when, used=sa.used, avail=sa.avail) |
|---|
| 55 | #print "done" |
|---|
| 56 | |
|---|
| 57 | HOUR = 3600 |
|---|
| 58 | DAY = 24*3600 |
|---|
| 59 | WEEK = 7*DAY |
|---|
| 60 | MONTH = 30*DAY |
|---|
| 61 | YEAR = 365*DAY |
|---|
| 62 | |
|---|
| 63 | class DiskWatcher(service.MultiService, resource.Resource): |
|---|
| 64 | POLL_INTERVAL = 1*HOUR |
|---|
| 65 | AVERAGES = {#"60s": 60, |
|---|
| 66 | #"5m": 5*60, |
|---|
| 67 | #"30m": 30*60, |
|---|
| 68 | "1hr": 1*HOUR, |
|---|
| 69 | "1day": 1*DAY, |
|---|
| 70 | "2wk": 2*WEEK, |
|---|
| 71 | "4wk": 4*WEEK, |
|---|
| 72 | } |
|---|
| 73 | |
|---|
| 74 | def __init__(self): |
|---|
| 75 | assert os.path.exists("diskwatcher.tac") # run from the right directory |
|---|
| 76 | self.growth_cache = {} |
|---|
| 77 | service.MultiService.__init__(self) |
|---|
| 78 | resource.Resource.__init__(self) |
|---|
| 79 | self.store = Store("history.axiom") |
|---|
| 80 | self.store.whenFullyUpgraded().addCallback(self._upgrade_complete) |
|---|
| 81 | service.IService(self.store).setServiceParent(self) # let upgrader run |
|---|
| 82 | ts = internet.TimerService(self.POLL_INTERVAL, self.poll) |
|---|
| 83 | ts.setServiceParent(self) |
|---|
| 84 | |
|---|
| 85 | def _upgrade_complete(self, ignored): |
|---|
| 86 | print("Axiom store upgrade complete") |
|---|
| 87 | |
|---|
| 88 | def startService(self): |
|---|
| 89 | service.MultiService.startService(self) |
|---|
| 90 | |
|---|
| 91 | try: |
|---|
| 92 | desired_webport = open("webport", "r").read().strip() |
|---|
| 93 | except EnvironmentError: |
|---|
| 94 | desired_webport = None |
|---|
| 95 | webport = desired_webport or "tcp:0" |
|---|
| 96 | root = self |
|---|
| 97 | serv = strports.service(webport, server.Site(root)) |
|---|
| 98 | serv.setServiceParent(self) |
|---|
| 99 | if not desired_webport: |
|---|
| 100 | got_port = serv._port.getHost().port |
|---|
| 101 | open("webport", "w").write("tcp:%d\n" % got_port) |
|---|
| 102 | |
|---|
| 103 | |
|---|
| 104 | def get_urls(self): |
|---|
| 105 | for url in open("urls.txt","r").readlines(): |
|---|
| 106 | if "#" in url: |
|---|
| 107 | url = url[:url.find("#")] |
|---|
| 108 | url = url.strip() |
|---|
| 109 | if not url: |
|---|
| 110 | continue |
|---|
| 111 | yield url |
|---|
| 112 | |
|---|
| 113 | def poll(self): |
|---|
| 114 | log.msg("polling..") |
|---|
| 115 | #return self.poll_synchronous() |
|---|
| 116 | return self.poll_asynchronous() |
|---|
| 117 | |
|---|
| 118 | def poll_asynchronous(self): |
|---|
| 119 | # this didn't actually seem to work any better than poll_synchronous: |
|---|
| 120 | # logs are more noisy, and I got frequent DNS failures. But with a |
|---|
| 121 | # lot of servers to query, this is probably the better way to go. A |
|---|
| 122 | # significant advantage of this approach is that we can use a |
|---|
| 123 | # timeout= argument to tolerate hanging servers. |
|---|
| 124 | dl = [] |
|---|
| 125 | for url in self.get_urls(): |
|---|
| 126 | when = extime.Time() |
|---|
| 127 | d = client.getPage(url, timeout=60) |
|---|
| 128 | d.addCallback(self.got_response, when, url) |
|---|
| 129 | dl.append(d) |
|---|
| 130 | d = defer.DeferredList(dl) |
|---|
| 131 | def _done(res): |
|---|
| 132 | fetched = len([1 for (success, value) in res if success]) |
|---|
| 133 | log.msg("fetched %d of %d" % (fetched, len(dl))) |
|---|
| 134 | d.addCallback(_done) |
|---|
| 135 | return d |
|---|
| 136 | |
|---|
| 137 | def poll_synchronous(self): |
|---|
| 138 | attempts = 0 |
|---|
| 139 | fetched = 0 |
|---|
| 140 | for url in self.get_urls(): |
|---|
| 141 | attempts += 1 |
|---|
| 142 | try: |
|---|
| 143 | when = extime.Time() |
|---|
| 144 | # if a server accepts the connection and then hangs, this |
|---|
| 145 | # will block forever |
|---|
| 146 | data_json = urllib.urlopen(url).read() |
|---|
| 147 | self.got_response(data_json, when, url) |
|---|
| 148 | fetched += 1 |
|---|
| 149 | except: |
|---|
| 150 | log.msg("error while fetching: %s" % url) |
|---|
| 151 | log.err() |
|---|
| 152 | log.msg("fetched %d of %d" % (fetched, attempts)) |
|---|
| 153 | |
|---|
| 154 | def got_response(self, data_json, when, url): |
|---|
| 155 | data = json.loads(data_json) |
|---|
| 156 | total = data[u"stats"][u"storage_server.disk_total"] |
|---|
| 157 | used = data[u"stats"][u"storage_server.disk_used"] |
|---|
| 158 | avail = data[u"stats"][u"storage_server.disk_avail"] |
|---|
| 159 | print("%s : total=%s, used=%s, avail=%s" % (url, |
|---|
| 160 | total, used, avail)) |
|---|
| 161 | Sample(store=self.store, |
|---|
| 162 | url=unicode(url), when=when, total=total, used=used, avail=avail) |
|---|
| 163 | |
|---|
| 164 | def calculate_growth_timeleft(self): |
|---|
| 165 | timespans = [] |
|---|
| 166 | total_avail_space = self.find_total_available_space() |
|---|
| 167 | pairs = [ (timespan,name) |
|---|
| 168 | for name,timespan in self.AVERAGES.items() ] |
|---|
| 169 | pairs.sort() |
|---|
| 170 | for (timespan,name) in pairs: |
|---|
| 171 | growth = self.growth(timespan) |
|---|
| 172 | print(name, total_avail_space, growth) |
|---|
| 173 | if growth is not None: |
|---|
| 174 | timeleft = None |
|---|
| 175 | if growth > 0: |
|---|
| 176 | timeleft = total_avail_space / growth |
|---|
| 177 | timespans.append( (name, timespan, growth, timeleft) ) |
|---|
| 178 | return timespans |
|---|
| 179 | |
|---|
| 180 | def find_total_space(self): |
|---|
| 181 | # this returns the sum of disk-avail stats for all servers that 1) |
|---|
| 182 | # are listed in urls.txt and 2) have responded recently. |
|---|
| 183 | now = extime.Time() |
|---|
| 184 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
|---|
| 185 | total_space = 0 |
|---|
| 186 | for url in self.get_urls(): |
|---|
| 187 | url = unicode(url) |
|---|
| 188 | latest = list(self.store.query(Sample, |
|---|
| 189 | AND(Sample.url == url, |
|---|
| 190 | Sample.when > recent), |
|---|
| 191 | sort=Sample.when.descending, |
|---|
| 192 | limit=1)) |
|---|
| 193 | if latest: |
|---|
| 194 | total_space += latest[0].total |
|---|
| 195 | return total_space |
|---|
| 196 | |
|---|
| 197 | def find_total_available_space(self): |
|---|
| 198 | # this returns the sum of disk-avail stats for all servers that 1) |
|---|
| 199 | # are listed in urls.txt and 2) have responded recently. |
|---|
| 200 | now = extime.Time() |
|---|
| 201 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
|---|
| 202 | total_avail_space = 0 |
|---|
| 203 | for url in self.get_urls(): |
|---|
| 204 | url = unicode(url) |
|---|
| 205 | latest = list(self.store.query(Sample, |
|---|
| 206 | AND(Sample.url == url, |
|---|
| 207 | Sample.when > recent), |
|---|
| 208 | sort=Sample.when.descending, |
|---|
| 209 | limit=1)) |
|---|
| 210 | if latest: |
|---|
| 211 | total_avail_space += latest[0].avail |
|---|
| 212 | return total_avail_space |
|---|
| 213 | |
|---|
| 214 | def find_total_used_space(self): |
|---|
| 215 | # this returns the sum of disk-used stats for all servers that 1) are |
|---|
| 216 | # listed in urls.txt and 2) have responded recently. |
|---|
| 217 | now = extime.Time() |
|---|
| 218 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
|---|
| 219 | total_used_space = 0 |
|---|
| 220 | for url in self.get_urls(): |
|---|
| 221 | url = unicode(url) |
|---|
| 222 | latest = list(self.store.query(Sample, |
|---|
| 223 | AND(Sample.url == url, |
|---|
| 224 | Sample.when > recent), |
|---|
| 225 | sort=Sample.when.descending, |
|---|
| 226 | limit=1)) |
|---|
| 227 | if latest: |
|---|
| 228 | total_used_space += latest[0].used |
|---|
| 229 | return total_used_space |
|---|
| 230 | |
|---|
| 231 | |
|---|
| 232 | def growth(self, timespan): |
|---|
| 233 | """Calculate the bytes-per-second growth of the total disk-used stat, |
|---|
| 234 | over a period of TIMESPAN seconds (i.e. between the most recent |
|---|
| 235 | sample and the latest one that's at least TIMESPAN seconds ago), |
|---|
| 236 | summed over all nodes which 1) are listed in urls.txt, 2) have |
|---|
| 237 | responded recently, and 3) have a response at least as old as |
|---|
| 238 | TIMESPAN. If there are no nodes which meet these criteria, we'll |
|---|
| 239 | return None; this is likely to happen for the longer timespans (4wk) |
|---|
| 240 | until the gatherer has been running and collecting data for that |
|---|
| 241 | long.""" |
|---|
| 242 | |
|---|
| 243 | # a note about workload: for our oldest storage servers, as of |
|---|
| 244 | # 25-Jan-2009, the first DB query here takes about 40ms per server |
|---|
| 245 | # URL (some take as little as 10ms). There are about 110 servers, and |
|---|
| 246 | # two queries each, so the growth() function takes about 7s to run |
|---|
| 247 | # for each timespan. We track 4 timespans, and find_total_*_space() |
|---|
| 248 | # takes about 2.3s to run, so calculate_growth_timeleft() takes about |
|---|
| 249 | # 27s. Each HTTP query thus takes 27s, and we have six munin plugins |
|---|
| 250 | # which perform HTTP queries every 5 minutes. By adding growth_cache(), |
|---|
| 251 | # I hope to reduce this: the first HTTP query will still take 27s, |
|---|
| 252 | # but the subsequent five should be about 2.3s each. |
|---|
| 253 | |
|---|
| 254 | # we're allowed to cache this value for 3 minutes |
|---|
| 255 | if timespan in self.growth_cache: |
|---|
| 256 | (when, value) = self.growth_cache[timespan] |
|---|
| 257 | if time.time() - when < 3*60: |
|---|
| 258 | return value |
|---|
| 259 | |
|---|
| 260 | td = timedelta(seconds=timespan) |
|---|
| 261 | now = extime.Time() |
|---|
| 262 | then = now - td |
|---|
| 263 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
|---|
| 264 | |
|---|
| 265 | total_growth = 0.0 |
|---|
| 266 | num_nodes = 0 |
|---|
| 267 | |
|---|
| 268 | for url in self.get_urls(): |
|---|
| 269 | url = unicode(url) |
|---|
| 270 | latest = list(self.store.query(Sample, |
|---|
| 271 | AND(Sample.url == url, |
|---|
| 272 | Sample.when > recent), |
|---|
| 273 | sort=Sample.when.descending, |
|---|
| 274 | limit=1)) |
|---|
| 275 | if not latest: |
|---|
| 276 | #print "no latest sample from", url |
|---|
| 277 | continue # skip this node |
|---|
| 278 | latest = latest[0] |
|---|
| 279 | old = list(self.store.query(Sample, |
|---|
| 280 | AND(Sample.url == url, |
|---|
| 281 | Sample.when < then), |
|---|
| 282 | sort=Sample.when.descending, |
|---|
| 283 | limit=1)) |
|---|
| 284 | if not old: |
|---|
| 285 | #print "no old sample from", url |
|---|
| 286 | continue # skip this node |
|---|
| 287 | old = old[0] |
|---|
| 288 | duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp() |
|---|
| 289 | if not duration: |
|---|
| 290 | print("only one sample from", url) |
|---|
| 291 | continue |
|---|
| 292 | |
|---|
| 293 | rate = float(latest.used - old.used) / duration |
|---|
| 294 | #print url, rate |
|---|
| 295 | total_growth += rate |
|---|
| 296 | num_nodes += 1 |
|---|
| 297 | |
|---|
| 298 | if not num_nodes: |
|---|
| 299 | return None |
|---|
| 300 | self.growth_cache[timespan] = (time.time(), total_growth) |
|---|
| 301 | return total_growth |
|---|
| 302 | |
|---|
| 303 | def getChild(self, path, req): |
|---|
| 304 | if path == "": |
|---|
| 305 | return self |
|---|
| 306 | return resource.Resource.getChild(self, path, req) |
|---|
| 307 | |
|---|
| 308 | def abbreviate_time(self, s): |
|---|
| 309 | def _plural(count, unit): |
|---|
| 310 | count = int(count) |
|---|
| 311 | if count == 1: |
|---|
| 312 | return "%d %s" % (count, unit) |
|---|
| 313 | return "%d %ss" % (count, unit) |
|---|
| 314 | if s is None: |
|---|
| 315 | return "unknown" |
|---|
| 316 | if s < 120: |
|---|
| 317 | return _plural(s, "second") |
|---|
| 318 | if s < 3*HOUR: |
|---|
| 319 | return _plural(s/60, "minute") |
|---|
| 320 | if s < 2*DAY: |
|---|
| 321 | return _plural(s/HOUR, "hour") |
|---|
| 322 | if s < 2*MONTH: |
|---|
| 323 | return _plural(s/DAY, "day") |
|---|
| 324 | if s < 4*YEAR: |
|---|
| 325 | return _plural(s/MONTH, "month") |
|---|
| 326 | return _plural(s/YEAR, "year") |
|---|
| 327 | |
|---|
| 328 | def abbreviate_space2(self, s, SI=True): |
|---|
| 329 | if s is None: |
|---|
| 330 | return "unknown" |
|---|
| 331 | if SI: |
|---|
| 332 | U = 1000.0 |
|---|
| 333 | isuffix = "B" |
|---|
| 334 | else: |
|---|
| 335 | U = 1024.0 |
|---|
| 336 | isuffix = "iB" |
|---|
| 337 | def r(count, suffix): |
|---|
| 338 | return "%.2f %s%s" % (count, suffix, isuffix) |
|---|
| 339 | |
|---|
| 340 | if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode |
|---|
| 341 | return r(s, "") |
|---|
| 342 | if s < U*U: |
|---|
| 343 | return r(s/U, "k") |
|---|
| 344 | if s < U*U*U: |
|---|
| 345 | return r(s/(U*U), "M") |
|---|
| 346 | if s < U*U*U*U: |
|---|
| 347 | return r(s/(U*U*U), "G") |
|---|
| 348 | if s < U*U*U*U*U: |
|---|
| 349 | return r(s/(U*U*U*U), "T") |
|---|
| 350 | return r(s/(U*U*U*U*U), "P") |
|---|
| 351 | |
|---|
| 352 | def abbreviate_space(self, s): |
|---|
| 353 | return "(%s, %s)" % (self.abbreviate_space2(s, True), |
|---|
| 354 | self.abbreviate_space2(s, False)) |
|---|
| 355 | |
|---|
| 356 | def render(self, req): |
|---|
| 357 | t = req.args.get("t", ["html"])[0] |
|---|
| 358 | ctype = "text/plain" |
|---|
| 359 | data = "" |
|---|
| 360 | if t == "html": |
|---|
| 361 | data = "" |
|---|
| 362 | for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft(): |
|---|
| 363 | data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \ |
|---|
| 364 | (growth, self.abbreviate_space2(growth, True), |
|---|
| 365 | self.abbreviate_time(timeleft), name) |
|---|
| 366 | used = self.find_total_used_space() |
|---|
| 367 | data += "total used: %d bytes %s\n" % (used, |
|---|
| 368 | self.abbreviate_space(used)) |
|---|
| 369 | total = self.find_total_space() |
|---|
| 370 | data += "total space: %d bytes %s\n" % (total, |
|---|
| 371 | self.abbreviate_space(total)) |
|---|
| 372 | elif t == "json": |
|---|
| 373 | current = {"rates": self.calculate_growth_timeleft(), |
|---|
| 374 | "total": self.find_total_space(), |
|---|
| 375 | "used": self.find_total_used_space(), |
|---|
| 376 | "available": self.find_total_available_space(), |
|---|
| 377 | } |
|---|
| 378 | data = json.dumps(current, indent=True) |
|---|
| 379 | else: |
|---|
| 380 | req.setResponseCode(http.BAD_REQUEST) |
|---|
| 381 | data = "Unknown t= %s\n" % t |
|---|
| 382 | req.setHeader("content-type", ctype) |
|---|
| 383 | return data |
|---|
| 384 | |
|---|
| 385 | application = service.Application("disk-watcher") |
|---|
| 386 | DiskWatcher().setServiceParent(application) |
|---|