com.oscaro/clj-k8s

1.29.1.0


Clojure kubernetes client API stub

dependencies

org.clojure/clojure
1.10.1
clj-http
3.12.3
cheshire
5.11.0
io.forward/yaml
1.0.11
prismatic/schema
1.4.1
org.flatland/ordered
1.15.11
com.google.auth/google-auth-library-oauth2-http
1.16.1
org.apache.httpcomponents/httpclient
4.5.14
org.apache.httpcomponents/httpcore
4.4.16
com.fasterxml.jackson.core/jackson-core
2.15.0



(this space intentionally left almost blank)
 
(ns clj-k8s.utils
  (:require [clojure.string :as str]))

=====================================

Utils

(defn find-named [x xs]
  (some #(when (= (:name %) x) %) xs))
(defmacro not-found->nil
  [& body]
  `(try
     ~@body
     (catch Throwable t#
       (if (= 404 (-> t# ex-data :status))
         nil
         (throw t#)))))
(defn ->label-selector
  [labels]
  (if (string? labels)
    labels
    (->> labels
         (map (fn [[k v]] (format "%s=%s" (name k) v)))
         (str/join ","))))

Kubernetes Related Predicates

(defn- current-condition
  [{:keys [conditions]}]
  (->> conditions
       (filter #(= (:status %) "True"))
       first))

Returns true if the job is still running

(defn running?
  [{:keys [status]}]
  (boolean (and status (nil? (current-condition status)))))

Returns true if the job was successful

(defn succeeded?
  [{:keys [status]}]
  (= (:type (current-condition status)) "Complete"))

Returns true if the job failed

(defn failed?
  [{:keys [status]}]
  (or (nil? status) (= (:type (current-condition status)) "Failed")))
 
(ns clj-k8s.models
  (:require [schema.core :as s]))

==========================================

Models

(s/defschema KubeClientSpec
  {:token s/Str
   :base-url s/Str
   (s/optional-key :namespace) s/Str})
(s/defschema KubeClient
  {:base-url s/Str
   :auths {s/Str s/Str}
   :namespace s/Str})
 
(ns clj-k8s.gke
  (:import (com.google.auth.oauth2 GoogleCredentials)
           (java.util List)))

=====================================

Google Kubernetes Engine

See: https://developers.google.com/identity/protocols/googlescopes#containerv1

(defonce gke-scopes
  ["https://www.googleapis.com/auth/cloud-platform"])
(def ^GoogleCredentials creds
  (delay
    (let [app-creds (GoogleCredentials/getApplicationDefault)]
      (if (.createScopedRequired app-creds)
        (.createScoped app-creds ^List gke-scopes)
        app-creds))))

Refresh the token by discarding the cached token and metadata and requesting the new ones if expired

(defn refresh-goog-token!
  [^GoogleCredentials creds]
  (.refreshIfExpired creds)
  creds)

Fetches the google access token

(def get-google-access-token
  (fn []
    (some-> @creds
            refresh-goog-token!
            .getAccessToken
            .getTokenValue)))
 
(ns clj-k8s.api
  (:require [yaml.core :as yaml]
            [schema.core :as s]
            [clj-k8s.gke :as gke]
            [clj-k8s.utils :refer [find-named not-found->nil
                                   ->label-selector]]
            [clj-k8s.models :refer :all]
            [clojure.java.io :as io]
            [clojure.string :as str]
            [kubernetes.core :as k]
            [kubernetes.api.version :as kv]
            [kubernetes.api.core-v- :as kc]
            [kubernetes.api.batch-v- :as kb]))

=====================================

Public API

Constants

(defonce ^:private default-ns "default")

For application running in Kubernetes context only

(defonce ^:private default-k8s-svc-host (System/getenv "KUBERNETES_SERVICE_HOST"))
(defonce ^:private default-k8s-svc-port (System/getenv "KUBERNETES_SERVICE_PORT"))

See: https://cloud.google.com/docs/authentication/application-default-credentials?hl=fr

(defonce ^:private default-cluster-service-account-dir
  (or (System/getenv "GOOGLE_APPLICATION_CREDENTIALS")
      "/var/run/secrets/kubernetes.io/serviceaccount"))

Kubernetes Token

(defonce ^:private token-from-env (str (System/getenv "KUBERNETES_TOKEN")))

Honor KUBECONFIG before generating fallback one from the user HOME

(defonce ^:private default-kube-config-path
  (or (System/getenv "KUBECONFIG")
      (str (System/getenv "HOME") "/.kube/config")))

Detect if the application is currently running on k8s platform, or not

(def running-from-kube?
  (and (.exists (io/file default-cluster-service-account-dir))
       (every? some? [default-k8s-svc-host default-k8s-svc-port])))

Detect if the current context is a GKE cluster. Logic is based on gke_* naming scheme

(defn- is-current-gke?
  [current-context]
  (str/starts-with? current-context "gke_"))

Authentification

Generic client builder

There is several ways to instantiate the client:

  1. Without any arguments, the function will fetch several env variables to build the client. This method is useful for applications that run directly in a Kubernetes Pod, or for executions context where env variables are used for configurations, required env variables are:

    • KUBERNETES_SERVICE_HOST - the Kubernetes Service Endpoint
    • KUBERNETES_SERVICE_HOST - the Kubernetes Service Endpoint Port
    • KUBERNETES_TOKEN - the actual Kubernetes token to be used See: https://docs.selectel.com/cloud/managed-kubernetes/instructions/service-account-token
  2. The more straightforward, and easy to setup is from a map spec directly, every elements are explicitly used for client building (probably this will be fetched from your component management system like Integrant.

  3. The last one is from, configuration file directly. The builder will auto-detect if the current context is a GKE cluster. If so, you'll need to be sure that the GOOGLE_APPLICATION_CREDENTIALS variable is set and corretly pointing to your SA for fetching token from API.

    WARN: If not running on GKE, you'll need to explicitly setup your token variable to use with your current context.

    Note: This methods is used by default if the builder isn't call with arguments AND all variables are not satisfied.

(defn mk-client
  {:added "1.25.8.2"}
  ([]
   (if (and default-k8s-svc-host default-k8s-svc-port (not-empty token-from-env))
     (s/validate KubeClient
                 {:base-url default-k8s-svc-host
                  :auths     {"BearerToken" (str "Bearer " token-from-env)}
                  :namespace default-ns})
     (mk-client default-kube-config-path)))
  ([kube-config]
   (if (map? kube-config)
     (let [{:keys [base-url token namespace]} (s/validate KubeClientSpec kube-config)]
       (s/validate KubeClient {:base-url base-url
                               :auths     {"BearerToken" (str "Bearer " token)}
                               :namespace (or namespace default-ns)}))
     (mk-client kube-config token-from-env)))
  ([kube-config token]
   (let [{:keys [clusters contexts current-context]}
         (yaml/from-file kube-config)
         context (find-named current-context contexts)
         cluster (find-named (get-in context [:context :cluster]) clusters)
         token (if (is-current-gke? current-context)
                 (gke/get-google-access-token)
                 token)]
     {:base-url  (get-in cluster [:cluster :server])
      :auths     {"BearerToken" (str "Bearer " token)}
      :namespace (get-in context [:context :namespace] default-ns)})))

Api Utils

A helper macro to wrap api-context with default values.

(defmacro with-api-context
  [api-context & body]
  `(let [api-context# ~api-context
         api-context# (-> k/*api-context*
                          (merge api-context#)
                          (assoc :auths (merge (:auths k/*api-context*) (:auths api-context#))))]
     (binding [k/*api-context* api-context#]
       ~@body)))
(defn active-ns [spec]
  (with-api-context spec
    (:namespace k/*api-context*)))

Version

Retrieve remote cluster build informations

(defn cluster-version
  {:added "1.25.8.2"}
  ([spec]
   (with-api-context spec
     (kv/get-code-version))))

Namespaces

Create a new namespace

(defn create-namespace
  {:added "1.25.8.2"}
  ([spec ns-spec] (create-namespace spec ns-spec {}))
  ([spec ns-spec opts]
   (let [ns-spec (if (string? ns-spec)
                   {:metadata {:name ns-spec}} ns-spec)]
     (with-api-context spec
       (kc/create-core-v1-namespace ns-spec opts)))))

Retrieve informations about the selected namespace

(defn get-namespace
  {:added "1.25.8.2"}
  ([spec n] (get-namespace spec n {}))
  ([spec n opts]
   (with-api-context spec
     (not-found->nil
      (kc/read-core-v1-namespace n opts)))))

Delete the selected namespace

(defn delete-namespace
  {:added "1.25.8.2"}
  ([spec ns] (delete-namespace spec ns {}))
  ([spec ns opts]
   (with-api-context spec
     (not-found->nil
      (kc/delete-core-v1-namespace ns opts)))))

Endpoints

Fetches the specified endpoints or returns nil if not found

(defn get-endpoints
  {:added "1.25.8.2"}
  ([spec ns] (get-endpoints spec ns {}))
  ([spec ep-name {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kc/read-core-v1-namespaced-endpoints ep-name namespace opts)))))

Create a new endpoint

(defn create-endpoint
  {:added "1.25.8.2"}
  ([spec ep-spec] (create-endpoint spec ep-spec {}))
  ([spec ep-spec {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kc/create-core-v1-namespaced-endpoints ep-spec namespace opts)))))

Pods

List or watch pods

(defn list-pods
  {:added "1.25.8.2"}
  ([spec] (list-pods spec {}))
  ([spec {:keys [namespace all-namespaces]
          :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (let [opts (update opts :label-selector ->label-selector)]
       (:items
        (if all-namespaces
          (kc/list-core-v1-pod-for-all-namespaces opts)
          (kc/list-core-v1-namespaced-pod namespace opts)))))))

Retrieve pod informations

(defn get-pod
  {:added "1.25.8.2"}
  ([spec pod-name] (get-pod spec pod-name {}))
  ([spec pod-name {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kc/read-core-v1-namespaced-pod pod-name namespace opts)))))

Delete a pod by his name

(defn delete-pod
  {:added "1.25.8.2"}
  ([spec pod-name] (get-pod spec pod-name {}))
  ([spec pod-name {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kc/delete-core-v1-namespaced-pod pod-name namespace opts)))))

Reads the logs of a specific pod

(defn pod-logs
  {:added "1.25.8.2"}
  ([spec n] (pod-logs spec n {}))
  ([spec n {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kc/read-core-v1-namespaced-pod-log n namespace opts)))))

Jobs Batch

Fetches the specified job or returns nil if not found

(defn get-job
  {:added "1.25.8.2"}
  ([spec n] (get-job spec n {}))
  ([spec n {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (not-found->nil
      (kb/read-batch-v1-namespaced-job n namespace opts)))))

List or watch jobs

(defn list-jobs
  {:added "1.25.8.2"}
  ([spec] (list-jobs spec {}))
  ([spec {:keys [namespace all-namespaces] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (let [opts (update opts :label-selector ->label-selector)]
       (:items
        (if all-namespaces
          (kb/list-batch-v1-job-for-all-namespaces opts)
          (kb/list-batch-v1-namespaced-job namespace opts)))))))

Submits a job for execution

(defn submit-job
  {:added "1.25.8.2"}
  ([spec job-spec] (submit-job spec job-spec {}))
  ([spec job-spec opts]
   (with-api-context spec
     (let [job-ns (get-in job-spec [:metadata :namespace] default-ns)]
       (kb/create-batch-v1-namespaced-job job-ns job-spec opts)))))

Deletes a job

(defn delete-job
  {:added "1.25.8.2"}
  ([spec n] (delete-job spec n {}))
  ([spec n {:keys [namespace] :or {namespace default-ns} :as opts}]
   (with-api-context spec
     (let [opts (merge {:propagation-policy "Foreground"} opts)]
       (kb/delete-batch-v1-namespaced-job n namespace opts)))))

Fetches all of the pods belonging to a specific job

(defn job-pods
  {:added "1.25.8.2"}
  ([spec n] (job-pods spec n {}))
  ([spec n opts]
   (with-api-context spec
     (if-let [job (get-job spec n opts)]
       (->> (get-in job [:spec :selector :matchLabels :controller-uid])
            (str "controller-uid=")
            (assoc opts :label-selector)
            (kc/list-core-v1-namespaced-pod (get-in job [:metadata :namespace]))
            :items)
       []))))