Conversation
Training datasets will be created directly before training a coordinate. FixedEffectDataset is merged into FixedEffectCoordinate; RandomEffectDataset is merged into RandomEffectCoordinate; Random effect vector projection will be disabled
2. Scores are changed to use Dataframe 3. Residuals will be computed by using a UDF on the training DataFrame. For random effects, the per-entity models will first need to be joined to the DataFrame by REID. A single UDF will do all scoring for fixed and random effects at once.
ashelkovnykov
left a comment
There was a problem hiding this comment.
Initial comments on WIP
photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala
Outdated
Show resolved
Hide resolved
|
|
||
| // Create the optimization coordinates for each component model | ||
| val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate[_] }] = | ||
| val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate }] = |
There was a problem hiding this comment.
There's nothing wrong here, but it might be easier to keep the Dataset objects like we did for the tests to wrap the DataFrame of training data (once it is generated) and the feature shard ID.
photon-api/src/main/scala/com/linkedin/photon/ml/Constants.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/Constants.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/model/RandomEffectModel.scala
Outdated
Show resolved
Hide resolved
photon-lib/src/main/scala/com/linkedin/photon/ml/algorithm/Coordinate.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/model/FixedEffectModel.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/supervised/model/GeneralizedLinearModel.scala
Outdated
Show resolved
Hide resolved
|
Forgot to comment - since all of these commits are related to one task and don't seem to have any logical separation, would you kindly crush them into one commit. |
ashelkovnykov
left a comment
There was a problem hiding this comment.
I skipped reviewing much of the scoring changes as they looked like they were still early WIP and subject to many changes.
photon-api/src/main/scala/com/linkedin/photon/ml/util/ApiUtils.scala
Outdated
Show resolved
Hide resolved
| optimizationProblem: DistributedOptimizationProblem[Objective], | ||
| featureShardId: FeatureShardId, | ||
| inputColumnsNames: InputColumnsNames) | ||
| extends Coordinate { |
There was a problem hiding this comment.
This isn't what I was picturing when writing the design document - I was thinking of something more like generateDataset in the proof-of-concept tests:
CoordinateDescentcallstraininFixedEffectCoordinatewith trainingDataFrametraincallsgenerateDatasetgenerateDatasetdrops unnecessary columns and trains aFixedEffectModelCoordinateDescentcallsscoreinFixedEffectCoordinatewith trainingDataFrameandFixedEffectModelscorereturns a newDataFramewith a scores columnCoordinateDescentcallstraininRandomEffectCoordinatewith the scoredDataFrametraincallsgenerateDatasetgenerateDatasetmerges the offset column with the scores column, then drops unnecessary columns and trainsRandomEffectModelCoordinateDescentcallsscoreinRandomEffectCoordinatewith the scoredDataFrameandRandomEffectModelscorereturns a newDataFramewith another scores column- etc.
There was a problem hiding this comment.
What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested).
See CoordinateDescent line 192-208:
logger.debug(s"Updating coordinate of class ${coordinate.getClass}")
// compute scores using the previous coordinate model and update offsets
prevModelOpt.map(model => coordinate.updateOffset(model))
// Train a new model
val (model, tracker) = initialModelOpt.map(
initialModel => Timed(s"Train new model using existing model as starting point") {
coordinate.trainModel(initialModel)
}).getOrElse(
Timed(s"Train new model") {
coordinate.trainModel()
})
// Log summary
logOptimizationSummary(logger, coordinateId, model, tracker)
| val modelBroadcast: Broadcast[GeneralizedLinearModel], | ||
| val featureShardId: String) | ||
| val modelBroadcast: Broadcast[GeneralizedLinearModel], | ||
| val featureShardId: String) |
There was a problem hiding this comment.
These two lines should be indented once more
...rc/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala
Show resolved
Hide resolved
FixedEffectCoordinate.updateOffset (and RandomEffectCoordinate.updateOffset) are used to compute scores instead merging scores back to original dataset. |
| (rETypeOpt, coordinateOptConfig, lossFunctionFactory) match { | ||
| case ( | ||
| fEDataset: FixedEffectDataset, | ||
| None, |
There was a problem hiding this comment.
why this becomes "None"? this is for fixed effect case?
| if (hasOffsetField && hasCoordinateScoreField) { | ||
| // offset = offset - old_coordinateScore + new_coordinateScore | ||
| dataset.withColumn(offset, col(offset) - col(SCORE_FIELD)) | ||
| fixedEffectModel.computeScore(dataset, SCORE_FIELD) |
There was a problem hiding this comment.
I think new score is saved in SCORE_FIELD.
| if (modelsRDD.first()._2.coefficients.variancesOption.isDefined) { | ||
| stringBuilder.append(s"\nVariance: ${modelsRDD.values.map(_.coefficients.variancesL2NormOption.get).stats()}") | ||
| } | ||
| //stringBuilder.append(s"\nLength: ${modelsRDD.values.map(_.coefficients.means.length).stats()}") |
There was a problem hiding this comment.
why not delete them if they are not used.
There was a problem hiding this comment.
does this means we don't have stats in the log file any more?
| case (_, model: RandomEffectModel) => model.unpersistRDD() | ||
| case _ => | ||
| } | ||
| // gameModel.toMap.foreach { |
|
|
||
| /** | ||
| * Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]], | ||
| * Creates a [[Coordinate]] of the appropriate type, given the input data set, |
There was a problem hiding this comment.
Let's keep "dataset" as one word.
| * | ||
| * @tparam D Some type of [[Dataset]] | ||
| * @param dataset The input data to use for training | ||
| * @param featureShardId |
There was a problem hiding this comment.
Miss parameter description.
| * @tparam D Some type of [[Dataset]] | ||
| * @param dataset The input data to use for training | ||
| * @param featureShardId | ||
| * @param inputColumnsNames |
There was a problem hiding this comment.
Miss parameter description.
| * @param varianceComputationType Should the trained coefficient variances be computed in addition to the means? | ||
| * @param interceptIndexOpt The index of the intercept, if one is present | ||
| * @return A [[Coordinate]] for the [[Dataset]] of type [[D]] | ||
| * @param rETypeOpt |
There was a problem hiding this comment.
Miss parameter description.
| val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig) | ||
|
|
||
| (dataset, coordinateOptConfig, lossFunctionFactory) match { | ||
| (rETypeOpt, coordinateOptConfig, lossFunctionFactory) match { |
There was a problem hiding this comment.
Can we just do a match on (coordinateOptConfig, lossFunctionFactory)? This rETypeOpt seems to be redundant.
| val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker) | ||
|
|
||
| (updatedFixedEffectModel, optimizationTracker) | ||
| override protected[algorithm] def updateOffset(model: DatumScoringModel) = { |
There was a problem hiding this comment.
Comments are missing for updateOffset.
| } | ||
|
|
||
| new CoordinateDataScores(scores) | ||
| def updateOffset( |
There was a problem hiding this comment.
Miss comments for updateOffset.
| fixedEffectModel.computeScore(dataset, SCORE_FIELD) | ||
| .withColumn(offset, col(offset) + col(SCORE_FIELD)) | ||
| } else { | ||
| throw new UnsupportedOperationException("It shouldn't happen!") |
There was a problem hiding this comment.
Can you make the error message more explicit?
|
|
||
| object FixedEffectCoordinate { | ||
|
|
||
| def SCORE_FIELD = "fixed_score" |
There was a problem hiding this comment.
Better to call it fixed_effect_score.
| } | ||
| } | ||
|
|
||
| def toDataFrame(input: RDD[(REType, GeneralizedLinearModel)]): DataFrame = { |
| import breeze.linalg.{Vector, cholesky, diag} | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
| import breeze.linalg.Vector | ||
| import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector => SparkVector} | ||
| import org.apache.spark.rdd.RDD | ||
|
|
|
|
||
| score | ||
| }) | ||
|
|
There was a problem hiding this comment.
This line is not necessary.
|
|
||
| var score = 0D | ||
|
|
||
| coefficients match { |
There was a problem hiding this comment.
If the features are dense, then the coefficients are usually dense. If the features are sparse (for random effect), then the coefficients are sparse. So it seems that
features.foreachActive { case (index, value) => score += value * denseCoef(index)}
is good enough. Will there be cases that coefficients are sparse but features are dense?
| .reduceByKey(_ + _) | ||
| .values | ||
| .stats() | ||
| .groupBy(idTag).agg(count("*").alias("cnt")) |
| @@ -0,0 +1,25 @@ | |||
| /* | |||
| * Copyright 2017 LinkedIn Corp. All rights reserved. | |||
|
|
||
| import java.util.Random | ||
|
|
||
| import com.linkedin.photon.ml.Types.UniqueSampleId |
There was a problem hiding this comment.
Please reorder this import.
The following changes in the design doc are covered.