Skip to main content
When Geneva runs in distributed mode, jobs are deployed against a Kubernetes KubeRay instance that dynamically provisions a Ray cluster. Job execution time depends on sufficient CPU/GPU resources for computation and sufficient write bandwidth to store the output values. Tuning the performance of a job boils down to configuring the table or cluster resources.

Geneva defaults

Geneva sets the following defaults when creating a KubeRay cluster via GenevaClusterBuilder. These apply as both Kubernetes requests and limits.

Head Node

ResourceDefault
CPU4
Memory8 GiB
GPU0
Node Selectorgeneva.lancedb.com/ray-head: true
Service Accountgeneva-service-account
The head node runs the Ray GCS (Global Control Store), the driver task, and the dashboard. It does not run UDF applier actors.

CPU Workers

ResourceDefault
CPU4
Memory8 GiB
GPU0
Node Selectorgeneva.lancedb.com/ray-worker-cpu: true
Replicas1 (min: 0, max: 100)
Idle Timeout60 seconds

GPU Workers

ResourceDefault
CPU8
Memory16 GiB
GPU1
Node Selectorgeneva.lancedb.com/ray-worker-gpu: true
Replicas1 (min: 0, max: 100)
Idle Timeout60 seconds

Scaling computation resources

Geneva jobs can split and schedule computational work into smaller batches that are assigned to tasks which are distributed across the cluster. As each task completes, it writes its output into a checkpoint file. If a job is interrupted or run again, Geneva will look to see if a checkpoint for the computation is already present and if not will kick off computations. Usually computation capacity is the bottleneck for job execution. To complete all of a job’s tasks more quickly, you just need to increase the amount of CPU/GPU resources available.

Estimating CPU and memory requirements

CPU

Geneva can run up to concurrency tasks in parallel. Within each task, it can run up to intra_applier_concurrency UDF applications concurrently. If your UDF requests udf.num_cpus CPUs, the peak CPU required is approximately: total_cpus ≈ concurrency * intra_applier_concurrency * udf.num_cpus concurrency and intra_applier_concurrency are parameters on Table.backfill(...). In most cases you do not need to change udf.num_cpus, unless the UDF itself uses multiple threads. If your UDF is single-threaded but you want to speed it up with more parallelism, prefer increasing concurrency first. If you stop seeing meaningful improvements, then try increasing intra_applier_concurrency.

Memory

Geneva defaults to checkpoint batches of up to 100 rows (checkpoint_size=100). Peak memory usage scales with both the input rows held for processing and the output rows buffered before they are written. A rough upper bound is:
memory_bytes ≈
  input_row_size  * min(checkpoint_size * worker_num_cpus, task_size) +
  output_row_size * min(checkpoint_size * intra_applier_concurrency, task_size)
  • worker_num_cpus is the number of CPUs on the Ray worker.
  • task_size is a tunable parameter (rows per task). It can be set on the UDF (e.g. @udf(..., task_size=...)) and on Table.backfill(..., task_size=...) (backfill-level values take precedence). If task_size is larger than the number of rows in a fragment, it behaves the same as task_size = fragment_num_rows. By default, it is: task_size = num_rows / (concurrency * intra_applier_concurrency * 2) (where num_rows is the number of rows in the table, or the number of rows selected by your filter)
  • input_row_size / output_row_size are the average bytes per row after materialization in memory.
For primitive numeric columns (ints/floats), these sizes are usually small. For images, videos, embeddings, etc. you may need to explicitly provision more memory (e.g. via @udf(..., memory=...)) and/or reduce checkpoint_size / task_size. Typical per-row sizes:
  • Images: ~200KB–2MB (depends on dataset and encoding)
  • Videos: ~10MB–200MB
  • Embeddings: dimension * data_type_size bytes (e.g. float32 embeddings use 4 bytes per value, so a 1536-dim embedding is 1536 * 4 = 6144 bytes)

Internal Actor Resource Overhead

In addition to UDF applier actors, each backfill job creates internal actors that consume resources:
ComponentCPUMemoryCount
Driver task0.11
JobTracker0.1128 MiB1
Writer actors0.11 GiB1 per applier
Queue actors01 per applier
Applier actorsUDF num_cpus * intra_applier_concurrencyUDF memoryconcurrency

Example

For a job with concurrency=4, intra_applier_concurrency=2, with a UDF that requests 1 CPU and 1GiB memory:
ComponentCPUMemoryCount
Driver task0.11
JobTracker0.1128 MiB1
Writer actors0.11 GiB4
Queue actors04
Applier actors1*2=21 GiB4
Total0.1+0.1+40.1+42 = 8.6128 MiB + 4 * 1GiB + 4 * 1GiB = 8.125 GiB

Overall Cluster Sizing

Of course, the size of a cluster will vary dramatically for each task. But if you don’t know how to estimate your workload, we can recommend the following cluster sizes as a starting point:
Cluster SizeHead NodeWorkers
Small (Dev / CI)1 CPU, 8 GB4 x 2 CPU, 8 GB
Medium (Staging)2 CPU, 8 GB4 x 4 CPU, 8 GB
Large (Production)4 CPU, 8 GB8+ x 8 CPU, 16 GB

Validation Thresholds

The cluster builder validates memory configuration and warns or errors on suspicious values:
ThresholdValueBehavior
GPU worker minimum memory< 4 GiBError — build fails if below this
Large memory warning> 100 GBWarning — may exceed K8s node capacity
Memory-per-CPU ratio warning> 16 GiB per CPUWarning — unusual ratio, likely misconfigured

GKE node pools

GKE + KubeRay can autoscale the number of VM nodes on demand. Limitations on the amount of resources provisioned are configured via node pools. Node pools can be managed to scale vertically (type of machine) or horizontally (# of nodes). Properly applying Kubernetes labels to the node pool machines allows you to control resources for different jobs in your cluster.

Options on Table.backfill(..)

The Table.backfill(..) method has several optional arguments to tune performance. To saturate the CPUs in the cluster, the main arguments to change are concurrency which controls the number of task processes and intra_applier_concurrency which controls the number of task threads per task process. commit_granularity controls how frequently fragments are committed so that partial results can become visible to table readers. Setting checkpoint_size smaller introduces finer-grained checkpoints and can help provide more frequent proof of life as a job is being executed. This is useful if the computation on your data is expensive. Reference:

Balancing write bandwidth

While computation can be broken down into small tasks, new Lance column data for each fragment must be written out in a serialized fashion. Each fragment has a writer that waits for checkpointed results to arrive, sequences them, and then serially writes out the new data file. Writers can be a bottleneck if a Lance dataset has a small number of fragments, especially if the amount of data being written out is comparatively large. Maximizing parallel write throughput can be achieved by having more fragments than nodes in the cluster.

Symptom: Computation tasks complete but writers seem to hang

Certain jobs that take a small dataset and expand it may appear as if the writer has frozen. An example is a table that contains a list of URLs pointing to large media files. This list is relatively small (< 100MB) and can fit into a single fragment. A UDF that downloads will fetch all the data and then attempt to write all of it out through the single writer. This single writer can then be responsible for serially writing out 500+GB of data to a single file! To mitigate this, you can load your initial table so that there will be multiple fragments. Each fragment with new outputs can be written in parallel with higher write throughput.