mirror of https://github.com/dapr/java-sdk.git
				
				
				
			Workaround to get state for actors given runtime API response issue. (#165)
This commit is contained in:
		
							parent
							
								
									62f43e3f38
								
							
						
					
					
						commit
						50675532b2
					
				|  | @ -7,11 +7,14 @@ package io.dapr.actors.runtime; | ||||||
| 
 | 
 | ||||||
| import com.fasterxml.jackson.core.JsonFactory; | import com.fasterxml.jackson.core.JsonFactory; | ||||||
| import com.fasterxml.jackson.core.JsonGenerator; | import com.fasterxml.jackson.core.JsonGenerator; | ||||||
|  | import com.fasterxml.jackson.databind.ObjectMapper; | ||||||
| import io.dapr.actors.ActorId; | import io.dapr.actors.ActorId; | ||||||
| import io.dapr.serializer.DaprObjectSerializer; | import io.dapr.serializer.DaprObjectSerializer; | ||||||
| import io.dapr.serializer.StringContentType; | import io.dapr.serializer.StringContentType; | ||||||
|  | 
 | ||||||
| import java.io.ByteArrayOutputStream; | import java.io.ByteArrayOutputStream; | ||||||
| import java.io.IOException; | import java.io.IOException; | ||||||
|  | 
 | ||||||
| import reactor.core.publisher.Mono; | import reactor.core.publisher.Mono; | ||||||
| 
 | 
 | ||||||
| /** | /** | ||||||
|  | @ -19,6 +22,11 @@ import reactor.core.publisher.Mono; | ||||||
|  */ |  */ | ||||||
| class DaprStateAsyncProvider { | class DaprStateAsyncProvider { | ||||||
| 
 | 
 | ||||||
|  |   /** | ||||||
|  |    * Used to fix problem from Dapr's state response. | ||||||
|  |    */ | ||||||
|  |   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||||||
|  | 
 | ||||||
|   /** |   /** | ||||||
|    * Shared Json Factory as per Jackson's documentation, used only for this class. |    * Shared Json Factory as per Jackson's documentation, used only for this class. | ||||||
|    */ |    */ | ||||||
|  | @ -56,7 +64,7 @@ class DaprStateAsyncProvider { | ||||||
| 
 | 
 | ||||||
|     return result.flatMap(s -> { |     return result.flatMap(s -> { | ||||||
|       try { |       try { | ||||||
|         T response = this.stateSerializer.deserialize(s, clazz); |         T response = this.stateSerializer.deserialize(fixDaprStateResponse(s), clazz); | ||||||
|         if (response == null) { |         if (response == null) { | ||||||
|           return Mono.empty(); |           return Mono.empty(); | ||||||
|         } |         } | ||||||
|  | @ -129,7 +137,7 @@ class DaprStateAsyncProvider { | ||||||
|         generator.writeObjectFieldStart("request"); |         generator.writeObjectFieldStart("request"); | ||||||
|         generator.writeStringField("key", stateChange.getStateName()); |         generator.writeStringField("key", stateChange.getStateName()); | ||||||
|         if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) |         if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) | ||||||
|             || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { |           || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { | ||||||
|           byte[] data = this.stateSerializer.serialize(stateChange.getValue()); |           byte[] data = this.stateSerializer.serialize(stateChange.getValue()); | ||||||
|           if (data != null) { |           if (data != null) { | ||||||
|             if (this.isStateString) { |             if (this.isStateString) { | ||||||
|  | @ -164,4 +172,29 @@ class DaprStateAsyncProvider { | ||||||
| 
 | 
 | ||||||
|     return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), payload); |     return this.daprClient.saveActorStateTransactionally(actorType, actorId.toString(), payload); | ||||||
|   } |   } | ||||||
| } | 
 | ||||||
|  |   /** | ||||||
|  |    * Workaround for a bug in Dapr's runtime where actor state is saved as a JSON string and | ||||||
|  |    * returned as-is without deserializing first. | ||||||
|  |    * | ||||||
|  |    * @param raw Bytes received from Dapr. | ||||||
|  |    * @return Corrected byte[]. | ||||||
|  |    */ | ||||||
|  |   private byte[] fixDaprStateResponse(byte[] raw) { | ||||||
|  |     if (raw == null) { | ||||||
|  |       return raw; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     if (raw.length == 0) { | ||||||
|  |       return raw; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     try { | ||||||
|  |       return OBJECT_MAPPER.readValue(raw, String.class).getBytes(); | ||||||
|  |     } catch (IOException e) { | ||||||
|  |       // We could not fix it, so iit goes as-is. | ||||||
|  |       return raw; | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue