From fb053008f7022fb4cdfb1776e3fda079fafdf0e2 Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 22 Jul 2022 13:01:38 +0200 Subject: [PATCH 1/2] Support remote access to arrays. Preliminary commit --- include/libiarray/iarray.h | 10 +++ src/iarray_server.c | 130 +++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 src/iarray_server.c diff --git a/include/libiarray/iarray.h b/include/libiarray/iarray.h index 5e44d5b..bd06341 100644 --- a/include/libiarray/iarray.h +++ b/include/libiarray/iarray.h @@ -824,6 +824,16 @@ typedef void (*zhandler_ptr) (char *zarr_urlpath, int64_t *slice_start, int64_t INA_API(ina_rc_t) iarray_add_zproxy_postfilter(iarray_container_t *src, char *zarr_urlpath, zhandler_ptr zhandler); +/* server accessor */ +typedef int32_t (*rhandler_ptr) (char *server_urlpath, char *array_urlpath, int64_t nchunk, + int32_t start, int32_t nitems, int32_t destsize, uint8_t *cblock); +INA_API(ina_rc_t) iarray_add_request_postfilter(iarray_container_t *src, char *server_urlpath, char *urlpath, + rhandler_ptr request_handler); + +INA_API(ina_rc_t) iarray_server_job(iarray_context_t *ctx, iarray_container_t *a, int64_t nchunk, + int32_t start, int32_t nitems, int32_t size, uint8_t *dest, int32_t *block_size); + + INA_API(ina_rc_t) iarray_opt_gemv(iarray_context_t *ctx, iarray_container_t *a, iarray_container_t *b, diff --git a/src/iarray_server.c b/src/iarray_server.c new file mode 100644 index 0000000..1275db1 --- /dev/null +++ b/src/iarray_server.c @@ -0,0 +1,130 @@ +/* + * Copyright ironArray SL 2021. + * + * All rights reserved. + * + * This software is the confidential and proprietary information of ironArray SL + * ("Confidential Information"). You shall not disclose such Confidential + * Information and shall use it only in accordance with the terms of the license agreement. + * + */ + +#include "iarray_private.h" +#include +#include + + +typedef struct { + char *server_urlpath; + //!< urlpath to server with arrays. + char *urlpath; + //!< urlpath to array with data. + int32_t blocksize; + //!< schunk's blocksize + rhandler_ptr request_handler; + //!< Function pointer to request handler +} request_postparams_udata; + + +ina_rc_t request_postfilter(blosc2_postfilter_params *postparams) +{ + request_postparams_udata *udata = postparams->user_data; + + int block_nitems = udata->blocksize / postparams->typesize; + uint8_t* cblock = malloc(postparams->size + BLOSC_MAX_OVERHEAD); + int32_t csize = udata->request_handler(udata->server_urlpath, udata->urlpath, postparams->nchunk, + postparams->nblock * block_nitems, + block_nitems, postparams->size, cblock); + if (csize < 0) { + IARRAY_TRACE1(iarray.tracing, "could not get block from server"); + return IARRAY_ERR_BLOSC_FAILED; + } + + blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS; + blosc2_context *dctx = blosc2_create_dctx(dparams); + int rc = blosc2_decompress_ctx(dctx, cblock, csize, postparams->out, postparams->size); + blosc2_free_ctx(dctx); + free(cblock); + if (rc != postparams->size) { + IARRAY_TRACE1(iarray.tracing, "Error decompressing block"); + return IARRAY_ERR_BLOSC_FAILED; + } + + return INA_SUCCESS; +} + + +INA_API(ina_rc_t) iarray_server_job(iarray_context_t *ctx, iarray_container_t *a, int64_t nchunk, + int32_t start, int32_t nitems, int32_t size, uint8_t *dest, int32_t *block_size) +{ + INA_UNUSED(ctx); + uint8_t *chunk; + bool needs_free; + int csize = blosc2_schunk_get_lazychunk(a->catarr->sc, nchunk, &chunk, &needs_free); + if (csize < 0) { + IARRAY_TRACE1(iarray.tracing, "Error getting lazy chunk"); + return IARRAY_ERR_BLOSC_FAILED; + } + + uint8_t *dc_block = ina_mem_alloc(size); + int bsize = blosc2_getitem_ctx(a->catarr->sc->dctx, chunk, csize, start, nitems, dc_block, size); + if (needs_free) { + free(chunk); + } + if (bsize < 0) { + IARRAY_TRACE1(iarray.tracing, "Error getting block"); + return IARRAY_ERR_BLOSC_FAILED; + } + + blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS; + cparams.compcode = BLOSC_ZSTD; + cparams.clevel = 1; + blosc2_context *blosc_ctx = blosc2_create_cctx(cparams); + + csize = blosc2_compress_ctx(blosc_ctx, dc_block, bsize, dest, size); + ina_mem_free(dc_block); + blosc2_free_ctx(blosc_ctx); + if (csize <= 0) { + IARRAY_TRACE1(iarray.tracing, "Error compressing the block"); + return IARRAY_ERR_BLOSC_FAILED; + } + *block_size = csize; + + return INA_SUCCESS; +} + + +INA_API(ina_rc_t) iarray_add_request_postfilter(iarray_container_t *src, char *server_urlpath, char *urlpath, + rhandler_ptr request_handler) +{ + INA_VERIFY_NOT_NULL(src); + + // Create params + blosc2_dparams *dparams; + blosc2_schunk_get_dparams(src->catarr->sc, &dparams); + + dparams->postfilter = (blosc2_postfilter_fn)request_postfilter; + + blosc2_postfilter_params *postparams = malloc(sizeof(blosc2_postfilter_params)); + request_postparams_udata *rpostparams = malloc(sizeof(request_postparams_udata)); + + // Fill the user_data + rpostparams->server_urlpath = malloc(strlen(server_urlpath) + 1); + strcpy(rpostparams->server_urlpath, server_urlpath); + + rpostparams->urlpath = malloc(strlen(urlpath) + 1); + strcpy(rpostparams->urlpath, urlpath); + + rpostparams->blocksize = src->catarr->sc->blocksize; + rpostparams->request_handler = request_handler; + + postparams->user_data = (void*)rpostparams; + dparams->postparams = postparams; + + // Create new context since postparams is empty in the old one + blosc2_free_ctx(src->catarr->sc->dctx); + src->catarr->sc->dctx = blosc2_create_dctx(*dparams); + free(dparams); + + return INA_SUCCESS; +} From 36f81453055c9d1b7772ae6598cb4235ab5176df Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 12:02:42 +0200 Subject: [PATCH 2/2] use ina_mem --- src/iarray_server.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iarray_server.c b/src/iarray_server.c index 1275db1..f6f87f1 100644 --- a/src/iarray_server.c +++ b/src/iarray_server.c @@ -31,7 +31,7 @@ ina_rc_t request_postfilter(blosc2_postfilter_params *postparams) request_postparams_udata *udata = postparams->user_data; int block_nitems = udata->blocksize / postparams->typesize; - uint8_t* cblock = malloc(postparams->size + BLOSC_MAX_OVERHEAD); + uint8_t* cblock = ina_mem_alloc(postparams->size + BLOSC_MAX_OVERHEAD); int32_t csize = udata->request_handler(udata->server_urlpath, udata->urlpath, postparams->nchunk, postparams->nblock * block_nitems, block_nitems, postparams->size, cblock); @@ -44,7 +44,7 @@ ina_rc_t request_postfilter(blosc2_postfilter_params *postparams) blosc2_context *dctx = blosc2_create_dctx(dparams); int rc = blosc2_decompress_ctx(dctx, cblock, csize, postparams->out, postparams->size); blosc2_free_ctx(dctx); - free(cblock); + ina_mem_free(cblock); if (rc != postparams->size) { IARRAY_TRACE1(iarray.tracing, "Error decompressing block"); return IARRAY_ERR_BLOSC_FAILED;