Bug Summary

File:ubik/recovery.c
Location:line 737, column 6
Description:Value stored to 'code' is never read

Annotated Source Code

1/*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10#include <afsconfig.h>
11#include <afs/param.h>
12
13#include <roken.h>
14
15#include <lock.h>
16#include <rx/xdr.h>
17#include <rx/rx.h>
18#include <afs/afsutil.h>
19#include <afs/cellconfig.h>
20
21#define UBIK_INTERNALS
22#include "ubik.h"
23#include "ubik_int.h"
24
25/*! \file
26 * This module is responsible for determining when the system has
27 * recovered to the point that it can handle new transactions. It
28 * replays logs, polls to determine the current dbase after a crash,
29 * and distributes the new database to the others.
30 *
31 * The sync site associates a version number with each database. It
32 * broadcasts the version associated with its current dbase in every
33 * one of its beacon messages. When the sync site send a dbase to a
34 * server, it also sends the db's version. A non-sync site server can
35 * tell if it has the right dbase version by simply comparing the
36 * version from the beacon message \p uvote_dbVersion with the version
37 * associated with the database \p ubik_dbase->version. The sync site
38 * itself simply has one counter to keep track of all of this (again
39 * \p ubik_dbase->version).
40 *
41 * sync site: routine called when the sync site loses its quorum; this
42 * procedure is called "up" from the beacon package. It resyncs the
43 * dbase and nudges the recovery daemon to try to propagate out the
44 * changes. It also resets the recovery daemon's state, since
45 * recovery must potentially find a new dbase to propagate out. This
46 * routine should not do anything with variables used by non-sync site
47 * servers.
48 */
49
50/*!
51 * if this flag is set, then ubik will use only the primary address
52 * (the address specified in the CellServDB) to contact other
53 * ubik servers. Ubik recovery will not try opening connections
54 * to the alternate interface addresses.
55 */
56int ubikPrimaryAddrOnly;
57
58int
59urecovery_ResetState(void)
60{
61 urecovery_state = 0;
62#if !defined(AFS_PTHREAD_ENV)
63 /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
64 LWP_NoYieldSignal(&urecovery_state)LWP_INTERNALSIGNAL(&urecovery_state, 0);
65#endif
66 return 0;
67}
68
69/*!
70 * \brief sync site
71 *
72 * routine called when a non-sync site server goes down; restarts recovery
73 * process to send missing server the new db when it comes back up for
74 * non-sync site servers.
75 *
76 * \note This routine should not do anything with variables used by non-sync site servers.
77 */
78int
79urecovery_LostServer(struct ubik_server *ts)
80{
81 ubeacon_ReinitServer(ts);
82#if !defined(AFS_PTHREAD_ENV)
83 /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
84 LWP_NoYieldSignal(&urecovery_state)LWP_INTERNALSIGNAL(&urecovery_state, 0);
85#endif
86 return 0;
87}
88
89/*!
90 * return true iff we have a current database (called by both sync
91 * sites and non-sync sites) How do we determine this? If we're the
92 * sync site, we wait until recovery has finished fetching and
93 * re-labelling its dbase (it may still be trying to propagate it out
94 * to everyone else; that's THEIR problem). If we're not the sync
95 * site, then we must have a dbase labelled with the right version,
96 * and we must have a currently-good sync site.
97 */
98int
99urecovery_AllBetter(struct ubik_dbase *adbase, int areadAny)
100{
101 afs_int32 rcode;
102
103 ubik_dprint_25("allbetter checking\n");
104 rcode = 0;
105
106
107 if (areadAny) {
108 if (ubik_dbase->version.epoch > 1)
109 rcode = 1; /* Happy with any good version of database */
110 }
111
112 /* Check if we're sync site and we've got the right data */
113 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB4)) {
114 rcode = 1;
115 }
116
117 /* next, check if we're aux site, and we've ever been sent the
118 * right data (note that if a dbase update fails, we won't think
119 * that the sync site is still the sync site, 'cause it won't talk
120 * to us until a timeout period has gone by. When we recover, we
121 * leave this clear until we get a new dbase */
122 else if (uvote_HaveSyncAndVersion(ubik_dbase->version)) {
123 rcode = 1;
124 }
125
126 ubik_dprint_25("allbetter: returning %d\n", rcode);
127 return rcode;
128}
129
130/*!
131 * \brief abort all transactions on this database
132 */
133int
134urecovery_AbortAll(struct ubik_dbase *adbase)
135{
136 struct ubik_trans *tt;
137 for (tt = adbase->activeTrans; tt; tt = tt->next) {
138 udisk_abort(tt);
139 }
140 return 0;
141}
142
143/*!
144 * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
145 */
146int
147urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
148{
149 if (ubik_currentTrans) {
150 /* there is remote write trans, see if we match, see if this
151 * is a new transaction */
152 if (atid->epoch != ubik_currentTrans->tid.epoch
153 || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
154 /* don't match, abort it */
155 /* If the thread is not waiting for lock - ok to end it */
156 if (ubik_currentTrans->locktype != LOCKWAIT3) {
157 udisk_end(ubik_currentTrans);
158 }
159 ubik_currentTrans = (struct ubik_trans *)0;
160 }
161 }
162 return 0;
163}
164
165/*!
166 * \brief replay logs
167 *
168 * log format is defined here, and implicitly in disk.c
169 *
170 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
171 * are in logged in network standard byte order, in case we want to move logs
172 * from machine-to-machine someday.
173 *
174 * Begin transaction: opcode \n
175 * Commit transaction: opcode, version (8 bytes) \n
176 * Truncate file: opcode, file number, length \n
177 * Abort transaction: opcode \n
178 * Write data: opcode, file, position, length, <length> data bytes \n
179 *
180 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
181 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
182 * the commit record, then we allow data to be written through to the dbase. In our particular
183 * implementation, once a transaction is done, we write out the pages to the database, so that
184 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
185 * any changed data while there is an uncommitted write transaction can be zapped during an
186 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
187 * the log.
188 */
189static int
190ReplayLog(struct ubik_dbase *adbase)
191{
192 afs_int32 opcode;
193 afs_int32 code, tpos;
194 int logIsGood;
195 afs_int32 len, thisSize, tfile, filePos;
196 afs_int32 buffer[4];
197 afs_int32 syncFile = -1;
198 afs_int32 data[1024];
199
200 /* read the lock twice, once to see whether we have a transaction to deal
201 * with that committed, (theoretically, we should support more than one
202 * trans in the log at once, but not yet), and once replaying the
203 * transactions. */
204 tpos = 0;
205 logIsGood = 0;
206 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
207 while (1) {
208 code =
209 (*adbase->read) (adbase, LOGFILE(-1), (char *)&opcode, tpos,
210 sizeof(afs_int32));
211 if (code != sizeof(afs_int32))
212 break;
213 opcode = ntohl(opcode)(__builtin_constant_p(opcode) ? ((((__uint32_t)(opcode)) >>
24) | ((((__uint32_t)(opcode)) & (0xff << 16)) >>
8) | ((((__uint32_t)(opcode)) & (0xff << 8)) <<
8) | (((__uint32_t)(opcode)) << 24)) : __bswap32_var(opcode
))
;
214 if (opcode == LOGNEW100) {
215 /* handle begin trans */
216 tpos += sizeof(afs_int32);
217 } else if (opcode == LOGABORT102)
218 break;
219 else if (opcode == LOGEND101) {
220 logIsGood = 1;
221 break;
222 } else if (opcode == LOGTRUNCATE104) {
223 tpos += 4;
224 code =
225 (*adbase->read) (adbase, LOGFILE(-1), (char *)buffer, tpos,
226 2 * sizeof(afs_int32));
227 if (code != 2 * sizeof(afs_int32))
228 break; /* premature eof or io error */
229 tpos += 2 * sizeof(afs_int32);
230 } else if (opcode == LOGDATA103) {
231 tpos += 4;
232 code =
233 (*adbase->read) (adbase, LOGFILE(-1), (char *)buffer, tpos,
234 3 * sizeof(afs_int32));
235 if (code != 3 * sizeof(afs_int32))
236 break;
237 /* otherwise, skip over the data bytes, too */
238 tpos += ntohl(buffer[2])(__builtin_constant_p(buffer[2]) ? ((((__uint32_t)(buffer[2])
) >> 24) | ((((__uint32_t)(buffer[2])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[2])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[2])) << 24)) :
__bswap32_var(buffer[2]))
+ 3 * sizeof(afs_int32);
239 } else {
240 ubik_print("corrupt log opcode (%d) at position %d\n", opcode,
241 tpos);
242 break; /* corrupt log! */
243 }
244 }
245 if (logIsGood) {
246 /* actually do the replay; log should go all the way through the commit record, since
247 * we just read it above. */
248 tpos = 0;
249 logIsGood = 0;
250 syncFile = -1;
251 while (1) {
252 code =
253 (*adbase->read) (adbase, LOGFILE(-1), (char *)&opcode, tpos,
254 sizeof(afs_int32));
255 if (code != sizeof(afs_int32))
256 break;
257 opcode = ntohl(opcode)(__builtin_constant_p(opcode) ? ((((__uint32_t)(opcode)) >>
24) | ((((__uint32_t)(opcode)) & (0xff << 16)) >>
8) | ((((__uint32_t)(opcode)) & (0xff << 8)) <<
8) | (((__uint32_t)(opcode)) << 24)) : __bswap32_var(opcode
))
;
258 if (opcode == LOGNEW100) {
259 /* handle begin trans */
260 tpos += sizeof(afs_int32);
261 } else if (opcode == LOGABORT102)
262 panic("log abort\n");
263 else if (opcode == LOGEND101) {
264 struct ubik_version version;
265 tpos += 4;
266 code =
267 (*adbase->read) (adbase, LOGFILE(-1), (char *)buffer, tpos,
268 2 * sizeof(afs_int32));
269 if (code != 2 * sizeof(afs_int32))
270 return UBADLOG(5384L);
271 version.epoch = ntohl(buffer[0])(__builtin_constant_p(buffer[0]) ? ((((__uint32_t)(buffer[0])
) >> 24) | ((((__uint32_t)(buffer[0])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[0])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[0])) << 24)) :
__bswap32_var(buffer[0]))
;
272 version.counter = ntohl(buffer[1])(__builtin_constant_p(buffer[1]) ? ((((__uint32_t)(buffer[1])
) >> 24) | ((((__uint32_t)(buffer[1])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[1])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[1])) << 24)) :
__bswap32_var(buffer[1]))
;
273 code = (*adbase->setlabel) (adbase, 0, &version);
274 if (code)
275 return code;
276 ubik_print("Successfully replayed log for interrupted "
277 "transaction; db version is now %ld.%ld\n",
278 (long) version.epoch, (long) version.counter);
279 logIsGood = 1;
280 break; /* all done now */
281 } else if (opcode == LOGTRUNCATE104) {
282 tpos += 4;
283 code =
284 (*adbase->read) (adbase, LOGFILE(-1), (char *)buffer, tpos,
285 2 * sizeof(afs_int32));
286 if (code != 2 * sizeof(afs_int32))
287 break; /* premature eof or io error */
288 tpos += 2 * sizeof(afs_int32);
289 code =
290 (*adbase->truncate) (adbase, ntohl(buffer[0])(__builtin_constant_p(buffer[0]) ? ((((__uint32_t)(buffer[0])
) >> 24) | ((((__uint32_t)(buffer[0])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[0])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[0])) << 24)) :
__bswap32_var(buffer[0]))
,
291 ntohl(buffer[1])(__builtin_constant_p(buffer[1]) ? ((((__uint32_t)(buffer[1])
) >> 24) | ((((__uint32_t)(buffer[1])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[1])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[1])) << 24)) :
__bswap32_var(buffer[1]))
);
292 if (code)
293 return code;
294 } else if (opcode == LOGDATA103) {
295 tpos += 4;
296 code =
297 (*adbase->read) (adbase, LOGFILE(-1), (char *)buffer, tpos,
298 3 * sizeof(afs_int32));
299 if (code != 3 * sizeof(afs_int32))
300 break;
301 tpos += 3 * sizeof(afs_int32);
302 /* otherwise, skip over the data bytes, too */
303 len = ntohl(buffer[2])(__builtin_constant_p(buffer[2]) ? ((((__uint32_t)(buffer[2])
) >> 24) | ((((__uint32_t)(buffer[2])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[2])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[2])) << 24)) :
__bswap32_var(buffer[2]))
; /* total number of bytes to copy */
304 filePos = ntohl(buffer[1])(__builtin_constant_p(buffer[1]) ? ((((__uint32_t)(buffer[1])
) >> 24) | ((((__uint32_t)(buffer[1])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[1])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[1])) << 24)) :
__bswap32_var(buffer[1]))
;
305 tfile = ntohl(buffer[0])(__builtin_constant_p(buffer[0]) ? ((((__uint32_t)(buffer[0])
) >> 24) | ((((__uint32_t)(buffer[0])) & (0xff <<
16)) >> 8) | ((((__uint32_t)(buffer[0])) & (0xff <<
8)) << 8) | (((__uint32_t)(buffer[0])) << 24)) :
__bswap32_var(buffer[0]))
;
306 /* try to minimize file syncs */
307 if (syncFile != tfile) {
308 if (syncFile >= 0)
309 code = (*adbase->sync) (adbase, syncFile);
310 else
311 code = 0;
312 syncFile = tfile;
313 if (code)
314 return code;
315 }
316 while (len > 0) {
317 thisSize = (len > sizeof(data) ? sizeof(data) : len);
318 /* copy sizeof(data) buffer bytes at a time */
319 code =
320 (*adbase->read) (adbase, LOGFILE(-1), (char *)data, tpos,
321 thisSize);
322 if (code != thisSize)
323 return UBADLOG(5384L);
324 code =
325 (*adbase->write) (adbase, tfile, (char *)data, filePos,
326 thisSize);
327 if (code != thisSize)
328 return UBADLOG(5384L);
329 filePos += thisSize;
330 tpos += thisSize;
331 len -= thisSize;
332 }
333 } else {
334 ubik_print("corrupt log opcode (%d) at position %d\n",
335 opcode, tpos);
336 break; /* corrupt log! */
337 }
338 }
339 if (logIsGood) {
340 if (syncFile >= 0)
341 code = (*adbase->sync) (adbase, syncFile);
342 if (code)
343 return code;
344 } else {
345 ubik_print("Log read error on pass 2\n");
346 return UBADLOG(5384L);
347 }
348 }
349
350 /* now truncate the log, we're done with it */
351 code = (*adbase->truncate) (adbase, LOGFILE(-1), 0);
352 return code;
353}
354
355/*! \brief
356 * Called at initialization to figure out version of the dbase we really have.
357 *
358 * This routine is called after replaying the log; it reads the restored labels.
359 */
360static int
361InitializeDB(struct ubik_dbase *adbase)
362{
363 afs_int32 code;
364
365 code = (*adbase->getlabel) (adbase, 0, &adbase->version);
366 if (code) {
367 /* try setting the label to a new value */
368 UBIK_VERSION_LOCK;
369 adbase->version.epoch = 1; /* value for newly-initialized db */
370 adbase->version.counter = 1;
371 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
372 if (code) {
373 /* failed, try to set it back */
374 adbase->version.epoch = 0;
375 adbase->version.counter = 0;
376 (*adbase->setlabel) (adbase, 0, &adbase->version);
377 }
378#ifdef AFS_PTHREAD_ENV
379 CV_BROADCAST(&adbase->version_cond);
380#else
381 LWP_NoYieldSignal(&adbase->version)LWP_INTERNALSIGNAL(&adbase->version, 0);
382#endif
383 UBIK_VERSION_UNLOCK;
384 }
385 return 0;
386}
387
388/*!
389 * \brief initialize the local ubik_dbase
390 *
391 * We replay the logs and then read the resulting file to figure out what version we've really got.
392 */
393int
394urecovery_Initialize(struct ubik_dbase *adbase)
395{
396 afs_int32 code;
397
398 DBHOLD(adbase)do { ; if (!(&((adbase)->versionLock))->excl_locked
&& !(&((adbase)->versionLock))->readers_reading
) (&((adbase)->versionLock)) -> excl_locked = 2; else
Afs_Lock_Obtain(&((adbase)->versionLock), 2); ; } while
(0)
;
399 code = ReplayLog(adbase);
400 if (code)
401 goto done;
402 code = InitializeDB(adbase);
403done:
404 DBRELE(adbase)do { ; (&((adbase)->versionLock))->excl_locked &=
~2; if ((&((adbase)->versionLock))->wait_states) Afs_Lock_ReleaseR
(&((adbase)->versionLock)); ; } while (0)
;
405 return code;
406}
407
408/*!
409 * \brief Main interaction loop for the recovery manager
410 *
411 * The recovery light-weight process only runs when you're the
412 * synchronization site. It performs the following tasks, if and only
413 * if the prerequisite tasks have been performed successfully (it
414 * keeps track of which ones have been performed in its bit map,
415 * \p urecovery_state).
416 *
417 * First, it is responsible for probing that all servers are up. This
418 * is the only operation that must be performed even if this is not
419 * yet the sync site, since otherwise this site may not notice that
420 * enough other machines are running to even elect this guy to be the
421 * sync site.
422 *
423 * After that, the recovery process does nothing until the beacon and
424 * voting modules manage to get this site elected sync site.
425 *
426 * After becoming sync site, recovery first attempts to find the best
427 * database available in the network (it must do this in order to
428 * ensure finding the latest committed data). After finding the right
429 * database, it must fetch this dbase to the sync site.
430 *
431 * After fetching the dbase, it relabels it with a new version number,
432 * to ensure that everyone recognizes this dbase as the most recent
433 * dbase.
434 *
435 * One the dbase has been relabelled, this machine can start handling
436 * requests. However, the recovery module still has one more task:
437 * propagating the dbase out to everyone who is up in the network.
438 */
439void *
440urecovery_Interact(void *dummy)
441{
442 afs_int32 code, tcode;
443 struct ubik_server *bestServer = NULL((void *)0);
444 struct ubik_server *ts;
445 int dbok, doingRPC, now;
446 afs_int32 lastProbeTime;
447 /* if we're the sync site, the best db version we've found yet */
448 static struct ubik_version bestDBVersion;
449 struct ubik_version tversion;
450 struct timeval tv;
451 int length, tlen, offset, file, nbytes;
452 struct rx_call *rxcall;
453 char tbuffer[1024];
454 struct ubik_stat ubikstat;
455 struct in_addr inAddr;
456 char hoststr[16];
457 char pbuffer[1028];
458 int fd = -1;
459 afs_int32 pass;
460
461 afs_pthread_setname_self("recovery")(void)0;
462
463 /* otherwise, begin interaction */
464 urecovery_state = 0;
465 lastProbeTime = 0;
466 while (1) {
467 /* Run through this loop every 4 seconds */
468 tv.tv_sec = 4;
469 tv.tv_usec = 0;
470#ifdef AFS_PTHREAD_ENV
471 select(0, 0, 0, 0, &tv);
472#else
473 IOMGR_Select(0, 0, 0, 0, &tv);
474#endif
475
476 ubik_dprint("recovery running in state %x\n", urecovery_state);
477
478 /* Every 30 seconds, check all the down servers and mark them
479 * as up if they respond. When a server comes up or found to
480 * not be current, then re-find the the best database and
481 * propogate it.
482 */
483 if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
484
485 for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
486 UBIK_BEACON_LOCK;
487 if (!ts->up) {
488 UBIK_BEACON_UNLOCK;
489 doingRPC = 1;
490 code = DoProbe(ts);
491 if (code == 0) {
492 UBIK_BEACON_LOCK;
493 ts->up = 1;
494 UBIK_BEACON_UNLOCK;
495 DBHOLD(ubik_dbase)do { ; if (!(&((ubik_dbase)->versionLock))->excl_locked
&& !(&((ubik_dbase)->versionLock))->readers_reading
) (&((ubik_dbase)->versionLock)) -> excl_locked = 2
; else Afs_Lock_Obtain(&((ubik_dbase)->versionLock), 2
); ; } while (0)
;
496 urecovery_state &= ~UBIK_RECFOUNDDB2;
497 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
498 }
499 } else {
500 UBIK_BEACON_UNLOCK;
501 DBHOLD(ubik_dbase)do { ; if (!(&((ubik_dbase)->versionLock))->excl_locked
&& !(&((ubik_dbase)->versionLock))->readers_reading
) (&((ubik_dbase)->versionLock)) -> excl_locked = 2
; else Afs_Lock_Obtain(&((ubik_dbase)->versionLock), 2
); ; } while (0)
;
502 if (!ts->currentDB)
503 urecovery_state &= ~UBIK_RECFOUNDDB2;
504 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
505 }
506 }
507
508 if (doingRPC)
509 now = FT_ApproxTime();
510 lastProbeTime = now;
511 }
512
513 /* Mark whether we are the sync site */
514 DBHOLD(ubik_dbase)do { ; if (!(&((ubik_dbase)->versionLock))->excl_locked
&& !(&((ubik_dbase)->versionLock))->readers_reading
) (&((ubik_dbase)->versionLock)) -> excl_locked = 2
; else Afs_Lock_Obtain(&((ubik_dbase)->versionLock), 2
); ; } while (0)
;
515 if (!ubeacon_AmSyncSite()) {
516 urecovery_state &= ~UBIK_RECSYNCSITE1;
517 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
518 continue; /* nothing to do */
519 }
520 urecovery_state |= UBIK_RECSYNCSITE1;
521
522 /* If a server has just come up or if we have not found the
523 * most current database, then go find the most current db.
524 */
525 if (!(urecovery_state & UBIK_RECFOUNDDB2)) {
526 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
527 bestServer = (struct ubik_server *)0;
528 bestDBVersion.epoch = 0;
529 bestDBVersion.counter = 0;
530 for (ts = ubik_servers; ts; ts = ts->next) {
531 UBIK_BEACON_LOCK;
532 if (!ts->up) {
533 UBIK_BEACON_UNLOCK;
534 continue; /* don't bother with these guys */
535 }
536 UBIK_BEACON_UNLOCK;
537 if (ts->isClone)
538 continue;
539 UBIK_ADDR_LOCK;
540 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
541 UBIK_ADDR_UNLOCK;
542 if (code == 0) {
543 /* perhaps this is the best version */
544 if (vcmp(ts->version, bestDBVersion)((ts->version).epoch == (bestDBVersion).epoch? ((ts->version
).counter - (bestDBVersion).counter) : ((ts->version).epoch
- (bestDBVersion).epoch))
> 0) {
545 /* new best version */
546 bestDBVersion = ts->version;
547 bestServer = ts;
548 }
549 }
550 }
551 /* take into consideration our version. Remember if we,
552 * the sync site, have the best version. Also note that
553 * we may need to send the best version out.
554 */
555 DBHOLD(ubik_dbase)do { ; if (!(&((ubik_dbase)->versionLock))->excl_locked
&& !(&((ubik_dbase)->versionLock))->readers_reading
) (&((ubik_dbase)->versionLock)) -> excl_locked = 2
; else Afs_Lock_Obtain(&((ubik_dbase)->versionLock), 2
); ; } while (0)
;
556 if (vcmp(ubik_dbase->version, bestDBVersion)((ubik_dbase->version).epoch == (bestDBVersion).epoch? ((ubik_dbase
->version).counter - (bestDBVersion).counter) : ((ubik_dbase
->version).epoch - (bestDBVersion).epoch))
>= 0) {
557 bestDBVersion = ubik_dbase->version;
558 bestServer = (struct ubik_server *)0;
559 urecovery_state |= UBIK_RECHAVEDB4;
560 } else {
561 /* Clear the flag only when we know we have to retrieve
562 * the db. Because urecovery_AllBetter() looks at it.
563 */
564 urecovery_state &= ~UBIK_RECHAVEDB4;
565 }
566 urecovery_state |= UBIK_RECFOUNDDB2;
567 urecovery_state &= ~UBIK_RECSENTDB0x10;
568 }
569 if (!(urecovery_state & UBIK_RECFOUNDDB2)) {
570 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
571 continue; /* not ready */
572 }
573
574 /* If we, the sync site, do not have the best db version, then
575 * go and get it from the server that does.
576 */
577 if ((urecovery_state & UBIK_RECHAVEDB4) || !bestServer) {
578 urecovery_state |= UBIK_RECHAVEDB4;
579 } else {
580 /* we don't have the best version; we should fetch it. */
581 urecovery_AbortAll(ubik_dbase);
582
583 /* Rx code to do the Bulk fetch */
584 file = 0;
585 offset = 0;
586 UBIK_ADDR_LOCK;
587 rxcall = rx_NewCall(bestServer->disk_rxcid);
588
589 ubik_print("Ubik: Synchronize database with server %s\n",
590 afs_inet_ntoa_r(bestServer->addr[0], hoststr));
591 UBIK_ADDR_UNLOCK;
592
593 code = StartDISK_GetFile(rxcall, file);
594 if (code) {
595 ubik_dprint("StartDiskGetFile failed=%d\n", code);
596 goto FetchEndCall;
597 }
598 nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32))rx_ReadProc(rxcall, (char *)&length, sizeof(afs_int32));
599 length = ntohl(length)(__builtin_constant_p(length) ? ((((__uint32_t)(length)) >>
24) | ((((__uint32_t)(length)) & (0xff << 16)) >>
8) | ((((__uint32_t)(length)) & (0xff << 8)) <<
8) | (((__uint32_t)(length)) << 24)) : __bswap32_var(length
))
;
600 if (nbytes != sizeof(afs_int32)) {
601 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR1);
602 code = EIO5;
603 goto FetchEndCall;
604 }
605
606 /* give invalid label during file transit */
607 UBIK_VERSION_LOCK;
608 tversion.epoch = 0;
609 code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
610 UBIK_VERSION_UNLOCK;
611 if (code) {
612 ubik_dprint("setlabel io error=%d\n", code);
613 goto FetchEndCall;
614 }
615 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
616 ubik_dbase->pathName, (file<0)?"SYS":"",
617 (file<0)?-file:file);
618 fd = open(pbuffer, O_CREAT0x0200 | O_RDWR0x0002 | O_TRUNC0x0400, 0600);
619 if (fd < 0) {
620 code = errno(* __error());
621 goto FetchEndCall;
622 }
623 code = lseek(fd, HDRSIZE64, 0);
624 if (code != HDRSIZE64) {
625 close(fd);
626 goto FetchEndCall;
627 }
628
629 pass = 0;
630 while (length > 0) {
631 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
632#ifndef AFS_PTHREAD_ENV
633 if (pass % 4 == 0)
634 IOMGR_Poll();
635#endif
636 nbytes = rx_Read(rxcall, tbuffer, tlen)rx_ReadProc(rxcall, tbuffer, tlen);
637 if (nbytes != tlen) {
638 ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR1);
639 code = EIO5;
640 close(fd);
641 goto FetchEndCall;
642 }
643 nbytes = write(fd, tbuffer, tlen);
644 pass++;
645 if (nbytes != tlen) {
646 code = UIOERROR(5379L);
647 close(fd);
648 goto FetchEndCall;
649 }
650 offset += tlen;
651 length -= tlen;
652 }
653 code = close(fd);
654 if (code)
655 goto FetchEndCall;
656 code = EndDISK_GetFile(rxcall, &tversion);
657 FetchEndCall:
658 tcode = rx_EndCall(rxcall, code);
659 if (!code)
660 code = tcode;
661 if (!code) {
662 /* we got a new file, set up its header */
663 urecovery_state |= UBIK_RECHAVEDB4;
664 UBIK_VERSION_LOCK;
665 memcpy(&ubik_dbase->version, &tversion,
666 sizeof(struct ubik_version));
667 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
668 ubik_dbase->pathName, (file<0)?"SYS":"",
669 (file<0)?-file:file);
670#ifdef AFS_NT40_ENV
671 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
672 ubik_dbase->pathName, (file<0)?"SYS":"",
673 (file<0)?-file:file);
674 code = unlink(pbuffer);
675 if (!code)
676 code = rename(tbuffer, pbuffer);
677 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
678 ubik_dbase->pathName, (file<0)?"SYS":"",
679 (file<0)?-file:file);
680#endif
681 if (!code)
682 code = rename(pbuffer, tbuffer);
683 if (!code) {
684 (*ubik_dbase->open) (ubik_dbase, file);
685 /* after data is good, sync disk with correct label */
686 code =
687 (*ubik_dbase->setlabel) (ubik_dbase, 0,
688 &ubik_dbase->version);
689 }
690 UBIK_VERSION_UNLOCK;
691#ifdef AFS_NT40_ENV
692 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
693 ubik_dbase->pathName, (file<0)?"SYS":"",
694 (file<0)?-file:file);
695 unlink(pbuffer);
696#endif
697 }
698 if (code) {
699 unlink(pbuffer);
700 /*
701 * We will effectively invalidate the old data forever now.
702 * Unclear if we *should* but we do.
703 */
704 UBIK_VERSION_LOCK;
705 ubik_dbase->version.epoch = 0;
706 ubik_dbase->version.counter = 0;
707 UBIK_VERSION_UNLOCK;
708 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
709 code);
710 } else {
711 ubik_print("Ubik: Synchronize database completed\n");
712 urecovery_state |= UBIK_RECHAVEDB4;
713 }
714 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
715#ifdef AFS_PTHREAD_ENV
716 CV_BROADCAST(&ubik_dbase->version_cond);
717#else
718 LWP_NoYieldSignal(&ubik_dbase->version)LWP_INTERNALSIGNAL(&ubik_dbase->version, 0);
719#endif
720 }
721 if (!(urecovery_state & UBIK_RECHAVEDB4)) {
722 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
723 continue; /* not ready */
724 }
725
726 /* If the database was newly initialized, then when we establish quorum, write
727 * a new label. This allows urecovery_AllBetter() to allow access for reads.
728 * Setting it to 2 also allows another site to come along with a newer
729 * database and overwrite this one.
730 */
731 if (ubik_dbase->version.epoch == 1) {
732 urecovery_AbortAll(ubik_dbase);
733 UBIK_VERSION_LOCK;
734 version_globals.ubik_epochTime = 2;
735 ubik_dbase->version.epoch = version_globals.ubik_epochTime;
736 ubik_dbase->version.counter = 1;
737 code =
Value stored to 'code' is never read
738 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
739 UBIK_VERSION_UNLOCK;
740 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
741#ifdef AFS_PTHREAD_ENV
742 CV_BROADCAST(&ubik_dbase->version_cond);
743#else
744 LWP_NoYieldSignal(&ubik_dbase->version)LWP_INTERNALSIGNAL(&ubik_dbase->version, 0);
745#endif
746 }
747
748 /* Check the other sites and send the database to them if they
749 * do not have the current db.
750 */
751 if (!(urecovery_state & UBIK_RECSENTDB0x10)) {
752 /* now propagate out new version to everyone else */
753 dbok = 1; /* start off assuming they all worked */
754
755 /*
756 * Check if a write transaction is in progress. We can't send the
757 * db when a write is in progress here because the db would be
758 * obsolete as soon as it goes there. Also, ops after the begin
759 * trans would reach the recepient and wouldn't find a transaction
760 * pending there. Frankly, I don't think it's possible to get past
761 * the write-lock above if there is a write transaction in progress,
762 * but then, it won't hurt to check, will it?
763 */
764 if (ubik_dbase->flags & DBWRITING1) {
765 struct timeval tv;
766 int safety = 0;
767 long cur_usec = 50000;
768 while ((ubik_dbase->flags & DBWRITING1) && (safety < 500)) {
769 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
770 /* sleep for a little while */
771 tv.tv_sec = 0;
772 tv.tv_usec = cur_usec;
773#ifdef AFS_PTHREAD_ENV
774 select(0, 0, 0, 0, &tv);
775#else
776 IOMGR_Select(0, 0, 0, 0, &tv);
777#endif
778 cur_usec += 10000;
779 safety++;
780 DBHOLD(ubik_dbase)do { ; if (!(&((ubik_dbase)->versionLock))->excl_locked
&& !(&((ubik_dbase)->versionLock))->readers_reading
) (&((ubik_dbase)->versionLock)) -> excl_locked = 2
; else Afs_Lock_Obtain(&((ubik_dbase)->versionLock), 2
); ; } while (0)
;
781 }
782 }
783
784 for (ts = ubik_servers; ts; ts = ts->next) {
785 UBIK_ADDR_LOCK;
786 inAddr.s_addr = ts->addr[0];
787 UBIK_ADDR_UNLOCK;
788 UBIK_BEACON_LOCK;
789 if (!ts->up) {
790 UBIK_BEACON_UNLOCK;
791 ubik_dprint("recovery cannot send version to %s\n",
792 afs_inet_ntoa_r(inAddr.s_addr, hoststr));
793 dbok = 0;
794 continue;
795 }
796 UBIK_BEACON_UNLOCK;
797 ubik_dprint("recovery sending version to %s\n",
798 afs_inet_ntoa_r(inAddr.s_addr, hoststr));
799 if (vcmp(ts->version, ubik_dbase->version)((ts->version).epoch == (ubik_dbase->version).epoch? ((
ts->version).counter - (ubik_dbase->version).counter) :
((ts->version).epoch - (ubik_dbase->version).epoch))
!= 0) {
800 ubik_dprint("recovery stating local database\n");
801
802 /* Rx code to do the Bulk Store */
803 code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
804 if (!code) {
805 length = ubikstat.size;
806 file = offset = 0;
807 UBIK_ADDR_LOCK;
808 rxcall = rx_NewCall(ts->disk_rxcid);
809 UBIK_ADDR_UNLOCK;
810 code =
811 StartDISK_SendFile(rxcall, file, length,
812 &ubik_dbase->version);
813 if (code) {
814 ubik_dprint("StartDiskSendFile failed=%d\n",
815 code);
816 goto StoreEndCall;
817 }
818 while (length > 0) {
819 tlen =
820 (length >
821 sizeof(tbuffer) ? sizeof(tbuffer) : length);
822 nbytes =
823 (*ubik_dbase->read) (ubik_dbase, file,
824 tbuffer, offset, tlen);
825 if (nbytes != tlen) {
826 ubik_dprint("Local disk read error=%d\n",
827 code = UIOERROR(5379L));
828 goto StoreEndCall;
829 }
830 nbytes = rx_Write(rxcall, tbuffer, tlen)rx_WriteProc(rxcall, tbuffer, tlen);
831 if (nbytes != tlen) {
832 ubik_dprint("Rx-write bulk error=%d\n", code =
833 BULK_ERROR1);
834 goto StoreEndCall;
835 }
836 offset += tlen;
837 length -= tlen;
838 }
839 code = EndDISK_SendFile(rxcall);
840 StoreEndCall:
841 code = rx_EndCall(rxcall, code);
842 }
843 if (code == 0) {
844 /* we set a new file, process its header */
845 ts->version = ubik_dbase->version;
846 ts->currentDB = 1;
847 } else
848 dbok = 0;
849 } else {
850 /* mark file up to date */
851 ts->currentDB = 1;
852 }
853 }
854 if (dbok)
855 urecovery_state |= UBIK_RECSENTDB0x10;
856 }
857 DBRELE(ubik_dbase)do { ; (&((ubik_dbase)->versionLock))->excl_locked &=
~2; if ((&((ubik_dbase)->versionLock))->wait_states
) Afs_Lock_ReleaseR(&((ubik_dbase)->versionLock)); ; }
while (0)
;
858 }
859 return NULL((void *)0);
860}
861
862/*!
863 * \brief send a Probe to all the network address of this server
864 *
865 * \return 0 if success, else return 1
866 */
867int
868DoProbe(struct ubik_server *server)
869{
870 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR256];
871 struct rx_connection *connSuccess = 0;
872 int i, j, success_i = -1;
873 afs_uint32 addr;
874 char buffer[32];
875 char hoststr[16];
876
877 UBIK_ADDR_LOCK;
878 for (i = 0; (addr = server->addr[i]) && (i < UBIK_MAX_INTERFACE_ADDR256);
879 i++) {
880 conns[i] =
881 rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID51,
882 addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
883
884 /* user requirement to use only the primary interface */
885 if (ubikPrimaryAddrOnly) {
886 i = 1;
887 break;
888 }
889 }
890 UBIK_ADDR_UNLOCK;
891 osi_Assert(i)(void)((i) || (osi_AssertFailU("i", "recovery.c", 891), 0)); /* at least one interface address for this server */
892
893 multi_Rx(conns, i)do { struct multi_handle *multi_h; int multi_i; int multi_i0;
afs_int32 multi_error; struct rx_call *multi_call; multi_h =
multi_Init(conns, i); for (multi_i0 = multi_i = 0; ; multi_i
= multi_i0 )
{
894 multi_DISK_Probe()if (multi_h->nextReady == multi_h->firstNotReady &&
multi_i < multi_h->nConns) { multi_call = multi_h->
calls[multi_i]; if (multi_call) { StartDISK_Probe(multi_call)
; rx_FlushWrite(multi_call); } multi_i0++; continue; } if ((multi_i
= multi_Select(multi_h)) < 0) break; multi_call = multi_h
->calls[multi_i]; multi_error = rx_EndCall(multi_call, EndDISK_Probe
(multi_call)); multi_h->calls[multi_i] = (struct rx_call *
) 0
;
895 if (!multi_error) { /* first success */
896 success_i = multi_i;
897
898 multi_Abortbreak;
899 }
900 } multi_End_Ignoremulti_Finalize_Ignore(multi_h); } while (0);
901
902 if (success_i >= 0) {
903 UBIK_ADDR_LOCK;
904 addr = server->addr[success_i]; /* successful interface addr */
905
906 if (server->disk_rxcid) /* destroy existing conn */
907 rx_DestroyConnection(server->disk_rxcid);
908 if (server->vote_rxcid)
909 rx_DestroyConnection(server->vote_rxcid);
910
911 /* make new connections */
912 server->disk_rxcid = conns[success_i];
913 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
914 VOTE_SERVICE_ID50, addr_globals.ubikSecClass,
915 addr_globals.ubikSecIndex);
916
917 connSuccess = conns[success_i];
918 strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
919
920 ubik_print("ubik:server %s is back up: will be contacted through %s\n",
921 buffer, afs_inet_ntoa_r(addr, hoststr));
922 UBIK_ADDR_UNLOCK;
923 }
924
925 /* Destroy all connections except the one on which we succeeded */
926 for (j = 0; j < i; j++)
927 if (conns[j] != connSuccess)
928 rx_DestroyConnection(conns[j]);
929
930 if (!connSuccess)
931 ubik_dprint("ubik:server %s still down\n",
932 afs_inet_ntoa_r(server->addr[0], hoststr));
933
934 if (connSuccess)
935 return 0; /* success */
936 else
937 return 1; /* failure */
938}