| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from six import ensure_str |
|---|
| 6 | |
|---|
| 7 | from allmydata.uri import from_string |
|---|
| 8 | from allmydata.util import base32, log, dictutil |
|---|
| 9 | from allmydata.util.happinessutil import servers_of_happiness |
|---|
| 10 | from allmydata.check_results import CheckAndRepairResults, CheckResults |
|---|
| 11 | |
|---|
| 12 | from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError |
|---|
| 13 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
|---|
| 14 | from allmydata.mutable.retrieve import Retrieve # for verifying |
|---|
| 15 | |
|---|
| 16 | class MutableChecker: |
|---|
| 17 | SERVERMAP_MODE = MODE_CHECK |
|---|
| 18 | |
|---|
| 19 | def __init__(self, node, storage_broker, history, monitor): |
|---|
| 20 | self._node = node |
|---|
| 21 | self._storage_broker = storage_broker |
|---|
| 22 | self._history = history |
|---|
| 23 | self._monitor = monitor |
|---|
| 24 | self.bad_shares = [] # list of (server,shnum,failure) |
|---|
| 25 | self._storage_index = self._node.get_storage_index() |
|---|
| 26 | self.need_repair = False |
|---|
| 27 | self.responded = set() # set of (binary) nodeids |
|---|
| 28 | |
|---|
| 29 | def check(self, verify=False, add_lease=False): |
|---|
| 30 | servermap = ServerMap() |
|---|
| 31 | # Updating the servermap in MODE_CHECK will stand a good chance |
|---|
| 32 | # of finding all of the shares, and getting a good idea of |
|---|
| 33 | # recoverability, etc, without verifying. |
|---|
| 34 | u = ServermapUpdater(self._node, self._storage_broker, self._monitor, |
|---|
| 35 | servermap, self.SERVERMAP_MODE, |
|---|
| 36 | add_lease=add_lease) |
|---|
| 37 | if self._history: |
|---|
| 38 | self._history.notify_mapupdate(u.get_status()) |
|---|
| 39 | d = u.update() |
|---|
| 40 | d.addCallback(self._got_mapupdate_results) |
|---|
| 41 | if verify: |
|---|
| 42 | d.addCallback(self._verify_all_shares) |
|---|
| 43 | d.addCallback(lambda res: servermap) |
|---|
| 44 | d.addCallback(self._make_checker_results) |
|---|
| 45 | return d |
|---|
| 46 | |
|---|
| 47 | def _got_mapupdate_results(self, servermap): |
|---|
| 48 | # the file is healthy if there is exactly one recoverable version, it |
|---|
| 49 | # has at least N distinct shares, and there are no unrecoverable |
|---|
| 50 | # versions: all existing shares will be for the same version. |
|---|
| 51 | self._monitor.raise_if_cancelled() |
|---|
| 52 | self.best_version = None |
|---|
| 53 | num_recoverable = len(servermap.recoverable_versions()) |
|---|
| 54 | if num_recoverable: |
|---|
| 55 | self.best_version = servermap.best_recoverable_version() |
|---|
| 56 | |
|---|
| 57 | # The file is unhealthy and needs to be repaired if: |
|---|
| 58 | # - There are unrecoverable versions. |
|---|
| 59 | if servermap.unrecoverable_versions(): |
|---|
| 60 | self.need_repair = True |
|---|
| 61 | # - There isn't a recoverable version. |
|---|
| 62 | if num_recoverable != 1: |
|---|
| 63 | self.need_repair = True |
|---|
| 64 | # - The best recoverable version is missing some shares. |
|---|
| 65 | if self.best_version: |
|---|
| 66 | available_shares = servermap.shares_available() |
|---|
| 67 | (num_distinct_shares, k, N) = available_shares[self.best_version] |
|---|
| 68 | if num_distinct_shares < N: |
|---|
| 69 | self.need_repair = True |
|---|
| 70 | |
|---|
| 71 | return servermap |
|---|
| 72 | |
|---|
| 73 | def _verify_all_shares(self, servermap): |
|---|
| 74 | # read every byte of each share |
|---|
| 75 | # |
|---|
| 76 | # This logic is going to be very nearly the same as the |
|---|
| 77 | # downloader. I bet we could pass the downloader a flag that |
|---|
| 78 | # makes it do this, and piggyback onto that instead of |
|---|
| 79 | # duplicating a bunch of code. |
|---|
| 80 | # |
|---|
| 81 | # Like: |
|---|
| 82 | # r = Retrieve(blah, blah, blah, verify=True) |
|---|
| 83 | # d = r.download() |
|---|
| 84 | # (wait, wait, wait, d.callback) |
|---|
| 85 | # |
|---|
| 86 | # Then, when it has finished, we can check the servermap (which |
|---|
| 87 | # we provided to Retrieve) to figure out which shares are bad, |
|---|
| 88 | # since the Retrieve process will have updated the servermap as |
|---|
| 89 | # it went along. |
|---|
| 90 | # |
|---|
| 91 | # By passing the verify=True flag to the constructor, we are |
|---|
| 92 | # telling the downloader a few things. |
|---|
| 93 | # |
|---|
| 94 | # 1. It needs to download all N shares, not just K shares. |
|---|
| 95 | # 2. It doesn't need to decrypt or decode the shares, only |
|---|
| 96 | # verify them. |
|---|
| 97 | if not self.best_version: |
|---|
| 98 | return |
|---|
| 99 | |
|---|
| 100 | r = Retrieve(self._node, self._storage_broker, servermap, |
|---|
| 101 | self.best_version, verify=True) |
|---|
| 102 | d = r.download() |
|---|
| 103 | d.addCallback(self._process_bad_shares) |
|---|
| 104 | return d |
|---|
| 105 | |
|---|
| 106 | |
|---|
| 107 | def _process_bad_shares(self, bad_shares): |
|---|
| 108 | if bad_shares: |
|---|
| 109 | self.need_repair = True |
|---|
| 110 | self.bad_shares = bad_shares |
|---|
| 111 | |
|---|
| 112 | |
|---|
| 113 | def _count_shares(self, smap, version): |
|---|
| 114 | available_shares = smap.shares_available() |
|---|
| 115 | (num_distinct_shares, k, N) = available_shares[version] |
|---|
| 116 | counters = {} |
|---|
| 117 | counters["count-shares-good"] = num_distinct_shares |
|---|
| 118 | counters["count-shares-needed"] = k |
|---|
| 119 | counters["count-shares-expected"] = N |
|---|
| 120 | good_hosts = smap.all_servers_for_version(version) |
|---|
| 121 | counters["count-good-share-hosts"] = len(good_hosts) |
|---|
| 122 | vmap = smap.make_versionmap() |
|---|
| 123 | counters["count-wrong-shares"] = sum([len(shares) |
|---|
| 124 | for verinfo,shares in vmap.items() |
|---|
| 125 | if verinfo != version]) |
|---|
| 126 | |
|---|
| 127 | return counters |
|---|
| 128 | |
|---|
| 129 | def _make_checker_results(self, smap): |
|---|
| 130 | self._monitor.raise_if_cancelled() |
|---|
| 131 | healthy = True |
|---|
| 132 | report = [] |
|---|
| 133 | summary = [] |
|---|
| 134 | vmap = smap.make_versionmap() |
|---|
| 135 | recoverable = smap.recoverable_versions() |
|---|
| 136 | unrecoverable = smap.unrecoverable_versions() |
|---|
| 137 | |
|---|
| 138 | if recoverable: |
|---|
| 139 | report.append("Recoverable Versions: " + |
|---|
| 140 | "/".join(["%d*%s" % (len(vmap[v]), |
|---|
| 141 | smap.summarize_version(v)) |
|---|
| 142 | for v in recoverable])) |
|---|
| 143 | if unrecoverable: |
|---|
| 144 | report.append("Unrecoverable Versions: " + |
|---|
| 145 | "/".join(["%d*%s" % (len(vmap[v]), |
|---|
| 146 | smap.summarize_version(v)) |
|---|
| 147 | for v in unrecoverable])) |
|---|
| 148 | if smap.unrecoverable_versions(): |
|---|
| 149 | healthy = False |
|---|
| 150 | summary.append("some versions are unrecoverable") |
|---|
| 151 | report.append("Unhealthy: some versions are unrecoverable") |
|---|
| 152 | if len(recoverable) == 0: |
|---|
| 153 | healthy = False |
|---|
| 154 | summary.append("no versions are recoverable") |
|---|
| 155 | report.append("Unhealthy: no versions are recoverable") |
|---|
| 156 | if len(recoverable) > 1: |
|---|
| 157 | healthy = False |
|---|
| 158 | summary.append("multiple versions are recoverable") |
|---|
| 159 | report.append("Unhealthy: there are multiple recoverable versions") |
|---|
| 160 | |
|---|
| 161 | if recoverable: |
|---|
| 162 | best_version = smap.best_recoverable_version() |
|---|
| 163 | report.append("Best Recoverable Version: " + |
|---|
| 164 | smap.summarize_version(best_version)) |
|---|
| 165 | counters = self._count_shares(smap, best_version) |
|---|
| 166 | s = counters["count-shares-good"] |
|---|
| 167 | k = counters["count-shares-needed"] |
|---|
| 168 | N = counters["count-shares-expected"] |
|---|
| 169 | if s < N: |
|---|
| 170 | healthy = False |
|---|
| 171 | report.append("Unhealthy: best version has only %d shares " |
|---|
| 172 | "(encoding is %d-of-%d)" % (s, k, N)) |
|---|
| 173 | summary.append("%d shares (enc %d-of-%d)" % (s, k, N)) |
|---|
| 174 | elif unrecoverable: |
|---|
| 175 | healthy = False |
|---|
| 176 | # find a k and N from somewhere |
|---|
| 177 | first = list(unrecoverable)[0] |
|---|
| 178 | # not exactly the best version, but that doesn't matter too much |
|---|
| 179 | counters = self._count_shares(smap, first) |
|---|
| 180 | else: |
|---|
| 181 | # couldn't find anything at all |
|---|
| 182 | counters = { |
|---|
| 183 | "count-shares-good": 0, |
|---|
| 184 | "count-shares-needed": 3, # arbitrary defaults |
|---|
| 185 | "count-shares-expected": 10, |
|---|
| 186 | "count-good-share-hosts": 0, |
|---|
| 187 | "count-wrong-shares": 0, |
|---|
| 188 | } |
|---|
| 189 | |
|---|
| 190 | corrupt_share_locators = [] |
|---|
| 191 | problems = [] |
|---|
| 192 | if self.bad_shares: |
|---|
| 193 | report.append("Corrupt Shares:") |
|---|
| 194 | summary.append("Corrupt Shares:") |
|---|
| 195 | for (server, shnum, f) in sorted(self.bad_shares, key=id): |
|---|
| 196 | serverid = server.get_serverid() |
|---|
| 197 | locator = (server, self._storage_index, shnum) |
|---|
| 198 | corrupt_share_locators.append(locator) |
|---|
| 199 | s = "%s-sh%d" % (ensure_str(server.get_name()), shnum) |
|---|
| 200 | if f.check(CorruptShareError): |
|---|
| 201 | ft = f.value.reason |
|---|
| 202 | else: |
|---|
| 203 | ft = str(f) |
|---|
| 204 | report.append(" %s: %s" % (s, ft)) |
|---|
| 205 | summary.append(s) |
|---|
| 206 | p = (serverid, self._storage_index, shnum, f) |
|---|
| 207 | problems.append(p) |
|---|
| 208 | msg = ("CorruptShareError during mutable verify, " |
|---|
| 209 | "serverid=%(serverid)s, si=%(si)s, shnum=%(shnum)d, " |
|---|
| 210 | "where=%(where)s") |
|---|
| 211 | log.msg(format=msg, serverid=server.get_name(), |
|---|
| 212 | si=base32.b2a(self._storage_index), |
|---|
| 213 | shnum=shnum, |
|---|
| 214 | where=ft, |
|---|
| 215 | level=log.WEIRD, umid="EkK8QA") |
|---|
| 216 | |
|---|
| 217 | sharemap = dictutil.DictOfSets() |
|---|
| 218 | for verinfo in vmap: |
|---|
| 219 | for (shnum, server, timestamp) in vmap[verinfo]: |
|---|
| 220 | shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum) |
|---|
| 221 | sharemap.add(shareid, server) |
|---|
| 222 | if healthy: |
|---|
| 223 | summary = "Healthy" |
|---|
| 224 | else: |
|---|
| 225 | summary = "Unhealthy: " + " ".join(summary) |
|---|
| 226 | |
|---|
| 227 | count_happiness = servers_of_happiness(sharemap) |
|---|
| 228 | |
|---|
| 229 | cr = CheckResults(from_string(self._node.get_uri()), |
|---|
| 230 | self._storage_index, |
|---|
| 231 | healthy=healthy, recoverable=bool(recoverable), |
|---|
| 232 | count_happiness=count_happiness, |
|---|
| 233 | count_shares_needed=counters["count-shares-needed"], |
|---|
| 234 | count_shares_expected=counters["count-shares-expected"], |
|---|
| 235 | count_shares_good=counters["count-shares-good"], |
|---|
| 236 | count_good_share_hosts=counters["count-good-share-hosts"], |
|---|
| 237 | count_recoverable_versions=len(recoverable), |
|---|
| 238 | count_unrecoverable_versions=len(unrecoverable), |
|---|
| 239 | servers_responding=list(smap.get_reachable_servers()), |
|---|
| 240 | sharemap=sharemap, |
|---|
| 241 | count_wrong_shares=counters["count-wrong-shares"], |
|---|
| 242 | list_corrupt_shares=corrupt_share_locators, |
|---|
| 243 | count_corrupt_shares=len(corrupt_share_locators), |
|---|
| 244 | list_incompatible_shares=[], |
|---|
| 245 | count_incompatible_shares=0, |
|---|
| 246 | summary=summary, |
|---|
| 247 | report=report, |
|---|
| 248 | share_problems=problems, |
|---|
| 249 | servermap=smap.copy()) |
|---|
| 250 | return cr |
|---|
| 251 | |
|---|
| 252 | |
|---|
| 253 | class MutableCheckAndRepairer(MutableChecker): |
|---|
| 254 | SERVERMAP_MODE = MODE_WRITE # needed to get the privkey |
|---|
| 255 | |
|---|
| 256 | def __init__(self, node, storage_broker, history, monitor): |
|---|
| 257 | MutableChecker.__init__(self, node, storage_broker, history, monitor) |
|---|
| 258 | self.cr_results = CheckAndRepairResults(self._storage_index) |
|---|
| 259 | self.need_repair = False |
|---|
| 260 | |
|---|
| 261 | def check(self, verify=False, add_lease=False): |
|---|
| 262 | d = MutableChecker.check(self, verify, add_lease) |
|---|
| 263 | d.addCallback(self._stash_pre_repair_results) |
|---|
| 264 | d.addCallback(self._maybe_repair) |
|---|
| 265 | d.addCallback(lambda res: self.cr_results) |
|---|
| 266 | return d |
|---|
| 267 | |
|---|
| 268 | def _stash_pre_repair_results(self, pre_repair_results): |
|---|
| 269 | self.cr_results.pre_repair_results = pre_repair_results |
|---|
| 270 | return pre_repair_results |
|---|
| 271 | |
|---|
| 272 | def _maybe_repair(self, pre_repair_results): |
|---|
| 273 | crr = self.cr_results |
|---|
| 274 | self._monitor.raise_if_cancelled() |
|---|
| 275 | if not self.need_repair: |
|---|
| 276 | crr.post_repair_results = pre_repair_results |
|---|
| 277 | return |
|---|
| 278 | if self._node.is_readonly(): |
|---|
| 279 | # ticket #625: we cannot yet repair read-only mutable files |
|---|
| 280 | crr.post_repair_results = pre_repair_results |
|---|
| 281 | crr.repair_attempted = False |
|---|
| 282 | return |
|---|
| 283 | crr.repair_attempted = True |
|---|
| 284 | d = self._node.repair(pre_repair_results, monitor=self._monitor) |
|---|
| 285 | def _repair_finished(rr): |
|---|
| 286 | crr.repair_successful = rr.get_successful() |
|---|
| 287 | crr.post_repair_results = self._make_checker_results(rr.servermap) |
|---|
| 288 | crr.repair_results = rr # TODO? |
|---|
| 289 | return |
|---|
| 290 | def _repair_error(f): |
|---|
| 291 | # I'm not sure if I want to pass through a failure or not. |
|---|
| 292 | crr.repair_successful = False |
|---|
| 293 | crr.repair_failure = f # TODO? |
|---|
| 294 | #crr.post_repair_results = ?? |
|---|
| 295 | return f |
|---|
| 296 | d.addCallbacks(_repair_finished, _repair_error) |
|---|
| 297 | return d |
|---|