sqlite3/ext/lsm1/lsm_log.c

1157 lines
38 KiB
C

/*
** 2011-08-13
**
** The author disclaims copyright to this source code. In place of
** a legal notice, here is a blessing:
**
** May you do good and not evil.
** May you find forgiveness for yourself and forgive others.
** May you share freely, never taking more than you give.
**
*************************************************************************
**
** This file contains the implementation of LSM database logging. Logging
** has one purpose in LSM - to make transactions durable.
**
** When data is written to an LSM database, it is initially stored in an
** in-memory tree structure. Since this structure is in volatile memory,
** if a power failure or application crash occurs it may be lost. To
** prevent loss of data in this case, each time a record is written to the
** in-memory tree an equivalent record is appended to the log on disk.
** If a power failure or application crash does occur, data can be recovered
** by reading the log.
**
** A log file consists of the following types of records representing data
** written into the database:
**
** LOG_WRITE: A key-value pair written to the database.
** LOG_DELETE: A delete key issued to the database.
** LOG_COMMIT: A transaction commit.
**
** And the following types of records for ancillary purposes..
**
** LOG_EOF: A record indicating the end of a log file.
** LOG_PAD1: A single byte padding record.
** LOG_PAD2: An N byte padding record (N>1).
** LOG_JUMP: A pointer to another offset within the log file.
**
** Each transaction written to the log contains one or more LOG_WRITE and/or
** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
** contains an 8-byte checksum based on all previous data written to the
** log file.
**
** LOG CHECKSUMS & RECOVERY
**
** Checksums are found in two types of log records: LOG_COMMIT and
** LOG_CKSUM records. In order to recover content from a log, a client
** reads each record from the start of the log, calculating a checksum as
** it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the
** recovery process verifies that the checksum stored in the log
** matches the calculated checksum. If it does not, the recovery process
** can stop reading the log.
**
** If a recovery process reads records (other than COMMIT or CKSUM)
** consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
** the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
** not, the recovery process also stops reading the log.
**
** To recover the log file, it must be read twice. The first time to
** determine the location of the last valid commit record. And the second
** time to load data into the in-memory tree.
**
** Todo: Surely there is a better way...
**
** LOG WRAPPING
**
** If the log file were never deleted or wrapped, it would be possible to
** read it from start to end each time is required recovery (i.e each time
** the number of database clients changes from 0 to 1). Effectively reading
** the entire history of the database each time. This would quickly become
** inefficient. Additionally, since the log file would grow without bound,
** it wastes storage space.
**
** Instead, part of each checkpoint written into the database file contains
** a log offset (and other information required to read the log starting at
** at this offset) at which to begin recovery. Offset $O.
**
** Once a checkpoint has been written and synced into the database file, it
** is guaranteed that no recovery process will need to read any data before
** offset $O of the log file. It is therefore safe to begin overwriting
** any data that occurs before offset $O.
**
** This implementation separates the log into three regions mapped into
** the log file - regions 0, 1 and 2. During recovery, regions are read
** in ascending order (i.e. 0, then 1, then 2). Each region is zero or
** more bytes in size.
**
** |---1---|..|--0--|.|--2--|....
**
** New records are always appended to the end of region 2.
**
** Initially (when it is empty), all three regions are zero bytes in size.
** Each of them are located at the beginning of the file. As records are
** added to the log, region 2 grows, so that the log consists of a zero
** byte region 1, followed by a zero byte region 0, followed by an N byte
** region 2. After one or more checkpoints have been written to disk,
** the start point of region 2 is moved to $O. For example:
**
** A) ||.........|--2--|....
**
** (both regions 0 and 1 are 0 bytes in size at offset 0).
**
** Eventually, the log wraps around to write new records into the start.
** At this point, region 2 is renamed to region 0. Region 0 is renamed
** to region 2. After appending a few records to the new region 2, the
** log file looks like this:
**
** B) ||--2--|...|--0--|....
**
** (region 1 is still 0 bytes in size, located at offset 0).
**
** Any checkpoints made at this point may reduce the size of region 0.
** However, if they do not, and region 2 expands so that it is about to
** overwrite the start of region 0, then region 2 is renamed to region 1,
** and a new region 2 created at the end of the file following the existing
** region 0.
**
** C) |---1---|..|--0--|.|-2-|
**
** In this state records are appended to region 2 until checkpoints have
** contracted regions 0 AND 1 UNTil they are both zero bytes in size. They
** are then shifted to the start of the log file, leaving the system in
** the equivalent of state A above.
**
** Alternatively, state B may transition directly to state A if the size
** of region 0 is reduced to zero bytes before region 2 threatens to
** encroach upon it.
**
** LOG_PAD1 & LOG_PAD2 RECORDS
**
** PAD1 and PAD2 records may appear in a log file at any point. They allow
** a process writing the log file align the beginning of transactions with
** the beginning of disk sectors, which increases robustness.
**
** RECORD FORMATS:
**
** LOG_EOF: * A single 0x00 byte.
**
** LOG_PAD1: * A single 0x01 byte.
**
** LOG_PAD2: * A single 0x02 byte, followed by
** * The number of unused bytes (N) as a varint,
** * An N byte block of unused space.
**
** LOG_COMMIT: * A single 0x03 byte.
** * An 8-byte checksum.
**
** LOG_JUMP: * A single 0x04 byte.
** * Absolute file offset to jump to, encoded as a varint.
**
** LOG_WRITE: * A single 0x06 or 0x07 byte,
** * The number of bytes in the key, encoded as a varint,
** * The number of bytes in the value, encoded as a varint,
** * If the first byte was 0x07, an 8 byte checksum.
** * The key data,
** * The value data.
**
** LOG_DELETE: * A single 0x08 or 0x09 byte,
** * The number of bytes in the key, encoded as a varint,
** * If the first byte was 0x09, an 8 byte checksum.
** * The key data.
**
** Varints are as described in lsm_varint.c (SQLite 4 format).
**
** CHECKSUMS:
**
** The checksum is calculated using two 32-bit unsigned integers, s0 and
** s1. The initial value for both is 42. It is updated each time a record
** is written into the log file by treating the encoded (binary) record as
** an array of 32-bit little-endian integers. Then, if x[] is the integer
** array, updating the checksum accumulators as follows:
**
** for i from 0 to n-1 step 2:
** s0 += x[i] + s1;
** s1 += x[i+1] + s0;
** endfor
**
** If the record is not an even multiple of 8-bytes in size it is padded
** with zeroes to make it so before the checksum is updated.
**
** The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
** up to the start of the 8-byte checksum itself, including the COMMIT,
** WRITE or DELETE fields that appear before the checksum in the record.
**
** VARINT FORMAT
**
** See lsm_varint.c.
*/
#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
/* Log record types */
#define LSM_LOG_EOF 0x00
#define LSM_LOG_PAD1 0x01
#define LSM_LOG_PAD2 0x02
#define LSM_LOG_COMMIT 0x03
#define LSM_LOG_JUMP 0x04
#define LSM_LOG_WRITE 0x06
#define LSM_LOG_WRITE_CKSUM 0x07
#define LSM_LOG_DELETE 0x08
#define LSM_LOG_DELETE_CKSUM 0x09
#define LSM_LOG_DRANGE 0x0A
#define LSM_LOG_DRANGE_CKSUM 0x0B
/* Require a checksum every 32KB. */
#define LSM_CKSUM_MAXDATA (32*1024)
/* Do not wrap a log file smaller than this in bytes. */
#define LSM_MIN_LOGWRAP (128*1024)
/*
** szSector:
** Commit records must be aligned to end on szSector boundaries. If
** the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
** if the safety-mode is set to FULL, it is the size of the file-system
** sectors as reported by lsmFsSectorSize().
*/
struct LogWriter {
u32 cksum0; /* Checksum 0 at offset iOff */
u32 cksum1; /* Checksum 1 at offset iOff */
int iCksumBuf; /* Bytes of buf that have been checksummed */
i64 iOff; /* Offset at start of buffer buf */
int szSector; /* Sector size for this transaction */
LogRegion jump; /* Avoid writing to this region */
i64 iRegion1End; /* End of first region written by trans */
i64 iRegion2Start; /* Start of second regions written by trans */
LsmString buf; /* Buffer containing data not yet written */
};
/*
** Return the result of interpreting the first 4 bytes in buffer aIn as
** a 32-bit unsigned little-endian integer.
*/
static u32 getU32le(u8 *aIn){
return ((u32)aIn[3] << 24)
+ ((u32)aIn[2] << 16)
+ ((u32)aIn[1] << 8)
+ ((u32)aIn[0]);
}
/*
** This function is the same as logCksum(), except that pointer "a" need
** not be aligned to an 8-byte boundary or padded with zero bytes. This
** version is slower, but sometimes more convenient to use.
*/
static void logCksumUnaligned(
char *z, /* Input buffer */
int n, /* Size of input buffer in bytes */
u32 *pCksum0, /* IN/OUT: Checksum value 1 */
u32 *pCksum1 /* IN/OUT: Checksum value 2 */
){
u8 *a = (u8 *)z;
u32 cksum0 = *pCksum0;
u32 cksum1 = *pCksum1;
int nIn = (n/8) * 8;
int i;
assert( n>0 );
for(i=0; i<nIn; i+=8){
cksum0 += getU32le(&a[i]) + cksum1;
cksum1 += getU32le(&a[i+4]) + cksum0;
}
if( nIn!=n ){
u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
assert( (n-nIn)<8 && n>nIn );
memcpy(aBuf, &a[nIn], n-nIn);
cksum0 += getU32le(aBuf) + cksum1;
cksum1 += getU32le(&aBuf[4]) + cksum0;
}
*pCksum0 = cksum0;
*pCksum1 = cksum1;
}
/*
** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the
** write buffer (pLog->buf) are included in the checksum.
*/
static void logUpdateCksum(LogWriter *pLog, int nBuf){
assert( (pLog->iCksumBuf % 8)==0 );
assert( pLog->iCksumBuf<=nBuf );
assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
if( nBuf>pLog->iCksumBuf ){
logCksumUnaligned(
&pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf,
&pLog->cksum0, &pLog->cksum1
);
}
pLog->iCksumBuf = nBuf;
}
static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
return (iOff / pLog->szSector) * pLog->szSector;
}
static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
}
/*
** If possible, reclaim log file space. Log file space is reclaimed after
** a snapshot that points to the same data in the database file is synced
** into the db header.
*/
static int logReclaimSpace(lsm_db *pDb){
int rc;
int iMeta;
int bRotrans; /* True if there exists some ro-trans */
/* Test if there exists some other connection with a read-only transaction
** open. If there does, then log file space may not be reclaimed. */
rc = lsmDetectRoTrans(pDb, &bRotrans);
if( rc!=LSM_OK || bRotrans ) return rc;
iMeta = (int)pDb->pShmhdr->iMetaPage;
if( iMeta==1 || iMeta==2 ){
DbLog *pLog = &pDb->treehdr.log;
i64 iSyncedId;
/* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
** that in theory, the value read is untrustworthy (due to a race
** condition - see comments above lsmFsReadSyncedId()). So it is only
** ever used to conclude that no log space can be reclaimed. If it seems
** to indicate that it may be possible to reclaim log space, a
** second call to lsmCheckpointSynced() (which does return trustworthy
** values) is made below to confirm. */
rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);
if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
i64 iSnapshotId = 0;
i64 iOff = 0;
rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
int iRegion;
for(iRegion=0; iRegion<3; iRegion++){
LogRegion *p = &pLog->aRegion[iRegion];
if( iOff>=p->iStart && iOff<=p->iEnd ) break;
p->iStart = 0;
p->iEnd = 0;
}
assert( iRegion<3 );
pLog->aRegion[iRegion].iStart = iOff;
pLog->iSnapshotId = iSnapshotId;
}
}
}
return rc;
}
/*
** This function is called when a write-transaction is first opened. It
** is assumed that the caller is holding the client-mutex when it is
** called.
**
** Before returning, this function allocates the LogWriter object that
** will be used to write to the log file during the write transaction.
** LSM_OK is returned if no error occurs, otherwise an LSM error code.
*/
int lsmLogBegin(lsm_db *pDb){
int rc = LSM_OK;
LogWriter *pNew;
LogRegion *aReg;
if( pDb->bUseLog==0 ) return LSM_OK;
/* If the log file has not yet been opened, open it now. Also allocate
** the LogWriter structure, if it has not already been allocated. */
rc = lsmFsOpenLog(pDb, 0);
if( pDb->pLogWriter==0 ){
pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
if( pNew ){
lsmStringInit(&pNew->buf, pDb->pEnv);
rc = lsmStringExtend(&pNew->buf, 2);
}
pDb->pLogWriter = pNew;
}else{
pNew = pDb->pLogWriter;
assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
pNew->buf.n = 0;
}
if( rc==LSM_OK ){
/* The following call detects whether or not a new snapshot has been
** synced into the database file. If so, it updates the contents of
** the pDb->treehdr.log structure to reclaim any space in the log
** file that is no longer required.
**
** TODO: Calling this every transaction is overkill. And since the
** call has to read and checksum a snapshot from the database file,
** it is expensive. It would be better to figure out a way so that
** this is only called occasionally - say for every 32KB written to
** the log file.
*/
rc = logReclaimSpace(pDb);
}
if( rc!=LSM_OK ){
lsmLogClose(pDb);
return rc;
}
/* Set the effective sector-size for this transaction. Sectors are assumed
** to be one byte in size if the safety-mode is OFF or NORMAL, or as
** reported by lsmFsSectorSize if it is FULL. */
if( pDb->eSafety==LSM_SAFETY_FULL ){
pNew->szSector = lsmFsSectorSize(pDb->pFS);
assert( pNew->szSector>0 );
}else{
pNew->szSector = 1;
}
/* There are now three scenarios:
**
** 1) Regions 0 and 1 are both zero bytes in size and region 2 begins
** at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
** around to the start and write data into the start of the log file.
**
** 2) Region 1 is zero bytes in size and region 2 occurs earlier in the
** file than region 0. In this case, append data to region 2, but
** remember to jump over region 1 if required.
**
** 3) Region 2 is the last in the file. Append to it.
*/
aReg = &pDb->treehdr.log.aRegion[0];
assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );
pNew->cksum0 = pDb->treehdr.log.cksum0;
pNew->cksum1 = pDb->treehdr.log.cksum1;
if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
/* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP
** into the log file in this case. Pad it out to 8 bytes using a PAD2
** record so that the checksums can be updated immediately. */
u8 aJump[] = {
LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00
};
lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
logUpdateCksum(pNew, pNew->buf.n);
rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
pNew->iCksumBuf = pNew->buf.n = 0;
aReg[2].iEnd += 8;
pNew->jump = aReg[0] = aReg[2];
aReg[2].iStart = aReg[2].iEnd = 0;
}else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
/* Case 2. */
pNew->iOff = aReg[2].iEnd;
pNew->jump = aReg[0];
}else{
/* Case 3. */
assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
pNew->iOff = aReg[2].iEnd;
}
if( pNew->jump.iStart ){
i64 iRound;
assert( pNew->jump.iStart>pNew->iOff );
iRound = firstByteOnSector(pNew, pNew->jump.iStart);
if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
}
assert( pDb->pLogWriter==pNew );
return rc;
}
/*
** This function is called when a write-transaction is being closed.
** Parameter bCommit is true if the transaction is being committed,
** or false otherwise. The caller must hold the client-mutex to call
** this function.
**
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
DbLog *pLog;
LogWriter *p;
p = pDb->pLogWriter;
if( p==0 ) return;
pLog = &pDb->treehdr.log;
if( bCommit ){
pLog->aRegion[2].iEnd = p->iOff;
pLog->cksum0 = p->cksum0;
pLog->cksum1 = p->cksum1;
if( p->iRegion1End ){
/* This happens when the transaction had to jump over some other
** part of the log. */
assert( pLog->aRegion[1].iEnd==0 );
assert( pLog->aRegion[2].iStart<p->iRegion1End );
pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
pLog->aRegion[1].iEnd = p->iRegion1End;
pLog->aRegion[2].iStart = p->iRegion2Start;
}
}
}
static int jumpIfRequired(
lsm_db *pDb,
LogWriter *pLog,
int nReq,
int *pbJump
){
/* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
** jump region before writing the LSM_LOG_WRITE or DELETE record. This
** is necessary if there is insufficient room between the current offset
** and the jump region to fit the new WRITE/DELETE record and the largest
** possible JUMP record with up to 7 bytes of padding (a total of 17
** bytes). */
if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
&& (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17)))
){
int rc; /* Return code */
i64 iJump; /* Offset to jump to */
u8 aJump[10]; /* Encoded jump record */
int nJump; /* Valid bytes in aJump[] */
int nPad; /* Bytes of padding required */
/* Serialize the JUMP record */
iJump = pLog->jump.iEnd+1;
aJump[0] = LSM_LOG_JUMP;
nJump = 1 + lsmVarintPut64(&aJump[1], iJump);
/* Adding padding to the contents of the buffer so that it will be a
** multiple of 8 bytes in size after the JUMP record is appended. This
** is not strictly required, it just makes the keeping the running
** checksum up to date in this file a little simpler. */
nPad = (pLog->buf.n + nJump) % 8;
if( nPad ){
u8 aPad[7] = {0,0,0,0,0,0,0};
nPad = 8-nPad;
if( nPad==1 ){
aPad[0] = LSM_LOG_PAD1;
}else{
aPad[0] = LSM_LOG_PAD2;
aPad[1] = (u8)(nPad-2);
}
rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
if( rc!=LSM_OK ) return rc;
}
/* Append the JUMP record to the buffer. Then flush the buffer to disk
** and update the checksums. The next write to the log file (assuming
** there is no transaction rollback) will be to offset iJump (just past
** the jump region). */
rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
if( rc!=LSM_OK ) return rc;
assert( (pLog->buf.n % 8)==0 );
rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
if( rc!=LSM_OK ) return rc;
logUpdateCksum(pLog, pLog->buf.n);
pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
pLog->iRegion2Start = iJump;
pLog->iOff = iJump;
pLog->iCksumBuf = pLog->buf.n = 0;
if( pbJump ) *pbJump = 1;
}
return LSM_OK;
}
static int logCksumAndFlush(lsm_db *pDb){
int rc; /* Return code */
LogWriter *pLog = pDb->pLogWriter;
/* Calculate the checksum value. Append it to the buffer. */
logUpdateCksum(pLog, pLog->buf.n);
lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
pLog->buf.n += 4;
lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
pLog->buf.n += 4;
/* Write the contents of the buffer to disk. */
rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
pLog->iOff += pLog->buf.n;
pLog->iCksumBuf = pLog->buf.n = 0;
return rc;
}
/*
** Write the contents of the log-buffer to disk. Then write either a CKSUM
** or COMMIT record, depending on the value of parameter eType.
*/
static int logFlush(lsm_db *pDb, int eType){
int rc;
int nReq;
LogWriter *pLog = pDb->pLogWriter;
assert( eType==LSM_LOG_COMMIT );
assert( pLog );
/* Commit record is always 9 bytes in size. */
nReq = 9;
if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
rc = jumpIfRequired(pDb, pLog, nReq, 0);
/* If this is a COMMIT, add padding to the log so that the COMMIT record
** is aligned against the end of a disk sector. In other words, add padding
** so that the first byte following the COMMIT record lies on a different
** sector. */
if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
int nPad; /* Bytes of padding to add */
/* Determine the value of nPad. */
nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
if( nPad ) nPad = pLog->szSector - nPad;
rc = lsmStringExtend(&pLog->buf, nPad);
if( rc!=LSM_OK ) return rc;
while( nPad ){
if( nPad==1 ){
pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
nPad = 0;
}else{
int n = LSM_MIN(200, nPad-2);
pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
pLog->buf.z[pLog->buf.n++] = (char)n;
nPad -= 2;
memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
pLog->buf.n += n;
nPad -= n;
}
}
}
/* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
** record. Then add the first byte of it. */
rc = lsmStringExtend(&pLog->buf, 9);
if( rc!=LSM_OK ) return rc;
pLog->buf.z[pLog->buf.n++] = (char)eType;
memset(&pLog->buf.z[pLog->buf.n], 0, 8);
rc = logCksumAndFlush(pDb);
/* If this is a commit and synchronous=full, sync the log to disk. */
if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
rc = lsmFsSyncLog(pDb->pFS);
}
return rc;
}
/*
** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0)
** record to the database log.
*/
int lsmLogWrite(
lsm_db *pDb, /* Database handle */
int eType,
void *pKey, int nKey, /* Database key to write to log */
void *pVal, int nVal /* Database value (or nVal<0) to write */
){
int rc = LSM_OK;
LogWriter *pLog; /* Log object to write to */
int nReq; /* Bytes of space required in log */
int bCksum = 0; /* True to embed a checksum in this record */
assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
assert( LSM_LOG_WRITE==LSM_WRITE );
assert( LSM_LOG_DELETE==LSM_DELETE );
assert( LSM_LOG_DRANGE==LSM_DRANGE );
assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
if( pDb->bUseLog==0 ) return LSM_OK;
pLog = pDb->pLogWriter;
/* Determine how many bytes of space are required, assuming that a checksum
** will be embedded in this record (even though it may not be). */
nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
/* Jump over the jump region if required. Set bCksum to true to tell the
** code below to include a checksum in the record if either (a) writing
** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
** have been written to the log since the last checksum, or (b) the jump
** is taken. */
rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;
if( rc==LSM_OK ){
rc = lsmStringExtend(&pLog->buf, nReq);
}
if( rc==LSM_OK ){
u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
/* Write the record header - the type byte followed by either 1 (for
** DELETE) or 2 (for WRITE) varints. */
assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
*(a++) = (u8)eType | (u8)bCksum;
a += lsmVarintPut32(a, nKey);
if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
if( bCksum ){
pLog->buf.n = (a - (u8 *)pLog->buf.z);
rc = logCksumAndFlush(pDb);
a = (u8 *)&pLog->buf.z[pLog->buf.n];
}
memcpy(a, pKey, nKey);
a += nKey;
if( eType!=LSM_LOG_DELETE ){
memcpy(a, pVal, nVal);
a += nVal;
}
pLog->buf.n = a - (u8 *)pLog->buf.z;
assert( pLog->buf.n<=pLog->buf.nAlloc );
}
return rc;
}
/*
** Append an LSM_LOG_COMMIT record to the database log.
*/
int lsmLogCommit(lsm_db *pDb){
if( pDb->bUseLog==0 ) return LSM_OK;
return logFlush(pDb, LSM_LOG_COMMIT);
}
/*
** Store the current offset and other checksum related information in the
** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
** the LogWriter object to the current log file offset. This is used when
** rolling back savepoint transactions.
*/
void lsmLogTell(
lsm_db *pDb, /* Database handle */
LogMark *pMark /* Populate this object with current offset */
){
LogWriter *pLog;
int nCksum;
if( pDb->bUseLog==0 ) return;
pLog = pDb->pLogWriter;
nCksum = pLog->buf.n & 0xFFFFFFF8;
logUpdateCksum(pLog, nCksum);
assert( pLog->iCksumBuf==nCksum );
pMark->nBuf = pLog->buf.n - nCksum;
memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);
pMark->iOff = pLog->iOff + pLog->buf.n;
pMark->cksum0 = pLog->cksum0;
pMark->cksum1 = pLog->cksum1;
}
/*
** Seek (rewind) back to the log file offset stored by an ealier call to
** lsmLogTell() in *pMark.
*/
void lsmLogSeek(
lsm_db *pDb, /* Database handle */
LogMark *pMark /* Object containing log offset to seek to */
){
LogWriter *pLog;
if( pDb->bUseLog==0 ) return;
pLog = pDb->pLogWriter;
assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
}else{
pLog->buf.n = pMark->nBuf;
memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
pLog->iCksumBuf = 0;
pLog->iOff = pMark->iOff - pMark->nBuf;
}
pLog->cksum0 = pMark->cksum0;
pLog->cksum1 = pMark->cksum1;
if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
}
/*
** This function does the work for an lsm_info(LOG_STRUCTURE) request.
*/
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
int rc = LSM_OK;
char *zVal = 0;
/* If there is no read or write transaction open, read the latest
** tree-header from shared-memory to report on. If necessary, update
** it based on the contents of the database header.
**
** No locks are taken here - these are passive read operations only.
*/
if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
rc = lsmTreeLoadHeader(pDb, 0);
if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
}
if( rc==LSM_OK ){
DbLog *pLog = &pDb->treehdr.log;
zVal = lsmMallocPrintf(pDb->pEnv,
"%d %d %d %d %d %d",
(int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
(int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
(int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
);
if( !zVal ) rc = LSM_NOMEM_BKPT;
}
*pzVal = zVal;
return rc;
}
/*************************************************************************
** Begin code for log recovery.
*/
typedef struct LogReader LogReader;
struct LogReader {
FileSystem *pFS; /* File system to read from */
i64 iOff; /* File offset at end of buf content */
int iBuf; /* Current read offset in buf */
LsmString buf; /* Buffer containing file content */
int iCksumBuf; /* Offset in buf corresponding to cksum[01] */
u32 cksum0; /* Checksum 0 at offset iCksumBuf */
u32 cksum1; /* Checksum 1 at offset iCksumBuf */
};
static void logReaderBlob(
LogReader *p, /* Log reader object */
LsmString *pBuf, /* Dynamic storage, if required */
int nBlob, /* Number of bytes to read */
u8 **ppBlob, /* OUT: Pointer to blob read */
int *pRc /* IN/OUT: Error code */
){
static const int LOG_READ_SIZE = 512;
int rc = *pRc; /* Return code */
int nReq = nBlob; /* Bytes required */
while( rc==LSM_OK && nReq>0 ){
int nAvail; /* Bytes of data available in p->buf */
if( p->buf.n==p->iBuf ){
int nCksum; /* Total bytes requiring checksum */
int nCarry = 0; /* Total bytes requiring checksum */
nCksum = p->iBuf - p->iCksumBuf;
if( nCksum>0 ){
nCarry = nCksum % 8;
nCksum = ((nCksum / 8) * 8);
if( nCksum>0 ){
logCksumUnaligned(
&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
);
}
}
if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
p->buf.n = nCarry;
p->iBuf = nCarry;
rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
if( rc!=LSM_OK ) break;
p->iCksumBuf = 0;
p->iOff += LOG_READ_SIZE;
}
nAvail = p->buf.n - p->iBuf;
if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
*ppBlob = (u8 *)&p->buf.z[p->iBuf];
p->iBuf += nBlob;
nReq = 0;
}else{
int nCopy = LSM_MIN(nAvail, nReq);
if( nBlob==nReq ){
pBuf->n = 0;
}
rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
nReq -= nCopy;
p->iBuf += nCopy;
if( nReq==0 && ppBlob ){
*ppBlob = (u8*)pBuf->z;
}
}
}
*pRc = rc;
}
static void logReaderVarint(
LogReader *p,
LsmString *pBuf,
int *piVal, /* OUT: Value read from log */
int *pRc /* IN/OUT: Error code */
){
if( *pRc==LSM_OK ){
u8 *aVarint;
if( p->buf.n==p->iBuf ){
logReaderBlob(p, 0, 10, &aVarint, pRc);
if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
}else{
logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
}
}
}
static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
u8 *pPtr = 0;
logReaderBlob(p, 0, 1, &pPtr, pRc);
if( pPtr ) *pByte = *pPtr;
}
static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
if( *pRc==LSM_OK ){
u8 *pPtr = 0;
u32 cksum0, cksum1;
int nCksum = p->iBuf - p->iCksumBuf;
/* Update in-memory (expected) checksums */
assert( nCksum>=0 );
logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
p->iCksumBuf = p->iBuf + 8;
logReaderBlob(p, pBuf, 8, &pPtr, pRc);
assert( pPtr || *pRc );
/* Read the checksums from the log file. Set *pbEof if they do not match. */
if( pPtr ){
cksum0 = lsmGetU32(pPtr);
cksum1 = lsmGetU32(&pPtr[4]);
*pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
p->iCksumBuf = p->iBuf;
}
}
}
static void logReaderInit(
lsm_db *pDb, /* Database handle */
DbLog *pLog, /* Log object associated with pDb */
int bInitBuf, /* True if p->buf is uninitialized */
LogReader *p /* Initialize this LogReader object */
){
p->pFS = pDb->pFS;
p->iOff = pLog->aRegion[2].iStart;
p->cksum0 = pLog->cksum0;
p->cksum1 = pLog->cksum1;
if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
p->buf.n = 0;
p->iCksumBuf = 0;
p->iBuf = 0;
}
/*
** This function is called after reading the header of a LOG_DELETE or
** LOG_WRITE record. Parameter nByte is the total size of the key and
** value that follow the header just read. Return true if the size and
** position of the record indicate that it should contain a checksum.
*/
static int logRequireCksum(LogReader *p, int nByte){
return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
}
/*
** Recover the contents of the log file.
*/
int lsmLogRecover(lsm_db *pDb){
LsmString buf1; /* Key buffer */
LsmString buf2; /* Value buffer */
LogReader reader; /* Log reader object */
int rc = LSM_OK; /* Return code */
int nCommit = 0; /* Number of transactions to recover */
int iPass;
int nJump = 0; /* Number of LSM_LOG_JUMP records in pass 0 */
DbLog *pLog;
int bOpen;
rc = lsmFsOpenLog(pDb, &bOpen);
if( rc!=LSM_OK ) return rc;
rc = lsmTreeInit(pDb);
if( rc!=LSM_OK ) return rc;
pLog = &pDb->treehdr.log;
lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);
logReaderInit(pDb, pLog, 1, &reader);
lsmStringInit(&buf1, pDb->pEnv);
lsmStringInit(&buf2, pDb->pEnv);
/* The outer for() loop runs at most twice. The first iteration is to
** count the number of committed transactions in the log. The second
** iterates through those transactions and updates the in-memory tree
** structure with their contents. */
if( bOpen ){
for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
int bEof = 0;
while( rc==LSM_OK && !bEof ){
u8 eType = 0;
logReaderByte(&reader, &eType, &rc);
switch( eType ){
case LSM_LOG_PAD1:
break;
case LSM_LOG_PAD2: {
int nPad;
logReaderVarint(&reader, &buf1, &nPad, &rc);
logReaderBlob(&reader, &buf1, nPad, 0, &rc);
break;
}
case LSM_LOG_DRANGE:
case LSM_LOG_DRANGE_CKSUM:
case LSM_LOG_WRITE:
case LSM_LOG_WRITE_CKSUM: {
int nKey;
int nVal;
u8 *aVal;
logReaderVarint(&reader, &buf1, &nKey, &rc);
logReaderVarint(&reader, &buf2, &nVal, &rc);
if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
logReaderCksum(&reader, &buf1, &bEof, &rc);
}else{
bEof = logRequireCksum(&reader, nKey+nVal);
}
if( bEof ) break;
logReaderBlob(&reader, &buf1, nKey, 0, &rc);
logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
if( iPass==1 && rc==LSM_OK ){
if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}else{
rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}
}
break;
}
case LSM_LOG_DELETE:
case LSM_LOG_DELETE_CKSUM: {
int nKey; u8 *aKey;
logReaderVarint(&reader, &buf1, &nKey, &rc);
if( eType==LSM_LOG_DELETE_CKSUM ){
logReaderCksum(&reader, &buf1, &bEof, &rc);
}else{
bEof = logRequireCksum(&reader, nKey);
}
if( bEof ) break;
logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
if( iPass==1 && rc==LSM_OK ){
rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
}
break;
}
case LSM_LOG_COMMIT:
logReaderCksum(&reader, &buf1, &bEof, &rc);
if( bEof==0 ){
nCommit++;
assert( nCommit>0 || iPass==1 );
if( nCommit==0 ) bEof = 1;
}
break;
case LSM_LOG_JUMP: {
int iOff = 0;
logReaderVarint(&reader, &buf1, &iOff, &rc);
if( rc==LSM_OK ){
if( iPass==1 ){
if( pLog->aRegion[2].iStart==0 ){
assert( pLog->aRegion[1].iStart==0 );
pLog->aRegion[1].iEnd = reader.iOff;
}else{
assert( pLog->aRegion[0].iStart==0 );
pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
}
pLog->aRegion[2].iStart = iOff;
}else{
if( (nJump++)==2 ){
bEof = 1;
}
}
reader.iOff = iOff;
reader.buf.n = reader.iBuf;
}
break;
}
default:
/* Including LSM_LOG_EOF */
bEof = 1;
break;
}
}
if( rc==LSM_OK && iPass==0 ){
if( nCommit==0 ){
if( pLog->aRegion[2].iStart==0 ){
iPass = 1;
}else{
pLog->aRegion[2].iStart = 0;
iPass = -1;
lsmCheckpointZeroLogoffset(pDb);
}
}
logReaderInit(pDb, pLog, 0, &reader);
nCommit = nCommit * -1;
}
}
}
/* Initialize DbLog object */
if( rc==LSM_OK ){
pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
pLog->cksum0 = reader.cksum0;
pLog->cksum1 = reader.cksum1;
}
if( rc==LSM_OK ){
rc = lsmFinishRecovery(pDb);
}else{
lsmFinishRecovery(pDb);
}
if( pDb->bRoTrans ){
lsmFsCloseLog(pDb);
}
lsmStringClear(&buf1);
lsmStringClear(&buf2);
lsmStringClear(&reader.buf);
return rc;
}
void lsmLogClose(lsm_db *db){
if( db->pLogWriter ){
lsmFree(db->pEnv, db->pLogWriter->buf.z);
lsmFree(db->pEnv, db->pLogWriter);
db->pLogWriter = 0;
}
}