Skip to content

Commit

Permalink
Merge pull request percona#2010 from venkatesh-prasad-v/8.4-post-push…
Browse files Browse the repository at this point in the history
…-4173

[8.4] PXC-4173: PXC node stalls with parallel replication workers executing DDLs via async node
  • Loading branch information
venkatesh-prasad-v authored Jan 28, 2025
2 parents 2b45465 + c8d3348 commit 63d2342
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 84 deletions.
7 changes: 4 additions & 3 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ bool MYSQL_BIN_LOG::write_transaction(THD *thd, binlog_cache_data *cache_data,
goto end;

ret = DBUG_EVALUATE_IF("simulate_write_trans_without_gtid", false,
gtid_event.write(writer));
gtid_event.write(writer));
#else
bool ret = DBUG_EVALUATE_IF("simulate_write_trans_without_gtid", false,
gtid_event.write(writer));
Expand Down Expand Up @@ -12545,8 +12545,9 @@ TC_LOG::enum_result wsrep_thd_binlog_commit(THD *thd, bool all) {
if (all) {
CONDITIONAL_SYNC_POINT_FOR_TIMESTAMP("before_commit_in_tc");
}
return trx_coordinator::commit_in_engines(thd, all) ? TC_LOG::RESULT_ABORTED
: TC_LOG::RESULT_SUCCESS;
return trx_coordinator::commit_in_engines(thd, all)
? TC_LOG::RESULT_ABORTED
: TC_LOG::RESULT_SUCCESS;
}
}

Expand Down
15 changes: 9 additions & 6 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@

#ifdef WITH_WSREP
#include "service_wsrep.h"
#include "sql/wsrep_async_monitor.h"
#include "wsrep_mysqld.h"
#include "wsrep_xid.h"
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

#define window_size Log_throttle::LOG_THROTTLE_WINDOW_SIZE
Expand Down Expand Up @@ -2679,7 +2679,8 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) {
Gtid_log_event *gtid_log_ev = static_cast<Gtid_log_event *>(this);
rli->started_processing(gtid_log_ev);
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()};
Wsrep_async_monitor *wsrep_async_monitor{
rli->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = gtid_log_ev->sequence_number;
wsrep_async_monitor->schedule(seqno);
Expand Down Expand Up @@ -11146,10 +11147,12 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli,
#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(rli->info_thd) && rli->info_thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER
&& !thd_is_wsrep_applier && res == FILTERED_OUT) {
Slave_worker *sw = static_cast<Slave_worker *>(const_cast<Relay_log_info *>(rli));
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (WSREP(rli->info_thd) &&
rli->info_thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
!thd_is_wsrep_applier && res == FILTERED_OUT) {
Slave_worker *sw =
static_cast<Slave_worker *>(const_cast<Relay_log_info *>(rli));
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
Expand Down
9 changes: 5 additions & 4 deletions sql/rpl_gtid_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include "sql/sql_parse.h" // stmt_causes_implicit_commit
#include "sql/system_variables.h"
#ifdef WITH_WSREP
#include "sql/rpl_rli_pdb.h" // Slave_worker
#include "sql/rpl_rli_pdb.h" // Slave_worker
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

Expand Down Expand Up @@ -386,10 +386,11 @@ static inline void skip_statement(THD *thd) {
*/
Commit_stage_manager::get_instance().finish_session_ticket(thd);
#ifdef WITH_WSREP
/* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */
/* Despite the transaction was skipped, it needs to be updated in the
* Wsrep_async_monitor */
if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
Slave_worker *sw = dynamic_cast<Slave_worker *>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
Expand Down
4 changes: 1 addition & 3 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -1791,9 +1791,7 @@ class Relay_log_info : public Rpl_info {
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
Wsrep_async_monitor *get_wsrep_async_monitor() { return wsrep_async_monitor; }
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
Expand Down
19 changes: 9 additions & 10 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,10 @@ inline bool check_database_filters(THD *thd, const char *db,
#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER
&& !thd->wsrep_applier && !db_ok) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (WSREP(thd) && thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
!thd->wsrep_applier && !db_ok) {
Slave_worker *sw = dynamic_cast<Slave_worker *>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
Expand Down Expand Up @@ -1863,14 +1863,14 @@ static bool block_write_while_in_rolling_upgrade(THD *thd) {
would not block writes, but for clear flow, let's check if server state
is initialized, and if it is not yet, do not block writes.
2. Background wsrep applier (like slave thread) */
if (!thd->wsrep_cs().server_state().is_initialized() || (WSREP(thd) && thd->wsrep_applier))
if (!thd->wsrep_cs().server_state().is_initialized() ||
(WSREP(thd) && thd->wsrep_applier))
return false;

bool block = false;
LEX *lex = thd->lex;
if (sql_command_flags[lex->sql_command] & CF_CHANGES_DATA) {
bool multi_version_cluster =
wsrep_protocol_version < WsrepVersion::V4;
bool multi_version_cluster = wsrep_protocol_version < WsrepVersion::V4;
if (multi_version_cluster ||
DBUG_EVALUATE_IF("simulate_wsrep_multiple_major_versions", true,
false)) {
Expand Down Expand Up @@ -5143,9 +5143,8 @@ int mysql_execute_command(THD *thd, bool first_level) {
REFRESH_USER_RESOURCES | REFRESH_ERROR_LOG | REFRESH_SLOW_LOG |
REFRESH_GENERAL_LOG | REFRESH_ENGINE_LOG | REFRESH_RELAY_LOG |
/* Percona Server specific */
REFRESH_TABLE_STATS |
REFRESH_INDEX_STATS | REFRESH_USER_STATS | REFRESH_CLIENT_STATS |
REFRESH_THREAD_STATS)) {
REFRESH_TABLE_STATS | REFRESH_INDEX_STATS | REFRESH_USER_STATS |
REFRESH_CLIENT_STATS | REFRESH_THREAD_STATS)) {
WSREP_TO_ISOLATION_BEGIN_WRTCHK(WSREP_MYSQL_DB, NULL, NULL)
}
#endif /* WITH_WSREP */
Expand Down
46 changes: 27 additions & 19 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8486,9 +8486,10 @@ static Sys_var_ulonglong Sys_wsrep_trx_fragment_size(
"wsrep_trx_fragment_size",
"Size of transaction fragments for streaming replication (measured in "
"units of 'wsrep_trx_fragment_unit')",
HINT_UPDATEABLE SESSION_VAR(wsrep_trx_fragment_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, WSREP_MAX_WS_SIZE), DEFAULT(0), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(wsrep_trx_fragment_size_check),
HINT_UPDATEABLE SESSION_VAR(wsrep_trx_fragment_size),
CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, WSREP_MAX_WS_SIZE), DEFAULT(0),
BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(wsrep_trx_fragment_size_check),
ON_UPDATE(wsrep_trx_fragment_size_update));

extern const char *wsrep_fragment_units[];
Expand All @@ -8497,9 +8498,10 @@ static Sys_var_enum Sys_wsrep_trx_fragment_unit(
"wsrep_trx_fragment_unit",
"Unit for streaming replication transaction fragments' size: bytes, "
"rows, statements",
HINT_UPDATEABLE SESSION_VAR(wsrep_trx_fragment_unit), CMD_LINE(REQUIRED_ARG),
wsrep_fragment_units, DEFAULT(WSREP_FRAG_BYTES), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(wsrep_trx_fragment_unit_update));
HINT_UPDATEABLE SESSION_VAR(wsrep_trx_fragment_unit),
CMD_LINE(REQUIRED_ARG), wsrep_fragment_units, DEFAULT(WSREP_FRAG_BYTES),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(wsrep_trx_fragment_unit_update));

extern const char *wsrep_SR_store_types[];
static Sys_var_enum Sys_wsrep_SR_store(
Expand All @@ -8509,8 +8511,8 @@ static Sys_var_enum Sys_wsrep_SR_store(

static Sys_var_bool Sys_wsrep_dirty_reads(
"wsrep_dirty_reads", "Allow reads from a node is not in primary component",
HINT_UPDATEABLE SESSION_VAR(wsrep_dirty_reads), CMD_LINE(OPT_ARG), DEFAULT(false),
NO_MUTEX_GUARD, NOT_IN_BINLOG);
HINT_UPDATEABLE SESSION_VAR(wsrep_dirty_reads), CMD_LINE(OPT_ARG),
DEFAULT(false), NO_MUTEX_GUARD, NOT_IN_BINLOG);

static Sys_var_uint Sys_wsrep_ignore_apply_errors(
"wsrep_ignore_apply_errors", "Ignore replication errors",
Expand Down Expand Up @@ -8554,25 +8556,31 @@ static Sys_var_bool Sys_pxc_encrypt_cluster_traffic(
READ_ONLY GLOBAL_VAR(pxc_encrypt_cluster_traffic), CMD_LINE(OPT_ARG),
DEFAULT(true), NO_MUTEX_GUARD, NOT_IN_BINLOG);

static const char *wsrep_encrypt_modes[] = {"OFF", "ON", "NONE",
NullS};
static const char *wsrep_encrypt_modes[] = {"OFF", "ON", "NONE", NullS};
static Sys_var_enum Sys_wsrep_gcache_encrypt(
"wsrep_gcache_encrypt", "Encrypt GCache. This variable is solely for testing purposes. "
"wsrep_gcache_encrypt",
"Encrypt GCache. This variable is solely for testing purposes. "
"wsrep_provider_options should be used in production.",
READ_ONLY GLOBAL_VAR(wsrep_gcache_encrypt), CMD_LINE(OPT_ARG), wsrep_encrypt_modes,
DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
READ_ONLY GLOBAL_VAR(wsrep_gcache_encrypt), CMD_LINE(OPT_ARG),
wsrep_encrypt_modes, DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD,
NOT_IN_BINLOG);

static Sys_var_enum Sys_wsrep_disk_pages_encrypt(
"wsrep_disk_pages_encrypt", "Encrypt WriteSet cache. This variable is solely for testing purposes. "
"wsrep_disk_pages_encrypt",
"Encrypt WriteSet cache. This variable is solely for testing purposes. "
"wsrep_provider_options should be used in production.",
READ_ONLY GLOBAL_VAR(wsrep_disk_pages_encrypt), CMD_LINE(OPT_ARG), wsrep_encrypt_modes,
DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
READ_ONLY GLOBAL_VAR(wsrep_disk_pages_encrypt), CMD_LINE(OPT_ARG),
wsrep_encrypt_modes, DEFAULT(WSREP_ENCRYPT_MODE_NONE), NO_MUTEX_GUARD,
NOT_IN_BINLOG);

static Sys_var_bool Sys_wsrep_async_monitor(
"wsrep_use_async_monitor",
"Use Async Monitors to avoid deadlock of replicated transactions in a multi-threaded replica. "
"Deadlock is possible when the PXC replica tries to commit the replicated transactions in galera "
"in a different order than its order in the relay log. Use 'wsrep_use_async_monitor' to avoid "
"Use Async Monitors to avoid deadlock of replicated transactions in a "
"multi-threaded replica. "
"Deadlock is possible when the PXC replica tries to commit the replicated "
"transactions in galera "
"in a different order than its order in the relay log. Use "
"'wsrep_use_async_monitor' to avoid "
"such deadlocks."
"This variable is only allowed to be changed through command line.",
READ_ONLY GLOBAL_VAR(wsrep_use_async_monitor), CMD_LINE(OPT_ARG),
Expand Down
15 changes: 9 additions & 6 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifdef WITH_WSREP
#include "sql/mysqld.h"
#include "sql/wsrep_async_monitor.h"
#include <cassert>
#include "sql/mysqld.h"

// Method for main thread to add scheduled seqnos
void Wsrep_async_monitor::schedule(seqno_t seqno) {
Expand All @@ -34,16 +34,17 @@ void Wsrep_async_monitor::enter(seqno_t seqno) {

// Wait until this transaction is at the head of the scheduled queue
m_cond.wait(lock, [this, seqno] {

// Here we need to remove skipped transactions

// Imagine a scenario where scheduled seqnos is 1,2(skip),3 and threads enter in
// an out of order manner.
// Imagine a scenario where scheduled seqnos is 1,2(skip),3 and threads
// enter in an out of order manner.
//
// - 3 enters the monitor first, since it is not in the front of the queue, it
// - 3 enters the monitor first, since it is not in the front of the queue,
// it
// goes to cond_wait
// - 2 enters the monitor, adds 2 to skipped_seqnos
// - 1 enters the monitor, since it is in the front, it acquires the monitor,
// - 1 enters the monitor, since it is in the front, it acquires the
// monitor,
// does its job and leaves the monitor by removing itself from the
// scheduled_seqnos and signals 3.
// - 3 wakes up, but it will see that 2 in the front of the
Expand Down Expand Up @@ -80,7 +81,9 @@ void Wsrep_async_monitor::leave(seqno_t seqno) {
// : std::to_string(scheduled_seqnos.front()))
// << " but got " << seqno << "." << std::endl;
assert(false && "Sequence number mismatch in leave()");
#ifndef EXTRA_CODE_FOR_UNIT_TESTING
unireg_abort(1);
#endif
}

// Remove seqnos from skipped_seqnos.
Expand Down
42 changes: 21 additions & 21 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,9 @@ void wsrep_deinit_server() {
Wsrep_server_state::destroy();
}

static void override_galera_option(const std::string& option, const std::string& new_value, std::string& options) {
static void override_galera_option(const std::string &option,
const std::string &new_value,
std::string &options) {
size_t option_pos = options.find(option + "=");
if (option_pos == std::string::npos) {
option_pos = options.find(option + " ");
Expand All @@ -1097,7 +1099,7 @@ static void override_galera_option(const std::string& option, const std::string&
// erase the option form the original string
std::string modified_options = options.substr(0, option_pos);
if (separator_pos != std::string::npos) {
modified_options += options.substr(separator_pos+1);
modified_options += options.substr(separator_pos + 1);
}

// add the new value of the option at the end
Expand All @@ -1106,17 +1108,20 @@ static void override_galera_option(const std::string& option, const std::string&
options = modified_options;
}

static void setup_galera_encryption_params(char* provider_options, size_t buf_size) {
static void setup_galera_encryption_params(char *provider_options,
size_t buf_size) {
std::string options(provider_options);
static const std::string yes("yes");
static const std::string no("no");

if (wsrep_gcache_encrypt != WSREP_ENCRYPT_MODE_NONE) {
const std::string& val = wsrep_gcache_encrypt == WSREP_ENCRYPT_MODE_ON ? yes : no;
const std::string &val =
wsrep_gcache_encrypt == WSREP_ENCRYPT_MODE_ON ? yes : no;
override_galera_option("gcache.encryption", val, options);
}
if (wsrep_disk_pages_encrypt != WSREP_ENCRYPT_MODE_NONE) {
const std::string& val = wsrep_disk_pages_encrypt == WSREP_ENCRYPT_MODE_ON ? yes : no;
const std::string &val =
wsrep_disk_pages_encrypt == WSREP_ENCRYPT_MODE_ON ? yes : no;
override_galera_option("allocator.disk_pages_encryption", val, options);
}

Expand Down Expand Up @@ -2248,8 +2253,7 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
If any of the remaining tables refer to temporary table error
is returned to client, so TOI can be skipped
*/
for (Table_ref *it = first_table->next_global; it;
it = it->next_global) {
for (Table_ref *it = first_table->next_global; it; it = it->next_global) {
if (find_temporary_table(thd, it)) {
return false;
}
Expand Down Expand Up @@ -2856,11 +2860,10 @@ static int wsrep_RSU_begin(THD *thd, const char *, const char *) {
"was executed with wsrep_OSU_method = RSU. "
"Query: %s",
WSREP_QUERY(thd));
push_warning_printf(
thd, Sql_condition::SL_WARNING, ER_UNKNOWN_ERROR,
"The statement was neither written to the binary log "
"nor any GTID was generated as the statement "
"was executed with wsrep_OSU_method = RSU.");
push_warning_printf(thd, Sql_condition::SL_WARNING, ER_UNKNOWN_ERROR,
"The statement was neither written to the binary log "
"nor any GTID was generated as the statement "
"was executed with wsrep_OSU_method = RSU.");
thd->disable_binlog_guard =
std::make_shared<Disable_binlog_guard>(thd, true);
}
Expand All @@ -2875,17 +2878,15 @@ static void wsrep_RSU_end(THD *thd) {
thd->disable_binlog_guard.reset();
}

void thd_enter_async_monitor(THD* thd) {

void thd_enter_async_monitor(THD *thd) {
// Only replica worker threads are allowed to enter
if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) {

// If the thread is already killed, leave it to the called to handle it.
if (thd->killed != THD::NOT_KILLED || thd->wsrep_applier) {
return;
}
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
Slave_worker *sw = dynamic_cast<Slave_worker *>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
Expand All @@ -2896,13 +2897,12 @@ void thd_enter_async_monitor(THD* thd) {
}
}

void thd_leave_async_monitor(THD* thd) {

void thd_leave_async_monitor(THD *thd) {
if (thd->wsrep_applier) return;

if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) {
Slave_worker * sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
Slave_worker *sw = dynamic_cast<Slave_worker *>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor{sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
Expand Down
Loading

0 comments on commit 63d2342

Please sign in to comment.