Skip to content

Commit

Permalink
Refactor xWrite to WAL
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Jan 25, 2024
1 parent e58036b commit 20c03cf
Showing 1 changed file with 151 additions and 91 deletions.
242 changes: 151 additions & 91 deletions src/vfs2.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,22 @@ struct vfs2_file
char *
wal_prev_fixed_name; /* e.g. /path/to/my.db-xwal2 */

/* All pending_txn fields pertain to a transaction that
* has at least one frame in the WAL and is the last
* transaction represented in the WAL. Writing a frame
* either updates the pending transaction or starts a
* new transaction. A frame starts a new transaction if
* it is written at the end of the WAL and the
* physically preceding frame has a nonzero commit
* marker. */
uint32_t pending_txn_start; /* in frames, zero-based */
uint32_t pending_txn_len;
dqlite_vfs_frame *pending_txn_frames;
dqlite_vfs_frame
*pending_txn_frames; /* for vfs2_poll */
uint32_t
pending_txn_last_frame_commit; /* commit marker for
the physical last
frame */
} wal;
/* if this file object is a main file */
struct
Expand All @@ -116,6 +129,17 @@ struct vfs2_file
* sorta-committed transaction that has not yet been
* through Raft. */
struct vfs2_wal_index_basic_hdr pending_txn_hdr;
/* When the WAL is restarted (or started for the first
* time), we capture the initial WAL index header in
* prev_txn_hdr.
*
* When we get SQLITE_FCNTL_COMMIT_PHASETWO, we copy the
* WAL index header from shm into pending_txn_hdr, then
* overwrite the shm with prev_txn_hdr to hide the
* transaction.
*
* When we get vfs2_apply, we overwrite both
* prev_txn_hdr and the shm with pending_txn_hdr. */

/* shm implementation, incl. locks */
void **all_regions;
Expand Down Expand Up @@ -220,106 +244,135 @@ static int vfs2_read(sqlite3_file *file, void *buf, int amt, sqlite3_int64 ofst)
return xfile->orig->pMethods->xRead(xfile->orig, buf, amt, ofst);
}

static int vfs2_write(sqlite3_file *file,
const void *buf,
int amt,
sqlite3_int64 ofst)
static void vfs2_wal_pre_write(struct vfs2_file *wal,
const void *buf,
int amt,
sqlite3_int64 ofst)
{
if (ofst > 0) {
return;
}
/* Trying to overwrite the WAL header: this is a WAL
* reset. */
assert(amt == VFS2_WAL_HDR_SIZE);
/* We expect that the corresponding database file has
* been opened already. */
struct vfs2_file *db = lookup_partner_file(wal);
assert(db != NULL);
/* WAL swap (in-memory part) */
sqlite3_file *tmp = wal->orig;
char *tmp_name = wal->wal.wal_cur_fixed_name;
wal->orig = wal->wal.wal_prev;
wal->wal.wal_cur_fixed_name = wal->wal.wal_prev_fixed_name;
wal->wal.wal_prev = tmp;
wal->wal.wal_prev_fixed_name = tmp_name;

wal->wal.pending_txn_start = 0;
wal->wal.pending_txn_len = 0;

/* Copy the WAL index header that SQLite has written so
* that we can restore it later. This relies on SQLite
* writing the WAL index header before restarting the
* WAL, an assumption that can be verified by looking at
* the source code. */
assert(db->db_shm.all_regions_len > 0);
union vfs2_shm_region0 *region0 = db->db_shm.all_regions[0];
db->db_shm.prev_txn_hdr = region0->hdr.basic[0];
}

static int vfs2_wal_post_write(struct vfs2_file *wal,
const void *buf,
int amt,
sqlite3_int64 ofst)
{
int rv;
struct vfs2_file *xfile = (struct vfs2_file *)file;

if (xfile->flags & SQLITE_OPEN_WAL) {
sqlite3_int64 next_frame_ofst =
VFS2_WAL_HDR_SIZE +
(VFS2_WAL_FRAME_HDR_SIZE + xfile->vfs_data->page_size) *
(xfile->wal.pending_txn_start +
xfile->wal.pending_txn_len);
if (ofst == 0) {
/* Trying to overwrite the WAL header: this is a WAL
* reset. */
assert(amt == VFS2_WAL_HDR_SIZE);
/* We expect that the corresponding database file has
* been opened already. */
struct vfs2_file *db = lookup_partner_file(xfile);
assert(db != NULL);
/* WAL swap (in-memory part) */
sqlite3_file *tmp = xfile->orig;
char *tmp_name = xfile->wal.wal_cur_fixed_name;
xfile->orig = xfile->wal.wal_prev;
xfile->wal.wal_cur_fixed_name =
xfile->wal.wal_prev_fixed_name;
xfile->wal.wal_prev = tmp;
xfile->wal.wal_prev_fixed_name = tmp_name;
/* WAL swap (on-disk part) */
rv = unlink(xfile->wal.moving_name);
if (rv != 0) {
return SQLITE_IOERR;
}
/* If we crash between unlink and link, we'll see the
* conventionally-named WAL is missing at startup, and
* we can't determine which of -xwal1 and -xwal2 is more
* recent. Fortunately, this is not a correctness issue.
* See vfs2_open_wal for how this situation is handled.
*/
rv = link(xfile->wal.wal_cur_fixed_name,
xfile->wal.moving_name);
if (rv != 0) {
return SQLITE_IOERR;
}

xfile->wal.pending_txn_start = 0;
xfile->wal.pending_txn_len = 0;

/* Copy the WAL index header that SQLite has written so
* that we can restore it later. This relies on SQLite
* writing the WAL index header before restarting the
* WAL, an assumption that can be verified by looking at
* the source code. */
assert(db->db_shm.all_regions_len > 0);
union vfs2_shm_region0 *region0 =
db->db_shm.all_regions[0];
db->db_shm.prev_txn_hdr = region0->hdr.basic[0];
} else if (amt == VFS2_WAL_FRAME_HDR_SIZE &&
ofst == next_frame_ofst) {
/* Extend the current transaction. */
xfile->wal.pending_txn_frames =
sqlite3_realloc(xfile->wal.pending_txn_frames,
xfile->wal.pending_txn_len + 1);
if (xfile->wal.pending_txn_frames == NULL) {
return SQLITE_NOMEM;
if (ofst == 0) {
/* WAL swap (on-disk part) */
rv = unlink(wal->wal.moving_name);
if (rv != 0) {
return SQLITE_IOERR;
}
/* If we crash between unlink and link, we'll see the
* conventionally-named WAL is missing at startup, and
* we can't determine which of -xwal1 and -xwal2 is more
* recent. Fortunately, this is not a correctness issue.
* See vfs2_open_wal for how this situation is handled.
*/
rv = link(wal->wal.wal_cur_fixed_name, wal->wal.moving_name);
if (rv != 0) {
return SQLITE_IOERR;
}
} else if (amt == VFS2_WAL_FRAME_HDR_SIZE) {
sqlite3_int64 next_frame_ofst;
if (ofst == next_frame_ofst) {
if (wal->wal.pending_txn_last_frame_commit > 0) {
/* Start of a new transaction. */
} else {
/* Extend the current transaction. FIXME
* reallocating every time seems bad */
wal->wal.pending_txn_frames = sqlite3_realloc(
wal->wal.pending_txn_frames,
sizeof(struct dqlite_vfs_frame) *
(wal->wal.pending_txn_len + 1));
if (wal->wal.pending_txn_frames == NULL) {
return SQLITE_NOMEM;
}
dqlite_vfs_frame *frame =
&wal->wal.pending_txn_frames
[wal->wal.pending_txn_len];
frame->page_number = ByteGetBe32(buf);
frame->data = NULL;
wal->wal.pending_txn_len++;
}
dqlite_vfs_frame *frame =
&xfile->wal.pending_txn_frames
[xfile->wal.pending_txn_len];
frame->page_number = ByteGetBe32(buf);
frame->data = NULL;
xfile->wal.pending_txn_len++;
} else if (amt == VFS2_WAL_FRAME_HDR_SIZE) {
wal->wal.pending_txn_last_frame_commit =
ByteGetBe32((const uint8_t *)buf + 4);
} else {
/* Overwriting a previously-written frame in the current
* transaction. */
sqlite3_int64 x = (ofst - VFS2_WAL_HDR_SIZE) /
(VFS2_WAL_FRAME_HDR_SIZE +
xfile->vfs_data->page_size) -
xfile->wal.pending_txn_start;
wal->vfs_data->page_size) -
wal->wal.pending_txn_start;
dqlite_vfs_frame *frame =
&xfile->wal.pending_txn_frames[x];
&wal->wal.pending_txn_frames[x];
frame->page_number = ByteGetBe32(buf);
} else if (amt == xfile->vfs_data->page_size) {
sqlite3_int64 x = (ofst - VFS2_WAL_FRAME_HDR_SIZE -
VFS2_WAL_HDR_SIZE) /
(VFS2_WAL_FRAME_HDR_SIZE +
xfile->vfs_data->page_size) -
xfile->wal.pending_txn_start;
assert(x < xfile->wal.pending_txn_len);
dqlite_vfs_frame *frame =
&xfile->wal.pending_txn_frames[x];
sqlite3_free(frame->data);
frame->data = sqlite3_malloc(amt);
if (frame->data == NULL) {
return SQLITE_NOMEM;
}
memcpy(frame->data, buf, amt);
}
} else if (amt == wal->vfs_data->page_size) {
sqlite3_int64 x =
(ofst - VFS2_WAL_FRAME_HDR_SIZE - VFS2_WAL_HDR_SIZE) /
(VFS2_WAL_FRAME_HDR_SIZE + wal->vfs_data->page_size) -
wal->wal.pending_txn_start;
assert(x < wal->wal.pending_txn_len);
dqlite_vfs_frame *frame = &wal->wal.pending_txn_frames[x];
sqlite3_free(frame->data);
frame->data = sqlite3_malloc(amt);
if (frame->data == NULL) {
return SQLITE_NOMEM;
}
memcpy(frame->data, buf, amt);
}
return xfile->orig->pMethods->xWrite(xfile->orig, buf, amt, ofst);
return SQLITE_OK;
}

static int vfs2_write(sqlite3_file *file,
const void *buf,
int amt,
sqlite3_int64 ofst)
{
int rv;
struct vfs2_file *xfile = (struct vfs2_file *)file;
if (xfile->flags & SQLITE_OPEN_WAL) {
vfs2_wal_pre_write(xfile, buf, amt, ofst);
}
rv = xfile->orig->pMethods->xWrite(xfile->orig, buf, amt, ofst);
if (rv != SQLITE_OK) {
return rv;
}
if (xfile->flags & SQLITE_OPEN_WAL) {
return vfs2_wal_post_write(xfile, buf, amt, ofst);
}
return SQLITE_OK;
}

static int vfs2_truncate(sqlite3_file *file, sqlite3_int64 size)
Expand Down Expand Up @@ -894,4 +947,11 @@ int vfs2_poll(sqlite3_file *file, dqlite_vfs_frame **frames, unsigned *n)
if (!(xfile->flags & SQLITE_OPEN_MAIN_DB)) {
return 1;
}
struct vfs2_file *wal = lookup_partner_file(xfile);
if (wal == NULL) {
return 1;
}

*n = wal->wal.pending_txn_len;
*frames = wal->wal.pending_txn_frames;
}

0 comments on commit 20c03cf

Please sign in to comment.