Remove fasttensors, add platform-agnostic multithreaded ST loader

This commit is contained in:
turboderp
2024-09-17 00:33:16 +02:00
parent 144c576bdb
commit b25210778c
17 changed files with 431 additions and 911 deletions

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import torch
import math
from exllamav2.fasttensors import STFile, cleanup_stfiles
from exllamav2.stloader import STFile, cleanup_stfiles
from exllamav2.architecture import ExLlamaV2ArchParams
import os, glob, json
from typing import Any, Dict, List, TypeVar, Union, cast
@@ -61,7 +61,7 @@ class ExLlamaV2Config:
no_flash_attn: bool # Implementation will automatically use flash-attn-2 when available, set True to override
no_xformers: bool # Implementation will automatically use xformers for sm<80 when available, unless flash-attn-2 is available, set True to override
no_sdpa: bool # Do not use Torch SDPA even if causal_lower_right bias is available (seems to be unreliable on ROCm (?))
fasttensors: bool # Use alternative .safetensors loader (aio on Linux, cstdio on Windows). Not always faster but can address excessive use of system RAM in some situations
fasttensors: bool # Deprecated, has no effect
load_in_q4: bool # Load float linear layers in Q4 format (for test/dev purposes, not performant)
no_graphs: bool # Do not use CUDA graphs
@@ -323,7 +323,7 @@ class ExLlamaV2Config:
raise ValueError(f" ## No .safetensors files found in {self.model_dir}")
for st_file in self.tensor_files:
f = STFile.open(st_file, fast = self.fasttensors, keymap = self.arch.keymap)
f = STFile.open(st_file, keymap = self.arch.keymap)
for key in f.get_dict():
self.tensor_file_map[key] = st_file

View File

@@ -31,7 +31,7 @@ parser.add_argument("-l", "--length", type = int, default = 2048, help = "Max no
parser.add_argument("-ml", "--measurement_length", type = int, default = 2048, help = "Max no. tokens per sample when measuring")
parser.add_argument("-so", "--status_output", action = "store_true", help = "Include machine-parseable status updates in console output")
parser.add_argument("-hsol", "--hidden_state_offload_layers", type = int, default = 0, help = "Number of hidden/target states to keep in VRAM. Speed-up but increases VRAM usage")
parser.add_argument("-fst", "--fast_safetensors", action = "store_true", help = "Use fast-safetensors to load layers of the unquantized model. This can help alleviate some out-of-memory issues, especially on Windows.")
parser.add_argument("-fst", "--fast_safetensors", action = "store_true", help = "Deprecated (does nothing)")
args = parser.parse_args()
@@ -113,7 +113,6 @@ job = {"in_dir": args.in_dir,
"rope_scale": args.rope_scale,
"rope_alpha": args.rope_alpha,
"output_measurement": output_measurement,
"fast_safetensors": args.fast_safetensors,
"progress": "begin"}
if args.measurement is not None:
@@ -162,8 +161,6 @@ if job["output_measurement"] is None:
else:
print(f" -- Measurement will be saved to {job['output_measurement']}")
print(f" !! Conversion script will end after measurement pass")
if job.get("fast_safetensors"):
print(f" -- Enabled fast_safetensors option.")
if job['rope_scale']: print(f" -- RoPE scale: {job['rope_scale']:.2f}")
if job['rope_alpha']: print(f" -- RoPE alpha: {job['rope_alpha']:.2f}")
@@ -194,10 +191,6 @@ config.arch_compat_overrides()
tokenizer = ExLlamaV2Tokenizer(config)
# Set fast_safetensors in config
if job.get("fast_safetensors"): config.fasttensors = True
# Set scaling for input model
if job["rope_scale"] is not None: config.scale_pos_emb = job["rope_scale"]

View File

@@ -1,517 +0,0 @@
#include "safetensors.h"
#include <c10/cuda/CUDAGuard.h>
#include "util.h"
#ifdef __linux__
#include <aio.h>
#include <atomic>
#include <thread>
#include <limits>
#include <cerrno>
#include <iostream>
#include <fstream>
#include <fcntl.h>
#include <unistd.h>
#endif
#include <cstdio>
#include <cuda_runtime.h>
#define MAX_BLOCK_SIZE (128*1024)
#define MAX_PAGES 4
#define PAGESIZE (16*1024*1024)
#define Q_DEPTH 1
#define PINNED_MEMORY (MAX_PAGES * PAGESIZE)
//#define ST_DEBUG
void* pinned_buffer = nullptr;
void* aligned_buffer = nullptr;
struct STPage
{
public:
int file_descriptor;
#ifdef __linux__
size_t file_a;
size_t file_b;
long access;
std::atomic<int> locks;
char* ptr;
#endif
};
STPage pages[MAX_PAGES];
long serial = 1;
void safetensors_pinned_buffer()
{
#ifdef __linux__
if (pinned_buffer) return;
cudaMallocHost((void**) &pinned_buffer, PINNED_MEMORY + MAX_BLOCK_SIZE);
TORCH_CHECK(pinned_buffer, "Unable to allocate pinned memory");
aligned_buffer = (void*) ((char *)(((uintptr_t)pinned_buffer + MAX_BLOCK_SIZE - 1) & ~(MAX_BLOCK_SIZE - 1)));
for (int i = 0; i < MAX_PAGES; i++)
{
pages[i].file_descriptor = -1;
pages[i].file_a = 0;
pages[i].file_b = 0;
pages[i].access = -1;
pages[i].locks.store(0);
pages[i].ptr = ((char*) aligned_buffer) + i * PAGESIZE;
}
#endif
}
void safetensors_free_pinned_buffer()
{
#ifdef __linux__
if (!pinned_buffer) return;
cudaFreeHost((void*) pinned_buffer);
pinned_buffer = nullptr;
aligned_buffer = nullptr;
#endif
}
STPage* get_cache_page(size_t file_descriptor, size_t block_size, size_t filesize, size_t file_a, size_t file_b)
{
#ifdef __linux__
#ifdef ST_DEBUG
printf("-- get cache page\n");
DBGX3(file_descriptor, file_a, file_b);
DBGX2(block_size, filesize);
#endif
// Find existing page in cache
for (int i = 0; i < MAX_PAGES; i++)
{
if (static_cast<size_t>(pages[i].file_descriptor) == file_descriptor &&
pages[i].file_a == file_a &&
pages[i].file_b == file_b)
{
pages[i].access = serial++;
return &pages[i];
}
}
// Find page to evict
int oldest_i = -1;
long oldest = std::numeric_limits<long>::max();
while (oldest_i == -1)
{
for (int i = 0; i < MAX_PAGES; i++)
{
if (pages[i].locks.load() > 0) continue;
if (pages[i].access < oldest)
{
oldest_i = i;
oldest = pages[i].access;
}
}
}
#ifdef ST_DEBUG
printf("-- evict page\n");
DBGX(oldest_i);
#endif
// Load page
#ifdef ST_DEBUG
printf("-- load page\n");
DBGX3(file_descriptor, file_a, file_b);
#endif
int p = oldest_i;
pages[p].access = serial++;
pages[p].file_a = file_a;
pages[p].file_b = file_b;
pages[p].file_descriptor = file_descriptor;
aiocb aiocb_list[Q_DEPTH];
size_t read_lens[Q_DEPTH];
int num_chunks = 0;
size_t q_chunk = PAGESIZE / Q_DEPTH;
size_t q_a = file_a;
char* page_ptr = pages[p].ptr;
for (int i = 0; i < Q_DEPTH; ++i)
{
size_t q_b = q_a + q_chunk;
if (q_b > filesize) q_b = filesize;
size_t read_len = q_b - q_a;
read_lens[i] = read_len;
//read_len = DIVIDE(read_len, 2 * block_size) * 2 * block_size;
read_len = q_chunk;
memset(&aiocb_list[i], 0, sizeof(aiocb));
aiocb_list[i].aio_fildes = file_descriptor;
aiocb_list[i].aio_buf = page_ptr;
aiocb_list[i].aio_nbytes = read_len;
aiocb_list[i].aio_offset = q_a;
#ifdef ST_DEBUG
DBGX3(q_a, q_b, read_len);
DBGX2(filesize, read_lens[i]);
#endif
aio_read(&aiocb_list[i]);
num_chunks++;
if (q_b >= filesize) break;
page_ptr += q_chunk;
q_a += q_chunk;
}
q_a = file_a;
for (int i = 0; i < num_chunks; ++i)
{
struct aiocb *aiocb_active[1];
aiocb_active[0] = &aiocb_list[i];
aio_suspend(aiocb_active, 1, NULL);
int err = aio_error(&aiocb_list[i]);
#ifdef ST_DEBUG
DBGX(err);
#endif
TORCH_CHECK(err == 0, "Async read error (1)");
ssize_t bytes_read = aio_return(&aiocb_list[i]);
#ifdef ST_DEBUG
DBGX2(bytes_read, read_lens[i]);
#endif
TORCH_CHECK(bytes_read == static_cast<ssize_t>(read_lens[i]), "Async read error (2)");
}
return &pages[p];
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
return NULL;
#endif
}
uintptr_t safetensors_open(const char* filename)
{
#ifdef __linux__
STFile* f = new STFile(filename);
return reinterpret_cast<uintptr_t> (f);
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
return 0;
#endif
}
void safetensors_close(uintptr_t handle)
{
#ifdef __linux__
STFile* f = reinterpret_cast<STFile*> (handle);
delete f;
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
#endif
}
STFile::STFile(const char* filename)
{
#ifdef __linux__
file_descriptor = open(filename, O_RDONLY | O_DIRECT);
TORCH_CHECK(file_descriptor != -1, "Safetensors file I/O error");
struct stat sb;
auto res = fstat(file_descriptor, &sb);
TORCH_CHECK(res != -1, "Safetensors fstat failed");
filesize = sb.st_size;
block_size = sb.st_blksize;
padded_size = DIVIDE(filesize, block_size) * block_size;
TORCH_CHECK(block_size <= MAX_BLOCK_SIZE, "Block size too large")
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
#endif
}
STFile::~STFile()
{
#ifdef __linux__
close(file_descriptor);
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
#endif
}
void dec_lock(cudaStream_t stream, cudaError_t status, void *user_data)
{
#ifdef __linux__
STPage* p = (STPage*) user_data;
p->locks--;
#endif
}
void STFile::load
(
torch::Tensor target,
size_t offset,
size_t length,
bool gpu
)
{
#ifdef __linux__
safetensors_pinned_buffer();
#ifdef ST_DEBUG
printf("-- load tensor\n");
DBGX2(offset, length);
DBGI(length);
#endif
// Get cache pages
size_t file_b = offset / PAGESIZE * PAGESIZE;
/* doest appear to be utilized rn
size_t file_c = DIVIDE(offset + length, PAGESIZE) * PAGESIZE;
*/
// Loop over pages
size_t file_a = file_b;
size_t tensor_offset = 0;
while (tensor_offset < length)
{
file_a = file_b;
file_b += PAGESIZE;
STPage* page = get_cache_page(file_descriptor, block_size, filesize, file_a, file_b);
ssize_t left = offset - file_a;
if (left < 0) left = 0;
ssize_t right = offset + length - file_a;
if (right > PAGESIZE) right = PAGESIZE;
ssize_t copy_len = right - left;
#ifdef ST_DEBUG
printf("-- copy chunk\n");
DBGX3(left, right, copy_len);
DBGX(tensor_offset);
DBGI(copy_len);
#endif
char* src = page->ptr + left;
char* dst = ((char*) target.data_ptr()) + tensor_offset;
if (gpu)
{
page->locks++;
cudaMemcpyAsync(dst, src, copy_len, cudaMemcpyHostToDevice);
cudaStreamAddCallback(NULL, dec_lock, (void*) page, 0);
}
else
{
memcpy(dst, src, copy_len);
}
//cudaDeviceSynchronize();
tensor_offset += copy_len;
}
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
#endif
}
void safetensors_load
(
uintptr_t handle,
torch::Tensor target,
size_t offset,
size_t length
)
{
#ifdef __linux__
STFile* f = reinterpret_cast<STFile*> (handle);
c10::optional<torch::Device> device = torch::device_of(target);
if (device.has_value() && device->type() == torch::kCPU)
{
f->load(target, offset, length, false);
}
else
{
const at::cuda::OptionalCUDAGuard device_guard(device);
f->load(target, offset, length, true);
}
#else
TORCH_CHECK(false, "fasttensors only supported on Linux");
#endif
}
// Fallback routines for Windows
void* read_buffer;
int read_buffer_refcount = 0;
#define READ_BUFFER_SIZE (1024*1024)
uintptr_t safetensors_open_fb(const char* filename)
{
FILE* file = fopen(filename, "rb");
TORCH_CHECK(file != nullptr, "Can't open safetensors file");
read_buffer_refcount++;
if (read_buffer_refcount == 1)
{
read_buffer = malloc(READ_BUFFER_SIZE);
}
return reinterpret_cast<uintptr_t> (file);
}
void safetensors_close_fb(uintptr_t handle)
{
FILE* file = reinterpret_cast<FILE*> (handle);
fclose(file);
read_buffer_refcount--;
if (read_buffer_refcount == 0)
{
free(read_buffer);
read_buffer = NULL;
}
}
void safetensors_read_fb(uintptr_t handle, size_t beg, size_t size, torch::Tensor target)
{
TORCH_CHECK(read_buffer, "No read buffer");
FILE* file = reinterpret_cast<FILE*> (handle);
char* output = (char*) target.data_ptr();
c10::optional<torch::Device> device = torch::device_of(target);
bool target_cpu = (device.has_value() && device->type() == torch::kCPU);
#ifdef __linux__
int r = fseek(file, beg, SEEK_SET);
#else
int r = _fseeki64(file, static_cast<__int64>(beg), SEEK_SET);
#endif
TORCH_CHECK(!r, "Error seeking safetensors file");
if (target_cpu)
{
size_t bytes_read = fread(output, 1, size, file);
TORCH_CHECK(bytes_read == size, "Error reading safetensors file (EOF)");
}
else
{
const at::cuda::OptionalCUDAGuard device_guard(device);
size_t remaining = size;
while (remaining)
{
size_t chunk = READ_BUFFER_SIZE;
if (remaining < chunk) chunk = remaining;
size_t bytes_read = fread(read_buffer, 1, chunk, file);
TORCH_CHECK(bytes_read == chunk, "Error reading safetensors file (EOF)");
cudaError_t cr = cudaMemcpy(output, read_buffer, chunk, cudaMemcpyHostToDevice);
TORCH_CHECK(cr == cudaSuccess, "Failed to copy tensor data to device memory");
output += chunk;
remaining -= chunk;
}
}
}
void tensor_remap
(
torch::Tensor tensor,
torch::Tensor index
)
{
TORCH_CHECK_SHAPES(tensor, 1, index, 0, 1);
TORCH_CHECK_DTYPE(tensor, kInt);
TORCH_CHECK_DTYPE(index, kInt);
int rows = tensor.size(0);
int cols = tensor.size(1);
uint32_t* temp = (uint32_t*) calloc(cols, sizeof(int));
uint32_t* a = (uint32_t*) tensor.data_ptr();
uint32_t* idx = (uint32_t*) index.data_ptr();
for (int r = 0; r < rows; ++r)
{
memcpy(temp, a, sizeof(uint32_t) * cols);
for (int c = 0; c < cols; ++c)
{
*a++ = temp[idx[c]];
}
}
free(temp);
}
void tensor_remap_4bit
(
torch::Tensor tensor,
torch::Tensor index
)
{
TORCH_CHECK_SHAPES(index, 0, tensor, 1, 8);
TORCH_CHECK_DTYPE(tensor, kInt);
TORCH_CHECK_DTYPE(index, kInt);
int rows = tensor.size(0);
int cols = index.size(0);
uint32_t* temp = (uint32_t*) calloc(cols / 8, sizeof(int));
uint32_t* a = (uint32_t*) tensor.data_ptr();
uint32_t* idx = (uint32_t*) index.data_ptr();
for (int r = 0; r < rows; ++r)
{
memcpy(temp, a, sizeof(uint32_t) * cols / 8);
for (int c = 0; c < cols;)
{
uint32_t rv = 0;
for (int b = 0; b < 8; ++b, ++c)
{
uint32_t i = idx[c];
uint32_t v = (temp[i / 8] >> ((i & 7) * 4) & 0x0f);
rv |= v << (b * 4);
}
*a++ = rv;
}
}
free(temp);
}

View File

@@ -1,63 +0,0 @@
#ifndef _safetensors_h
#define _safetensors_h
#include <torch/extension.h>
#include <cuda_runtime.h>
#include <cuda_fp16.h>
#include <ATen/cuda/CUDAContext.h>
#include <cstdint>
#include <cstdio>
class STFile
{
public:
STFile(const char* filename);
~STFile();
void load
(
torch::Tensor target,
size_t offset,
size_t length,
bool gpu
);
int file_descriptor;
size_t filesize;
size_t padded_size;
size_t block_size;
void* aligned_buffer;
};
void safetensors_pinned_buffer();
void safetensors_free_pinned_buffer();
uintptr_t safetensors_open(const char* filename);
void safetensors_close(uintptr_t handle);
void safetensors_load
(
uintptr_t handle,
torch::Tensor target,
size_t offset,
size_t length
);
uintptr_t safetensors_open_fb(const char* filename);
void safetensors_close_fb(uintptr_t handle);
void safetensors_read_fb(uintptr_t handle, size_t beg, size_t size, torch::Tensor target);
void tensor_remap
(
torch::Tensor tensor,
torch::Tensor index
);
void tensor_remap_4bit
(
torch::Tensor tensor,
torch::Tensor index
);
#endif

View File

@@ -10,7 +10,7 @@
#include "ext_quant.h"
#include "ext_sampling.h"
#include "ext_safetensors.h"
#include "ext_stloader.h"
#include "ext_qmatrix.h"
#include "ext_qattn.h"
#include "ext_qmlp.h"
@@ -45,16 +45,9 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
m.def("dump_profile_results", &dump_profile_results, "dump_profile_results");
m.def("partial_strings_match", &partial_strings_match, "partial_strings_match");
// safetensors
// safetensors/stloader
m.def("safetensors_open", &safetensors_open, "safetensors_open");
m.def("safetensors_open_fb", &safetensors_open_fb, "safetensors_open_fb");
m.def("safetensors_close", &safetensors_close, "safetensors_close");
m.def("safetensors_close_fb", &safetensors_close_fb, "safetensors_close_fb");
m.def("safetensors_load", &safetensors_load, "safetensors_load");
m.def("safetensors_pinned_buffer", &safetensors_pinned_buffer, "safetensors_pinned_buffer");
m.def("safetensors_free_pinned_buffer", &safetensors_free_pinned_buffer, "safetensors_free_pinned_buffer");
m.def("safetensors_read_fb", &safetensors_read_fb, "safetensors_read_fb");
m.def("stloader_read", &stloader_read, "stloader_read");
m.def("tensor_remap", &tensor_remap, "tensor_remap");
m.def("tensor_remap_4bit", &tensor_remap_4bit, "tensor_remap_4bit");

View File

@@ -1,18 +0,0 @@
#include <torch/extension.h>
#include <c10/cuda/CUDAGuard.h>
#include <ATen/cuda/CUDAContext.h>
#include <cuda_runtime.h>
#include <cuda_fp16.h>
#include <cstdint>
#include <cstdio>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include "config.h"
#include "ext_safetensors.h"
#include "cpp/safetensors.h"
#include "cpp/util.h"

View File

@@ -1,2 +0,0 @@
#include "cpp/safetensors.h"

View File

@@ -0,0 +1,208 @@
#include "ext_stloader.h"
#include "cpp/util.h"
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <deque>
#include <condition_variable>
void stloader_read
(
const char* filename,
size_t offset,
size_t size,
torch::Tensor target
)
{
c10::optional<torch::Device> device = torch::device_of(target);
bool target_cpu = (device.has_value() && device->type() == torch::kCPU);
// Buffers
uint8_t* load_buffer;
uint8_t* cuda_buffer;
if (target_cpu)
{
load_buffer = (uint8_t*) target.data_ptr();
cuda_buffer = nullptr;
}
else
{
load_buffer = (uint8_t*) malloc(size);
TORCH_CHECK(load_buffer, "Can't allocate buffer for tensor");
cuda_buffer = (uint8_t*) target.data_ptr();
}
// Synchronization
Py_BEGIN_ALLOW_THREADS
volatile bool load_failed = false;
std::mutex mtx;
std::deque<std::pair<size_t, size_t>> dq;
std::condition_variable cv;
// Load chunks
auto load_worker = [&] (size_t pos_a)
{
FILE* file = fopen(filename, "rb");
if (!file) goto error;
while (pos_a < size && !load_failed)
{
size_t pos_b = pos_a + STLOADER_BLOCK_SIZE;
if (pos_b > size) pos_b = size;
#ifdef __linux__
ssize_t br = pread(fileno(file), load_buffer + pos_a, pos_b - pos_a, offset + pos_a);
if (br != pos_b - pos_a) goto error;
int sr = fseek(file, offset + pos_a, SEEK_SET);
#else
int sr = _fseeki64(file, static_cast<__int64>(offset + pos_a), SEEK_SET);
if (sr) goto error;
size_t br = fread(load_buffer + pos_a, 1, pos_b - pos_a, file);
if (br != pos_b - pos_a) goto error;
#endif
{
std::lock_guard<std::mutex> lock(mtx);
dq.push_back(std::pair<size_t, size_t>(pos_a, pos_b));
cv.notify_one();
}
// DBGX3(pos_a, pos_b, br);
pos_a += STLOADER_THREADS * STLOADER_BLOCK_SIZE;
}
fclose(file);
return;
error:
load_failed = true;
};
// Copy chunks to device
auto copy_worker = [&] ()
{
size_t total_blocks = DIVIDE(size, STLOADER_BLOCK_SIZE);
while (total_blocks && !load_failed)
{
size_t pos_a, pos_b;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&dq] { return !dq.empty(); });
auto pop = dq.front();
dq.pop_front();
total_blocks--;
pos_a = std::get<0>(pop);
pos_b = std::get<1>(pop);
while (!dq.empty() && std::get<0>(dq.front()) == pos_b)
{
pop = dq.front();
dq.pop_front();
pos_b = std::get<1>(pop);
total_blocks--;
}
}
// DBGX2(pos_a, pos_b);
cudaError_t cr = cudaMemcpyAsync
(
cuda_buffer + pos_a,
load_buffer + pos_a,
pos_b - pos_a,
cudaMemcpyHostToDevice
);
if (cr != cudaSuccess) goto error;
}
return;
error:
load_failed = true;
};
std::vector<std::thread> threads;
for (size_t i = 0; i < STLOADER_THREADS && i * STLOADER_BLOCK_SIZE < size; ++i)
threads.emplace_back(load_worker, i * STLOADER_BLOCK_SIZE);
if (cuda_buffer)
threads.emplace_back(copy_worker);
for (auto& thread : threads)
thread.join();
TORCH_CHECK(!load_failed, "I/O error reading tensor");
if (!target_cpu)
{
free(load_buffer);
cudaDeviceSynchronize();
}
Py_END_ALLOW_THREADS
}
void tensor_remap
(
torch::Tensor tensor,
torch::Tensor index
)
{
TORCH_CHECK_SHAPES(tensor, 1, index, 0, 1);
TORCH_CHECK_DTYPE(tensor, kInt);
TORCH_CHECK_DTYPE(index, kInt);
int rows = tensor.size(0);
int cols = tensor.size(1);
uint32_t* temp = (uint32_t*) calloc(cols, sizeof(int));
uint32_t* a = (uint32_t*) tensor.data_ptr();
uint32_t* idx = (uint32_t*) index.data_ptr();
for (int r = 0; r < rows; ++r)
{
memcpy(temp, a, sizeof(uint32_t) * cols);
for (int c = 0; c < cols; ++c)
{
*a++ = temp[idx[c]];
}
}
free(temp);
}
void tensor_remap_4bit
(
torch::Tensor tensor,
torch::Tensor index
)
{
TORCH_CHECK_SHAPES(index, 0, tensor, 1, 8);
TORCH_CHECK_DTYPE(tensor, kInt);
TORCH_CHECK_DTYPE(index, kInt);
int rows = tensor.size(0);
int cols = index.size(0);
uint32_t* temp = (uint32_t*) calloc(cols / 8, sizeof(int));
uint32_t* a = (uint32_t*) tensor.data_ptr();
uint32_t* idx = (uint32_t*) index.data_ptr();
for (int r = 0; r < rows; ++r)
{
memcpy(temp, a, sizeof(uint32_t) * cols / 8);
for (int c = 0; c < cols;)
{
uint32_t rv = 0;
for (int b = 0; b < 8; ++b, ++c)
{
uint32_t i = idx[c];
uint32_t v = (temp[i / 8] >> ((i & 7) * 4) & 0x0f);
rv |= v << (b * 4);
}
*a++ = rv;
}
}
free(temp);
}

View File

@@ -0,0 +1,31 @@
#include <torch/extension.h>
#include <cuda_runtime.h>
#include <cuda_fp16.h>
#include <cstdint>
#include <cstdio>
#include <c10/cuda/CUDAGuard.h>
#include <ATen/cuda/CUDAContext.h>
#define STLOADER_BLOCK_SIZE (1*1024*1024)
#define STLOADER_THREADS 8
void stloader_read
(
const char* filename,
size_t offset,
size_t size,
torch::Tensor target
);
void tensor_remap
(
torch::Tensor tensor,
torch::Tensor index
);
void tensor_remap_4bit
(
torch::Tensor tensor,
torch::Tensor index
);

View File

@@ -213,7 +213,7 @@ if build_jit:
"ext_qmlp.cpp",
"ext_quant.cpp",
"ext_rope.cpp",
"ext_safetensors.cpp",
"ext_stloader.cpp",
"ext_sampling.cpp",
"ext_element.cpp",
"ext_tp.cpp",
@@ -250,7 +250,6 @@ if build_jit:
"cpp/generator.cpp",
"cpp/sampling.cpp",
"cpp/sampling_avx2.cpp",
"cpp/safetensors.cpp"
]
sources = [os.path.join(sources_dir, s) for s in sources_]

View File

@@ -1,252 +0,0 @@
from __future__ import annotations
import torch
from safetensors import safe_open
import numpy as np
import json
from exllamav2.ext import exllamav2_ext as ext_c
import os
def convert_dtype(dt: str):
"""
:param dt:
Datatype string as used by safetensors
:return:
Torch type, element size in bytes
"""
if dt == "I32": return torch.int, 4
elif dt == "I16": return torch.short, 2
elif dt == "F16": return torch.float16, 2
elif dt == "BF16": return torch.bfloat16, 2
elif dt == "F32": return torch.float, 4
else: raise ValueError(f"Unknown dtype {dt}")
global_stfiles = []
global_cm = {}
global_tensorcache = []
num_cached_tensors = 4
def cleanup_stfiles():
global global_stfiles, global_cm
for stf in global_stfiles:
stf.close()
global_stfiles = []
for f in global_cm.values():
f.__exit__(None, None, None)
global_cm = {}
ext_c.safetensors_free_pinned_buffer()
class STFile:
filename: str
header: dict
header_size: int
metadata: object
handle: int
handle_fb: int
fast: bool
st_context = None
tensor_remap: dict | None
def __init__(
self,
filename: str,
fast: bool = True,
keymap: list[tuple[str, str]] = None
):
global global_stfiles
self.metadata = None
self.handle = 0
self.handle_fb = 0
self.filename = filename
self.st_context = None
self.tensor_remap = None
self.read_dict()
if keymap:
self.tensor_remap = {}
nheader = {}
for key in self.header.keys():
nkey = key
for z in keymap:
if z[0].startswith("$") and nkey.startswith(z[0][1:]):
nkey = ("$" + nkey).replace(z[0], z[1])
else:
nkey = nkey.replace(z[0], z[1])
nheader[nkey] = self.header[key]
self.tensor_remap[nkey] = key
self.header = nheader
# TODO: Create platform-independent multithreaded loader to replace direct IO loader on Linux and fallback
# cstdio loader on Windows
if fast and os.name == "nt":
self.fast_fb = True
fast = False
else:
self.fast_fb = False
self.fast = fast
if self.fast:
self.handle = ext_c.safetensors_open(filename)
if self.fast_fb:
self.handle_fb = ext_c.safetensors_open_fb(filename)
global_stfiles.append(self)
@staticmethod
def open(
filename,
fast = True,
keymap: list[tuple[str, str]] = None
) -> STFile:
"""
Open safetensors file, scan header and retain handle.
:param filename:
Filename
:param fast:
Use fast (direct I/O) codepath
:param keymap:
List of (a, b) tuples for string replacements in key index
:return:
STFile object
"""
global global_stfiles
for f in global_stfiles:
if f.filename == filename: return f
return STFile(filename, fast, keymap)
def close(self):
"""
Close file handle (if necessary)
"""
if self.fast: ext_c.safetensors_close(self.handle)
if self.fast_fb: ext_c.safetensors_close_fb(self.handle_fb)
def read_dict(self):
with open(self.filename, "rb") as fp:
header_size = np.fromfile(fp, dtype = np.int64, count = 1).item()
header_json = fp.read(header_size)
self.header = json.loads(header_json.decode("utf-8"))
self.header_size = fp.tell()
if "__metadata__" in self.header:
self.metadata = self.header["__metadata__"]
del self.header["__metadata__"]
def get_dict(self) -> dict:
return self.header
def get_metadata(self) -> object:
return self.metadata
def measure(self, key):
"""
:param key:
Tensor key
:return:
Byte size of tensor
"""
v = self.header[key]
data_offsets = v["data_offsets"]
length = data_offsets[1] - data_offsets[0]
return length
def get_cm(self, device):
global global_cm
cm_key = self.filename + "::" + device
if cm_key in global_cm:
return global_cm[cm_key]
f = safe_open(self.filename, framework = "pt", device = device)
f.__enter__()
global_cm[cm_key] = f
return f
def get_tensor(
self,
key: str,
device,
not_fast: bool = False,
cached: bool = False,
out_dtype = None
) -> torch.Tensor:
global global_tensorcache
torch.cuda.synchronize()
if device != "cpu":
torch.cuda.set_stream(torch.cuda.default_stream(device))
if self.tensor_remap and (not_fast or not self.fast):
key = self.tensor_remap[key]
if cached:
cachekey = self.filename + "::" + key + "::" + device
for (k, v) in global_tensorcache:
if k == cachekey: return v
if not_fast or (not self.fast and not self.fast_fb):
# with safe_open(self.filename, framework = "pt", device = device) as f:
f = self.get_cm(device)
tensor = f.get_tensor(key)
elif self.fast_fb:
h = self.header[key]
dtype, esize = convert_dtype(h["dtype"])
beg, end = h["data_offsets"]
size = end - beg
numel = size // esize
shape = h["shape"]
tensor = torch.zeros(shape, dtype = dtype, device = device)
assert tensor.is_contiguous, "Non-contiguous tensor"
ext_c.safetensors_read_fb(self.handle_fb, beg + self.header_size, size, tensor)
else:
v = self.header[key]
dtt, dts = convert_dtype(v["dtype"])
sh = v["shape"]
data_offsets = v["data_offsets"]
offset = data_offsets[0] + self.header_size
length = data_offsets[1] - data_offsets[0]
assert np.prod(sh) * dts == length, f"Tensor shape doesn't match storage size: {key}"
if device != "cpu":
torch.cuda.set_stream(torch.cuda.default_stream(device))
tensor = torch.empty(sh, device = device, dtype = dtt)
ext_c.safetensors_load(self.handle, tensor, offset, length)
if out_dtype:
tensor = tensor.to(out_dtype)
if cached:
if len(global_tensorcache) >= num_cached_tensors:
global_tensorcache = global_tensorcache[1:]
global_tensorcache.append((cachekey, tensor))
torch.cuda.synchronize()
return tensor

View File

@@ -46,7 +46,7 @@ from exllamav2.parallel_decoder import ExLlamaV2ParallelDecoder
from exllamav2.embedding import ExLlamaV2Embedding
from exllamav2.pos_embedding import ExLlamaV2PosEmbedding
from exllamav2.compat import safe_move_tensor
from exllamav2.fasttensors import cleanup_stfiles
from exllamav2.stloader import cleanup_stfiles
from exllamav2.device import ExLlamaV2DeviceContext, set_device_streams
from exllamav2.tensor_p import TPContext, BROADCAST_VC
import gc

View File

@@ -22,7 +22,7 @@ def add_args(parser):
parser.add_argument("-lm", "--low_mem", action = "store_true", help = "Enable VRAM optimizations, potentially trading off speed")
parser.add_argument("-ept", "--experts_per_token", type = int, help = "Override MoE model's default number of experts per token")
parser.add_argument("-lq4", "--load_q4", action = "store_true", help = "Load weights in Q4 mode")
parser.add_argument("-fst", "--fast_safetensors", action = "store_true", help = "Use alternative safetensors loader (with direct I/O when available)")
parser.add_argument("-fst", "--fast_safetensors", action = "store_true", help = "Deprecated (does nothing)")
parser.add_argument("-ic", "--ignore_compatibility", action = "store_true", help = "Do not override model config options in case of compatibility issues")
parser.add_argument("-chunk", "--chunk_size", type = int, help = "Chunk size ('input length')")
@@ -43,7 +43,6 @@ def print_options(args):
if args.no_sdpa: print_opts += ["no_sdpa"]
if args.no_graphs: print_opts += ["no_graphs"]
if args.low_mem: print_opts += ["low_mem"]
if hasattr(args, "fast_safetensors") and args.fast_safetensors: print_opts += ["fast_safetensors"]
if args.experts_per_token is not None: print_opts += [f"experts_per_token: {args.experts_per_token}"]
if args.load_q4: print_opts += ["load_q4"]
if args.ignore_compatibility: print_opts += ["ignore_compatibility"]
@@ -94,7 +93,6 @@ def init(
config = ExLlamaV2Config()
config.model_dir = args.model_dir
config.fasttensors = hasattr(args, "fast_safetensors") and args.fast_safetensors
config.prepare()
# Set config options

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import torch
import torch.nn as nn
from exllamav2.config import ExLlamaV2Config
from exllamav2.fasttensors import STFile
from exllamav2.stloader import STFile
from exllamav2.compat import safe_move_tensor
from typing import TYPE_CHECKING
@@ -81,7 +81,7 @@ class ExLlamaV2Module:
submap_i[v].append(k)
for v, ks in submap_i.items():
stfile = STFile.open(v, fast = self.model.config.fasttensors, keymap = self.model.config.arch.keymap)
stfile = STFile.open(v, keymap = self.model.config.arch.keymap)
for k in ks:
if measure:
size += stfile.measure(key + "." + k)
@@ -160,9 +160,8 @@ class ExLlamaV2Module:
filename = cfg.tensor_file_map.get(key)
if not filename: continue
stfile = STFile.open(filename, fast = cfg.fasttensors, keymap = cfg.arch.keymap)
# tensor = stfile.get_tensor(key, device = self.device()).half()
tensor = stfile.get_tensor(key, device = "cpu", cached = True, out_dtype = torch.half)
stfile = STFile.open(filename, keymap = cfg.arch.keymap)
tensor = stfile.get_tensor(key, device = "cpu", out_dtype = torch.half)
if cfg.arch.orig_weights_transposed and len(tensor.shape) == 2:
tensor = tensor.T

167
exllamav2/stloader.py Normal file
View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import torch
import numpy as np
import json
from exllamav2.ext import none_tensor, exllamav2_ext as ext_c
import os
def convert_dtype(dt: str):
"""
:param dt:
Datatype string as used by safetensors
:return:
Torch type, element size in bytes
"""
if dt == "I32": return torch.int, 4
elif dt == "I16": return torch.short, 2
elif dt == "F16": return torch.float16, 2
elif dt == "BF16": return torch.bfloat16, 2
elif dt == "F32": return torch.float, 4
else:
raise ValueError(f"Unknown dtype {dt}")
global_stfiles = {}
def cleanup_stfiles():
"""
Close all file handles and free any storage used while loading tensors
"""
global global_stfiles
# for stf in global_stfiles:
# stf.close()
global_stfiles = {}
# ext_c.stfiles_free_pinned_buffer()
class STFile:
filename: str
header: dict
header_size: int
metadata: object
handle: int
tensor_remap: dict | None
def __init__(
self,
filename: str,
keymap: list[tuple[str, str]] = None
):
self.filename = filename
self.metadata = None
self.handle = 0
self.tensor_remap = None
self.read_dict()
if keymap:
self.remap_dict(keymap)
@staticmethod
def open(
filename,
keymap: list[tuple[str, str]] = None
) -> STFile:
"""
Open safetensors file, scan header and retain handle.
:param filename:
Filename
:param keymap:
List of (a, b) tuples for string replacements in key index
:return:
STFile object
"""
global global_stfiles
if filename not in global_stfiles:
global_stfiles[filename] = STFile(filename, keymap)
return global_stfiles[filename]
def close(self):
"""
Close file handle (if necessary)
"""
assert self.filename in global_stfiles, \
f"Can't close {self.filename}: already closed"
# if self.fast_fb: ext_c.safetensors_close_fb(self.handle_fb)
def read_dict(self):
with open(self.filename, "rb") as fp:
header_size = np.fromfile(fp, dtype = np.int64, count = 1).item()
header_json = fp.read(header_size)
self.header = json.loads(header_json.decode("utf-8"))
self.header_size = fp.tell()
if "__metadata__" in self.header:
self.metadata = self.header["__metadata__"]
del self.header["__metadata__"]
def remap_dict(self, keymap: list[tuple[str, str]]):
self.tensor_remap = {}
nheader = {}
for key in self.header.keys():
nkey = key
for z in keymap:
if z[0].startswith("$") and nkey.startswith(z[0][1:]):
nkey = ("$" + nkey).replace(z[0], z[1])
else:
nkey = nkey.replace(z[0], z[1])
nheader[nkey] = self.header[key]
self.tensor_remap[nkey] = key
self.header = nheader
def get_dict(self) -> dict:
return self.header
def get_metadata(self) -> object:
return self.metadata
def measure(self, key):
"""
:param key:
Tensor key
:return:
Byte size of tensor
"""
v = self.header[key]
data_offsets = v["data_offsets"]
length = data_offsets[1] - data_offsets[0]
return length
def get_tensor(
self,
key: str,
device,
out_dtype = None
) -> torch.Tensor:
"""
Load tensor from file
:param key:
Tensor name
:param device:
Target device
:param out_dtype:
Force output datatype
:return:
torch.Tensor
"""
h = self.header[key]
dtype, esize = convert_dtype(h["dtype"])
beg, end = h["data_offsets"]
size = end - beg
shape = h["shape"]
tensor = torch.zeros(shape, dtype = dtype, device = device)
assert tensor.is_contiguous, "Non-contiguous tensor"
ext_c.stloader_read(
self.filename,
beg + self.header_size,
size,
tensor
)
if out_dtype:
tensor = tensor.to(out_dtype)
return tensor

View File

@@ -41,7 +41,7 @@ setup_kwargs = {
"exllamav2/exllamav2_ext/ext_qmlp.cpp",
"exllamav2/exllamav2_ext/ext_quant.cpp",
"exllamav2/exllamav2_ext/ext_rope.cpp",
"exllamav2/exllamav2_ext/ext_safetensors.cpp",
"exllamav2/exllamav2_ext/ext_stloader.cpp",
"exllamav2/exllamav2_ext/ext_sampling.cpp",
"exllamav2/exllamav2_ext/ext_element.cpp",
"exllamav2/exllamav2_ext/ext_tp.cpp",
@@ -78,7 +78,6 @@ setup_kwargs = {
"exllamav2/exllamav2_ext/cpp/generator.cpp",
"exllamav2/exllamav2_ext/cpp/sampling.cpp",
"exllamav2/exllamav2_ext/cpp/sampling_avx2.cpp",
"exllamav2/exllamav2_ext/cpp/safetensors.cpp"
],
extra_compile_args=extra_compile_args,
libraries=["cublas"] if windows else [],

View File

@@ -2,32 +2,18 @@ import sys, os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
from exllamav2.fasttensors import STFile
from exllamav2.ext import exllamav2_ext as ext_c
from exllamav2.stloader import STFile
import time
# Single tensor
# stfile = "/mnt/str/models/llama2-70b-exl2/3.5bpw/output-00002-of-00004.safetensors"
# stfile_size = os.path.getsize(stfile)
# sttest = STFile(stfile)
# key = "model.layers.45.self_attn.o_proj.q_weight"
# print(key)
# a = sttest.get_tensor(key, device="cuda:0")
# b = sttest.get_tensor(key, device="cuda:0", not_fast = True)
# assert a.equal(b), ":<"
# Multi file
stfiles = \
[
"/mnt/str/models/llama2-70b-exl2/4.0bpw/output-00001-of-00005.safetensors",
"/mnt/str/models/llama2-70b-exl2/4.0bpw/output-00002-of-00005.safetensors",
"/mnt/str/models/llama2-70b-exl2/4.0bpw/output-00003-of-00005.safetensors",
"/mnt/str/models/llama2-70b-exl2/4.0bpw/output-00004-of-00005.safetensors",
"/mnt/str/models/llama2-70b-exl2/4.0bpw/output-00005-of-00005.safetensors"
"/mnt/str/models/llama3-70b-exl2/4.0bpw/output-00001-of-00005.safetensors",
"/mnt/str/models/llama3-70b-exl2/4.0bpw/output-00002-of-00005.safetensors",
"/mnt/str/models/llama3-70b-exl2/4.0bpw/output-00003-of-00005.safetensors",
"/mnt/str/models/llama3-70b-exl2/4.0bpw/output-00004-of-00005.safetensors",
"/mnt/str/models/llama3-70b-exl2/4.0bpw/output-00005-of-00005.safetensors"
]
for stfile in stfiles:
@@ -45,7 +31,6 @@ for stfile in stfiles:
tensors2 = {}
t = time.time()
ext_c.safetensors_pinned_buffer()
t = time.time() - t
print(f"Time: {t:.4f} s")
@@ -56,21 +41,21 @@ for stfile in stfiles:
t = time.time()
for k in keys:
tensor = sttest.get_tensor(k, device = "cuda:0")
tensors1[k] = tensor
tensors2[k] = tensor
t = time.time() - t
print(f"Time: {t:.4f} s, {stfile_size / t / 1024**3:.4f} GB/s")
t = time.time()
for k in keys:
tensor = sttest.get_tensor(k, device = "cuda:0", not_fast = True)
tensors2[k] = tensor
tensor = sttest.get_tensor(k, device = "cpu")
tensors1[k] = tensor
t = time.time() - t
print(f"Time: {t:.4f} s, {stfile_size / t / 1024**3:.4f} GB/s")
for k in sttest.get_dict().keys():
a = tensors1[k]
b = tensors2[k]
assert a.equal(b), k
assert a.cuda().equal(b), k
print("ok")