Skip to content

Commit 27f3c80

Browse files
implausiblezawata
authored andcommitted
thread checkout: add threading to checkout_create_the_new
1 parent 0739689 commit 27f3c80

File tree

1 file changed

+220
-0
lines changed

1 file changed

+220
-0
lines changed

src/checkout.c

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,11 +1967,231 @@ static int checkout_create_the_new__single(
19671967
return 0;
19681968
}
19691969

1970+
#ifdef GIT_THREADS
1971+
1972+
typedef struct {
1973+
int error;
1974+
size_t index;
1975+
bool skipped;
1976+
} checkout_progress_pair;
1977+
1978+
typedef struct {
1979+
git_thread thread;
1980+
const unsigned int *actions;
1981+
checkout_data *cd;
1982+
1983+
git_cond *cond;
1984+
git_mutex *mutex;
1985+
1986+
git_atomic32 *delta_index;
1987+
git_atomic32 *error;
1988+
git_vector *progress_pairs;
1989+
} thread_params;
1990+
1991+
static void *checkout_create_the_new__thread(void *arg)
1992+
{
1993+
thread_params *worker = arg;
1994+
size_t i;
1995+
checkout_buffers *buffers = git__malloc(sizeof(checkout_buffers));
1996+
1997+
// TODO if the thread fails to allocate, signal and have the parent thread check the return value
1998+
// TODO deduplicate this setup with checkout_data_init
1999+
git_buf_init(&buffers->target_path, 0);
2000+
git_buf_init(&buffers->tmp, 0);
2001+
git_tlsdata_set(worker->cd->buffers, buffers);
2002+
git_buf_puts(&buffers->target_path, worker->cd->opts.target_directory);
2003+
git_path_to_dir(&buffers->target_path);
2004+
buffers->target_len = git_buf_len(&buffers->target_path);
2005+
2006+
while ((i = git_atomic32_add(worker->delta_index, 1)) <
2007+
git_vector_length(&worker->cd->diff->deltas)) {
2008+
checkout_progress_pair *progress_pair;
2009+
git_diff_delta *delta = git_vector_get(&worker->cd->diff->deltas, i);
2010+
2011+
if (delta == NULL || git_atomic32_get(worker->error) != 0)
2012+
return NULL;
2013+
2014+
progress_pair = (checkout_progress_pair *)git__malloc(
2015+
sizeof(checkout_progress_pair));
2016+
if (progress_pair == NULL) {
2017+
git_atomic32_set(worker->error, -1);
2018+
git_cond_signal(worker->cond);
2019+
return NULL;
2020+
}
2021+
2022+
/* We skip symlink operations, because we handle them
2023+
* in the main thread to avoid a symlink security flaw.
2024+
*/
2025+
if (!S_ISLNK(delta->new_file.mode) &&
2026+
worker->actions[i] & CHECKOUT_ACTION__UPDATE_BLOB) {
2027+
/* We will retry failed operations in the calling thread to handle
2028+
* the case where might encounter a file locking error due to
2029+
* multithreading and name collisions.
2030+
*/
2031+
progress_pair->index = i;
2032+
progress_pair->error = checkout_blob(worker->cd, &delta->new_file);
2033+
progress_pair->skipped = false;
2034+
} else {
2035+
progress_pair->index = i;
2036+
progress_pair->error = 0;
2037+
progress_pair->skipped = true;
2038+
}
2039+
2040+
git_mutex_lock(worker->mutex);
2041+
git_vector_insert(worker->progress_pairs, progress_pair);
2042+
git_cond_signal(worker->cond);
2043+
git_mutex_unlock(worker->mutex);
2044+
}
2045+
2046+
return NULL;
2047+
}
2048+
2049+
static int checkout_create_the_new__parallel(
2050+
unsigned int *actions,
2051+
checkout_data *data)
2052+
{
2053+
thread_params *p;
2054+
size_t i, num_threads = git__online_cpus(), last_index = 0, current_index = 0,
2055+
num_deltas = git_vector_length(&data->diff->deltas);
2056+
int ret;
2057+
checkout_progress_pair *progress_pair;
2058+
git_atomic32 delta_index, error;
2059+
git_diff_delta *delta;
2060+
git_vector errored_pairs, progress_pairs, temp;
2061+
git_cond cond;
2062+
git_mutex mutex;
2063+
2064+
if (
2065+
(ret = git_vector_init(&progress_pairs, num_deltas, NULL)) < 0 ||
2066+
(ret = git_vector_init(&errored_pairs, num_deltas, NULL)) < 0 ||
2067+
(ret = git_vector_init(&temp, num_deltas, NULL)) < 0)
2068+
return ret;
2069+
2070+
p = git__mallocarray(num_threads, sizeof(*p));
2071+
GIT_ERROR_CHECK_ALLOC(p);
2072+
2073+
git_cond_init(&cond);
2074+
git_mutex_init(&mutex);
2075+
git_mutex_lock(&mutex);
2076+
2077+
git_atomic32_set(&delta_index, -1);
2078+
git_atomic32_set(&error, 0);
2079+
2080+
/* Initialize worker threads */
2081+
for (i = 0; i < num_threads; ++i) {
2082+
p[i].actions = actions;
2083+
p[i].cd = data;
2084+
p[i].cond = &cond;
2085+
p[i].mutex = &mutex;
2086+
p[i].error = &error;
2087+
p[i].delta_index = &delta_index;
2088+
p[i].progress_pairs = &progress_pairs;
2089+
}
2090+
2091+
/* Start worker threads */
2092+
for (i = 0; i < num_threads; ++i) {
2093+
ret = git_thread_create(&p[i].thread, checkout_create_the_new__thread, &p[i]);
2094+
2095+
/* On error, we will cleanly exit any started worker threads,
2096+
* and then return with our error code */
2097+
if (ret) {
2098+
git_atomic32_set(&error, -1);
2099+
git_error_set(GIT_ERROR_THREAD, "unable to create thread");
2100+
git_mutex_unlock(&mutex);
2101+
/* Only clean up the number of threads we have started */
2102+
num_threads = i;
2103+
ret = -1;
2104+
goto cleanup;
2105+
}
2106+
}
2107+
2108+
while (last_index < num_deltas) {
2109+
if ((ret = git_atomic32_get(&error)) != 0) {
2110+
git_mutex_unlock(&mutex);
2111+
goto cleanup;
2112+
}
2113+
2114+
current_index = git_vector_length(&progress_pairs);
2115+
2116+
if (last_index == current_index) {
2117+
git_cond_wait(&cond, &mutex);
2118+
current_index = git_vector_length(&progress_pairs);
2119+
}
2120+
2121+
git_vector_clear(&temp);
2122+
for (; last_index < current_index; ++last_index) {
2123+
progress_pair = git_vector_get(&progress_pairs,
2124+
last_index);
2125+
delta = git_vector_get(&data->diff->deltas, last_index);
2126+
2127+
if (progress_pair->skipped)
2128+
continue;
2129+
2130+
/* We will retry errored checkouts synchronously after all the workers
2131+
* complete
2132+
*/
2133+
if (progress_pair->error < 0) {
2134+
git_vector_insert(&errored_pairs, progress_pair);
2135+
continue;
2136+
}
2137+
2138+
git_vector_insert(&temp, delta);
2139+
}
2140+
2141+
git_mutex_unlock(&mutex);
2142+
2143+
for (i = 0; i < git_vector_length(&temp); ++i) {
2144+
delta = git_vector_get(&temp, i);
2145+
data->completed_steps++;
2146+
report_progress(data, delta->new_file.path);
2147+
}
2148+
2149+
git_mutex_lock(&mutex);
2150+
}
2151+
2152+
git_mutex_unlock(&mutex);
2153+
2154+
git_vector_foreach(&errored_pairs, i, progress_pair) {
2155+
delta = git_vector_get(&data->diff->deltas, progress_pair->index);
2156+
if ((ret = checkout_create_the_new_perform(data, actions[progress_pair->index],
2157+
delta, NO_SYMLINKS)) < 0)
2158+
goto cleanup;
2159+
}
2160+
2161+
/* After we create everything else, we need to create all the symlinks
2162+
* to ensure that we don't accidentally write data through symlinks into
2163+
* the .git directory.
2164+
*/
2165+
git_vector_foreach(&data->diff->deltas, i, delta) {
2166+
if ((ret = checkout_create_the_new_perform(data, actions[i], delta,
2167+
SYMLINKS_ONLY)) < 0)
2168+
goto cleanup;
2169+
}
2170+
2171+
cleanup:
2172+
for (i = 0; i < num_threads; ++i) {
2173+
git_thread_join(&p[i].thread, NULL);
2174+
}
2175+
2176+
git__free(p);
2177+
git_vector_free(&errored_pairs);
2178+
git_vector_free(&temp);
2179+
git_vector_free_deep(&progress_pairs);
2180+
git_cond_free(&cond);
2181+
git_mutex_free(&mutex);
2182+
2183+
return ret;
2184+
}
2185+
2186+
#endif
2187+
19702188
static int checkout_create_the_new(
19712189
unsigned int *actions,
19722190
checkout_data *data)
19732191
{
19742192
#ifdef GIT_THREADS
2193+
if (git__online_cpus() > 1)
2194+
return checkout_create_the_new__parallel(actions, data);
19752195
#endif
19762196
return checkout_create_the_new__single(actions, data);
19772197
}

0 commit comments

Comments
 (0)