From 6803cad2f3e8036e6d45452bb52886438fe13868 Mon Sep 17 00:00:00 2001 From: Iwan Kawrakow Date: Thu, 25 Dec 2025 07:18:51 +0000 Subject: [PATCH] Scheduler changes --- ggml/src/ggml-backend.cpp | 222 +++++++++++++++++++++----------------- 1 file changed, 123 insertions(+), 99 deletions(-) diff --git a/ggml/src/ggml-backend.cpp b/ggml/src/ggml-backend.cpp index 550d8dc4..4058751f 100644 --- a/ggml/src/ggml-backend.cpp +++ b/ggml/src/ggml-backend.cpp @@ -1170,9 +1170,16 @@ struct ggml_backend_sched { uint32_t op_offload[(GGML_OP_COUNT + 31)/32]; + std::vector workers; + std::vector statuses; + std::vector> backend_splits; + std::array needs_sync; + std::array own_cpy; + bool only_active_experts; bool split_mode_graph; bool debug; + bool has_reduce = false; }; void ggml_backend_sched_set_op_offload(ggml_backend_sched_t sched, enum ggml_op op, bool on_or_off) { @@ -1394,6 +1401,7 @@ static void ggml_backend_sched_split_graph(ggml_backend_sched_t sched, struct gg sched->n_splits = 0; sched->n_graph_inputs = 0; sched->is_reset = false; + sched->has_reduce = false; struct ggml_init_params params = { /* .mem_size = */ sched->context_buffer_size, @@ -1698,6 +1706,9 @@ static void ggml_backend_sched_split_graph(ggml_backend_sched_t sched, struct gg // check if we should start a new split based on the sources of the current node bool need_new_split = false; + if (node->op == GGML_OP_REDUCE) { + sched->has_reduce = true; + } if ((node->op == GGML_OP_ADD && node->op_params[0] == 0xff) || node->op == GGML_OP_REDUCE || node->op == GGML_OP_FAKE_CPY || @@ -2137,97 +2148,18 @@ static ggml_status ggml_backend_sched_eval(ggml_backend_sched_t sched, ggml_back static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t sched) { - std::array needs_sync{{true}}; - std::array own_cpy{{false}}; + //if (!sched->split_mode_graph) { + // for (auto & item : sched->own_cpy ) item = false; + // for (auto & item : sched->needs_sync) item = true; + //} + for (auto & item : sched->needs_sync) item = true; - if (sched->split_mode_graph) { - auto tensor_size = [] (const ggml_tensor * t) { - auto nbytes = ggml_nbytes(t); - nbytes = 256*((nbytes + 255)/256); - return nbytes; - }; - //auto tim1 = std::chrono::steady_clock::now(); - std::vector> backend_splits(sched->n_backends); - for (int i = 0; i < sched->n_splits; i++) { - backend_splits[sched->splits[i].backend_id].push_back(&sched->splits[i]); - } - for (int backend_id = 0; backend_id < sched->n_backends; ++backend_id) { - if (ggml_backend_is_cpu(ggml_backend_sched_get_backend(sched, backend_id))) continue; - if (backend_splits[backend_id].empty()) continue; - size_t input_size = 0; - size_t max_input_size = 0; - int last_split = 0; - bool can_alloc = true; - for (int i = 0; i < int(backend_splits[backend_id].size()); ++i) { - auto split = backend_splits[backend_id][i]; - if (split->n_inputs < 1) continue; - size_t this_size = 0; - for (int j = 0; j < split->n_inputs; ++j) { - if (!ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { - this_size += tensor_size(split->inputs[j]); - } - } - if (input_size + this_size > sched->max_extra_alloc) { - if (i - last_split < 3) { - can_alloc = false; - break; - } - max_input_size = std::max(max_input_size, input_size); - input_size = 0; - last_split = i - 1; - } - input_size += this_size; - } - max_input_size = std::max(max_input_size, input_size); - if (!can_alloc || !max_input_size) continue; - if (sched->input_memory_bufs[backend_id] && sched->input_memory_bufs[backend_id]->size < max_input_size) { - ggml_backend_buffer_free(sched->input_memory_bufs[backend_id]); - sched->input_memory_bufs[backend_id] = nullptr; - } - if (!sched->input_memory_bufs[backend_id]) { - sched->input_memory_bufs[backend_id] = ggml_backend_alloc_buffer(sched->backends[backend_id], max_input_size); - } - auto ptr = (char *)ggml_backend_buffer_get_base(sched->input_memory_bufs[backend_id]); - input_size = 0; - for (int i = 0; i < int(backend_splits[backend_id].size()); ++i) { - auto split = backend_splits[backend_id][i]; - size_t this_size = 0; - for (int j = 0; j < split->n_inputs; ++j) { - if (!ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { - this_size += tensor_size(split->inputs[j]); - } - } - if (input_size + this_size > max_input_size) { - ptr = (char *)ggml_backend_buffer_get_base(sched->input_memory_bufs[backend_id]); - input_size = 0; - } - for (int j = 0; j < split->n_inputs; ++j) { - if (ggml_backend_buffer_is_host(split->inputs[j]->buffer)) continue; - auto input_cpy = tensor_copy(split->inputs[j], backend_id, sched->cur_copy); - for (int k = 0; k < split->graph.n_nodes; ++k) { - auto node = split->graph.nodes[k]; - for (int l = 0; l < GGML_MAX_SRC; ++l) { - if (node->src[l] && node->src[l]->data == input_cpy->data) node->src[l]->data = ptr; - } - } - input_cpy->data = ptr; - ptr += tensor_size(split->inputs[j]); - } - input_size += this_size; - } - needs_sync[backend_id] = false; - own_cpy[backend_id] = true; - } - } - - if (sched->n_backends > 3 && sched->split_mode_graph) { + if (sched->n_backends > 3 && sched->split_mode_graph && sched->has_reduce) { std::barrier barrier(sched->n_backends, [] () {}); - std::vector workers; - workers.reserve(sched->n_backends); - std::vector statuses(sched->n_backends, GGML_STATUS_SUCCESS); + for (auto & s : sched->statuses) s = GGML_STATUS_SUCCESS; - auto compute = [sched, &needs_sync, &own_cpy, &barrier, &statuses] (int ith) { + auto compute = [sched, &barrier] (int ith) { struct ggml_backend_sched_split * splits = sched->splits; @@ -2251,18 +2183,18 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s if (ith == split_backend_id) { // copy the input tensors to the split backend - ggml_backend_sched_copy_inputs(sched, split, needs_sync, ids, unique_ids, last_ids_tensor); + ggml_backend_sched_copy_inputs(sched, split, sched->needs_sync, ids, unique_ids, last_ids_tensor); - if (split->n_inputs > 0 && !own_cpy[split_backend_id]) { - needs_sync[split_backend_id] = true; + if (split->n_inputs > 0 && !sched->own_cpy[split_backend_id]) { + sched->needs_sync[split_backend_id] = true; } else { for (int j = 0; j < split->n_inputs; ++j) { if (ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { - needs_sync[split_backend_id] = true; + sched->needs_sync[split_backend_id] = true; } } } - statuses[ith] = ggml_backend_sched_eval(sched, split_backend, split); + sched->statuses[ith] = ggml_backend_sched_eval(sched, split_backend, split); } if (split->graph.nodes[0]->op == GGML_OP_REDUCE) { @@ -2281,9 +2213,10 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s } }; - for (int i = 0; i < sched->n_backends; ++i) workers.emplace_back(compute, i); - for (auto & w : workers) w.join(); - for (auto status : statuses) { + for (int i = 0; i < sched->n_backends; ++i) sched->workers.emplace_back(compute, i); + for (auto & w : sched->workers) w.join(); + sched->workers.clear(); + for (auto status : sched->statuses) { if (status != GGML_STATUS_SUCCESS) return status; } return GGML_STATUS_SUCCESS; @@ -2305,14 +2238,14 @@ static enum ggml_status ggml_backend_sched_compute_splits(ggml_backend_sched_t s ggml_backend_t split_backend = sched->backends[split_backend_id]; // copy the input tensors to the split backend - ggml_backend_sched_copy_inputs(sched, split, needs_sync, ids, unique_ids, last_ids_tensor); + ggml_backend_sched_copy_inputs(sched, split, sched->needs_sync, ids, unique_ids, last_ids_tensor); - if (split->n_inputs > 0 && !own_cpy[split_backend_id]) { - needs_sync[split_backend_id] = true; + if (split->n_inputs > 0 && !sched->own_cpy[split_backend_id]) { + sched->needs_sync[split_backend_id] = true; } else { for (int j = 0; j < split->n_inputs; ++j) { if (ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { - needs_sync[split_backend_id] = true; + sched->needs_sync[split_backend_id] = true; } } } @@ -2384,6 +2317,10 @@ ggml_backend_sched_t ggml_backend_sched_new( sched->galloc = ggml_gallocr_new_n(sched->bufts, n_backends); + sched->workers.reserve(sched->n_backends); + sched->statuses.resize(sched->n_backends, GGML_STATUS_SUCCESS); + sched->backend_splits.resize(sched->n_backends); + ggml_backend_sched_reset(sched); return sched; @@ -2466,6 +2403,92 @@ enum ggml_status ggml_backend_sched_graph_compute(ggml_backend_sched_t sched, st return err; } +static void ggml_sched_prepare_graph(ggml_backend_sched_t sched) { + + for (auto & item : sched->own_cpy ) item = false; + for (auto & item : sched->needs_sync) item = true; + + if (sched->split_mode_graph) { + auto tensor_size = [] (const ggml_tensor * t) { + auto nbytes = ggml_nbytes(t); + nbytes = 256*((nbytes + 255)/256); + return nbytes; + }; + //auto tim1 = std::chrono::steady_clock::now(); + for (auto & b : sched->backend_splits) b.clear(); + for (int i = 0; i < sched->n_splits; i++) { + sched->backend_splits[sched->splits[i].backend_id].push_back(&sched->splits[i]); + } + for (int backend_id = 0; backend_id < sched->n_backends; ++backend_id) { + if (ggml_backend_is_cpu(ggml_backend_sched_get_backend(sched, backend_id))) continue; + if (sched->backend_splits[backend_id].empty()) continue; + size_t input_size = 0; + size_t max_input_size = 0; + int last_split = 0; + bool can_alloc = true; + for (int i = 0; i < int(sched->backend_splits[backend_id].size()); ++i) { + auto split = sched->backend_splits[backend_id][i]; + if (split->n_inputs < 1) continue; + size_t this_size = 0; + for (int j = 0; j < split->n_inputs; ++j) { + if (!ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { + this_size += tensor_size(split->inputs[j]); + } + } + if (input_size + this_size > sched->max_extra_alloc) { + if (i - last_split < 3) { + can_alloc = false; + break; + } + max_input_size = std::max(max_input_size, input_size); + input_size = 0; + last_split = i - 1; + } + input_size += this_size; + } + max_input_size = std::max(max_input_size, input_size); + if (!can_alloc || !max_input_size) continue; + if (sched->input_memory_bufs[backend_id] && sched->input_memory_bufs[backend_id]->size < max_input_size) { + ggml_backend_buffer_free(sched->input_memory_bufs[backend_id]); + sched->input_memory_bufs[backend_id] = nullptr; + } + if (!sched->input_memory_bufs[backend_id]) { + sched->input_memory_bufs[backend_id] = ggml_backend_alloc_buffer(sched->backends[backend_id], max_input_size); + } + auto ptr = (char *)ggml_backend_buffer_get_base(sched->input_memory_bufs[backend_id]); + input_size = 0; + for (int i = 0; i < int(sched->backend_splits[backend_id].size()); ++i) { + auto split = sched->backend_splits[backend_id][i]; + size_t this_size = 0; + for (int j = 0; j < split->n_inputs; ++j) { + if (!ggml_backend_buffer_is_host(split->inputs[j]->buffer)) { + this_size += tensor_size(split->inputs[j]); + } + } + if (input_size + this_size > max_input_size) { + ptr = (char *)ggml_backend_buffer_get_base(sched->input_memory_bufs[backend_id]); + input_size = 0; + } + for (int j = 0; j < split->n_inputs; ++j) { + if (ggml_backend_buffer_is_host(split->inputs[j]->buffer)) continue; + auto input_cpy = tensor_copy(split->inputs[j], backend_id, sched->cur_copy); + for (int k = 0; k < split->graph.n_nodes; ++k) { + auto node = split->graph.nodes[k]; + for (int l = 0; l < GGML_MAX_SRC; ++l) { + if (node->src[l] && node->src[l]->data == input_cpy->data) node->src[l]->data = ptr; + } + } + input_cpy->data = ptr; + ptr += tensor_size(split->inputs[j]); + } + input_size += this_size; + } + sched->needs_sync[backend_id] = false; + sched->own_cpy[backend_id] = true; + } + } +} + enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sched, struct ggml_cgraph * graph) { if (!sched->is_reset && !sched->is_alloc) { ggml_backend_sched_reset(sched); @@ -2475,6 +2498,7 @@ enum ggml_status ggml_backend_sched_graph_compute_async(ggml_backend_sched_t sch if (!ggml_backend_sched_alloc_graph(sched, graph)) { return GGML_STATUS_ALLOC_FAILED; } + ggml_sched_prepare_graph(sched); } return ggml_backend_sched_compute_splits(sched);