-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ZEPPELIN-3840] Zeppelin on Kubernetes #3240
Conversation
f02151d
to
64a56b5
Compare
...dard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
Outdated
Show resolved
Hide resolved
...ns/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
Outdated
Show resolved
Hide resolved
Right. I just pushed a commit fa36c18, and it allows user override |
$ docker build -t <tag> . | ||
``` | ||
|
||
Finally, set custom image `<tag>` just created to `image` and `ZEPPELIN_K8S_CONTAINER_IMAGE` env variable of `zeppelin-server` container spec in `zeppelin-server.yaml` file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably one small section that lists the current limitations will be useful. The one which is probable worth highlighting is about the container image used by the interpreter other than spark. It will be the same as Zeppelin container image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more information below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This information is also very helpful
@@ -0,0 +1,143 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this file mentions "pod". But the yaml contains other resources like svc, rbac. Probably a better name can be used like 100-interpreter-spec.yaml?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File renamed.
- name: MASTER # default value of master property for spark interpreter. | ||
value: k8s://https://kubernetes.default.svc | ||
- name: zeppelin-server-gateway | ||
image: nginx:1.14.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to create a svc for spark and use native Kubernetes redirection instead of ngnix as gateway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What i'm trying to do here, make Spark-ui accessible when user have access to Zeppelin ui, without any extra configuration/command to run.
I'm not familiar with native Kubernetes redirection. Could you explain how it can be possible with native Kubernetes redirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this work for all modes (global, scoped, per-note, per-user)? I will try to check.
Implementing any security with embedded gateway might be a concern.
After some exploration, the change required will not be as simple as what has been provided. The approach could be as follows: (We have used it for other applications)
- For the Spark-Driver UI, create a service with type=ClusterIP, port=randomPort, targetPort=driverUI port (4040 default).
- Create an Ingress resource with path, service and port. Some annotations might be ingress controller specific sometimes. OR
- With this the spark-driver UI is accessible directly at the K8S edge-node/ 'path'.
I guess this would require some customization to be done by the users anyway.
If I get a chance to explore other ways, I will get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work for all modes. In terms of security, while access to Spark UI will be through domain 4040-<Spark Interpreter pod name>.<service domain>
, I would say it's safe while Pod name of Spark Interpreter is unpredictable.
Speaking of security, there're other aspect to consider.
- Blocking communication between interpreter Pod.
- Spark Interpreter Pod has Role CRUD for any pod/service in the same namespace. Which should be restricted to only Spark executors Pod. Otherwise SparkInterpreter Pod will able to access/terminate other Interpreter Pods.
I'd like to address above in separate PRs.
private final Map<String, String> envs; | ||
private final String zeppelinServiceHost; | ||
private final String zeppelinServiceRpcPort; | ||
private final int podCreateTimeoutSec = 180; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes image pulling takes longer. Its better if its configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the code just use interpreter connection timeout for waiting pod creation.
Which is already configurable and designed to wait
- interpreter process creation
- interpreter process ready and open rpc server port
And i think it make sense to use this value to wait
- interpreter pod creation
- interpreter process ready and open rpc server port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this change. This makes users more comfortable :)
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; | ||
this.portForward = portForward; | ||
this.sparkImage = sparkImage; | ||
this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to use timestamp instead of RandomString?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain a bit more why timestamp can be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just a thought: string might collide sometimes while timestamp would be less prone for collision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While code generating 6 char long random string using lower case alphabet, probability of collision is <number of running interpreter pods>/26^6
= <number of running interpreter pods>/308,915,776
. I think probability of collision is small enough in practical.
One worry using timestamp in pod name is, pod name becomes predictable and that makes accessing to spark-ui insecure, while spark-ui is accessed through subdomain which is constructed based on interpreter pod name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Let it be random string.
StringBuilder options = new StringBuilder(); | ||
|
||
options.append(" --master k8s://https://kubernetes.default.svc"); | ||
options.append(" --deploy-mode client"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this Cluster mode for spark may not work. Though it may not be needed since the spark-interpreter itself is launched in a new pod. Usually driver memory might require changes as per application being run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to add --driver-memory
configuration when spark.driver.memory
is defined.
* Check if i'm running inside of kubernetes or not. | ||
* @return | ||
*/ | ||
boolean isRunningOnKubernetes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really needed? It appears redundant since the same check is happening in
InterpreterSetting.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. I added a comment in the source code why we need isRunningOnKubernetes()
inside of K8sStandardInterpreterLauncher
, although K8sStandardInterpreterLauncher
supposed to be used only in Kubernetes cluster which InterpreterSetting
supposed to detect beforehand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. It is helpful.
@@ -0,0 +1,157 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we avoid kubectl binary dependency using io fabric java apis?
This could possibly be a separate PR. Please let me know if you want me to create a PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if kubernetes java client from fabric can handle kubernetes yaml spec file the same way kubectl
handles, why not? I tried to use official kubernetes java client first, and changed to use kubectl
because it does not have kubectl apply -f
equivalent functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually with fabric8 API, applying of each resources has been tried by us. We will explore the usage of applying all resources. Initial reading of the code looks to be supporting it. However we will try and get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leemoonsoo
My colleague has put an example of the possible replacement under
zep-k8sctl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nrchakradhar Cool! Thanks for sharing.
One question,
Let's say user is going to customize zeppelin-server.yaml, and interpreter.yaml. To mount volumns, add network restrictions, etc.
Therefore user might add additional sections in yaml, such as PersistentVolumeClaim, Secret, ConfigMap and so on. Also Kubernetes allows CRD, so there're some type of resource we don't know at this point.
But this code looks like need a modification if there's a new type of resource to apply. Will there any any solution for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leemoonsoo
Sorry for the delay in responding to this comment. Was busy in other activities.
We have tried multiple options to have functionality similar to kubectl apply, but fabric8 is having some bugs/issues/enhancements. Please refer the following Stackoverflow query for details:
For now we can have the current implementation and when fabric8 changes are available we can probably revisit this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Issue with fabric8 library related to kubectl apply or delete of resources using a single API is working with #1300 which is merged to master branch.
When it is available in the next release of fabric8io/kubernetes-client we can integrate the same. Using the above fix from fabric8 the change has been validated in the master branch of zep-k8sctl.
Can you please let us know if we create new JIRA issues so that we follow up or we will follow some different procedure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great news. Please feel free to create a JIRA issue and let me know if anything I can help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Sure
Awesome! that is a great news. Just out of curiosity, does the Zeppelin server in KB must be the one which has acess to the Spark_Home or the Spark interpreter which is now a pod? (Just to manage which one should be a gateway in Cloudera cluster). |
Spark Interpreter is running inside of its own Pod. And Spark driver is running in Spark Interpreter. So Spark Interpreter Pod is the one, who need SPARK_HOME and access from/to Spark master/executors in your cluster. |
Would be great to cut 0.9 release with this and all other great improvements that are currently only available in master. So more folks can take leverage of those features, and provide feedback sooner. Thank you! |
@@ -816,6 +840,13 @@ public int getClusterHeartbeatTimeout() { | |||
ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000), | |||
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000), | |||
|
|||
ZEPPELIN_K8S_MODE("zeppelin.k8s.mode", "auto"), // auto | on | off |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, zeppelin may also support YARN
. Now YARN
also supports Docker. Is it better to change this configuration to ZEPPELIN_RUN_MODE
?
For example: ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"), // auto | k8s | yarn | normal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Updated.
It would seem that when setting the master to: spark://spark-cluster:7077 , it would just use local[*] instead. Is connecting to a cluster using client mode not yet supported? EDIT: Adding an additional field to the spark interpreter conf "spark.master" and setting that to the spark cluster master url works (eg: spark://spark-cluster:7077) |
Sorry for responding late. Thanks @whai2kay for testing it out and confirming configure spark using "spark.master" interpreter conf. This is just an initial implementation and it can be a starting point to improvements from applying Kubernetes client library to network policy for security, and so on. I'd like to merge this PR to master, so we can start working on further improvements, If there's no more comments. |
} | ||
|
||
public String apply(String spec) throws IOException { | ||
return execAndGet(new String[]{"apply", "-f", "-"}, spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kubectl apply -f
does not work if there is a generateName
in the interpreter spec yaml. Its better to use kubectl create -f
if resources are being created. Please find the discussion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about generateName
. Thanks for pointing this out with discussion link.
I agree using create -f
. While this PR works itself, I'd like to merge this first and then follow up apply -> create
in a separate PR. Does it sounds like a plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created https://issues.apache.org/jira/browse/ZEPPELIN-3954. I'll follow up this issue.
options.append(" --driver-memory " + properties.get("spark.driver.memory")); | ||
} | ||
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace()); | ||
options.append(" --conf spark.executor.instances=1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leemoonsoo Are there alternative means to specify spark.executor.instances? I don't see a means to do so as this function build the spark submit options and is appended to the end. i.e. these will override all other options as specified in https://github.com/apache/zeppelin/pull/3240/files#diff-6d1d3084f55bdd519e39ede4a619e73dR269
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%spark.conf
spark.executor.instances <number>
inside a note will allow configuring this value.
But right, it'll be better not override SPARK_SUBMIT_OPTIONS.
The default value of spark.executor.instances
is 2. And I think it's okay to remove this line and use default value.
private String getZeppelinServiceHost() throws IOException { | ||
if (isRunningOnKubernetes()) { | ||
return String.format("%s.%s.svc.cluster.local", | ||
getHostname(), // service name and pod name should be the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leemoonsoo Great work on making Zeppelin work in Kubernetes!
I have a concern with this approach of matching Service and Pod names via hostname. This will not work for zeppelin pods that are provisioned via Deployments or StatefulSets as pod names are dynamically generated during creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Notebook serving (#3356) has an exact problem while TestTask runs Zeppelin using Job
, and ServingTask runs Zeppelin using Deployment
.
In serving implementation, K8sStandardInterpreterLauncher has modified to looking for SERVICE_NAME
env variable. Check https://github.com/apache/zeppelin/pull/3356/files#diff-dfad7c26921ec97aa7bca7fc3180408fL109. Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should work 👍
CommandLine cmd = new CommandLine(kubectlCmd); | ||
cmd.addArguments(args); | ||
|
||
ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leemoonsoo This value should be the same as the kubectl timeout flags. In the case of kubectl wait: https://github.com/apache/zeppelin/pull/3240/files#diff-7b231819fa2236eab55c934add3b56e8R69
This timeout may be reached before kubectl wait's timeout, causing an exception to be thrown:
INFO [2019-05-03 03:24:59,232] ({SchedulerFactory3} Kubectl.java[execAndGet]:121) - kubectl [wait, pod/spark-foqspk, --for=condition=Ready, --timeout=600s, --namespace=aikwei-sng]
WARN [2019-05-03 03:25:59,257] ({SchedulerFactory3} NotebookServer.java[onStatusChange]:1671) - Job paragraph_1556788652522_1837703941 is finished, status: ERROR, exception: null, result: %text org.apache.zeppelin.interpreter.InterpreterException: java.io.IOException:
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:134)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:298)
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:417)
at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:76)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:187)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException:
at org.apache.zeppelin.interpreter.launcher.Kubectl.execAndGet(Kubectl.java:141)
at org.apache.zeppelin.interpreter.launcher.Kubectl.execAndGet(Kubectl.java:106)
at org.apache.zeppelin.interpreter.launcher.Kubectl.wait(Kubectl.java:65)
at org.apache.zeppelin.interpreter.launcher.K8sRemoteInterpreterProcess.start(K8sRemoteInterpreterProcess.java:92)
at org.apache.zeppelin.interpreter.ManagedInterpreterGroup.getOrCreateInterpreterProcess(ManagedInterpreterGroup.java:65)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getOrCreateInterpreterProcess(RemoteInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.internal_create(RemoteInterpreter.java:163)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:131)
... 13 more
Caused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 143 (Exit value: 143)
at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404)
at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166)
at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:153)
at org.apache.zeppelin.interpreter.launcher.Kubectl.execute(Kubectl.java:155)
at org.apache.zeppelin.interpreter.launcher.Kubectl.execAndGet(Kubectl.java:125)
... 20 more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out!
What type of PR is it?
This PR adds ability to run Zeppelin on Kubernetes. It aims
Key features are
kubectl
to run Zeppelin serverTo do
How it works
Run Zeppelin Server on Kubernetes
k8s/zeppelin-server.yaml
is provided to run Zeppelin Server with few sidecars and configurations.This file is easy to publish (user can easily consume it using
curl
), highly customizable while it includes all the necessary things.K8s Interpreter launcher
This PR adds new module,
launcher-k8s-standard
underzeppelin/zeppelin-plugins/launcher/k8s-standard/
directory. This launcher is automatically being selected when Zeppelin is running on Kubernetes. The launcher both handles Spark interpreter and All other interpreters.The launcher launches interpreter as a Pod using template k8s/interpreter/100-interpreter-pod.yaml.
Reason filename has
100-
in prefix is because all files in the directory is consumed in alphabetical order by launcher on interpreter start/stop. User can drop more files here to extend/customize interpreter, and filename can be used to control order. The template is rendered by jinjava.Spark interpreter
When interpreter group is
spark
, K8sRemoteInterpreterProcess sets necessary spark configuration automatically to use Spark on Kubernetes. User doesn't have to configure anything. It uses client mode.Spark UI
We may make user manually configure port-forward or do something to access Spark UI, but that's not optimal. It is the best when Spark UI is automatically accessible when user have access to Zeppelin UI, without any extra configuration.
To enable this, Zeppelin server Pod has a reverse proxy as a sidecar, and it split traffic to Zeppelin server and Spark UI running in the other Pod. It assume both
service.domain.com
and*.service.domain.com
point the nginx proxy address.service.domain.com
is directed to ZeppelinServer,*.service.domain.com
is directed to interpreter Pod.<port>-<interpreter pod svc name>.service.domain.com
is convention to access any application running in interpreter Pod. If Spark interpreter Pod is running with a namespark-axefeg
and Spark UI is running on port 4040,is the address to access Spark UI. Default service domain is local.zeppelin-project.org:8080, while
local.zeppelin-project.org
and*.local.zeppelin-project.org
point127.0.0.1
, and it works withkubectl port-forward
.What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-3840
How should this be tested?
Prepare a Kubernetes cluster with enough resources (cpus > 5, mem > 6g).
If you're using minikube, check your capacity using
kubectl describe node
command before start.You'll need to build Zeppelin docker image and Spark docker image to test. Please follow guide docs/quickstart/kubernetes.md.
To quickly try without building docker images, I have uploaded pre-built image on docker hub
moon/zeppelin:0.9.0-SNAPSHOT
,moon/spark:2.4.0
. Try following commandAnd port forward
And browse http://localhost:8080
To clean up
Screenshots (if appropriate)
See this video https://youtu.be/7E4ZGn4pnTo
Future work
Questions: